From 45a9a9d01fad8d9446e10bc66274d0d2bcb109d3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 11 Jan 2022 15:10:03 +0000 Subject: [PATCH] Perform logical replication after initial sync --- tap_postgres/__init__.py | 4 ++++ tests/test_full_table_interruption.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 05bd565d..2039f9f1 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 34ed5371..60c8e409 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,4 +1,5 @@ import unittest +from unittest import mock import tap_postgres import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common @@ -62,7 +63,8 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + @mock.patch('tap_postgres.sync_logical_streams') + def test_catalog(self, mock_sync_logical_streams): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok @@ -90,7 +92,7 @@ def test_catalog(self): insert_record(cur, 'COW', cow_rec) conn.close() - + blew_up_on_cow = False state = {} #the initial phase of cows logical replication will be a full table. @@ -100,6 +102,8 @@ def test_catalog(self): except Exception: blew_up_on_cow = True + mock_sync_logical_streams.assert_not_called() + self.assertTrue(blew_up_on_cow) self.assertEqual(7, len(CAUGHT_MESSAGES)) @@ -151,6 +155,8 @@ def test_catalog(self): CAUGHT_MESSAGES.clear() tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + mock_sync_logical_streams.assert_called_once() + self.assertEqual(8, len(CAUGHT_MESSAGES)) self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') @@ -238,7 +244,7 @@ def test_catalog(self): conn = get_test_connection() conn.autocommit = True - + with conn.cursor() as cur: cow_rec = {'name': 'betty', 'colour': 'blue'} insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'}) @@ -256,7 +262,7 @@ def test_catalog(self): state = {} blew_up_on_cow = False - + #this will sync the CHICKEN but then blow up on the COW try: tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state)