Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# finishing previously interrupted full-table (first stage of logical replication)
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

# inconsistent state
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
Expand All @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# initial full-table phase of logical replication
lookup[stream['tap_stream_id']] = 'logical_initial'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

else: # no xmin but we have an lsn
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication
Expand Down
5 changes: 3 additions & 2 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def streams_to_wal2json_tables(streams):
The output is compatible with the 'filter-tables' and 'add-tables' option of wal2json plugin.

Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash.
Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as "public.Foo\ bar".
Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as "public.Foo\\ bar".
Documentation in wal2json plugin: https://github.com/eulerto/wal2json/blob/master/README.md#parameters

:param streams: List of singer stream dictionaries
Expand Down Expand Up @@ -702,7 +702,8 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
LOGGER.debug('Unable to open and parse %s', state_file)
finally:
lsn_comitted = min(
[get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
get_bookmark(state_comitted, s['tap_stream_id'], 'lsn', start_lsn) for s in logical_streams
)
if (lsn_currently_processing > lsn_comitted) and (lsn_comitted > lsn_to_flush):
lsn_to_flush = lsn_comitted
LOGGER.info('Confirming write up to %s, flush to %s',
Expand Down