Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit f8b4440

Browse files
committed
Fixes #107
1 parent 280aaf0 commit f8b4440

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

tap_postgres/__init__.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def sync_traditional_stream(conn_config, stream, state, sync_method, end_lsn):
194194
return state
195195

196196

197-
def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_file):
197+
def sync_logical_streams(conn_config, logical_streams, traditional_streams, state, end_lsn, state_file):
198198
"""
199199
Sync streams that use LOG_BASED method
200200
"""
@@ -212,10 +212,20 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil
212212
selected_streams.add("{}".format(stream['tap_stream_id']))
213213

214214
new_state = dict(currently_syncing=state['currently_syncing'], bookmarks={})
215+
traditional_stream_ids = [s['tap_stream_id'] for s in traditional_streams]
215216

216217
for stream, bookmark in state['bookmarks'].items():
217-
if bookmark == {} or bookmark['last_replication_method'] != 'LOG_BASED' or stream in selected_streams:
218+
if (
219+
bookmark == {}
220+
or bookmark['last_replication_method'] != 'LOG_BASED'
221+
or stream in selected_streams
222+
# The first time a LOG_BASED stream runs it needs to do an
223+
# initial full table sync, and so will be treated as a
224+
# traditional stream.
225+
or (stream in traditional_stream_ids and bookmark['last_replication_method'] == 'LOG_BASED')
226+
):
218227
new_state['bookmarks'][stream] = bookmark
228+
219229
state = new_state
220230

221231
state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file)
@@ -319,7 +329,7 @@ def do_sync(conn_config, catalog, default_replication_method, state, state_file=
319329
for dbname, streams in itertools.groupby(logical_streams,
320330
lambda s: metadata.to_map(s['metadata']).get(()).get('database-name')):
321331
conn_config['dbname'] = dbname
322-
state = sync_logical_streams(conn_config, list(streams), state, end_lsn, state_file)
332+
state = sync_logical_streams(conn_config, list(streams), traditional_streams, state, end_lsn, state_file)
323333
return state
324334

325335

0 commit comments

Comments
 (0)