Skip to content

Commit c89c72b

Browse files
tim-quixgwaramadze
andauthored
add community sink: quix -> another kafka (#1049)
* add community kafka sink * adjust comments * Correct logs --------- Co-authored-by: Remy Gwaramadze <gwaramadze@users.noreply.github.com>
1 parent ff292d0 commit c89c72b

File tree

1 file changed

+209
-0
lines changed

1 file changed

+209
-0
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
import logging
2+
from typing import Any, Optional, Union
3+
4+
from quixstreams.internal_producer import InternalProducer
5+
from quixstreams.kafka.configuration import ConnectionConfig
6+
from quixstreams.models import Row, Topic, TopicAdmin
7+
from quixstreams.models.messagecontext import MessageContext
8+
from quixstreams.models.serializers import SerializerType
9+
from quixstreams.models.types import HeadersTuples
10+
from quixstreams.sinks import (
11+
BaseSink,
12+
ClientConnectFailureCallback,
13+
ClientConnectSuccessCallback,
14+
SinkBackpressureError,
15+
)
16+
17+
__all__ = ("KafkaReplicatorSink",)
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class KafkaReplicatorSink(BaseSink):
23+
"""
24+
A sink that produces data to an external Kafka cluster.
25+
26+
This sink uses the same serialization approach as the Quix Application.
27+
28+
Example Snippet:
29+
30+
```python
31+
from quixstreams import Application
32+
from quixstreams.sinks.community.kafka import KafkaSink
33+
34+
app = Application(
35+
consumer_group="group",
36+
)
37+
38+
topic = app.topic("input-topic")
39+
40+
# Define the external Kafka cluster configuration
41+
kafka_sink = KafkaSink(
42+
broker_address="external-kafka:9092",
43+
topic_name="output-topic",
44+
value_serializer="json",
45+
key_serializer="bytes",
46+
)
47+
48+
sdf = app.dataframe(topic=topic)
49+
sdf.sink(kafka_sink)
50+
51+
app.run()
52+
```
53+
"""
54+
55+
def __init__(
56+
self,
57+
broker_address: Union[str, ConnectionConfig],
58+
topic_name: str,
59+
value_serializer: SerializerType = "json",
60+
key_serializer: SerializerType = "bytes",
61+
producer_extra_config: Optional[dict] = None,
62+
flush_timeout: float = 10.0,
63+
origin_topic: Optional[Topic] = None,
64+
auto_create_sink_topic: bool = True,
65+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
66+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
67+
) -> None:
68+
"""
69+
:param broker_address: The connection settings for the external Kafka cluster.
70+
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
71+
or a ConnectionConfig object if authentication is required.
72+
:param topic_name: The topic name to produce to on the external Kafka cluster.
73+
:param value_serializer: The serializer type for values.
74+
Default - `json`.
75+
:param key_serializer: The serializer type for keys.
76+
Default - `bytes`.
77+
:param producer_extra_config: A dictionary with additional options that
78+
will be passed to `confluent_kafka.Producer` as is.
79+
Default - `None`.
80+
:param flush_timeout: The time in seconds the producer waits for all messages
81+
to be delivered during flush.
82+
Default - 10.0.
83+
:param origin_topic: If auto-creating the sink topic, can optionally pass the
84+
source topic to use its configuration.
85+
:param auto_create_sink_topic: Whether to try to create the sink topic upon startup
86+
Default - True
87+
:param on_client_connect_success: An optional callback made after successful
88+
client authentication, primarily for additional logging.
89+
:param on_client_connect_failure: An optional callback made after failed
90+
client authentication (which should raise an Exception).
91+
Callback should accept the raised Exception as an argument.
92+
Callback must resolve (or propagate/re-raise) the Exception.
93+
"""
94+
super().__init__(
95+
on_client_connect_success=on_client_connect_success,
96+
on_client_connect_failure=on_client_connect_failure,
97+
)
98+
99+
self._broker_address = broker_address
100+
self._topic_name = topic_name
101+
self._value_serializer = value_serializer
102+
self._key_serializer = key_serializer
103+
self._producer_extra_config = producer_extra_config or {}
104+
self._flush_timeout = flush_timeout
105+
self._auto_create_sink_topic = auto_create_sink_topic
106+
self._origin_topic = origin_topic
107+
108+
self._producer: Optional[InternalProducer] = None
109+
self._topic: Optional[Topic] = None
110+
111+
def setup(self):
112+
"""
113+
Initialize the InternalProducer and Topic for serialization.
114+
"""
115+
logger.info(
116+
f"Setting up KafkaReplicatorSink: "
117+
f'broker_address="{self._broker_address}" '
118+
f'topic="{self._topic_name}" '
119+
f'value_serializer="{self._value_serializer}" '
120+
f'key_serializer="{self._key_serializer}"'
121+
)
122+
123+
self._producer = InternalProducer(
124+
broker_address=self._broker_address,
125+
extra_config=self._producer_extra_config,
126+
flush_timeout=self._flush_timeout,
127+
transactional=False,
128+
)
129+
130+
self._topic = Topic(
131+
name=self._topic_name,
132+
value_serializer=self._value_serializer,
133+
key_serializer=self._key_serializer,
134+
create_config=self._origin_topic.broker_config
135+
if self._origin_topic
136+
else None,
137+
)
138+
139+
if self._auto_create_sink_topic:
140+
admin = TopicAdmin(
141+
broker_address=self._broker_address,
142+
extra_config=self._producer_extra_config,
143+
)
144+
admin.create_topics(topics=[self._topic])
145+
146+
def add(
147+
self,
148+
value: Any,
149+
key: Any,
150+
timestamp: int,
151+
headers: HeadersTuples,
152+
topic: str,
153+
partition: int,
154+
offset: int,
155+
) -> None:
156+
"""
157+
Add a message to be produced to the external Kafka cluster.
158+
159+
This method converts the provided data into a Row object and uses
160+
the InternalProducer to serialize and produce it.
161+
162+
:param value: The message value.
163+
:param key: The message key.
164+
:param timestamp: The message timestamp in milliseconds.
165+
:param headers: The message headers.
166+
:param topic: The source topic name.
167+
:param partition: The source partition.
168+
:param offset: The source offset.
169+
"""
170+
context = MessageContext(
171+
topic=topic,
172+
partition=partition,
173+
offset=offset,
174+
size=0,
175+
leader_epoch=None,
176+
)
177+
row = Row(
178+
value=value,
179+
key=key,
180+
timestamp=timestamp,
181+
context=context,
182+
headers=headers,
183+
)
184+
self._producer.produce_row(
185+
row=row,
186+
topic=self._topic,
187+
timestamp=timestamp,
188+
)
189+
190+
def flush(self) -> None:
191+
"""
192+
Flush the producer to ensure all messages are delivered.
193+
194+
This method is triggered by the Checkpoint class when it commits.
195+
If flush fails, the checkpoint will be aborted.
196+
"""
197+
logger.debug(f'Flushing KafkaReplicatorSink for topic "{self._topic_name}"')
198+
199+
# Flush all pending messages
200+
result = self._producer.flush(timeout=self._flush_timeout)
201+
202+
if result > 0:
203+
logger.warning(
204+
f"{result} messages were not delivered to Kafka topic "
205+
f'"{self._topic_name}" within the flush timeout of {self._flush_timeout}s'
206+
)
207+
raise SinkBackpressureError(retry_after=10.0)
208+
209+
logger.debug(f'Successfully flushed KafkaSink for topic "{self._topic_name}"')

0 commit comments

Comments
 (0)