Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit 45a9a9d

Browse files
committed
Perform logical replication after initial sync
1 parent 6dc8394 commit 45a9a9d

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

tap_postgres/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
131131
# finishing previously interrupted full-table (first stage of logical replication)
132132
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
133133
traditional_steams.append(stream)
134+
# do any required logical replication after inital sync is complete
135+
logical_streams.append(stream)
134136

135137
# inconsistent state
136138
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
@@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
142144
# initial full-table phase of logical replication
143145
lookup[stream['tap_stream_id']] = 'logical_initial'
144146
traditional_steams.append(stream)
147+
# do any required logical replication after inital sync is complete
148+
logical_streams.append(stream)
145149

146150
else: # no xmin but we have an lsn
147151
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication

tests/test_full_table_interruption.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from unittest import mock
23
import tap_postgres
34
import tap_postgres.sync_strategies.full_table as full_table
45
import tap_postgres.sync_strategies.common as pg_common
@@ -62,7 +63,8 @@ def setUp(self):
6263
global CAUGHT_MESSAGES
6364
CAUGHT_MESSAGES.clear()
6465

65-
def test_catalog(self):
66+
@mock.patch('tap_postgres.sync_logical_streams')
67+
def test_catalog(self, mock_sync_logical_streams):
6668
singer.write_message = singer_write_message_no_cow
6769
pg_common.write_schema_message = singer_write_message_ok
6870

@@ -90,7 +92,7 @@ def test_catalog(self):
9092
insert_record(cur, 'COW', cow_rec)
9193

9294
conn.close()
93-
95+
9496
blew_up_on_cow = False
9597
state = {}
9698
#the initial phase of cows logical replication will be a full table.
@@ -100,6 +102,8 @@ def test_catalog(self):
100102
except Exception:
101103
blew_up_on_cow = True
102104

105+
mock_sync_logical_streams.assert_not_called()
106+
103107
self.assertTrue(blew_up_on_cow)
104108

105109
self.assertEqual(7, len(CAUGHT_MESSAGES))
@@ -151,6 +155,8 @@ def test_catalog(self):
151155
CAUGHT_MESSAGES.clear()
152156
tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state)
153157

158+
mock_sync_logical_streams.assert_called_once()
159+
154160
self.assertEqual(8, len(CAUGHT_MESSAGES))
155161

156162
self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA')
@@ -238,7 +244,7 @@ def test_catalog(self):
238244

239245
conn = get_test_connection()
240246
conn.autocommit = True
241-
247+
242248
with conn.cursor() as cur:
243249
cow_rec = {'name': 'betty', 'colour': 'blue'}
244250
insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'})
@@ -256,7 +262,7 @@ def test_catalog(self):
256262

257263
state = {}
258264
blew_up_on_cow = False
259-
265+
260266
#this will sync the CHICKEN but then blow up on the COW
261267
try:
262268
tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state)

0 commit comments

Comments
 (0)