1414from pydantic_settings import BaseSettings as PydanticBaseSettings
1515from pydantic_settings import PydanticBaseSettingsSource , SettingsConfigDict
1616
17- from .context import copy_context , set_message_context
17+ from .context import MessageContext , copy_context , set_message_context
1818from .core .stream .functions .types import VoidExecutor
1919from .dataframe import DataFrameRegistry , StreamingDataFrame
2020from .error_callbacks import (
4545)
4646from .platforms .quix .env import QUIX_ENVIRONMENT
4747from .processing import ProcessingContext
48+ from .processing .watermarking import WatermarkManager
4849from .runtracker import RunTracker
4950from .sinks import SinkManager
5051from .sources import BaseSource , SourceException , SourceManager
5152from .state import StateStoreManager
5253from .state .recovery import RecoveryManager
5354from .state .rocksdb import RocksDBOptionsType
55+ from .utils .format import format_timestamp
5456from .utils .settings import BaseSettings
5557
5658__all__ = ("Application" , "ApplicationConfig" )
@@ -151,6 +153,8 @@ def __init__(
151153 topic_create_timeout : float = 60 ,
152154 processing_guarantee : ProcessingGuarantee = "at-least-once" ,
153155 max_partition_buffer_size : int = 10000 ,
156+ watermarking_default_assignor_enabled : bool = True ,
157+ watermarking_interval : float = 1.0 ,
154158 ):
155159 """
156160 :param broker_address: Connection settings for Kafka.
@@ -219,6 +223,14 @@ def __init__(
219223 It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
220224 Lower value decreases the memory use, but increases the latency.
221225 Default - `10000`.
226+ :param watermarking_default_assignor_enabled: when True, the applicaiton extracts watermarks
227+ from incoming messages by default (respecting the `Topic(timestamp_extractor)` if configured).
228+ When disabled, no watermarks will be emitted unless the `StreamingDataFrame.set_timestamp()`
229+ is called for each main StreamingDataFrame.
230+ Default - `True`.
231+
232+ :param watermarking_interval: how often to emit watermarks updates for assigned partitions (in seconds).
233+ Default - `1.0`s.
222234
223235 <br><br>***Error Handlers***<br>
224236 To handle errors, `Application` accepts callbacks triggered when
@@ -338,6 +350,7 @@ def __init__(
338350 rocksdb_options = rocksdb_options ,
339351 use_changelog_topics = use_changelog_topics ,
340352 max_partition_buffer_size = max_partition_buffer_size ,
353+ watermarking_default_assignor_enabled = watermarking_default_assignor_enabled ,
341354 )
342355
343356 self ._on_message_processed = on_message_processed
@@ -373,6 +386,11 @@ def __init__(
373386 self ._source_manager = SourceManager ()
374387 self ._sink_manager = SinkManager ()
375388 self ._dataframe_registry = DataFrameRegistry ()
389+ self ._watermark_manager = WatermarkManager (
390+ producer = self ._producer ,
391+ topic_manager = self ._topic_manager ,
392+ interval = watermarking_interval ,
393+ )
376394 self ._processing_context = ProcessingContext (
377395 commit_interval = self ._config .commit_interval ,
378396 commit_every = self ._config .commit_every ,
@@ -382,6 +400,7 @@ def __init__(
382400 exactly_once = self ._config .exactly_once ,
383401 sink_manager = self ._sink_manager ,
384402 dataframe_registry = self ._dataframe_registry ,
403+ watermark_manager = self ._watermark_manager ,
385404 )
386405 self ._run_tracker = RunTracker ()
387406
@@ -902,9 +921,19 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
902921 printer = self ._processing_context .printer
903922 run_tracker = self ._run_tracker
904923 consumer = self ._consumer
924+ producer = self ._producer
925+ producer_poll_timeout = self ._config .producer_poll_timeout
926+ watermark_manager = self ._watermark_manager
927+
928+ # Set the topics to be tracked by the Watermark manager
929+ watermark_manager .set_topics (topics = self ._dataframe_registry .consumer_topics )
905930
906931 consumer .subscribe (
907- topics = self ._dataframe_registry .consumer_topics + changelog_topics ,
932+ topics = self ._dataframe_registry .consumer_topics
933+ + changelog_topics
934+ + [
935+ self ._watermark_manager .watermarks_topic
936+ ], # TODO: We subscribe here because otherwise it can't deserialize a message. Maybe it's time to split poll() and deserialization
908937 on_assign = self ._on_assign ,
909938 on_revoke = self ._on_revoke ,
910939 on_lost = self ._on_lost ,
@@ -921,11 +950,14 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
921950 state_manager .do_recovery ()
922951 run_tracker .timeout_refresh ()
923952 else :
953+ # Serve producer callbacks
954+ producer .poll (producer_poll_timeout )
924955 process_message (dataframes_composed )
925956 processing_context .commit_checkpoint ()
926957 consumer .resume_backpressured ()
927958 source_manager .raise_for_error ()
928959 printer .print ()
960+ watermark_manager .produce ()
929961 run_tracker .update_status ()
930962
931963 logger .info ("Stopping the application" )
@@ -953,9 +985,7 @@ def _quix_runtime_init(self):
953985 if self ._state_manager .stores :
954986 check_state_management_enabled ()
955987
956- def _process_message (self , dataframe_composed ):
957- # Serve producer callbacks
958- self ._producer .poll (self ._config .producer_poll_timeout )
988+ def _process_message (self , dataframe_composed : dict [str , VoidExecutor ]):
959989 rows = self ._consumer .poll_row (
960990 timeout = self ._config .consumer_poll_timeout ,
961991 buffered = self ._dataframe_registry .requires_time_alignment ,
@@ -977,7 +1007,52 @@ def _process_message(self, dataframe_composed):
9771007 first_row .offset ,
9781008 )
9791009
1010+ if topic_name == self ._watermark_manager .watermarks_topic .name :
1011+ watermark = self ._watermark_manager .receive (message = first_row .value )
1012+ if watermark is None :
1013+ return
1014+
1015+ data_topics = self ._topic_manager .non_changelog_topics
1016+ data_tps = [
1017+ tp for tp in self ._consumer .assignment () if tp .topic in data_topics
1018+ ]
1019+ for tp in data_tps :
1020+ logger .info (
1021+ f"Process watermark { format_timestamp (watermark )} . "
1022+ f"topic={ tp .topic } partition={ tp .partition } timestamp={ watermark } "
1023+ )
1024+ # Create a MessageContext to process a watermark update
1025+ # for each assigned TP
1026+ watermark_ctx = MessageContext (
1027+ topic = tp .topic ,
1028+ partition = tp .partition ,
1029+ offset = None ,
1030+ size = 0 ,
1031+ )
1032+ context = copy_context ()
1033+ context .run (set_message_context , watermark_ctx )
1034+ # Execute StreamingDataFrame in a context
1035+ context .run (
1036+ dataframe_composed [tp .topic ],
1037+ value = None ,
1038+ key = None ,
1039+ timestamp = watermark ,
1040+ headers = [],
1041+ is_watermark = True ,
1042+ )
1043+ return
1044+
9801045 for row in rows :
1046+ if self ._config .watermarking_default_assignor_enabled :
1047+ # Update the watermark with the current row's timestamp
1048+ # if the default watermark assignor is enabled (True by default).
1049+ self ._processing_context .watermark_manager .store (
1050+ topic = row .topic ,
1051+ partition = row .partition ,
1052+ timestamp = row .timestamp ,
1053+ default = True ,
1054+ )
1055+
9811056 context = copy_context ()
9821057 context .run (set_message_context , row .context )
9831058 try :
@@ -1023,28 +1098,33 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10231098 self ._source_manager .start_sources ()
10241099
10251100 # Assign partitions manually to pause the changelog topics
1026- self ._consumer .assign (topic_partitions )
1027- # Pause changelog topic+partitions immediately after assignment
1028- non_changelog_topics = self ._topic_manager .non_changelog_topics
1029- changelog_tps = [
1030- tp for tp in topic_partitions if tp .topic not in non_changelog_topics
1101+ watermarks_partitions = [
1102+ TopicPartition (
1103+ topic = self ._watermark_manager .watermarks_topic .name , partition = i
1104+ )
1105+ for i in range (
1106+ self ._watermark_manager .watermarks_topic .broker_config .num_partitions
1107+ )
10311108 ]
1109+ # TODO: The set is used because the watermark tp can already be present in the "topic_partitions"
1110+ # because we use `subscribe()` earlier. Fix the mess later.
1111+ # TODO: Also, how to avoid reading the whole WM topic on each restart?
1112+ # We really need only the most recent data
1113+ # Is it fine to read it from the end? The active partitions must still publish something.
1114+ # Or should we commit it?
1115+ self ._consumer .assign (list (set (topic_partitions + watermarks_partitions )))
1116+
1117+ # Pause changelog topic+partitions immediately after assignment
1118+ changelog_topics = {t .name for t in self ._topic_manager .changelog_topics_list }
1119+ changelog_tps = [tp for tp in topic_partitions if tp .topic in changelog_topics ]
10321120 self ._consumer .pause (changelog_tps )
10331121
1034- if self ._state_manager .stores :
1035- non_changelog_tps = [
1036- tp for tp in topic_partitions if tp .topic in non_changelog_topics
1037- ]
1038- # Match the assigned TP with a stream ID via DataFrameRegistry
1039- for tp in non_changelog_tps :
1040- stream_ids = self ._dataframe_registry .get_stream_ids (
1041- topic_name = tp .topic
1042- )
1043- # Assign store partitions for the given stream ids
1044- for stream_id in stream_ids :
1045- self ._state_manager .on_partition_assign (
1046- stream_id = stream_id , partition = tp .partition
1047- )
1122+ data_topics = self ._topic_manager .non_changelog_topics
1123+ data_tps = [tp for tp in topic_partitions if tp .topic in data_topics ]
1124+
1125+ for tp in data_tps :
1126+ self ._assign_state_partitions (topic = tp .topic , partition = tp .partition )
1127+
10481128 self ._run_tracker .timeout_refresh ()
10491129
10501130 def _on_revoke (self , _ , topic_partitions : List [TopicPartition ]):
@@ -1064,7 +1144,12 @@ def _on_revoke(self, _, topic_partitions: List[TopicPartition]):
10641144 else :
10651145 self ._processing_context .commit_checkpoint (force = True )
10661146
1067- self ._revoke_state_partitions (topic_partitions = topic_partitions )
1147+ data_topics = self ._topic_manager .non_changelog_topics
1148+ data_tps = [tp for tp in topic_partitions if tp .topic in data_topics ]
1149+ for tp in data_tps :
1150+ self ._watermark_manager .on_revoke (topic = tp .topic , partition = tp .partition )
1151+ self ._revoke_state_partitions (topic = tp .topic , partition = tp .partition )
1152+
10681153 self ._consumer .reset_backpressure ()
10691154
10701155 def _on_lost (self , _ , topic_partitions : List [TopicPartition ]):
@@ -1073,23 +1158,34 @@ def _on_lost(self, _, topic_partitions: List[TopicPartition]):
10731158 """
10741159 logger .debug ("Rebalancing: dropping lost partitions" )
10751160
1076- self ._revoke_state_partitions (topic_partitions = topic_partitions )
1161+ data_tps = [
1162+ tp
1163+ for tp in topic_partitions
1164+ if tp .topic in self ._topic_manager .non_changelog_topics
1165+ ]
1166+ for tp in data_tps :
1167+ self ._watermark_manager .on_revoke (topic = tp .topic , partition = tp .partition )
1168+ self ._revoke_state_partitions (topic = tp .topic , partition = tp .partition )
1169+
10771170 self ._consumer .reset_backpressure ()
10781171
1079- def _revoke_state_partitions (self , topic_partitions : List [TopicPartition ]):
1080- non_changelog_topics = self ._topic_manager .non_changelog_topics
1081- non_changelog_tps = [
1082- tp for tp in topic_partitions if tp .topic in non_changelog_topics
1083- ]
1084- for tp in non_changelog_tps :
1085- if self ._state_manager .stores :
1086- stream_ids = self ._dataframe_registry .get_stream_ids (
1087- topic_name = tp .topic
1172+ def _assign_state_partitions (self , topic : str , partition : int ):
1173+ if self ._state_manager .stores :
1174+ # Match the assigned TP with a stream ID via DataFrameRegistry
1175+ stream_ids = self ._dataframe_registry .get_stream_ids (topic_name = topic )
1176+ # Assign store partitions for the given stream ids
1177+ for stream_id in stream_ids :
1178+ self ._state_manager .on_partition_assign (
1179+ stream_id = stream_id , partition = partition
1180+ )
1181+
1182+ def _revoke_state_partitions (self , topic : str , partition : int ):
1183+ if self ._state_manager .stores :
1184+ stream_ids = self ._dataframe_registry .get_stream_ids (topic_name = topic )
1185+ for stream_id in stream_ids :
1186+ self ._state_manager .on_partition_revoke (
1187+ stream_id = stream_id , partition = partition
10881188 )
1089- for stream_id in stream_ids :
1090- self ._state_manager .on_partition_revoke (
1091- stream_id = stream_id , partition = tp .partition
1092- )
10931189
10941190 def _setup_signal_handlers (self ):
10951191 signal .signal (signal .SIGINT , self ._on_sigint )
@@ -1141,6 +1237,7 @@ class ApplicationConfig(BaseSettings):
11411237 rocksdb_options : Optional [RocksDBOptionsType ] = None
11421238 use_changelog_topics : bool = True
11431239 max_partition_buffer_size : int = 10000
1240+ watermarking_default_assignor_enabled : bool = True
11441241
11451242 @classmethod
11461243 def settings_customise_sources (
0 commit comments