diff --git a/python/destinations/kafka/README.md b/python/destinations/kafka/README.md new file mode 100644 index 00000000..c2e96bd0 --- /dev/null +++ b/python/destinations/kafka/README.md @@ -0,0 +1,52 @@ +# Kafka Replicator Sink + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/kafka) demonstrates how to consume data from a Quix topic and produce it to an external Kafka cluster. + +This sink uses the `KafkaReplicatorSink` to serialize and produce messages to an external Kafka cluster, making it easy to replicate data between Kafka clusters or export data from Quix to other Kafka-based systems. + +## How to run + +Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Requirements / Prerequisites + +You'll need to have an external Kafka cluster accessible either locally or in the cloud. + +## Environment Variables + +The connector uses the following environment variables: + +### Required +- **input**: Name of the input topic to listen to. +- **SINK_OUTPUT_TOPIC**: The target Kafka topic name to produce to on the external Kafka cluster. +- **SINK_BOOTSTRAP_SERVERS**: The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092). + +### Optional +- **CONSUMER_GROUP**: Name of the consumer group for consuming from Quix. Default: "kafka_sink" +- **SINK_KEY_SERIALIZER**: Serializer to use for the message key. Options: json, bytes, string, double, integer. Default: "bytes" +- **SINK_VALUE_SERIALIZER**: Serializer to use for the message value. Options: json, bytes, string, double, integer. Default: "json" +- **SINK_AUTO_CREATE_TOPIC**: Whether to attempt to create the sink topic upon startup. Default: "true" + +### Authentication (Optional) +- **SINK_SECURITY_PROTOCOL**: Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL +- **SINK_SASL_MECHANISM**: SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM +- **SINK_SASL_USERNAME**: SASL username for external Kafka authentication. +- **SINK_SASL_PASSWORD**: SASL password for external Kafka authentication. +- **SINK_SSL_CA_LOCATION**: Path to the SSL CA certificate file for secure connections. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/kafka/dockerfile b/python/destinations/kafka/dockerfile new file mode 100644 index 00000000..51b504f9 --- /dev/null +++ b/python/destinations/kafka/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] diff --git a/python/destinations/kafka/library.json b/python/destinations/kafka/library.json new file mode 100644 index 00000000..166cabfd --- /dev/null +++ b/python/destinations/kafka/library.json @@ -0,0 +1,113 @@ +{ + "libraryItemId": "kafka-replicator-sink", + "name": "Kafka Replicator Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["Data streaming"] + }, + "shortDescription": "Consume data from a Quix topic and produce it to an external Kafka cluster", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Name of the input topic to listen to.", + "Required": true + }, + { + "Name": "SINK_AUTO_CREATE_TOPIC", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used", + "defaultValue": true, + "Required": false + }, + { + "Name": "CONSUMER_GROUP", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Name of the consumer group", + "DefaultValue": "kafka_sink", + "Required": false + }, + { + "Name": "SINK_OUTPUT_TOPIC", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The target Kafka topic name to produce to on the external Kafka cluster", + "Required": true + }, + { + "Name": "SINK_BOOTSTRAP_SERVERS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092)", + "Required": true + }, + { + "Name": "SINK_KEY_SERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Serializer to use for the message key. Options: json, bytes, string, double, integer", + "DefaultValue": "bytes", + "Required": false + }, + { + "Name": "SINK_VALUE_SERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Serializer to use for the message value. Options: json, bytes, string, double, integer", + "DefaultValue": "json", + "Required": false + }, + { + "Name": "SINK_SECURITY_PROTOCOL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL", + "Required": false + }, + { + "Name": "SINK_SASL_MECHANISM", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM", + "Required": false + }, + { + "Name": "SINK_SASL_USERNAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "SASL username for external Kafka authentication", + "Required": false + }, + { + "Name": "SINK_SASL_PASSWORD", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "SASL password for external Kafka authentication", + "Required": false + }, + { + "Name": "SINK_SSL_CA_LOCATION", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 200, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} diff --git a/python/destinations/kafka/main.py b/python/destinations/kafka/main.py new file mode 100644 index 00000000..63dc2738 --- /dev/null +++ b/python/destinations/kafka/main.py @@ -0,0 +1,72 @@ +import os +from typing import Tuple, Type + +from pydantic_settings import ( + BaseSettings as PydanticBaseSettings, + PydanticBaseSettingsSource, + SettingsConfigDict +) + +from quixstreams import Application +from quixstreams.kafka.configuration import ConnectionConfig + +from sink import KafkaReplicatorSink + + +class SinkConnectionConfig(ConnectionConfig): + """ + A ConnectionConfig subclass that reads configuration from environment variables + with a SINK_ prefix. + + This allows users to configure the sink's Kafka connection using environment + variables like SINK_BOOTSTRAP_SERVERS, SINK_SASL_USERNAME, etc. + + Example: + export SINK_BOOTSTRAP_SERVERS=kafka:9092 + export SINK_SECURITY_PROTOCOL=SASL_SSL + export SINK_SASL_MECHANISM=PLAIN + export SINK_SASL_USERNAME=myuser + export SINK_SASL_PASSWORD=mypass + + # Then create the config + config = SinkConnectionConfig() + sink = KafkaSink(broker_address=config, topic_name="output-topic") + """ + + model_config = SettingsConfigDict( + env_prefix="SINK_", + ) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[PydanticBaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> Tuple[PydanticBaseSettingsSource, ...]: + """ + Enable reading values from environment variables with SINK_ prefix. + """ + return init_settings, env_settings + + +app = Application( + consumer_group=os.environ["CONSUMER_GROUP"], + auto_offset_reset="earliest", +) +input_topic = app.topic(os.environ['input']) +kafka_sink = KafkaReplicatorSink( + broker_address=SinkConnectionConfig(), + topic_name=os.environ["SINK_OUTPUT_TOPIC"], + key_serializer=os.getenv("SINK_KEY_SERIALIZER", "bytes"), + value_serializer=os.getenv("SINK_VALUE_SERIALIZER", "json"), + origin_topic=input_topic, + auto_create_sink_topic=os.getenv("SINK_AUTO_CREATE_TOPIC", "true").lower() == "true", +) +app.dataframe(input_topic).sink(kafka_sink) + + +if __name__ == '__main__': + app.run() diff --git a/python/destinations/kafka/requirements.txt b/python/destinations/kafka/requirements.txt new file mode 100644 index 00000000..12b3aec5 --- /dev/null +++ b/python/destinations/kafka/requirements.txt @@ -0,0 +1,2 @@ +quixstreams==3.23.1 +python-dotenv diff --git a/python/destinations/kafka/sink.py b/python/destinations/kafka/sink.py new file mode 100644 index 00000000..24495b9e --- /dev/null +++ b/python/destinations/kafka/sink.py @@ -0,0 +1,219 @@ +import logging +from typing import Any, Optional, Union + +from quixstreams.internal_producer import InternalProducer +from quixstreams.kafka.configuration import ConnectionConfig +from quixstreams.models import Row, Topic, TopicAdmin +from quixstreams.models.messagecontext import MessageContext +from quixstreams.models.serializers import SerializerType +from quixstreams.models.types import HeadersTuples +from quixstreams.sinks import ( + BaseSink, + ClientConnectFailureCallback, + ClientConnectSuccessCallback, + SinkBackpressureError, +) + +__all__ = ("KafkaReplicatorSink",) + +logger = logging.getLogger(__name__) + + +class KafkaReplicatorSink(BaseSink): + """ + A sink that produces data to an external Kafka cluster. + + This sink uses the InternalProducer to serialize Row objects using the same + serializers as the Quix Application, making it easy to replicate data to + another Kafka cluster. + + Example Snippet: + + ```python + from quixstreams import Application + from quixstreams.sinks.community.kafka import KafkaSink + + app = Application( + consumer_group="group", + ) + + topic = app.topic("input-topic") + + # Define the external Kafka cluster configuration + kafka_sink = KafkaSink( + broker_address="external-kafka:9092", + topic_name="output-topic", + value_serializer="json", + key_serializer="bytes", + ) + + sdf = app.dataframe(topic=topic) + sdf.sink(kafka_sink) + + app.run() + ``` + """ + + def __init__( + self, + broker_address: Union[str, ConnectionConfig], + topic_name: str, + value_serializer: SerializerType = "json", + key_serializer: SerializerType = "bytes", + producer_extra_config: Optional[dict] = None, + flush_timeout: float = 10.0, + origin_topic: Optional[Topic] = None, + auto_create_sink_topic: bool = True, + on_client_connect_success: Optional[ClientConnectSuccessCallback] = None, + on_client_connect_failure: Optional[ClientConnectFailureCallback] = None, + ) -> None: + """ + :param broker_address: The connection settings for the external Kafka cluster. + Accepts string with Kafka broker host and port formatted as `:`, + or a ConnectionConfig object if authentication is required. + :param topic_name: The topic name to produce to on the external Kafka cluster. + :param value_serializer: The serializer type for values. + Default - `json`. + :param key_serializer: The serializer type for keys. + Default - `bytes`. + :param producer_extra_config: A dictionary with additional options that + will be passed to `confluent_kafka.Producer` as is. + Default - `None`. + :param flush_timeout: The time in seconds the producer waits for all messages + to be delivered during flush. + Default - 10.0. + :param origin_topic: If auto-creating the sink topic, can optionally pass the + source topic to use its configuration. + :param auto_create_sink_topic: Whether to try to create the sink topic upon startup + Default - True + :param on_client_connect_success: An optional callback made after successful + client authentication, primarily for additional logging. + :param on_client_connect_failure: An optional callback made after failed + client authentication (which should raise an Exception). + Callback should accept the raised Exception as an argument. + Callback must resolve (or propagate/re-raise) the Exception. + """ + super().__init__( + on_client_connect_success=on_client_connect_success, + on_client_connect_failure=on_client_connect_failure, + ) + + self._broker_address = broker_address + self._topic_name = topic_name + self._value_serializer = value_serializer + self._key_serializer = key_serializer + self._producer_extra_config = producer_extra_config or {} + self._flush_timeout = flush_timeout + self._auto_create_sink_topic = auto_create_sink_topic + self._origin_topic = origin_topic + + self._producer: Optional[InternalProducer] = None + self._topic: Optional[Topic] = None + + def setup(self): + """ + Initialize the InternalProducer and Topic for serialization. + """ + logger.info( + f"Setting up KafkaSink: " + f'broker_address="{self._broker_address}" ' + f'topic="{self._topic_name}" ' + f'value_serializer="{self._value_serializer}" ' + f'key_serializer="{self._key_serializer}"' + ) + + # Create the InternalProducer + self._producer = InternalProducer( + broker_address=self._broker_address, + extra_config=self._producer_extra_config, + flush_timeout=self._flush_timeout, + transactional=False, + ) + + # Create a Topic object for serialization + # The topic is not created on the external cluster by this sink + self._topic = Topic( + name=self._topic_name, + value_serializer=self._value_serializer, + key_serializer=self._key_serializer, + create_config=self._origin_topic.broker_config + if self._origin_topic + else None, + ) + + if self._auto_create_sink_topic: + admin = TopicAdmin( + broker_address=self._broker_address, + extra_config=self._producer_extra_config, + ) + admin.create_topics(topics=[self._topic]) + + def add( + self, + value: Any, + key: Any, + timestamp: int, + headers: HeadersTuples, + topic: str, + partition: int, + offset: int, + ) -> None: + """ + Add a message to be produced to the external Kafka cluster. + + This method converts the provided data into a Row object and uses + the InternalProducer to serialize and produce it. + + :param value: The message value. + :param key: The message key. + :param timestamp: The message timestamp in milliseconds. + :param headers: The message headers. + :param topic: The source topic name. + :param partition: The source partition. + :param offset: The source offset. + """ + # Create a Row object from the provided data + # We need a MessageContext for the Row + context = MessageContext( + topic=topic, + partition=partition, + offset=offset, + size=0, + leader_epoch=None, + ) + + row = Row( + value=value, + key=key, + timestamp=timestamp, + context=context, + headers=headers, + ) + + # Use InternalProducer to serialize and produce the row + self._producer.produce_row( + row=row, + topic=self._topic, + timestamp=timestamp, + ) + + def flush(self) -> None: + """ + Flush the producer to ensure all messages are delivered. + + This method is triggered by the Checkpoint class when it commits. + If flush fails, the checkpoint will be aborted. + """ + logger.debug(f'Flushing KafkaSink for topic "{self._topic_name}"') + + # Flush all pending messages + result = self._producer.flush(timeout=self._flush_timeout) + + if result > 0: + logger.warning( + f"{result} messages were not delivered to Kafka topic " + f'"{self._topic_name}" within the flush timeout of {self._flush_timeout}s' + ) + raise SinkBackpressureError(retry_after=10.0) + + logger.debug(f'Successfully flushed KafkaSink for topic "{self._topic_name}"') diff --git a/tests/destinations/kafka/data.jsonlines b/tests/destinations/kafka/data.jsonlines new file mode 100644 index 00000000..492c61a9 --- /dev/null +++ b/tests/destinations/kafka/data.jsonlines @@ -0,0 +1,3 @@ +{"_key": "test0", "_value": {"k0": "v0", "k1": "v1"}} +{"_key": "test1", "_value": {"k0": "v0", "k1": "v1"}} +{"_key": "test2", "_value": {"k0": "v0", "k1": "v1"}} diff --git a/tests/destinations/kafka/docker-compose.test.yml b/tests/destinations/kafka/docker-compose.test.yml new file mode 100644 index 00000000..f00c6d6c --- /dev/null +++ b/tests/destinations/kafka/docker-compose.test.yml @@ -0,0 +1,130 @@ +# timeout: 60 +services: + # Quix Kafka instance (source) + quix-kafka: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092 + - --advertise-kafka-addr internal://quix-kafka:9092 + - --mode dev-container + - --smp 1 + healthcheck: + test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] + interval: 5s + timeout: 10s + retries: 10 + networks: + - test-network + stop_grace_period: 3s + + # External Kafka instance (destination) + external-kafka: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092 + - --advertise-kafka-addr internal://external-kafka:9092 + - --mode dev-container + - --smp 1 + healthcheck: + test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] + interval: 5s + timeout: 10s + retries: 10 + networks: + - test-network + stop_grace_period: 3s + + # Generate data to Quix Kafka + data-generator: + build: + context: ../../framework + dockerfile: Dockerfile + command: > + sh -c " + echo 'Waiting for Quix Kafka to be ready...' && + sleep 5 && + echo 'Generating test data to Quix Kafka...' && + python /tests/generate_data.py && + echo 'Test data generated successfully' && + echo 'Keeping data-generator alive to prevent test abort...' && + tail -f /dev/null + " + volumes: + - ./generate_data.py:/tests/generate_data.py:ro + - ./data.jsonlines:/tests/data.jsonlines:ro + working_dir: /tests + networks: + - test-network + depends_on: + quix-kafka: + condition: service_healthy + stop_grace_period: 3s + + # Kafka replicator sink (consumes from quix, produces to external) + kafka-sink: + build: + context: ../../../python/destinations/kafka + dockerfile: dockerfile + entrypoint: > + sh -c " + echo 'Waiting for data generator to populate Quix Kafka...' && + sleep 10 && + echo 'Starting Kafka replicator sink...' && + python3 main.py + " + environment: + - Quix__Broker__Address=quix-kafka:9092 + - Quix__Consumer__Group=kafka-sink-test + - Quix__Deployment__Id=test-kafka-sink + - input=source-topic + - CONSUMER_GROUP=kafka-sink-test + - SINK_OUTPUT_TOPIC=replicated-topic + - SINK_BOOTSTRAP_SERVERS=external-kafka:9092 + - SINK_KEY_SERIALIZER=bytes + - SINK_VALUE_SERIALIZER=json + networks: + - test-network + depends_on: + quix-kafka: + condition: service_healthy + external-kafka: + condition: service_healthy + data-generator: + condition: service_started + stop_grace_period: 3s + + # Verify output in external Kafka + test-verifier: + build: + context: ../../framework + dockerfile: Dockerfile + command: > + sh -c " + echo 'Waiting for kafka sink to replicate data...' && + sleep 10 && + echo 'Verifying replicated data in external Kafka...' && + python /tests/verify_output.py + " + environment: + - EXTERNAL_BROKER_ADDRESS=external-kafka:9092 + - TEST_OUTPUT_TOPIC=replicated-topic + - TEST_TIMEOUT=40 + volumes: + - ./verify_output.py:/tests/verify_output.py:ro + working_dir: / + networks: + - test-network + depends_on: + external-kafka: + condition: service_healthy + kafka-sink: + condition: service_started + stop_grace_period: 3s + +networks: + test-network: + driver: bridge diff --git a/tests/destinations/kafka/generate_data.py b/tests/destinations/kafka/generate_data.py new file mode 100644 index 00000000..32b921c0 --- /dev/null +++ b/tests/destinations/kafka/generate_data.py @@ -0,0 +1,21 @@ +import datetime + +from quixstreams.sources.community.file.local import LocalFileSource +from quixstreams import Application + + +def ts_setter(row): + return int(datetime.datetime.now().timestamp()) + + +app = Application( + broker_address="quix-kafka:9092", + auto_offset_reset="earliest" +) +app.add_source( + source=LocalFileSource(filepath="./data.jsonlines", timestamp_setter=ts_setter), + topic=app.topic("source-topic"), +) + +if __name__ == "__main__": + app.run() diff --git a/tests/destinations/kafka/verify_output.py b/tests/destinations/kafka/verify_output.py new file mode 100644 index 00000000..7398e891 --- /dev/null +++ b/tests/destinations/kafka/verify_output.py @@ -0,0 +1,84 @@ +import os +import time +from quixstreams import Application +from quixstreams.sinks.core.list import ListSink + + +def main(): + broker_address = os.getenv("EXTERNAL_BROKER_ADDRESS", "external-kafka:9092") + output_topic = os.getenv("TEST_OUTPUT_TOPIC", "replicated-topic") + timeout = int(os.getenv("TEST_TIMEOUT", "40")) + expected_count = 3 + + print(f"Consuming from output topic: {output_topic}") + print(f"Expected {expected_count} messages") + + app = Application( + broker_address=broker_address, + consumer_group=f"test-consumer-{int(time.time())}", + auto_offset_reset="earliest" + ) + + topic = app.topic(output_topic) + list_sink = ListSink(metadata=True) + + sdf = app.dataframe(topic) + sdf.sink(list_sink) + + app.run(count=expected_count, timeout=timeout) + + message_count = len(list_sink) + print(f"Received {message_count} messages from output topic") + + if message_count < expected_count: + print(f"FAILED: Expected {expected_count} messages, got {message_count}") + exit(1) + + print("Verifying message structure...") + + # Expected data from data.jsonlines + expected_keys = ["test0", "test1", "test2"] + received_keys = [] + + for i, message in enumerate(list_sink): + print(f"Message {i}: {message}") + + # Verify the message has the expected structure + if "_key" not in message: + print(f"FAILED: Message {i} missing '_key' field") + exit(1) + + received_keys.append(message["_key"].decode()) + + # Verify _value structure + value = message + if not isinstance(value, dict): + print(f"FAILED: Message {i} _value should be a dict, got {type(value)}") + exit(1) + + if "k0" not in value or "k1" not in value: + print(f"FAILED: Message {i} _value missing expected fields (k0, k1)") + exit(1) + + if value["k0"] != "v0" or value["k1"] != "v1": + print(f"FAILED: Message {i} _value has unexpected values") + exit(1) + + # Verify all expected keys were received + print(f"Received keys: {received_keys}") + print(f"Expected keys: {expected_keys}") + + for expected_key in expected_keys: + if expected_key not in received_keys: + print(f"FAILED: Expected key '{expected_key}' not found in messages") + exit(1) + + print(f"Success: Verified {message_count} messages with correct structure") + print(f"- All messages have required fields: _key, _value") + print(f"- All expected keys present: {expected_keys}") + print(f"- All values have correct structure and content") + exit(0) + + +if __name__ == "__main__": + main()