Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# finishing previously interrupted full-table (first stage of logical replication)
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

# inconsistent state
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
Expand All @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# initial full-table phase of logical replication
lookup[stream['tap_stream_id']] = 'logical_initial'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

else: # no xmin but we have an lsn
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication
Expand Down
14 changes: 10 additions & 4 deletions tests/test_full_table_interruption.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from unittest import mock
import tap_postgres
import tap_postgres.sync_strategies.full_table as full_table
import tap_postgres.sync_strategies.common as pg_common
Expand Down Expand Up @@ -62,7 +63,8 @@ def setUp(self):
global CAUGHT_MESSAGES
CAUGHT_MESSAGES.clear()

def test_catalog(self):
@mock.patch('tap_postgres.sync_logical_streams')
def test_catalog(self, mock_sync_logical_streams):
singer.write_message = singer_write_message_no_cow
pg_common.write_schema_message = singer_write_message_ok

Expand Down Expand Up @@ -90,7 +92,7 @@ def test_catalog(self):
insert_record(cur, 'COW', cow_rec)

conn.close()

blew_up_on_cow = False
state = {}
#the initial phase of cows logical replication will be a full table.
Expand All @@ -100,6 +102,8 @@ def test_catalog(self):
except Exception:
blew_up_on_cow = True

mock_sync_logical_streams.assert_not_called()

self.assertTrue(blew_up_on_cow)

self.assertEqual(7, len(CAUGHT_MESSAGES))
Expand Down Expand Up @@ -151,6 +155,8 @@ def test_catalog(self):
CAUGHT_MESSAGES.clear()
tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state)

mock_sync_logical_streams.assert_called_once()

self.assertEqual(8, len(CAUGHT_MESSAGES))

self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA')
Expand Down Expand Up @@ -238,7 +244,7 @@ def test_catalog(self):

conn = get_test_connection()
conn.autocommit = True

with conn.cursor() as cur:
cow_rec = {'name': 'betty', 'colour': 'blue'}
insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'})
Expand All @@ -256,7 +262,7 @@ def test_catalog(self):

state = {}
blew_up_on_cow = False

#this will sync the CHICKEN but then blow up on the COW
try:
tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state)
Expand Down