Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit 7fc8fa1

Browse files
committed
Fixes #107
1 parent 280aaf0 commit 7fc8fa1

File tree

1 file changed

+14
-3
lines changed

1 file changed

+14
-3
lines changed

tap_postgres/__init__.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ def sync_traditional_stream(conn_config, stream, state, sync_method, end_lsn):
194194
return state
195195

196196

197-
def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_file):
197+
# pylint: disable=too-many-arguments
198+
def sync_logical_streams(conn_config, logical_streams, traditional_streams, state, end_lsn, state_file):
198199
"""
199200
Sync streams that use LOG_BASED method
200201
"""
@@ -212,10 +213,20 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil
212213
selected_streams.add("{}".format(stream['tap_stream_id']))
213214

214215
new_state = dict(currently_syncing=state['currently_syncing'], bookmarks={})
216+
traditional_stream_ids = [s['tap_stream_id'] for s in traditional_streams]
215217

216218
for stream, bookmark in state['bookmarks'].items():
217-
if bookmark == {} or bookmark['last_replication_method'] != 'LOG_BASED' or stream in selected_streams:
219+
if (
220+
bookmark == {}
221+
or bookmark['last_replication_method'] != 'LOG_BASED'
222+
or stream in selected_streams
223+
# The first time a LOG_BASED stream runs it needs to do an
224+
# initial full table sync, and so will be treated as a
225+
# traditional stream.
226+
or (stream in traditional_stream_ids and bookmark['last_replication_method'] == 'LOG_BASED')
227+
):
218228
new_state['bookmarks'][stream] = bookmark
229+
219230
state = new_state
220231

221232
state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file)
@@ -319,7 +330,7 @@ def do_sync(conn_config, catalog, default_replication_method, state, state_file=
319330
for dbname, streams in itertools.groupby(logical_streams,
320331
lambda s: metadata.to_map(s['metadata']).get(()).get('database-name')):
321332
conn_config['dbname'] = dbname
322-
state = sync_logical_streams(conn_config, list(streams), state, end_lsn, state_file)
333+
state = sync_logical_streams(conn_config, list(streams), traditional_streams, state, end_lsn, state_file)
323334
return state
324335

325336

0 commit comments

Comments
 (0)