11import unittest
22import unittest .mock
33import tap_postgres
4+ from tap_postgres .sync_strategies import logical_replication
45import tap_postgres .sync_strategies .full_table as full_table
56import tap_postgres .sync_strategies .common as pg_common
67import singer
@@ -46,10 +47,31 @@ def do_not_dump_catalog(catalog):
4647tap_postgres .dump_catalog = do_not_dump_catalog
4748full_table .UPDATE_BOOKMARK_PERIOD = 1
4849
49- @unittest .mock .patch ('tap_postgres.sync_logical_streams' )
50+
51+ @unittest .mock .patch ('tap_postgres.sync_logical_streams' , wraps = tap_postgres .sync_logical_streams )
5052class LogicalInterruption (unittest .TestCase ):
5153 maxDiff = None
5254
55+ @classmethod
56+ def setUpClass (cls ):
57+ conn_config = get_test_connection_config ()
58+ slot_name = logical_replication .generate_replication_slot_name (
59+ dbname = conn_config ['dbname' ], tap_id = conn_config ['tap_id' ]
60+ )
61+ with get_test_connection () as conn :
62+ with conn .cursor () as cur :
63+ cur .execute (f"SELECT * FROM pg_create_logical_replication_slot('{ slot_name } ', 'wal2json')" )
64+
65+ @classmethod
66+ def tearDownClass (cls ):
67+ conn_config = get_test_connection_config ()
68+ slot_name = logical_replication .generate_replication_slot_name (
69+ dbname = conn_config ['dbname' ], tap_id = conn_config ['tap_id' ]
70+ )
71+ with get_test_connection () as conn :
72+ with conn .cursor () as cur :
73+ cur .execute (f"SELECT * FROM pg_drop_replication_slot('{ slot_name } ')" )
74+
5375 def setUp (self ):
5476 table_spec_1 = {"columns" : [{"name" : "id" , "type" : "serial" , "primary_key" : True },
5577 {"name" : 'name' , "type" : "character varying" },
@@ -70,6 +92,7 @@ def test_catalog(self, mock_sync_logical_streams):
7092
7193 conn_config = get_test_connection_config ()
7294 streams = tap_postgres .do_discovery (conn_config )
95+
7396 cow_stream = [s for s in streams if s ['table_name' ] == 'COW' ][0 ]
7497 self .assertIsNotNone (cow_stream )
7598 cow_stream = select_all_of_stream (cow_stream )
@@ -157,7 +180,7 @@ def test_catalog(self, mock_sync_logical_streams):
157180
158181 mock_sync_logical_streams .assert_called_once ()
159182
160- self .assertEqual (8 , len (CAUGHT_MESSAGES ))
183+ self .assertEqual (10 , len (CAUGHT_MESSAGES ))
161184
162185 self .assertEqual (CAUGHT_MESSAGES [0 ]['type' ], 'SCHEMA' )
163186
@@ -206,8 +229,16 @@ def test_catalog(self, mock_sync_logical_streams):
206229 self .assertEqual (CAUGHT_MESSAGES [7 ].value ['bookmarks' ]['public-COW' ].get ('lsn' ), end_lsn )
207230 self .assertEqual (CAUGHT_MESSAGES [7 ].value ['bookmarks' ]['public-COW' ].get ('version' ), new_version )
208231
232+ assert CAUGHT_MESSAGES [8 ]['type' ] == 'SCHEMA'
233+
234+ assert isinstance (CAUGHT_MESSAGES [9 ], singer .messages .StateMessage )
235+ assert CAUGHT_MESSAGES [9 ].value ['bookmarks' ]['public-COW' ].get ('xmin' ) is None
236+ assert CAUGHT_MESSAGES [9 ].value ['bookmarks' ]['public-COW' ].get ('lsn' ) == end_lsn
237+ assert CAUGHT_MESSAGES [9 ].value ['bookmarks' ]['public-COW' ].get ('version' ) == new_version
238+
209239class FullTableInterruption (unittest .TestCase ):
210240 maxDiff = None
241+
211242 def setUp (self ):
212243 table_spec_1 = {"columns" : [{"name" : "id" , "type" : "serial" , "primary_key" : True },
213244 {"name" : 'name' , "type" : "character varying" },
0 commit comments