diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index cc254c9a..09b2b5ce 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,9 +35,11 @@ jobs: - name: Tests env: + TAP_POSTGRES_HOST: localhost TAP_POSTGRES_PORT: 5432 TAP_POSTGRES_USER: test_user TAP_POSTGRES_PASSWORD: my-secret-passwd - TAP_POSTGRES_HOST: localhost + TAP_POSTGRES_SECONDARY_HOST: localhost + TAP_POSTGRES_SECONDARY_PORT: 5433 LOGGING_CONF_FILE: ./sample_logging.conf run: make test diff --git a/Makefile b/Makefile index 24f00e25..bac92cd1 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ pylint: pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/ start_db: - docker-compose up -d --build db + docker-compose up -d test: . ./venv/bin/activate ;\ diff --git a/README.md b/README.md index a8d5d945..9a787307 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,9 @@ Full list of options in `config.json`: | tap_id | String | No | ID of the pipeline/tap (Default: None) | | itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) | | default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) | +| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | +| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | +| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | ### Run the tap in Discovery Mode @@ -142,7 +145,7 @@ to the tap for the next sync. ``` Restart your PostgreSQL service to ensure the changes take effect. - + **Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance. @@ -151,11 +154,11 @@ to the tap for the next sync. In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database. - + Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot: ``` SELECT * - FROM pg_create_logical_replication_slot('pipelinewise_', 'wal2json'); + FROM pg_create_logical_replication_slot('pipelinewise_', 'wal2json'); ``` **Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple @@ -172,6 +175,8 @@ to the tap for the next sync. ``` export TAP_POSTGRES_HOST= export TAP_POSTGRES_PORT= + export TAP_POSTGRES_SECONDARY_HOST= + export TAP_POSTGRES_SECONDARY_PORT= export TAP_POSTGRES_USER= export TAP_POSTGRES_PASSWORD= ``` diff --git a/docker-compose.yml b/docker-compose.yml index b15b2649..c416b85b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,13 +1,31 @@ version: "3.3" services: - db: - image: "debezium/postgres:12-alpine" - container_name: "" + db_primary: + image: "docker.io/bitnami/postgresql:12" + container_name: "primary" ports: - "5432:5432" environment: + - POSTGRESQL_REPLICATION_MODE=master + - POSTGRESQL_REPLICATION_USER=repl_user + - POSTGRESQL_REPLICATION_PASSWORD=repl_password - POSTGRES_USER=test_user - POSTGRES_PASSWORD=my-secret-passwd - - POSTGRES_DB=tap_postgres_test - command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5" + - POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd + - POSTGRESQL_DATABASE=tap_postgres_test + - ALLOW_EMPTY_PASSWORD=yes + db_replica: + image: "docker.io/bitnami/postgresql:12" + container_name: replica + ports: + - "5433:5432" + depends_on: + - db_primary + environment: + - POSTGRESQL_REPLICATION_MODE=slave + - POSTGRESQL_REPLICATION_USER=repl_user + - POSTGRESQL_REPLICATION_PASSWORD=repl_password + - POSTGRESQL_MASTER_HOST=db_primary + - POSTGRESQL_MASTER_PORT_NUMBER=5432 + - ALLOW_EMPTY_PASSWORD=yes diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 05bd565d..2cafea11 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication @@ -405,9 +409,22 @@ def main_impl(): 'debug_lsn': args.config.get('debug_lsn') == 'true', 'max_run_seconds': args.config.get('max_run_seconds', 43200), 'break_at_end_lsn': args.config.get('break_at_end_lsn', True), - 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)) + 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)), + 'use_secondary': args.config.get('use_secondary', False), } + if conn_config['use_secondary']: + try: + conn_config.update({ + # Host and Port are mandatory. + 'secondary_host': args.config['secondary_host'], + 'secondary_port': args.config['secondary_port'], + }) + except KeyError as exc: + raise ValueError( + "When 'use_secondary' enabled 'secondary_host' and 'secondary_port' must be defined." + ) from exc + if args.config.get('ssl') == 'true': conn_config['sslmode'] = 'require' diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 41ab82de..1b39c974 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -36,7 +36,7 @@ def fully_qualified_table_name(schema, table): return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"' -def open_connection(conn_config, logical_replication=False): +def open_connection(conn_config, logical_replication=False, prioritize_primary=False): cfg = { 'application_name': 'pipelinewise', 'host': conn_config['host'], @@ -47,6 +47,14 @@ def open_connection(conn_config, logical_replication=False): 'connect_timeout': 30 } + if conn_config.get('use_secondary', False) and not prioritize_primary and not logical_replication: + # Try to use replica but fallback to primary if keys are missing. This is the same behavior as + # https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129 + cfg.update({ + 'host': conn_config.get("secondary_host", conn_config['host']), + 'port': conn_config.get("secondary_port", conn_config['port']), + }) + if conn_config.get('sslmode'): cfg['sslmode'] = conn_config['sslmode'] diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 904fd801..eb690956 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -34,7 +34,7 @@ class UnsupportedPayloadKindError(Exception): # pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments def get_pg_version(conn_info): - with post_db.open_connection(conn_info, False) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'") version = cur.fetchone()[0] @@ -93,7 +93,7 @@ def fetch_current_lsn(conn_config): if version < 90400: raise Exception('Logical replication not supported before PostgreSQL 9.4') - with post_db.open_connection(conn_config, False) as conn: + with post_db.open_connection(conn_config, False, True) as conn: with conn.cursor() as cur: # Use version specific lsn command if version >= 100000: @@ -138,7 +138,7 @@ def create_hstore_elem_query(elem): def create_hstore_elem(conn_info, elem): - with post_db.open_connection(conn_info) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: query = create_hstore_elem_query(elem) cur.execute(query) @@ -151,7 +151,7 @@ def create_array_elem(elem, sql_datatype, conn_info): if elem is None: return None - with post_db.open_connection(conn_info) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: if sql_datatype == 'bit[]': cast_datatype = 'boolean[]' @@ -517,7 +517,7 @@ def locate_replication_slot_by_cur(cursor, dbname, tap_id=None): def locate_replication_slot(conn_info): - with post_db.open_connection(conn_info, False) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: return locate_replication_slot_by_cur(cur, conn_info['dbname'], conn_info['tap_id']) @@ -576,7 +576,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): version = get_pg_version(conn_info) # Create replication connection and cursor - conn = post_db.open_connection(conn_info, True) + conn = post_db.open_connection(conn_info, True, True) cur = conn.cursor() # Set session wal_sender_timeout for PG12 and above diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 1dd48aaa..bb9eb35a 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -331,7 +331,7 @@ def setUp(self): table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True }, {"name" : 'our_hstore', "type" : "hstore" }], "name" : TestHStoreTable.table_name} - with get_test_connection() as conn: + with get_test_connection(superuser=True) as conn: cur = conn.cursor() cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """) if cur.fetchone()[0] is None: @@ -536,7 +536,7 @@ class TestColumnGrants(unittest.TestCase): table_name = 'CHICKEN TIMES' user = 'tmp_user_for_grant_tests' password = 'password' - + def setUp(self): table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, {"name" : 'size integer', "type" : "integer", "quoted" : True}, @@ -545,7 +545,7 @@ def setUp(self): "name" : TestColumnGrants.table_name} ensure_test_table(table_spec) - with get_test_connection() as conn: + with get_test_connection(superuser=True) as conn: cur = conn.cursor() sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password) @@ -560,8 +560,8 @@ def setUp(self): LOGGER.info("running sql: {}".format(sql)) cur.execute(sql) - - + + def test_catalog(self): conn_config = get_test_connection_config() @@ -587,7 +587,7 @@ def test_catalog(self): ('properties', 'id'): {'inclusion': 'available', 'selected-by-default': True, 'sql-datatype': 'integer'}}) - + self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS, 'type': 'object', 'properties': {'id': {'type': ['null', 'integer'], diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 34ed5371..c58d286b 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,4 +1,6 @@ -import unittest +import psycopg2 +import unittest.mock +import pytest import tap_postgres import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common @@ -45,10 +47,12 @@ def do_not_dump_catalog(catalog): tap_postgres.dump_catalog = do_not_dump_catalog full_table.UPDATE_BOOKMARK_PERIOD = 1 -class LogicalInterruption(unittest.TestCase): +@pytest.mark.parametrize('use_secondary', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestLogicalInterruption: maxDiff = None - def setUp(self): + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, {"name" : 'colour', "type": "character varying"}, @@ -62,14 +66,29 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + @unittest.mock.patch('tap_postgres.sync_logical_streams') + def test_catalog(self, mock_connect, use_secondary, mock_sync_logical_streams): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config() + conn_config = get_test_connection_config(use_secondary=use_secondary) streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] - self.assertIsNotNone(cow_stream) + assert cow_stream is not None cow_stream = select_all_of_stream(cow_stream) cow_stream = set_replication_method_for_stream(cow_stream, 'LOG_BASED') @@ -90,56 +109,61 @@ def test_catalog(self): insert_record(cur, 'COW', cow_rec) conn.close() - + blew_up_on_cow = False state = {} #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record try: - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception: blew_up_on_cow = True - self.assertTrue(blew_up_on_cow) + assert blew_up_on_cow is True + + mock_sync_logical_streams.assert_not_called() - self.assertEqual(7, len(CAUGHT_MESSAGES)) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin')) - self.assertIsNotNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn')) + assert 7 == len(CAUGHT_MESSAGES) + + assert CAUGHT_MESSAGES[0]['type'] =='SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') is not None end_lsn = CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) new_version = CAUGHT_MESSAGES[2].version - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[3].record, { + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert CAUGHT_MESSAGES[3].record == { 'colour': 'blue', 'id': 1, 'name': 'betty', 'timestamp_ntz': '2020-09-01T10:40:59+00:00', 'timestamp_tz': '2020-08-31T22:50:59+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[3].stream) + assert 'public-COW' == CAUGHT_MESSAGES[3].stream - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'], end_lsn) + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin'] is not None + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'] == end_lsn - self.assertEqual(CAUGHT_MESSAGES[5].record, { + assert CAUGHT_MESSAGES[5].record == { 'colour': 'brow', 'id': 2, 'name': 'smelly', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[5].stream) + assert 'public-COW' == CAUGHT_MESSAGES[5].stream - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) last_xmin = CAUGHT_MESSAGES[6].value['bookmarks']['public-COW']['xmin'] old_state = CAUGHT_MESSAGES[6].value @@ -149,60 +173,67 @@ def test_catalog(self): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) + + mock_sync_logical_streams.assert_called_once() - self.assertEqual(8, len(CAUGHT_MESSAGES)) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') + assert 8 == len(CAUGHT_MESSAGES) - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin'), last_xmin) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version'), new_version) + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[2].record, { + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') == last_xmin + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version') == new_version + + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record == { 'colour': 'brow', 'id': 2, 'name': 'smelly', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[2].stream) + assert 'public-COW' == CAUGHT_MESSAGES[2].stream - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.StateMessage) - self.assertTrue(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin) - self.assertEqual(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version') == new_version - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[4].record, { + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record == { 'colour': 'green', 'id': 3, 'name': 'pooper', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) - self.assertEqual('public-COW', CAUGHT_MESSAGES[4].stream) + } + assert 'public-COW' == CAUGHT_MESSAGES[4].stream - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.StateMessage) - self.assertTrue(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin) - self.assertEqual(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version') == new_version - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[6].version, new_version) + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == new_version - self.assertIsInstance(CAUGHT_MESSAGES[7], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin')) - self.assertEqual(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version -class FullTableInterruption(unittest.TestCase): +@pytest.mark.parametrize('use_secondary', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestFullTableInterruption: maxDiff = None - def setUp(self): + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, {"name" : 'colour', "type": "character varying"}], @@ -220,25 +251,39 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + def test_catalog(self, mock_connect, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config() + conn_config = get_test_connection_config(use_secondary=use_secondary) streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] - self.assertIsNotNone(cow_stream) + assert cow_stream is not None cow_stream = select_all_of_stream(cow_stream) cow_stream = set_replication_method_for_stream(cow_stream, 'FULL_TABLE') chicken_stream = [s for s in streams if s['table_name'] == 'CHICKEN'][0] - self.assertIsNotNone(chicken_stream) + assert chicken_stream is not None chicken_stream = select_all_of_stream(chicken_stream) chicken_stream = set_replication_method_for_stream(chicken_stream, 'FULL_TABLE') conn = get_test_connection() conn.autocommit = True - + with conn.cursor() as cur: cow_rec = {'name': 'betty', 'colour': 'blue'} insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'}) @@ -256,64 +301,65 @@ def test_catalog(self): state = {} blew_up_on_cow = False - + #this will sync the CHICKEN but then blow up on the COW try: - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception as ex: # LOGGER.exception(ex) blew_up_on_cow = True - self.assertTrue(blew_up_on_cow) - + assert blew_up_on_cow + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(14, len(CAUGHT_MESSAGES)) + assert 14 == len(CAUGHT_MESSAGES) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin')) + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin') is None - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) new_version = CAUGHT_MESSAGES[2].version - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.RecordMessage) - self.assertEqual('public-CHICKEN', CAUGHT_MESSAGES[3].stream) + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert 'public-CHICKEN' == CAUGHT_MESSAGES[3].stream - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin']) + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin'] is not None - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[5].version, new_version) + assert isinstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[5].version == new_version - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.StateMessage) - self.assertEqual(None, singer.get_currently_syncing( CAUGHT_MESSAGES[6].value)) + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert None == singer.get_currently_syncing( CAUGHT_MESSAGES[6].value) #xmin is cleared at the end of the full table replication - self.assertIsNone(CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin']) + assert CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin'] is None #cow messages - self.assertEqual(CAUGHT_MESSAGES[7]['type'], 'SCHEMA') + assert CAUGHT_MESSAGES[7]['type'] == 'SCHEMA' - self.assertEqual("public-COW", CAUGHT_MESSAGES[7]['stream']) - self.assertIsInstance(CAUGHT_MESSAGES[8], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin')) - self.assertEqual("public-COW", CAUGHT_MESSAGES[8].value['currently_syncing']) + assert "public-COW" == CAUGHT_MESSAGES[7]['stream'] + assert isinstance(CAUGHT_MESSAGES[8], singer.StateMessage) + assert CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin') is None + assert "public-COW" == CAUGHT_MESSAGES[8].value['currently_syncing'] - self.assertIsInstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) cow_version = CAUGHT_MESSAGES[9].version - self.assertIsInstance(CAUGHT_MESSAGES[10], singer.RecordMessage) + assert isinstance(CAUGHT_MESSAGES[10], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[10].record['name'], 'betty') - self.assertEqual('public-COW', CAUGHT_MESSAGES[10].stream) + assert CAUGHT_MESSAGES[10].record['name'] == 'betty' + assert 'public-COW' == CAUGHT_MESSAGES[10].stream - self.assertIsInstance(CAUGHT_MESSAGES[11], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[11], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin']) + assert CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin'] is not None - self.assertEqual(CAUGHT_MESSAGES[12].record['name'], 'smelly') - self.assertEqual('public-COW', CAUGHT_MESSAGES[12].stream) + assert CAUGHT_MESSAGES[12].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[12].stream old_state = CAUGHT_MESSAGES[13].value #run another do_sync @@ -322,47 +368,44 @@ def test_catalog(self): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() + + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) # because we were interrupted, we do not switch versions - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'], cow_version) - self.assertIsNotNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin']) - self.assertEqual("public-COW", singer.get_currently_syncing(CAUGHT_MESSAGES[1].value)) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'] == cow_version + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin'] is not None + assert "public-COW" == singer.get_currently_syncing(CAUGHT_MESSAGES[1].value) - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[2].record['name'], 'smelly') - self.assertEqual('public-COW', CAUGHT_MESSAGES[2].stream) + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[2].stream #after record: activate version, state with no xmin or currently syncing - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) #we still have an xmin for COW because are not yet done with the COW table - self.assertIsNotNone(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(singer.get_currently_syncing( CAUGHT_MESSAGES[3].value), 'public-COW') + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[3].value) == 'public-COW' - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[4].record['name'], 'pooper') - self.assertEqual('public-COW', CAUGHT_MESSAGES[4].stream) + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record['name'] == 'pooper' + assert 'public-COW' == CAUGHT_MESSAGES[4].stream - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.StateMessage) - self.assertIsNotNone(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(singer.get_currently_syncing( CAUGHT_MESSAGES[5].value), 'public-COW') + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[5].value) == 'public-COW' #xmin is cleared because we are finished the full table replication - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[6].version, cow_version) - - self.assertIsInstance(CAUGHT_MESSAGES[7], singer.StateMessage) - self.assertIsNone(singer.get_currently_syncing( CAUGHT_MESSAGES[7].value)) - self.assertIsNone(CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin']) - self.assertIsNone(singer.get_currently_syncing( CAUGHT_MESSAGES[7].value)) - + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == cow_version -if __name__== "__main__": - test1 = LogicalInterruption() - test1.setUp() - test1.test_catalog() + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin'] is None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None diff --git a/tests/utils.py b/tests/utils.py index 881cfd3c..ee678b85 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ LOGGER = get_logger() -def get_test_connection_config(target_db='postgres'): +def get_test_connection_config(target_db='postgres', use_secondary=False): missing_envs = [x for x in [os.getenv('TAP_POSTGRES_HOST'), os.getenv('TAP_POSTGRES_USER'), os.getenv('TAP_POSTGRES_PASSWORD'), @@ -24,13 +24,29 @@ def get_test_connection_config(target_db='postgres'): 'port': os.environ.get('TAP_POSTGRES_PORT'), 'dbname': target_db} + if use_secondary: + missing_envs = [x for x in [os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + os.getenv('TAP_POSTGRES_SECONDARY_PORT')] if x == None] + + if len(missing_envs) != 0: + raise Exception( + "set TAP_POSTGRES_SECONDARY_HOST, TAP_POSTGRES_SECONDARY_PORT" + ) + + conn_config.update({ + 'use_secondary': use_secondary, + 'secondary_host': os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + 'secondary_port': os.getenv('TAP_POSTGRES_SECONDARY_PORT'), + }) + return conn_config -def get_test_connection(target_db='postgres'): +def get_test_connection(target_db='postgres', superuser=False): conn_config = get_test_connection_config(target_db) + user = 'postgres' if superuser else conn_config['user'] conn_string = "host='{}' dbname='{}' user='{}' password='{}' port='{}'".format(conn_config['host'], conn_config['dbname'], - conn_config['user'], + user, conn_config['password'], conn_config['port']) LOGGER.info("connecting to {}".format(conn_config['host']))