From 45a9a9d01fad8d9446e10bc66274d0d2bcb109d3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 11 Jan 2022 15:10:03 +0000 Subject: [PATCH 1/6] Perform logical replication after initial sync --- tap_postgres/__init__.py | 4 ++++ tests/test_full_table_interruption.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 05bd565d..2039f9f1 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 34ed5371..60c8e409 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,4 +1,5 @@ import unittest +from unittest import mock import tap_postgres import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common @@ -62,7 +63,8 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + @mock.patch('tap_postgres.sync_logical_streams') + def test_catalog(self, mock_sync_logical_streams): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok @@ -90,7 +92,7 @@ def test_catalog(self): insert_record(cur, 'COW', cow_rec) conn.close() - + blew_up_on_cow = False state = {} #the initial phase of cows logical replication will be a full table. @@ -100,6 +102,8 @@ def test_catalog(self): except Exception: blew_up_on_cow = True + mock_sync_logical_streams.assert_not_called() + self.assertTrue(blew_up_on_cow) self.assertEqual(7, len(CAUGHT_MESSAGES)) @@ -151,6 +155,8 @@ def test_catalog(self): CAUGHT_MESSAGES.clear() tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + mock_sync_logical_streams.assert_called_once() + self.assertEqual(8, len(CAUGHT_MESSAGES)) self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') @@ -238,7 +244,7 @@ def test_catalog(self): conn = get_test_connection() conn.autocommit = True - + with conn.cursor() as cur: cow_rec = {'name': 'betty', 'colour': 'blue'} insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'}) @@ -256,7 +262,7 @@ def test_catalog(self): state = {} blew_up_on_cow = False - + #this will sync the CHICKEN but then blow up on the COW try: tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) From aa2ddd9f7d3f1ddc8d69fa23ca9a8457e1247928 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 11 Jan 2022 16:37:17 +0000 Subject: [PATCH 2/6] Add `use_replica` config flag --- README.md | 11 ++++++++--- tap_postgres/__init__.py | 18 +++++++++++++++++- tap_postgres/db.py | 12 +++++++++++- .../sync_strategies/logical_replication.py | 12 ++++++------ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index a8d5d945..6895a2a1 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,11 @@ Full list of options in `config.json`: | tap_id | String | No | ID of the pipeline/tap (Default: None) | | itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) | | default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) | +| use_replica | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | +| replica_host | String | No | PostgreSQL Replica host (required if `use_replica` is `True`) | +| replica_port | Integer | No | PostgreSQL Replica port (required if `use_replica` is `True`) | +| replica_user | String | No | PostgreSQL Replica user (Default: same as `user`) | +| replica_password | String | No | PostgreSQL Replica password (Default: same as `password`) | ### Run the tap in Discovery Mode @@ -142,7 +147,7 @@ to the tap for the next sync. ``` Restart your PostgreSQL service to ensure the changes take effect. - + **Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance. @@ -151,11 +156,11 @@ to the tap for the next sync. In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database. - + Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot: ``` SELECT * - FROM pg_create_logical_replication_slot('pipelinewise_', 'wal2json'); + FROM pg_create_logical_replication_slot('pipelinewise_', 'wal2json'); ``` **Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 05bd565d..fd7258a9 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -405,9 +405,25 @@ def main_impl(): 'debug_lsn': args.config.get('debug_lsn') == 'true', 'max_run_seconds': args.config.get('max_run_seconds', 43200), 'break_at_end_lsn': args.config.get('break_at_end_lsn', True), - 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)) + 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)), + 'use_replica': args.config.get('use_replica', False), } + if conn_config['use_replica']: + try: + conn_config.update({ + # Host and Port are mandatory. + 'replica_host': args.config["replica_host"], + 'replica_port': args.config["replica_port"], + # User and Password default to the same as on the Primary. + 'replica_user': args.config.get("replica_user", conn_config['user']), + 'replica_password': args.config.get("replica_password", conn_config['password']), + }) + except KeyError as exc: + raise ValueError( + "When 'use_replica' enabled 'replica_host' and 'replical_port' must be defined." + ) from exc + if args.config.get('ssl') == 'true': conn_config['sslmode'] = 'require' diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 41ab82de..56978f9a 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -36,7 +36,7 @@ def fully_qualified_table_name(schema, table): return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"' -def open_connection(conn_config, logical_replication=False): +def open_connection(conn_config, logical_replication=False, prioritize_primary=False): cfg = { 'application_name': 'pipelinewise', 'host': conn_config['host'], @@ -47,6 +47,16 @@ def open_connection(conn_config, logical_replication=False): 'connect_timeout': 30 } + if conn_config.get('use_replica', False) and not prioritize_primary and not logical_replication: + # Try to use replica but fallback to primary if keys are missing. This is the same behavior as + # https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129 + cfg.update({ + 'host': conn_config.get("replica_host", conn_config['host']), + 'port': conn_config.get("replica_port", conn_config['port']), + 'user': conn_config.get("replica_user", conn_config['user']), + 'password': conn_config.get("replica_password", conn_config['password']), + }) + if conn_config.get('sslmode'): cfg['sslmode'] = conn_config['sslmode'] diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 904fd801..eb690956 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -34,7 +34,7 @@ class UnsupportedPayloadKindError(Exception): # pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments def get_pg_version(conn_info): - with post_db.open_connection(conn_info, False) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'") version = cur.fetchone()[0] @@ -93,7 +93,7 @@ def fetch_current_lsn(conn_config): if version < 90400: raise Exception('Logical replication not supported before PostgreSQL 9.4') - with post_db.open_connection(conn_config, False) as conn: + with post_db.open_connection(conn_config, False, True) as conn: with conn.cursor() as cur: # Use version specific lsn command if version >= 100000: @@ -138,7 +138,7 @@ def create_hstore_elem_query(elem): def create_hstore_elem(conn_info, elem): - with post_db.open_connection(conn_info) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: query = create_hstore_elem_query(elem) cur.execute(query) @@ -151,7 +151,7 @@ def create_array_elem(elem, sql_datatype, conn_info): if elem is None: return None - with post_db.open_connection(conn_info) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: if sql_datatype == 'bit[]': cast_datatype = 'boolean[]' @@ -517,7 +517,7 @@ def locate_replication_slot_by_cur(cursor, dbname, tap_id=None): def locate_replication_slot(conn_info): - with post_db.open_connection(conn_info, False) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: return locate_replication_slot_by_cur(cur, conn_info['dbname'], conn_info['tap_id']) @@ -576,7 +576,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): version = get_pg_version(conn_info) # Create replication connection and cursor - conn = post_db.open_connection(conn_info, True) + conn = post_db.open_connection(conn_info, True, True) cur = conn.cursor() # Set session wal_sender_timeout for PG12 and above From 2383d7a4de46bf91ae89d3db61e29cdc9026b84f Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:45:58 +0000 Subject: [PATCH 3/6] Add tests for `use_replica` option --- .github/workflows/ci.yaml | 4 +- Makefile | 2 +- docker-compose.yml | 28 ++- tests/test_discovery.py | 12 +- tests/test_full_table_interruption.py | 285 +++++++++++++++----------- tests/utils.py | 22 +- 6 files changed, 214 insertions(+), 139 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index cc254c9a..12cfa741 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,9 +35,11 @@ jobs: - name: Tests env: + TAP_POSTGRES_HOST: localhost TAP_POSTGRES_PORT: 5432 TAP_POSTGRES_USER: test_user TAP_POSTGRES_PASSWORD: my-secret-passwd - TAP_POSTGRES_HOST: localhost + TAP_POSTGRES_REPLICA_HOST: localhost + TAP_POSTGRES_REPLICA_PORT: 5433 LOGGING_CONF_FILE: ./sample_logging.conf run: make test diff --git a/Makefile b/Makefile index 24f00e25..bac92cd1 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ pylint: pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/ start_db: - docker-compose up -d --build db + docker-compose up -d test: . ./venv/bin/activate ;\ diff --git a/docker-compose.yml b/docker-compose.yml index b15b2649..c416b85b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,13 +1,31 @@ version: "3.3" services: - db: - image: "debezium/postgres:12-alpine" - container_name: "" + db_primary: + image: "docker.io/bitnami/postgresql:12" + container_name: "primary" ports: - "5432:5432" environment: + - POSTGRESQL_REPLICATION_MODE=master + - POSTGRESQL_REPLICATION_USER=repl_user + - POSTGRESQL_REPLICATION_PASSWORD=repl_password - POSTGRES_USER=test_user - POSTGRES_PASSWORD=my-secret-passwd - - POSTGRES_DB=tap_postgres_test - command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5" + - POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd + - POSTGRESQL_DATABASE=tap_postgres_test + - ALLOW_EMPTY_PASSWORD=yes + db_replica: + image: "docker.io/bitnami/postgresql:12" + container_name: replica + ports: + - "5433:5432" + depends_on: + - db_primary + environment: + - POSTGRESQL_REPLICATION_MODE=slave + - POSTGRESQL_REPLICATION_USER=repl_user + - POSTGRESQL_REPLICATION_PASSWORD=repl_password + - POSTGRESQL_MASTER_HOST=db_primary + - POSTGRESQL_MASTER_PORT_NUMBER=5432 + - ALLOW_EMPTY_PASSWORD=yes diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 1dd48aaa..bb9eb35a 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -331,7 +331,7 @@ def setUp(self): table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True }, {"name" : 'our_hstore', "type" : "hstore" }], "name" : TestHStoreTable.table_name} - with get_test_connection() as conn: + with get_test_connection(superuser=True) as conn: cur = conn.cursor() cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """) if cur.fetchone()[0] is None: @@ -536,7 +536,7 @@ class TestColumnGrants(unittest.TestCase): table_name = 'CHICKEN TIMES' user = 'tmp_user_for_grant_tests' password = 'password' - + def setUp(self): table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, {"name" : 'size integer', "type" : "integer", "quoted" : True}, @@ -545,7 +545,7 @@ def setUp(self): "name" : TestColumnGrants.table_name} ensure_test_table(table_spec) - with get_test_connection() as conn: + with get_test_connection(superuser=True) as conn: cur = conn.cursor() sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password) @@ -560,8 +560,8 @@ def setUp(self): LOGGER.info("running sql: {}".format(sql)) cur.execute(sql) - - + + def test_catalog(self): conn_config = get_test_connection_config() @@ -587,7 +587,7 @@ def test_catalog(self): ('properties', 'id'): {'inclusion': 'available', 'selected-by-default': True, 'sql-datatype': 'integer'}}) - + self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS, 'type': 'object', 'properties': {'id': {'type': ['null', 'integer'], diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 34ed5371..c3249195 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,4 +1,7 @@ -import unittest +import re +import psycopg2 +import unittest.mock +import pytest import tap_postgres import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common @@ -45,10 +48,12 @@ def do_not_dump_catalog(catalog): tap_postgres.dump_catalog = do_not_dump_catalog full_table.UPDATE_BOOKMARK_PERIOD = 1 -class LogicalInterruption(unittest.TestCase): +@pytest.mark.parametrize('use_replica', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestLogicalInterruption: maxDiff = None - def setUp(self): + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, {"name" : 'colour', "type": "character varying"}, @@ -62,14 +67,28 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + def test_catalog(self, mock_connect, use_replica): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config() + conn_config = get_test_connection_config(use_replica=use_replica) streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['replica_host'] if use_replica else conn_config['host'], + 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] - self.assertIsNotNone(cow_stream) + assert cow_stream is not None cow_stream = select_all_of_stream(cow_stream) cow_stream = set_replication_method_for_stream(cow_stream, 'LOG_BASED') @@ -90,56 +109,59 @@ def test_catalog(self): insert_record(cur, 'COW', cow_rec) conn.close() - + blew_up_on_cow = False state = {} #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record try: - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) except Exception: blew_up_on_cow = True - self.assertTrue(blew_up_on_cow) + assert blew_up_on_cow is True + + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(7, len(CAUGHT_MESSAGES)) + assert 7 == len(CAUGHT_MESSAGES) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin')) - self.assertIsNotNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn')) + assert CAUGHT_MESSAGES[0]['type'] =='SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') is not None end_lsn = CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) new_version = CAUGHT_MESSAGES[2].version - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[3].record, { + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert CAUGHT_MESSAGES[3].record == { 'colour': 'blue', 'id': 1, 'name': 'betty', 'timestamp_ntz': '2020-09-01T10:40:59+00:00', 'timestamp_tz': '2020-08-31T22:50:59+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[3].stream) + assert 'public-COW' == CAUGHT_MESSAGES[3].stream - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'], end_lsn) + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin'] is not None + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'] == end_lsn - self.assertEqual(CAUGHT_MESSAGES[5].record, { + assert CAUGHT_MESSAGES[5].record == { 'colour': 'brow', 'id': 2, 'name': 'smelly', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[5].stream) + assert 'public-COW' == CAUGHT_MESSAGES[5].stream - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) last_xmin = CAUGHT_MESSAGES[6].value['bookmarks']['public-COW']['xmin'] old_state = CAUGHT_MESSAGES[6].value @@ -149,60 +171,65 @@ def test_catalog(self): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) - self.assertEqual(8, len(CAUGHT_MESSAGES)) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') + assert 8 == len(CAUGHT_MESSAGES) - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin'), last_xmin) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version'), new_version) + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[2].record, { + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') == last_xmin + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version') == new_version + + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record == { 'colour': 'brow', 'id': 2, 'name': 'smelly', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[2].stream) + assert 'public-COW' == CAUGHT_MESSAGES[2].stream - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.StateMessage) - self.assertTrue(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin) - self.assertEqual(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version') == new_version - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[4].record, { + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record == { 'colour': 'green', 'id': 3, 'name': 'pooper', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) - self.assertEqual('public-COW', CAUGHT_MESSAGES[4].stream) + } + assert 'public-COW' == CAUGHT_MESSAGES[4].stream - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.StateMessage) - self.assertTrue(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin) - self.assertEqual(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version') == new_version - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[6].version, new_version) + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == new_version - self.assertIsInstance(CAUGHT_MESSAGES[7], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin')) - self.assertEqual(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version -class FullTableInterruption(unittest.TestCase): +@pytest.mark.parametrize('use_replica', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestFullTableInterruption: maxDiff = None - def setUp(self): + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, {"name" : 'colour', "type": "character varying"}], @@ -220,25 +247,39 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + def test_catalog(self, mock_connect, use_replica): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config() + conn_config = get_test_connection_config(use_replica=use_replica) streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['replica_host'] if use_replica else conn_config['host'], + 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] - self.assertIsNotNone(cow_stream) + assert cow_stream is not None cow_stream = select_all_of_stream(cow_stream) cow_stream = set_replication_method_for_stream(cow_stream, 'FULL_TABLE') chicken_stream = [s for s in streams if s['table_name'] == 'CHICKEN'][0] - self.assertIsNotNone(chicken_stream) + assert chicken_stream is not None chicken_stream = select_all_of_stream(chicken_stream) chicken_stream = set_replication_method_for_stream(chicken_stream, 'FULL_TABLE') conn = get_test_connection() conn.autocommit = True - + with conn.cursor() as cur: cow_rec = {'name': 'betty', 'colour': 'blue'} insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'}) @@ -256,64 +297,65 @@ def test_catalog(self): state = {} blew_up_on_cow = False - + #this will sync the CHICKEN but then blow up on the COW try: - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) except Exception as ex: # LOGGER.exception(ex) blew_up_on_cow = True - self.assertTrue(blew_up_on_cow) - + assert blew_up_on_cow + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(14, len(CAUGHT_MESSAGES)) + assert 14 == len(CAUGHT_MESSAGES) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin')) + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin') is None - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) new_version = CAUGHT_MESSAGES[2].version - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.RecordMessage) - self.assertEqual('public-CHICKEN', CAUGHT_MESSAGES[3].stream) + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert 'public-CHICKEN' == CAUGHT_MESSAGES[3].stream - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin']) + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin'] is not None - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[5].version, new_version) + assert isinstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[5].version == new_version - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.StateMessage) - self.assertEqual(None, singer.get_currently_syncing( CAUGHT_MESSAGES[6].value)) + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert None == singer.get_currently_syncing( CAUGHT_MESSAGES[6].value) #xmin is cleared at the end of the full table replication - self.assertIsNone(CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin']) + assert CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin'] is None #cow messages - self.assertEqual(CAUGHT_MESSAGES[7]['type'], 'SCHEMA') + assert CAUGHT_MESSAGES[7]['type'] == 'SCHEMA' - self.assertEqual("public-COW", CAUGHT_MESSAGES[7]['stream']) - self.assertIsInstance(CAUGHT_MESSAGES[8], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin')) - self.assertEqual("public-COW", CAUGHT_MESSAGES[8].value['currently_syncing']) + assert "public-COW" == CAUGHT_MESSAGES[7]['stream'] + assert isinstance(CAUGHT_MESSAGES[8], singer.StateMessage) + assert CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin') is None + assert "public-COW" == CAUGHT_MESSAGES[8].value['currently_syncing'] - self.assertIsInstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) cow_version = CAUGHT_MESSAGES[9].version - self.assertIsInstance(CAUGHT_MESSAGES[10], singer.RecordMessage) + assert isinstance(CAUGHT_MESSAGES[10], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[10].record['name'], 'betty') - self.assertEqual('public-COW', CAUGHT_MESSAGES[10].stream) + assert CAUGHT_MESSAGES[10].record['name'] == 'betty' + assert 'public-COW' == CAUGHT_MESSAGES[10].stream - self.assertIsInstance(CAUGHT_MESSAGES[11], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[11], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin']) + assert CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin'] is not None - self.assertEqual(CAUGHT_MESSAGES[12].record['name'], 'smelly') - self.assertEqual('public-COW', CAUGHT_MESSAGES[12].stream) + assert CAUGHT_MESSAGES[12].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[12].stream old_state = CAUGHT_MESSAGES[13].value #run another do_sync @@ -322,47 +364,44 @@ def test_catalog(self): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() + + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) # because we were interrupted, we do not switch versions - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'], cow_version) - self.assertIsNotNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin']) - self.assertEqual("public-COW", singer.get_currently_syncing(CAUGHT_MESSAGES[1].value)) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'] == cow_version + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin'] is not None + assert "public-COW" == singer.get_currently_syncing(CAUGHT_MESSAGES[1].value) - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[2].record['name'], 'smelly') - self.assertEqual('public-COW', CAUGHT_MESSAGES[2].stream) + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[2].stream #after record: activate version, state with no xmin or currently syncing - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) #we still have an xmin for COW because are not yet done with the COW table - self.assertIsNotNone(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(singer.get_currently_syncing( CAUGHT_MESSAGES[3].value), 'public-COW') + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[3].value) == 'public-COW' - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[4].record['name'], 'pooper') - self.assertEqual('public-COW', CAUGHT_MESSAGES[4].stream) + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record['name'] == 'pooper' + assert 'public-COW' == CAUGHT_MESSAGES[4].stream - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.StateMessage) - self.assertIsNotNone(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(singer.get_currently_syncing( CAUGHT_MESSAGES[5].value), 'public-COW') + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[5].value) == 'public-COW' #xmin is cleared because we are finished the full table replication - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[6].version, cow_version) - - self.assertIsInstance(CAUGHT_MESSAGES[7], singer.StateMessage) - self.assertIsNone(singer.get_currently_syncing( CAUGHT_MESSAGES[7].value)) - self.assertIsNone(CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin']) - self.assertIsNone(singer.get_currently_syncing( CAUGHT_MESSAGES[7].value)) - + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == cow_version -if __name__== "__main__": - test1 = LogicalInterruption() - test1.setUp() - test1.test_catalog() + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin'] is None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None diff --git a/tests/utils.py b/tests/utils.py index 881cfd3c..79b1ce21 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ LOGGER = get_logger() -def get_test_connection_config(target_db='postgres'): +def get_test_connection_config(target_db='postgres', use_replica=False): missing_envs = [x for x in [os.getenv('TAP_POSTGRES_HOST'), os.getenv('TAP_POSTGRES_USER'), os.getenv('TAP_POSTGRES_PASSWORD'), @@ -24,13 +24,29 @@ def get_test_connection_config(target_db='postgres'): 'port': os.environ.get('TAP_POSTGRES_PORT'), 'dbname': target_db} + if use_replica: + missing_envs = [x for x in [os.getenv('TAP_POSTGRES_REPLICA_HOST'), + os.getenv('TAP_POSTGRES_REPLICA_PORT')] if x == None] + + if len(missing_envs) != 0: + raise Exception( + "set TAP_POSTGRES_REPLICA_HOST, TAP_POSTGRES_REPLICA_PORT" + ) + + conn_config.update({ + 'use_replica': use_replica, + 'replica_host': os.getenv('TAP_POSTGRES_REPLICA_HOST'), + 'replica_port': os.getenv('TAP_POSTGRES_REPLICA_PORT'), + }) + return conn_config -def get_test_connection(target_db='postgres'): +def get_test_connection(target_db='postgres', superuser=False): conn_config = get_test_connection_config(target_db) + user = 'postgres' if superuser else conn_config['user'] conn_string = "host='{}' dbname='{}' user='{}' password='{}' port='{}'".format(conn_config['host'], conn_config['dbname'], - conn_config['user'], + user, conn_config['password'], conn_config['port']) LOGGER.info("connecting to {}".format(conn_config['host'])) From 58462ffdece7d5629aa69d064632f5cb8209e800 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:53:57 +0000 Subject: [PATCH 4/6] Rename `replica_*` config options to `secondary_*` --- .github/workflows/ci.yaml | 4 ++-- README.md | 10 +++++----- tap_postgres/__init__.py | 14 +++++++------- tap_postgres/db.py | 10 +++++----- tests/test_full_table_interruption.py | 28 +++++++++++++-------------- tests/utils.py | 16 +++++++-------- 6 files changed, 41 insertions(+), 41 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 12cfa741..09b2b5ce 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -39,7 +39,7 @@ jobs: TAP_POSTGRES_PORT: 5432 TAP_POSTGRES_USER: test_user TAP_POSTGRES_PASSWORD: my-secret-passwd - TAP_POSTGRES_REPLICA_HOST: localhost - TAP_POSTGRES_REPLICA_PORT: 5433 + TAP_POSTGRES_SECONDARY_HOST: localhost + TAP_POSTGRES_SECONDARY_PORT: 5433 LOGGING_CONF_FILE: ./sample_logging.conf run: make test diff --git a/README.md b/README.md index 6895a2a1..b7c8c925 100644 --- a/README.md +++ b/README.md @@ -66,11 +66,11 @@ Full list of options in `config.json`: | tap_id | String | No | ID of the pipeline/tap (Default: None) | | itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) | | default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) | -| use_replica | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | -| replica_host | String | No | PostgreSQL Replica host (required if `use_replica` is `True`) | -| replica_port | Integer | No | PostgreSQL Replica port (required if `use_replica` is `True`) | -| replica_user | String | No | PostgreSQL Replica user (Default: same as `user`) | -| replica_password | String | No | PostgreSQL Replica password (Default: same as `password`) | +| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | +| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | +| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | +| secondary_user | String | No | PostgreSQL Replica user (Default: same as `user`) | +| secondary_password | String | No | PostgreSQL Replica password (Default: same as `password`) | ### Run the tap in Discovery Mode diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index fd7258a9..ddb7afb5 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -406,22 +406,22 @@ def main_impl(): 'max_run_seconds': args.config.get('max_run_seconds', 43200), 'break_at_end_lsn': args.config.get('break_at_end_lsn', True), 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)), - 'use_replica': args.config.get('use_replica', False), + 'use_secondary': args.config.get('use_secondary', False), } - if conn_config['use_replica']: + if conn_config['use_secondary']: try: conn_config.update({ # Host and Port are mandatory. - 'replica_host': args.config["replica_host"], - 'replica_port': args.config["replica_port"], + 'secondary_host': args.config['secondary_host'], + 'secondary_port': args.config['secondary_port'], # User and Password default to the same as on the Primary. - 'replica_user': args.config.get("replica_user", conn_config['user']), - 'replica_password': args.config.get("replica_password", conn_config['password']), + 'secondary_user': args.config.get('secondary_user', conn_config['user']), + 'secondary_password': args.config.get('secondary_password', conn_config['password']), }) except KeyError as exc: raise ValueError( - "When 'use_replica' enabled 'replica_host' and 'replical_port' must be defined." + "When 'use_secondary' enabled 'secondary_host' and 'secondary_port' must be defined." ) from exc if args.config.get('ssl') == 'true': diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 56978f9a..3228bbb8 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -47,14 +47,14 @@ def open_connection(conn_config, logical_replication=False, prioritize_primary=F 'connect_timeout': 30 } - if conn_config.get('use_replica', False) and not prioritize_primary and not logical_replication: + if conn_config.get('use_secondary', False) and not prioritize_primary and not logical_replication: # Try to use replica but fallback to primary if keys are missing. This is the same behavior as # https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129 cfg.update({ - 'host': conn_config.get("replica_host", conn_config['host']), - 'port': conn_config.get("replica_port", conn_config['port']), - 'user': conn_config.get("replica_user", conn_config['user']), - 'password': conn_config.get("replica_password", conn_config['password']), + 'host': conn_config.get("secondary_host", conn_config['host']), + 'port': conn_config.get("secondary_port", conn_config['port']), + 'user': conn_config.get("secondary_user", conn_config['user']), + 'password': conn_config.get("secondary_password", conn_config['password']), }) if conn_config.get('sslmode'): diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index c3249195..c1417468 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -48,7 +48,7 @@ def do_not_dump_catalog(catalog): tap_postgres.dump_catalog = do_not_dump_catalog full_table.UPDATE_BOOKMARK_PERIOD = 1 -@pytest.mark.parametrize('use_replica', [False, True]) +@pytest.mark.parametrize('use_secondary', [False, True]) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestLogicalInterruption: maxDiff = None @@ -67,11 +67,11 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, use_replica): + def test_catalog(self, mock_connect, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config(use_replica=use_replica) + conn_config = get_test_connection_config(use_secondary=use_secondary) streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database @@ -81,8 +81,8 @@ def test_catalog(self, mock_connect, use_replica): 'user': unittest.mock.ANY, 'password': unittest.mock.ANY, 'connect_timeout':unittest.mock.ANY, - 'host': conn_config['replica_host'] if use_replica else conn_config['host'], - 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], } mock_connect.assert_called_once_with(**expected_connection) mock_connect.reset_mock() @@ -115,7 +115,7 @@ def test_catalog(self, mock_connect, use_replica): #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record try: - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception: blew_up_on_cow = True @@ -171,7 +171,7 @@ def test_catalog(self, mock_connect, use_replica): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() @@ -225,7 +225,7 @@ def test_catalog(self, mock_connect, use_replica): assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version -@pytest.mark.parametrize('use_replica', [False, True]) +@pytest.mark.parametrize('use_secondary', [False, True]) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestFullTableInterruption: maxDiff = None @@ -247,11 +247,11 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, use_replica): + def test_catalog(self, mock_connect, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config(use_replica=use_replica) + conn_config = get_test_connection_config(use_secondary=use_secondary) streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database @@ -261,8 +261,8 @@ def test_catalog(self, mock_connect, use_replica): 'user': unittest.mock.ANY, 'password': unittest.mock.ANY, 'connect_timeout':unittest.mock.ANY, - 'host': conn_config['replica_host'] if use_replica else conn_config['host'], - 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], } mock_connect.assert_called_once_with(**expected_connection) mock_connect.reset_mock() @@ -300,7 +300,7 @@ def test_catalog(self, mock_connect, use_replica): #this will sync the CHICKEN but then blow up on the COW try: - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception as ex: # LOGGER.exception(ex) blew_up_on_cow = True @@ -364,7 +364,7 @@ def test_catalog(self, mock_connect, use_replica): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() diff --git a/tests/utils.py b/tests/utils.py index 79b1ce21..ee678b85 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ LOGGER = get_logger() -def get_test_connection_config(target_db='postgres', use_replica=False): +def get_test_connection_config(target_db='postgres', use_secondary=False): missing_envs = [x for x in [os.getenv('TAP_POSTGRES_HOST'), os.getenv('TAP_POSTGRES_USER'), os.getenv('TAP_POSTGRES_PASSWORD'), @@ -24,19 +24,19 @@ def get_test_connection_config(target_db='postgres', use_replica=False): 'port': os.environ.get('TAP_POSTGRES_PORT'), 'dbname': target_db} - if use_replica: - missing_envs = [x for x in [os.getenv('TAP_POSTGRES_REPLICA_HOST'), - os.getenv('TAP_POSTGRES_REPLICA_PORT')] if x == None] + if use_secondary: + missing_envs = [x for x in [os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + os.getenv('TAP_POSTGRES_SECONDARY_PORT')] if x == None] if len(missing_envs) != 0: raise Exception( - "set TAP_POSTGRES_REPLICA_HOST, TAP_POSTGRES_REPLICA_PORT" + "set TAP_POSTGRES_SECONDARY_HOST, TAP_POSTGRES_SECONDARY_PORT" ) conn_config.update({ - 'use_replica': use_replica, - 'replica_host': os.getenv('TAP_POSTGRES_REPLICA_HOST'), - 'replica_port': os.getenv('TAP_POSTGRES_REPLICA_PORT'), + 'use_secondary': use_secondary, + 'secondary_host': os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + 'secondary_port': os.getenv('TAP_POSTGRES_SECONDARY_PORT'), }) return conn_config From 09d6b9ccd3ef043542fb569fb65549e5a958a7fb Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:55:44 +0000 Subject: [PATCH 5/6] Require primary and secondary to use the same user --- README.md | 2 -- tap_postgres/__init__.py | 3 --- tap_postgres/db.py | 2 -- 3 files changed, 7 deletions(-) diff --git a/README.md b/README.md index b7c8c925..9da3a6ca 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,6 @@ Full list of options in `config.json`: | use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | | secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | | secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | -| secondary_user | String | No | PostgreSQL Replica user (Default: same as `user`) | -| secondary_password | String | No | PostgreSQL Replica password (Default: same as `password`) | ### Run the tap in Discovery Mode diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index ddb7afb5..5c7c2ac1 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -415,9 +415,6 @@ def main_impl(): # Host and Port are mandatory. 'secondary_host': args.config['secondary_host'], 'secondary_port': args.config['secondary_port'], - # User and Password default to the same as on the Primary. - 'secondary_user': args.config.get('secondary_user', conn_config['user']), - 'secondary_password': args.config.get('secondary_password', conn_config['password']), }) except KeyError as exc: raise ValueError( diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 3228bbb8..1b39c974 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -53,8 +53,6 @@ def open_connection(conn_config, logical_replication=False, prioritize_primary=F cfg.update({ 'host': conn_config.get("secondary_host", conn_config['host']), 'port': conn_config.get("secondary_port", conn_config['port']), - 'user': conn_config.get("secondary_user", conn_config['user']), - 'password': conn_config.get("secondary_password", conn_config['password']), }) if conn_config.get('sslmode'): From e892687529422cdf0d95698e1d1b2210faf03392 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:56:19 +0000 Subject: [PATCH 6/6] Add documentation --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 9da3a6ca..9a787307 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,8 @@ to the tap for the next sync. ``` export TAP_POSTGRES_HOST= export TAP_POSTGRES_PORT= + export TAP_POSTGRES_SECONDARY_HOST= + export TAP_POSTGRES_SECONDARY_PORT= export TAP_POSTGRES_USER= export TAP_POSTGRES_PASSWORD= ```