22import pytz
33import decimal
44import psycopg2
5- from psycopg2 import sql
65import copy
76import json
87import singer
98import singer .metadata as metadata
9+
10+ from psycopg2 import sql
1011from singer import utils , get_bookmark
1112from dateutil .parser import parse
1213from functools import reduce
1314
1415import tap_postgres .db as post_db
1516import tap_postgres .sync_strategies .common as sync_common
17+ from tap_postgres .stream_utils import refresh_streams_schema
1618
1719LOGGER = singer .get_logger ('tap_postgres' )
1820
2224class ReplicationSlotNotFoundError (Exception ):
2325 """Custom exception when replication slot not found"""
2426
27+ class UnsupportedPayloadKindError (Exception ):
28+ """Custom exception when waljson payload is not insert, update nor delete"""
29+
2530
2631# pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments
2732def get_pg_version (conn_info ):
@@ -98,9 +103,10 @@ def fetch_current_lsn(conn_config):
98103 return lsn_to_int (current_lsn )
99104
100105
101- def add_automatic_properties (stream , conn_config ):
106+ def add_automatic_properties (stream , debug_lsn : bool = False ):
102107 stream ['schema' ]['properties' ]['_sdc_deleted_at' ] = {'type' : ['null' , 'string' ], 'format' : 'date-time' }
103- if conn_config .get ('debug_lsn' ):
108+
109+ if debug_lsn :
104110 LOGGER .debug ('debug_lsn is ON' )
105111 stream ['schema' ]['properties' ]['_sdc_lsn' ] = {'type' : ['null' , 'string' ]}
106112 else :
@@ -295,7 +301,7 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map,
295301
296302
297303# pylint: disable=unused-argument,too-many-locals
298- def consume_message (streams , state , msg , time_extracted , conn_info , end_lsn ):
304+ def consume_message (streams , state , msg , time_extracted , conn_info ):
299305 # Strip leading comma generated by write-in-chunks and parse valid JSON
300306 try :
301307 payload = json .loads (msg .payload .lstrip (',' ))
@@ -304,56 +310,58 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
304310
305311 lsn = msg .data_start
306312
307- streams_lookup = {}
308- for s in streams :
309- streams_lookup [s ['tap_stream_id' ]] = s
313+ streams_lookup = {s ['tap_stream_id' ]: s for s in streams }
310314
311315 tap_stream_id = post_db .compute_tap_stream_id (payload ['schema' ], payload ['table' ])
312316 if streams_lookup .get (tap_stream_id ) is None :
313317 return state
314318
315319 target_stream = streams_lookup [tap_stream_id ]
320+
321+ if payload ['kind' ] not in {'insert' , 'update' , 'delete' }:
322+ raise UnsupportedPayloadKindError ("unrecognized replication operation: {}" .format (payload ['kind' ]))
323+
324+ # Get the additional fields in payload that are not in schema properties:
325+ # only inserts and updates have the list of columns that can be used to detect any different in columns
326+ diff = set ()
327+ if payload ['kind' ] in {'insert' , 'update' }:
328+ diff = set (payload ['columnnames' ]).difference (target_stream ['schema' ]['properties' ].keys ())
329+
330+ # if there is new columns in the payload that are not in the schema properties then refresh the stream schema
331+ if diff :
332+ LOGGER .info ('Detected new columns "%s", refreshing schema of stream %s' , diff , target_stream ['stream' ])
333+ # encountered a column that is not in the schema
334+ # refresh the stream schema and metadata by running discovery
335+ refresh_streams_schema (conn_info , [target_stream ])
336+
337+ # add the automatic properties back to the stream
338+ add_automatic_properties (target_stream , conn_info .get ('debug_lsn' , False ))
339+
340+ # publish new schema
341+ sync_common .send_schema_message (target_stream , ['lsn' ])
342+
316343 stream_version = get_stream_version (target_stream ['tap_stream_id' ], state )
317344 stream_md_map = metadata .to_map (target_stream ['metadata' ])
318345
319- desired_columns = [ c for c in target_stream ['schema' ]['properties' ].keys () if sync_common .should_sync_column (
320- stream_md_map , c )]
346+ desired_columns = { c for c in target_stream ['schema' ]['properties' ].keys () if sync_common .should_sync_column (
347+ stream_md_map , c )}
321348
322- if payload ['kind' ] == 'insert' :
349+ if payload ['kind' ] in { 'insert' , 'update' } :
323350 col_names = []
324351 col_vals = []
325- for idx , col in enumerate (payload ['columnnames' ]):
326- if col in set (desired_columns ):
327- col_names .append (col )
328- col_vals .append (payload ['columnvalues' ][idx ])
329352
330- col_names = col_names + ['_sdc_deleted_at' ]
331- col_vals = col_vals + [None ]
332- if conn_info .get ('debug_lsn' ):
333- col_names = col_names + ['_sdc_lsn' ]
334- col_vals = col_vals + [str (lsn )]
335- record_message = row_to_singer_message (target_stream ,
336- col_vals ,
337- stream_version ,
338- col_names ,
339- time_extracted ,
340- stream_md_map ,
341- conn_info )
342-
343- elif payload ['kind' ] == 'update' :
344- col_names = []
345- col_vals = []
346353 for idx , col in enumerate (payload ['columnnames' ]):
347- if col in set ( desired_columns ) :
354+ if col in desired_columns :
348355 col_names .append (col )
349356 col_vals .append (payload ['columnvalues' ][idx ])
350357
351- col_names = col_names + [ '_sdc_deleted_at' ]
352- col_vals = col_vals + [ None ]
358+ col_names . append ( '_sdc_deleted_at' )
359+ col_vals . append ( None )
353360
354361 if conn_info .get ('debug_lsn' ):
355- col_vals = col_vals + [str (lsn )]
356- col_names = col_names + ['_sdc_lsn' ]
362+ col_names .append ('_sdc_lsn' )
363+ col_vals .append (str (lsn ))
364+
357365 record_message = row_to_singer_message (target_stream ,
358366 col_vals ,
359367 stream_version ,
@@ -366,15 +374,17 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
366374 col_names = []
367375 col_vals = []
368376 for idx , col in enumerate (payload ['oldkeys' ]['keynames' ]):
369- if col in set ( desired_columns ) :
377+ if col in desired_columns :
370378 col_names .append (col )
371379 col_vals .append (payload ['oldkeys' ]['keyvalues' ][idx ])
372380
373- col_names = col_names + ['_sdc_deleted_at' ]
374- col_vals = col_vals + [singer .utils .strftime (time_extracted )]
381+ col_names .append ('_sdc_deleted_at' )
382+ col_vals .append (singer .utils .strftime (time_extracted ))
383+
375384 if conn_info .get ('debug_lsn' ):
376- col_vals = col_vals + [str (lsn )]
377- col_names = col_names + ['_sdc_lsn' ]
385+ col_names .append ('_sdc_lsn' )
386+ col_vals .append (str (lsn ))
387+
378388 record_message = row_to_singer_message (target_stream ,
379389 col_vals ,
380390 stream_version ,
@@ -383,9 +393,6 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
383393 stream_md_map ,
384394 conn_info )
385395
386- else :
387- raise Exception ("unrecognized replication operation: {}" .format (payload ['kind' ]))
388-
389396 singer .write_message (record_message )
390397 state = singer .write_bookmark (state , target_stream ['tap_stream_id' ], 'lsn' , lsn )
391398
@@ -544,7 +551,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
544551 LOGGER .info ('Breaking - reached max_run_seconds of %i' , max_run_seconds )
545552 break
546553
547- state = consume_message (logical_streams , state , msg , time_extracted , conn_info , end_lsn )
554+ state = consume_message (logical_streams , state , msg , time_extracted , conn_info )
548555
549556 # When using wal2json with write-in-chunks, multiple messages can have the same lsn
550557 # This is to ensure we only flush to lsn that has completed entirely
0 commit comments