@@ -7974,6 +7974,151 @@ Implements retry logic to handle concurrent write conflicts.
79747974
79757975- `batch`: The batch of data to write.
79767976
7977+ <a id="quixstreams.sinks.community.kafka"></a>
7978+
7979+ ## quixstreams.sinks.community.kafka
7980+
7981+ <a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink"></a>
7982+
7983+ ### KafkaReplicatorSink
7984+
7985+ ```python
7986+ class KafkaReplicatorSink(BaseSink)
7987+ ```
7988+
7989+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L22)
7990+
7991+ A sink that produces data to an external Kafka cluster.
7992+
7993+ This sink uses the same serialization approach as the Quix Application.
7994+
7995+ Example Snippet:
7996+
7997+ ```python
7998+ from quixstreams import Application
7999+ from quixstreams.sinks.community.kafka import KafkaReplicatorSink
8000+
8001+ app = Application(
8002+ consumer_group="group",
8003+ )
8004+
8005+ topic = app.topic("input-topic")
8006+
8007+ # Define the external Kafka cluster configuration
8008+ kafka_sink = KafkaReplicatorSink(
8009+ broker_address="external-kafka:9092",
8010+ topic_name="output-topic",
8011+ value_serializer="json",
8012+ key_serializer="bytes",
8013+ )
8014+
8015+ sdf = app.dataframe(topic=topic)
8016+ sdf.sink(kafka_sink)
8017+
8018+ app.run()
8019+ ```
8020+
8021+ <a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.__init__"></a>
8022+
8023+ #### KafkaReplicatorSink.\_\_init\_\_
8024+
8025+ ```python
8026+ def __init__(
8027+ broker_address: Union[str, ConnectionConfig],
8028+ topic_name: str,
8029+ value_serializer: SerializerType = "json",
8030+ key_serializer: SerializerType = "bytes",
8031+ producer_extra_config: Optional[dict] = None,
8032+ flush_timeout: float = 10.0,
8033+ origin_topic: Optional[Topic] = None,
8034+ auto_create_sink_topic: bool = True,
8035+ on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
8036+ on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
8037+ ) -> None
8038+ ```
8039+
8040+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L55)
8041+
8042+ **Arguments**:
8043+
8044+ - `broker_address`: The connection settings for the external Kafka cluster.
8045+ Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
8046+ or a ConnectionConfig object if authentication is required.
8047+ - `topic_name`: The topic name to produce to on the external Kafka cluster.
8048+ - `value_serializer`: The serializer type for values.
8049+ Default - `json`.
8050+ - `key_serializer`: The serializer type for keys.
8051+ Default - `bytes`.
8052+ - `producer_extra_config`: A dictionary with additional options that
8053+ will be passed to `confluent_kafka.Producer` as is.
8054+ Default - `None`.
8055+ - `flush_timeout`: The time in seconds the producer waits for all messages
8056+ to be delivered during flush.
8057+ Default - 10.0.
8058+ - `origin_topic`: If auto-creating the sink topic, can optionally pass the
8059+ source topic to use its configuration.
8060+ - `auto_create_sink_topic`: Whether to try to create the sink topic upon startup
8061+ Default - True
8062+ - `on_client_connect_success`: An optional callback made after successful
8063+ client authentication, primarily for additional logging.
8064+ - `on_client_connect_failure`: An optional callback made after failed
8065+ client authentication (which should raise an Exception).
8066+ Callback should accept the raised Exception as an argument.
8067+ Callback must resolve (or propagate/re-raise) the Exception.
8068+
8069+ <a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.setup"></a>
8070+
8071+ #### KafkaReplicatorSink.setup
8072+
8073+ ```python
8074+ def setup()
8075+ ```
8076+
8077+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L111)
8078+
8079+ Initialize the InternalProducer and Topic for serialization.
8080+
8081+ <a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.add"></a>
8082+
8083+ #### KafkaReplicatorSink.add
8084+
8085+ ```python
8086+ def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples,
8087+ topic: str, partition: int, offset: int) -> None
8088+ ```
8089+
8090+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L146)
8091+
8092+ Add a message to be produced to the external Kafka cluster.
8093+
8094+ This method converts the provided data into a Row object and uses
8095+ the InternalProducer to serialize and produce it.
8096+
8097+ **Arguments**:
8098+
8099+ - `value`: The message value.
8100+ - `key`: The message key.
8101+ - `timestamp`: The message timestamp in milliseconds.
8102+ - `headers`: The message headers.
8103+ - `topic`: The source topic name.
8104+ - `partition`: The source partition.
8105+ - `offset`: The source offset.
8106+
8107+ <a id="quixstreams.sinks.community.kafka.KafkaReplicatorSink.flush"></a>
8108+
8109+ #### KafkaReplicatorSink.flush
8110+
8111+ ```python
8112+ def flush() -> None
8113+ ```
8114+
8115+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kafka.py#L190)
8116+
8117+ Flush the producer to ensure all messages are delivered.
8118+
8119+ This method is triggered by the Checkpoint class when it commits.
8120+ If flush fails, the checkpoint will be aborted.
8121+
79778122<a id="quixstreams.sinks.community.pubsub"></a>
79788123
79798124## quixstreams.sinks.community.pubsub
0 commit comments