@@ -606,108 +606,110 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
606606 lsn_received_timestamp = datetime .datetime .utcnow ()
607607 poll_timestamp = datetime .datetime .utcnow ()
608608
609- while True :
610- # Disconnect when no data received for logical_poll_total_seconds
611- # needs to be long enough to wait for the largest single wal payload to avoid unplanned timeouts
612- poll_duration = (datetime .datetime .utcnow () - lsn_received_timestamp ).total_seconds ()
613- if poll_duration > logical_poll_total_seconds :
614- LOGGER .info ('Breaking - %i seconds of polling with no data' , poll_duration )
615- break
616-
617- if datetime .datetime .utcnow () >= (start_run_timestamp + datetime .timedelta (seconds = max_run_seconds )):
618- LOGGER .info ('Breaking - reached max_run_seconds of %i' , max_run_seconds )
619- break
620-
621- try :
622- msg = cur .read_message ()
623- except Exception as e :
624- LOGGER .error (e )
625- raise
609+ try :
610+ while True :
611+ # Disconnect when no data received for logical_poll_total_seconds
612+ # needs to be long enough to wait for the largest single wal payload to avoid unplanned timeouts
613+ poll_duration = (datetime .datetime .utcnow () - lsn_received_timestamp ).total_seconds ()
614+ if poll_duration > logical_poll_total_seconds :
615+ LOGGER .info ('Breaking - %i seconds of polling with no data' , poll_duration )
616+ break
626617
627- if msg :
628- if (break_at_end_lsn ) and (msg .data_start > end_lsn ):
629- LOGGER .info ('Breaking - latest wal message %s is past end_lsn %s' ,
630- int_to_lsn (msg .data_start ),
631- int_to_lsn (end_lsn ))
618+ if datetime .datetime .utcnow () >= (start_run_timestamp + datetime .timedelta (seconds = max_run_seconds )):
619+ LOGGER .info ('Breaking - reached max_run_seconds of %i' , max_run_seconds )
632620 break
633621
634- state = consume_message (logical_streams , state , msg , time_extracted , conn_info )
635-
636- # When using wal2json with write-in-chunks, multiple messages can have the same lsn
637- # This is to ensure we only flush to lsn that has completed entirely
638- if lsn_currently_processing is None :
639- lsn_currently_processing = msg .data_start
640- LOGGER .info ('First wal message received is %s' , int_to_lsn (lsn_currently_processing ))
641-
642- # Flush Postgres wal up to lsn comitted in previous run, or first lsn received in this run
643- lsn_to_flush = lsn_comitted
644- if lsn_currently_processing < lsn_to_flush :
645- lsn_to_flush = lsn_currently_processing
646- LOGGER .info ('Confirming write up to %s, flush to %s' ,
647- int_to_lsn (lsn_to_flush ),
648- int_to_lsn (lsn_to_flush ))
649- cur .send_feedback (write_lsn = lsn_to_flush , flush_lsn = lsn_to_flush , reply = True , force = True )
650-
651- elif int (msg .data_start ) > lsn_currently_processing :
652- lsn_last_processed = lsn_currently_processing
653- lsn_currently_processing = msg .data_start
654- lsn_received_timestamp = datetime .datetime .utcnow ()
655- lsn_processed_count = lsn_processed_count + 1
656- if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD :
657- LOGGER .debug ('Updating bookmarks for all streams to lsn = %s (%s)' ,
658- lsn_last_processed ,
659- int_to_lsn (lsn_last_processed ))
660- for s in logical_streams :
661- state = singer .write_bookmark (state , s ['tap_stream_id' ], 'lsn' , lsn_last_processed )
662- singer .write_message (singer .StateMessage (value = copy .deepcopy (state )))
663- lsn_processed_count = 0
664- else :
665622 try :
666- # Wait for a second unless a message arrives
667- select ([cur ], [], [], 1 )
668- except InterruptedError :
669- pass
670-
671- # Every poll_interval, update latest comitted lsn position from the state_file
672- if datetime .datetime .utcnow () >= (poll_timestamp + datetime .timedelta (seconds = poll_interval )):
673- if lsn_currently_processing is None :
674- LOGGER .info ('Waiting for first wal message' )
623+ msg = cur .read_message ()
624+ except Exception as e :
625+ LOGGER .error (e )
626+ raise
627+
628+ if msg :
629+ if (break_at_end_lsn ) and (msg .data_start > end_lsn ):
630+ LOGGER .info ('Breaking - latest wal message %s is past end_lsn %s' ,
631+ int_to_lsn (msg .data_start ),
632+ int_to_lsn (end_lsn ))
633+ break
634+
635+ state = consume_message (logical_streams , state , msg , time_extracted , conn_info )
636+
637+ # When using wal2json with write-in-chunks, multiple messages can have the same lsn
638+ # This is to ensure we only flush to lsn that has completed entirely
639+ if lsn_currently_processing is None :
640+ lsn_currently_processing = msg .data_start
641+ LOGGER .info ('First wal message received is %s' , int_to_lsn (lsn_currently_processing ))
642+
643+ # Flush Postgres wal up to lsn comitted in previous run, or first lsn received in this run
644+ lsn_to_flush = lsn_comitted
645+ if lsn_currently_processing < lsn_to_flush :
646+ lsn_to_flush = lsn_currently_processing
647+ LOGGER .info ('Confirming write up to %s, flush to %s' ,
648+ int_to_lsn (lsn_to_flush ),
649+ int_to_lsn (lsn_to_flush ))
650+ cur .send_feedback (write_lsn = lsn_to_flush , flush_lsn = lsn_to_flush , reply = True , force = True )
651+
652+ elif int (msg .data_start ) > lsn_currently_processing :
653+ lsn_last_processed = lsn_currently_processing
654+ lsn_currently_processing = msg .data_start
655+ lsn_received_timestamp = datetime .datetime .utcnow ()
656+ lsn_processed_count = lsn_processed_count + 1
657+ if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD :
658+ LOGGER .debug ('Updating bookmarks for all streams to lsn = %s (%s)' ,
659+ lsn_last_processed ,
660+ int_to_lsn (lsn_last_processed ))
661+ for s in logical_streams :
662+ state = singer .write_bookmark (state , s ['tap_stream_id' ], 'lsn' , lsn_last_processed )
663+ singer .write_message (singer .StateMessage (value = copy .deepcopy (state )))
664+ lsn_processed_count = 0
675665 else :
676- LOGGER .info ('Lastest wal message received was %s' , int_to_lsn (lsn_last_processed ))
677666 try :
678- with open (state_file , mode = "r" , encoding = "utf-8" ) as fh :
679- state_comitted = json .load (fh )
680- except Exception :
681- LOGGER .debug ('Unable to open and parse %s' , state_file )
682- finally :
683- lsn_comitted = min (
684- [get_bookmark (state_comitted , s ['tap_stream_id' ], 'lsn' ) for s in logical_streams ])
685- if (lsn_currently_processing > lsn_comitted ) and (lsn_comitted > lsn_to_flush ):
686- lsn_to_flush = lsn_comitted
687- LOGGER .info ('Confirming write up to %s, flush to %s' ,
688- int_to_lsn (lsn_to_flush ),
689- int_to_lsn (lsn_to_flush ))
690- cur .send_feedback (write_lsn = lsn_to_flush , flush_lsn = lsn_to_flush , reply = True , force = True )
691-
692- poll_timestamp = datetime .datetime .utcnow ()
693-
694- # Close replication connection and cursor
695- cur .close ()
696- conn .close ()
697-
698- if lsn_last_processed :
699- if lsn_comitted > lsn_last_processed :
700- lsn_last_processed = lsn_comitted
701- LOGGER .info ('Current lsn_last_processed %s is older than lsn_comitted %s' ,
702- int_to_lsn (lsn_last_processed ),
703- int_to_lsn (lsn_comitted ))
704-
705- LOGGER .info ('Updating bookmarks for all streams to lsn = %s (%s)' ,
706- lsn_last_processed ,
707- int_to_lsn (lsn_last_processed ))
708-
709- for s in logical_streams :
710- state = singer .write_bookmark (state , s ['tap_stream_id' ], 'lsn' , lsn_last_processed )
711-
712- singer .write_message (singer .StateMessage (value = copy .deepcopy (state )))
667+ # Wait for a second unless a message arrives
668+ select ([cur ], [], [], 1 )
669+ except InterruptedError :
670+ pass
671+
672+ # Every poll_interval, update latest comitted lsn position from the state_file
673+ if datetime .datetime .utcnow () >= (poll_timestamp + datetime .timedelta (seconds = poll_interval )):
674+ if lsn_currently_processing is None :
675+ LOGGER .info ('Waiting for first wal message' )
676+ else :
677+ LOGGER .info ('Lastest wal message received was %s' , int_to_lsn (lsn_last_processed ))
678+ try :
679+ with open (state_file , mode = "r" , encoding = "utf-8" ) as fh :
680+ state_comitted = json .load (fh )
681+ except Exception :
682+ LOGGER .debug ('Unable to open and parse %s' , state_file )
683+ finally :
684+ lsn_comitted = min (
685+ [get_bookmark (state_comitted , s ['tap_stream_id' ], 'lsn' ) for s in logical_streams ])
686+ if (lsn_currently_processing > lsn_comitted ) and (lsn_comitted > lsn_to_flush ):
687+ lsn_to_flush = lsn_comitted
688+ LOGGER .info ('Confirming write up to %s, flush to %s' ,
689+ int_to_lsn (lsn_to_flush ),
690+ int_to_lsn (lsn_to_flush ))
691+ cur .send_feedback (write_lsn = lsn_to_flush , flush_lsn = lsn_to_flush , reply = True , force = True )
692+
693+ poll_timestamp = datetime .datetime .utcnow ()
694+
695+ # Close replication connection and cursor
696+ cur .close ()
697+ conn .close ()
698+ finally :
699+ if lsn_last_processed :
700+ if lsn_comitted > lsn_last_processed :
701+ lsn_last_processed = lsn_comitted
702+ LOGGER .info ('Current lsn_last_processed %s is older than lsn_comitted %s' ,
703+ int_to_lsn (lsn_last_processed ),
704+ int_to_lsn (lsn_comitted ))
705+
706+ LOGGER .info ('Updating bookmarks for all streams to lsn = %s (%s)' ,
707+ lsn_last_processed ,
708+ int_to_lsn (lsn_last_processed ))
709+
710+ for s in logical_streams :
711+ state = singer .write_bookmark (state , s ['tap_stream_id' ], 'lsn' , lsn_last_processed )
712+
713+ singer .write_message (singer .StateMessage (value = copy .deepcopy (state )))
714+
713715 return state
0 commit comments