From aa2ddd9f7d3f1ddc8d69fa23ca9a8457e1247928 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 11 Jan 2022 16:37:17 +0000 Subject: [PATCH 01/20] Add `use_replica` config flag --- README.md | 11 ++++++++--- tap_postgres/__init__.py | 18 +++++++++++++++++- tap_postgres/db.py | 12 +++++++++++- .../sync_strategies/logical_replication.py | 12 ++++++------ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index a8d5d945..6895a2a1 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,11 @@ Full list of options in `config.json`: | tap_id | String | No | ID of the pipeline/tap (Default: None) | | itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) | | default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) | +| use_replica | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | +| replica_host | String | No | PostgreSQL Replica host (required if `use_replica` is `True`) | +| replica_port | Integer | No | PostgreSQL Replica port (required if `use_replica` is `True`) | +| replica_user | String | No | PostgreSQL Replica user (Default: same as `user`) | +| replica_password | String | No | PostgreSQL Replica password (Default: same as `password`) | ### Run the tap in Discovery Mode @@ -142,7 +147,7 @@ to the tap for the next sync. ``` Restart your PostgreSQL service to ensure the changes take effect. - + **Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance. @@ -151,11 +156,11 @@ to the tap for the next sync. In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database. - + Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot: ``` SELECT * - FROM pg_create_logical_replication_slot('pipelinewise_', 'wal2json'); + FROM pg_create_logical_replication_slot('pipelinewise_', 'wal2json'); ``` **Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 05bd565d..fd7258a9 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -405,9 +405,25 @@ def main_impl(): 'debug_lsn': args.config.get('debug_lsn') == 'true', 'max_run_seconds': args.config.get('max_run_seconds', 43200), 'break_at_end_lsn': args.config.get('break_at_end_lsn', True), - 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)) + 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)), + 'use_replica': args.config.get('use_replica', False), } + if conn_config['use_replica']: + try: + conn_config.update({ + # Host and Port are mandatory. + 'replica_host': args.config["replica_host"], + 'replica_port': args.config["replica_port"], + # User and Password default to the same as on the Primary. + 'replica_user': args.config.get("replica_user", conn_config['user']), + 'replica_password': args.config.get("replica_password", conn_config['password']), + }) + except KeyError as exc: + raise ValueError( + "When 'use_replica' enabled 'replica_host' and 'replical_port' must be defined." + ) from exc + if args.config.get('ssl') == 'true': conn_config['sslmode'] = 'require' diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 41ab82de..56978f9a 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -36,7 +36,7 @@ def fully_qualified_table_name(schema, table): return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"' -def open_connection(conn_config, logical_replication=False): +def open_connection(conn_config, logical_replication=False, prioritize_primary=False): cfg = { 'application_name': 'pipelinewise', 'host': conn_config['host'], @@ -47,6 +47,16 @@ def open_connection(conn_config, logical_replication=False): 'connect_timeout': 30 } + if conn_config.get('use_replica', False) and not prioritize_primary and not logical_replication: + # Try to use replica but fallback to primary if keys are missing. This is the same behavior as + # https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129 + cfg.update({ + 'host': conn_config.get("replica_host", conn_config['host']), + 'port': conn_config.get("replica_port", conn_config['port']), + 'user': conn_config.get("replica_user", conn_config['user']), + 'password': conn_config.get("replica_password", conn_config['password']), + }) + if conn_config.get('sslmode'): cfg['sslmode'] = conn_config['sslmode'] diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 904fd801..eb690956 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -34,7 +34,7 @@ class UnsupportedPayloadKindError(Exception): # pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments def get_pg_version(conn_info): - with post_db.open_connection(conn_info, False) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'") version = cur.fetchone()[0] @@ -93,7 +93,7 @@ def fetch_current_lsn(conn_config): if version < 90400: raise Exception('Logical replication not supported before PostgreSQL 9.4') - with post_db.open_connection(conn_config, False) as conn: + with post_db.open_connection(conn_config, False, True) as conn: with conn.cursor() as cur: # Use version specific lsn command if version >= 100000: @@ -138,7 +138,7 @@ def create_hstore_elem_query(elem): def create_hstore_elem(conn_info, elem): - with post_db.open_connection(conn_info) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: query = create_hstore_elem_query(elem) cur.execute(query) @@ -151,7 +151,7 @@ def create_array_elem(elem, sql_datatype, conn_info): if elem is None: return None - with post_db.open_connection(conn_info) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: if sql_datatype == 'bit[]': cast_datatype = 'boolean[]' @@ -517,7 +517,7 @@ def locate_replication_slot_by_cur(cursor, dbname, tap_id=None): def locate_replication_slot(conn_info): - with post_db.open_connection(conn_info, False) as conn: + with post_db.open_connection(conn_info, False, True) as conn: with conn.cursor() as cur: return locate_replication_slot_by_cur(cur, conn_info['dbname'], conn_info['tap_id']) @@ -576,7 +576,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): version = get_pg_version(conn_info) # Create replication connection and cursor - conn = post_db.open_connection(conn_info, True) + conn = post_db.open_connection(conn_info, True, True) cur = conn.cursor() # Set session wal_sender_timeout for PG12 and above From 2383d7a4de46bf91ae89d3db61e29cdc9026b84f Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:45:58 +0000 Subject: [PATCH 02/20] Add tests for `use_replica` option --- .github/workflows/ci.yaml | 4 +- Makefile | 2 +- docker-compose.yml | 28 ++- tests/test_discovery.py | 12 +- tests/test_full_table_interruption.py | 285 +++++++++++++++----------- tests/utils.py | 22 +- 6 files changed, 214 insertions(+), 139 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index cc254c9a..12cfa741 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,9 +35,11 @@ jobs: - name: Tests env: + TAP_POSTGRES_HOST: localhost TAP_POSTGRES_PORT: 5432 TAP_POSTGRES_USER: test_user TAP_POSTGRES_PASSWORD: my-secret-passwd - TAP_POSTGRES_HOST: localhost + TAP_POSTGRES_REPLICA_HOST: localhost + TAP_POSTGRES_REPLICA_PORT: 5433 LOGGING_CONF_FILE: ./sample_logging.conf run: make test diff --git a/Makefile b/Makefile index 24f00e25..bac92cd1 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ pylint: pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/ start_db: - docker-compose up -d --build db + docker-compose up -d test: . ./venv/bin/activate ;\ diff --git a/docker-compose.yml b/docker-compose.yml index b15b2649..c416b85b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,13 +1,31 @@ version: "3.3" services: - db: - image: "debezium/postgres:12-alpine" - container_name: "" + db_primary: + image: "docker.io/bitnami/postgresql:12" + container_name: "primary" ports: - "5432:5432" environment: + - POSTGRESQL_REPLICATION_MODE=master + - POSTGRESQL_REPLICATION_USER=repl_user + - POSTGRESQL_REPLICATION_PASSWORD=repl_password - POSTGRES_USER=test_user - POSTGRES_PASSWORD=my-secret-passwd - - POSTGRES_DB=tap_postgres_test - command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5" + - POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd + - POSTGRESQL_DATABASE=tap_postgres_test + - ALLOW_EMPTY_PASSWORD=yes + db_replica: + image: "docker.io/bitnami/postgresql:12" + container_name: replica + ports: + - "5433:5432" + depends_on: + - db_primary + environment: + - POSTGRESQL_REPLICATION_MODE=slave + - POSTGRESQL_REPLICATION_USER=repl_user + - POSTGRESQL_REPLICATION_PASSWORD=repl_password + - POSTGRESQL_MASTER_HOST=db_primary + - POSTGRESQL_MASTER_PORT_NUMBER=5432 + - ALLOW_EMPTY_PASSWORD=yes diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 1dd48aaa..bb9eb35a 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -331,7 +331,7 @@ def setUp(self): table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True }, {"name" : 'our_hstore', "type" : "hstore" }], "name" : TestHStoreTable.table_name} - with get_test_connection() as conn: + with get_test_connection(superuser=True) as conn: cur = conn.cursor() cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """) if cur.fetchone()[0] is None: @@ -536,7 +536,7 @@ class TestColumnGrants(unittest.TestCase): table_name = 'CHICKEN TIMES' user = 'tmp_user_for_grant_tests' password = 'password' - + def setUp(self): table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, {"name" : 'size integer', "type" : "integer", "quoted" : True}, @@ -545,7 +545,7 @@ def setUp(self): "name" : TestColumnGrants.table_name} ensure_test_table(table_spec) - with get_test_connection() as conn: + with get_test_connection(superuser=True) as conn: cur = conn.cursor() sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password) @@ -560,8 +560,8 @@ def setUp(self): LOGGER.info("running sql: {}".format(sql)) cur.execute(sql) - - + + def test_catalog(self): conn_config = get_test_connection_config() @@ -587,7 +587,7 @@ def test_catalog(self): ('properties', 'id'): {'inclusion': 'available', 'selected-by-default': True, 'sql-datatype': 'integer'}}) - + self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS, 'type': 'object', 'properties': {'id': {'type': ['null', 'integer'], diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 34ed5371..c3249195 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,4 +1,7 @@ -import unittest +import re +import psycopg2 +import unittest.mock +import pytest import tap_postgres import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common @@ -45,10 +48,12 @@ def do_not_dump_catalog(catalog): tap_postgres.dump_catalog = do_not_dump_catalog full_table.UPDATE_BOOKMARK_PERIOD = 1 -class LogicalInterruption(unittest.TestCase): +@pytest.mark.parametrize('use_replica', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestLogicalInterruption: maxDiff = None - def setUp(self): + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, {"name" : 'colour', "type": "character varying"}, @@ -62,14 +67,28 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + def test_catalog(self, mock_connect, use_replica): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config() + conn_config = get_test_connection_config(use_replica=use_replica) streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['replica_host'] if use_replica else conn_config['host'], + 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] - self.assertIsNotNone(cow_stream) + assert cow_stream is not None cow_stream = select_all_of_stream(cow_stream) cow_stream = set_replication_method_for_stream(cow_stream, 'LOG_BASED') @@ -90,56 +109,59 @@ def test_catalog(self): insert_record(cur, 'COW', cow_rec) conn.close() - + blew_up_on_cow = False state = {} #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record try: - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) except Exception: blew_up_on_cow = True - self.assertTrue(blew_up_on_cow) + assert blew_up_on_cow is True + + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(7, len(CAUGHT_MESSAGES)) + assert 7 == len(CAUGHT_MESSAGES) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin')) - self.assertIsNotNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn')) + assert CAUGHT_MESSAGES[0]['type'] =='SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') is not None end_lsn = CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) new_version = CAUGHT_MESSAGES[2].version - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[3].record, { + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert CAUGHT_MESSAGES[3].record == { 'colour': 'blue', 'id': 1, 'name': 'betty', 'timestamp_ntz': '2020-09-01T10:40:59+00:00', 'timestamp_tz': '2020-08-31T22:50:59+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[3].stream) + assert 'public-COW' == CAUGHT_MESSAGES[3].stream - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'], end_lsn) + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin'] is not None + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'] == end_lsn - self.assertEqual(CAUGHT_MESSAGES[5].record, { + assert CAUGHT_MESSAGES[5].record == { 'colour': 'brow', 'id': 2, 'name': 'smelly', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[5].stream) + assert 'public-COW' == CAUGHT_MESSAGES[5].stream - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) last_xmin = CAUGHT_MESSAGES[6].value['bookmarks']['public-COW']['xmin'] old_state = CAUGHT_MESSAGES[6].value @@ -149,60 +171,65 @@ def test_catalog(self): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) - self.assertEqual(8, len(CAUGHT_MESSAGES)) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') + assert 8 == len(CAUGHT_MESSAGES) - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin'), last_xmin) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version'), new_version) + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[2].record, { + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') == last_xmin + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version') == new_version + + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record == { 'colour': 'brow', 'id': 2, 'name': 'smelly', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) + } - self.assertEqual('public-COW', CAUGHT_MESSAGES[2].stream) + assert 'public-COW' == CAUGHT_MESSAGES[2].stream - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.StateMessage) - self.assertTrue(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin) - self.assertEqual(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version') == new_version - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[4].record, { + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record == { 'colour': 'green', 'id': 3, 'name': 'pooper', 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' - }) - self.assertEqual('public-COW', CAUGHT_MESSAGES[4].stream) + } + assert 'public-COW' == CAUGHT_MESSAGES[4].stream - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.StateMessage) - self.assertTrue(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin) - self.assertEqual(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version') == new_version - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[6].version, new_version) + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == new_version - self.assertIsInstance(CAUGHT_MESSAGES[7], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin')) - self.assertEqual(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn'), end_lsn) - self.assertEqual(CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version'), new_version) + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version -class FullTableInterruption(unittest.TestCase): +@pytest.mark.parametrize('use_replica', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestFullTableInterruption: maxDiff = None - def setUp(self): + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, {"name" : 'colour', "type": "character varying"}], @@ -220,25 +247,39 @@ def setUp(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self): + def test_catalog(self, mock_connect, use_replica): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config() + conn_config = get_test_connection_config(use_replica=use_replica) streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['replica_host'] if use_replica else conn_config['host'], + 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] - self.assertIsNotNone(cow_stream) + assert cow_stream is not None cow_stream = select_all_of_stream(cow_stream) cow_stream = set_replication_method_for_stream(cow_stream, 'FULL_TABLE') chicken_stream = [s for s in streams if s['table_name'] == 'CHICKEN'][0] - self.assertIsNotNone(chicken_stream) + assert chicken_stream is not None chicken_stream = select_all_of_stream(chicken_stream) chicken_stream = set_replication_method_for_stream(chicken_stream, 'FULL_TABLE') conn = get_test_connection() conn.autocommit = True - + with conn.cursor() as cur: cow_rec = {'name': 'betty', 'colour': 'blue'} insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'}) @@ -256,64 +297,65 @@ def test_catalog(self): state = {} blew_up_on_cow = False - + #this will sync the CHICKEN but then blow up on the COW try: - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) except Exception as ex: # LOGGER.exception(ex) blew_up_on_cow = True - self.assertTrue(blew_up_on_cow) - + assert blew_up_on_cow + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() - self.assertEqual(14, len(CAUGHT_MESSAGES)) + assert 14 == len(CAUGHT_MESSAGES) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin')) + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin') is None - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) new_version = CAUGHT_MESSAGES[2].version - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.RecordMessage) - self.assertEqual('public-CHICKEN', CAUGHT_MESSAGES[3].stream) + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert 'public-CHICKEN' == CAUGHT_MESSAGES[3].stream - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin']) + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin'] is not None - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[5].version, new_version) + assert isinstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[5].version == new_version - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.StateMessage) - self.assertEqual(None, singer.get_currently_syncing( CAUGHT_MESSAGES[6].value)) + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert None == singer.get_currently_syncing( CAUGHT_MESSAGES[6].value) #xmin is cleared at the end of the full table replication - self.assertIsNone(CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin']) + assert CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin'] is None #cow messages - self.assertEqual(CAUGHT_MESSAGES[7]['type'], 'SCHEMA') + assert CAUGHT_MESSAGES[7]['type'] == 'SCHEMA' - self.assertEqual("public-COW", CAUGHT_MESSAGES[7]['stream']) - self.assertIsInstance(CAUGHT_MESSAGES[8], singer.StateMessage) - self.assertIsNone(CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin')) - self.assertEqual("public-COW", CAUGHT_MESSAGES[8].value['currently_syncing']) + assert "public-COW" == CAUGHT_MESSAGES[7]['stream'] + assert isinstance(CAUGHT_MESSAGES[8], singer.StateMessage) + assert CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin') is None + assert "public-COW" == CAUGHT_MESSAGES[8].value['currently_syncing'] - self.assertIsInstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) + assert isinstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) cow_version = CAUGHT_MESSAGES[9].version - self.assertIsInstance(CAUGHT_MESSAGES[10], singer.RecordMessage) + assert isinstance(CAUGHT_MESSAGES[10], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[10].record['name'], 'betty') - self.assertEqual('public-COW', CAUGHT_MESSAGES[10].stream) + assert CAUGHT_MESSAGES[10].record['name'] == 'betty' + assert 'public-COW' == CAUGHT_MESSAGES[10].stream - self.assertIsInstance(CAUGHT_MESSAGES[11], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[11], singer.StateMessage) #xmin is set while we are processing the full table replication - self.assertIsNotNone(CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin']) + assert CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin'] is not None - self.assertEqual(CAUGHT_MESSAGES[12].record['name'], 'smelly') - self.assertEqual('public-COW', CAUGHT_MESSAGES[12].stream) + assert CAUGHT_MESSAGES[12].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[12].stream old_state = CAUGHT_MESSAGES[13].value #run another do_sync @@ -322,47 +364,44 @@ def test_catalog(self): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 - tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) - self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') - self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() + + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) # because we were interrupted, we do not switch versions - self.assertEqual(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'], cow_version) - self.assertIsNotNone(CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin']) - self.assertEqual("public-COW", singer.get_currently_syncing(CAUGHT_MESSAGES[1].value)) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'] == cow_version + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin'] is not None + assert "public-COW" == singer.get_currently_syncing(CAUGHT_MESSAGES[1].value) - self.assertIsInstance(CAUGHT_MESSAGES[2], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[2].record['name'], 'smelly') - self.assertEqual('public-COW', CAUGHT_MESSAGES[2].stream) + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[2].stream #after record: activate version, state with no xmin or currently syncing - self.assertIsInstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) #we still have an xmin for COW because are not yet done with the COW table - self.assertIsNotNone(CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(singer.get_currently_syncing( CAUGHT_MESSAGES[3].value), 'public-COW') + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[3].value) == 'public-COW' - self.assertIsInstance(CAUGHT_MESSAGES[4], singer.RecordMessage) - self.assertEqual(CAUGHT_MESSAGES[4].record['name'], 'pooper') - self.assertEqual('public-COW', CAUGHT_MESSAGES[4].stream) + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record['name'] == 'pooper' + assert 'public-COW' == CAUGHT_MESSAGES[4].stream - self.assertIsInstance(CAUGHT_MESSAGES[5], singer.StateMessage) - self.assertIsNotNone(CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin']) - self.assertEqual(singer.get_currently_syncing( CAUGHT_MESSAGES[5].value), 'public-COW') + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[5].value) == 'public-COW' #xmin is cleared because we are finished the full table replication - self.assertIsInstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) - self.assertEqual(CAUGHT_MESSAGES[6].version, cow_version) - - self.assertIsInstance(CAUGHT_MESSAGES[7], singer.StateMessage) - self.assertIsNone(singer.get_currently_syncing( CAUGHT_MESSAGES[7].value)) - self.assertIsNone(CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin']) - self.assertIsNone(singer.get_currently_syncing( CAUGHT_MESSAGES[7].value)) - + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == cow_version -if __name__== "__main__": - test1 = LogicalInterruption() - test1.setUp() - test1.test_catalog() + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin'] is None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None diff --git a/tests/utils.py b/tests/utils.py index 881cfd3c..79b1ce21 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ LOGGER = get_logger() -def get_test_connection_config(target_db='postgres'): +def get_test_connection_config(target_db='postgres', use_replica=False): missing_envs = [x for x in [os.getenv('TAP_POSTGRES_HOST'), os.getenv('TAP_POSTGRES_USER'), os.getenv('TAP_POSTGRES_PASSWORD'), @@ -24,13 +24,29 @@ def get_test_connection_config(target_db='postgres'): 'port': os.environ.get('TAP_POSTGRES_PORT'), 'dbname': target_db} + if use_replica: + missing_envs = [x for x in [os.getenv('TAP_POSTGRES_REPLICA_HOST'), + os.getenv('TAP_POSTGRES_REPLICA_PORT')] if x == None] + + if len(missing_envs) != 0: + raise Exception( + "set TAP_POSTGRES_REPLICA_HOST, TAP_POSTGRES_REPLICA_PORT" + ) + + conn_config.update({ + 'use_replica': use_replica, + 'replica_host': os.getenv('TAP_POSTGRES_REPLICA_HOST'), + 'replica_port': os.getenv('TAP_POSTGRES_REPLICA_PORT'), + }) + return conn_config -def get_test_connection(target_db='postgres'): +def get_test_connection(target_db='postgres', superuser=False): conn_config = get_test_connection_config(target_db) + user = 'postgres' if superuser else conn_config['user'] conn_string = "host='{}' dbname='{}' user='{}' password='{}' port='{}'".format(conn_config['host'], conn_config['dbname'], - conn_config['user'], + user, conn_config['password'], conn_config['port']) LOGGER.info("connecting to {}".format(conn_config['host'])) From 58462ffdece7d5629aa69d064632f5cb8209e800 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:53:57 +0000 Subject: [PATCH 03/20] Rename `replica_*` config options to `secondary_*` --- .github/workflows/ci.yaml | 4 ++-- README.md | 10 +++++----- tap_postgres/__init__.py | 14 +++++++------- tap_postgres/db.py | 10 +++++----- tests/test_full_table_interruption.py | 28 +++++++++++++-------------- tests/utils.py | 16 +++++++-------- 6 files changed, 41 insertions(+), 41 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 12cfa741..09b2b5ce 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -39,7 +39,7 @@ jobs: TAP_POSTGRES_PORT: 5432 TAP_POSTGRES_USER: test_user TAP_POSTGRES_PASSWORD: my-secret-passwd - TAP_POSTGRES_REPLICA_HOST: localhost - TAP_POSTGRES_REPLICA_PORT: 5433 + TAP_POSTGRES_SECONDARY_HOST: localhost + TAP_POSTGRES_SECONDARY_PORT: 5433 LOGGING_CONF_FILE: ./sample_logging.conf run: make test diff --git a/README.md b/README.md index 6895a2a1..b7c8c925 100644 --- a/README.md +++ b/README.md @@ -66,11 +66,11 @@ Full list of options in `config.json`: | tap_id | String | No | ID of the pipeline/tap (Default: None) | | itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) | | default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) | -| use_replica | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | -| replica_host | String | No | PostgreSQL Replica host (required if `use_replica` is `True`) | -| replica_port | Integer | No | PostgreSQL Replica port (required if `use_replica` is `True`) | -| replica_user | String | No | PostgreSQL Replica user (Default: same as `user`) | -| replica_password | String | No | PostgreSQL Replica password (Default: same as `password`) | +| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | +| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | +| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | +| secondary_user | String | No | PostgreSQL Replica user (Default: same as `user`) | +| secondary_password | String | No | PostgreSQL Replica password (Default: same as `password`) | ### Run the tap in Discovery Mode diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index fd7258a9..ddb7afb5 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -406,22 +406,22 @@ def main_impl(): 'max_run_seconds': args.config.get('max_run_seconds', 43200), 'break_at_end_lsn': args.config.get('break_at_end_lsn', True), 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)), - 'use_replica': args.config.get('use_replica', False), + 'use_secondary': args.config.get('use_secondary', False), } - if conn_config['use_replica']: + if conn_config['use_secondary']: try: conn_config.update({ # Host and Port are mandatory. - 'replica_host': args.config["replica_host"], - 'replica_port': args.config["replica_port"], + 'secondary_host': args.config['secondary_host'], + 'secondary_port': args.config['secondary_port'], # User and Password default to the same as on the Primary. - 'replica_user': args.config.get("replica_user", conn_config['user']), - 'replica_password': args.config.get("replica_password", conn_config['password']), + 'secondary_user': args.config.get('secondary_user', conn_config['user']), + 'secondary_password': args.config.get('secondary_password', conn_config['password']), }) except KeyError as exc: raise ValueError( - "When 'use_replica' enabled 'replica_host' and 'replical_port' must be defined." + "When 'use_secondary' enabled 'secondary_host' and 'secondary_port' must be defined." ) from exc if args.config.get('ssl') == 'true': diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 56978f9a..3228bbb8 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -47,14 +47,14 @@ def open_connection(conn_config, logical_replication=False, prioritize_primary=F 'connect_timeout': 30 } - if conn_config.get('use_replica', False) and not prioritize_primary and not logical_replication: + if conn_config.get('use_secondary', False) and not prioritize_primary and not logical_replication: # Try to use replica but fallback to primary if keys are missing. This is the same behavior as # https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129 cfg.update({ - 'host': conn_config.get("replica_host", conn_config['host']), - 'port': conn_config.get("replica_port", conn_config['port']), - 'user': conn_config.get("replica_user", conn_config['user']), - 'password': conn_config.get("replica_password", conn_config['password']), + 'host': conn_config.get("secondary_host", conn_config['host']), + 'port': conn_config.get("secondary_port", conn_config['port']), + 'user': conn_config.get("secondary_user", conn_config['user']), + 'password': conn_config.get("secondary_password", conn_config['password']), }) if conn_config.get('sslmode'): diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index c3249195..c1417468 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -48,7 +48,7 @@ def do_not_dump_catalog(catalog): tap_postgres.dump_catalog = do_not_dump_catalog full_table.UPDATE_BOOKMARK_PERIOD = 1 -@pytest.mark.parametrize('use_replica', [False, True]) +@pytest.mark.parametrize('use_secondary', [False, True]) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestLogicalInterruption: maxDiff = None @@ -67,11 +67,11 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, use_replica): + def test_catalog(self, mock_connect, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config(use_replica=use_replica) + conn_config = get_test_connection_config(use_secondary=use_secondary) streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database @@ -81,8 +81,8 @@ def test_catalog(self, mock_connect, use_replica): 'user': unittest.mock.ANY, 'password': unittest.mock.ANY, 'connect_timeout':unittest.mock.ANY, - 'host': conn_config['replica_host'] if use_replica else conn_config['host'], - 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], } mock_connect.assert_called_once_with(**expected_connection) mock_connect.reset_mock() @@ -115,7 +115,7 @@ def test_catalog(self, mock_connect, use_replica): #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record try: - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception: blew_up_on_cow = True @@ -171,7 +171,7 @@ def test_catalog(self, mock_connect, use_replica): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() @@ -225,7 +225,7 @@ def test_catalog(self, mock_connect, use_replica): assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version -@pytest.mark.parametrize('use_replica', [False, True]) +@pytest.mark.parametrize('use_secondary', [False, True]) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestFullTableInterruption: maxDiff = None @@ -247,11 +247,11 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, use_replica): + def test_catalog(self, mock_connect, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config(use_replica=use_replica) + conn_config = get_test_connection_config(use_secondary=use_secondary) streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database @@ -261,8 +261,8 @@ def test_catalog(self, mock_connect, use_replica): 'user': unittest.mock.ANY, 'password': unittest.mock.ANY, 'connect_timeout':unittest.mock.ANY, - 'host': conn_config['replica_host'] if use_replica else conn_config['host'], - 'port': conn_config['replica_port'] if use_replica else conn_config['port'], + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], } mock_connect.assert_called_once_with(**expected_connection) mock_connect.reset_mock() @@ -300,7 +300,7 @@ def test_catalog(self, mock_connect, use_replica): #this will sync the CHICKEN but then blow up on the COW try: - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception as ex: # LOGGER.exception(ex) blew_up_on_cow = True @@ -364,7 +364,7 @@ def test_catalog(self, mock_connect, use_replica): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 - tap_postgres.do_sync(get_test_connection_config(use_replica=use_replica), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() diff --git a/tests/utils.py b/tests/utils.py index 79b1ce21..ee678b85 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ LOGGER = get_logger() -def get_test_connection_config(target_db='postgres', use_replica=False): +def get_test_connection_config(target_db='postgres', use_secondary=False): missing_envs = [x for x in [os.getenv('TAP_POSTGRES_HOST'), os.getenv('TAP_POSTGRES_USER'), os.getenv('TAP_POSTGRES_PASSWORD'), @@ -24,19 +24,19 @@ def get_test_connection_config(target_db='postgres', use_replica=False): 'port': os.environ.get('TAP_POSTGRES_PORT'), 'dbname': target_db} - if use_replica: - missing_envs = [x for x in [os.getenv('TAP_POSTGRES_REPLICA_HOST'), - os.getenv('TAP_POSTGRES_REPLICA_PORT')] if x == None] + if use_secondary: + missing_envs = [x for x in [os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + os.getenv('TAP_POSTGRES_SECONDARY_PORT')] if x == None] if len(missing_envs) != 0: raise Exception( - "set TAP_POSTGRES_REPLICA_HOST, TAP_POSTGRES_REPLICA_PORT" + "set TAP_POSTGRES_SECONDARY_HOST, TAP_POSTGRES_SECONDARY_PORT" ) conn_config.update({ - 'use_replica': use_replica, - 'replica_host': os.getenv('TAP_POSTGRES_REPLICA_HOST'), - 'replica_port': os.getenv('TAP_POSTGRES_REPLICA_PORT'), + 'use_secondary': use_secondary, + 'secondary_host': os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + 'secondary_port': os.getenv('TAP_POSTGRES_SECONDARY_PORT'), }) return conn_config From 09d6b9ccd3ef043542fb569fb65549e5a958a7fb Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:55:44 +0000 Subject: [PATCH 04/20] Require primary and secondary to use the same user --- README.md | 2 -- tap_postgres/__init__.py | 3 --- tap_postgres/db.py | 2 -- 3 files changed, 7 deletions(-) diff --git a/README.md b/README.md index b7c8c925..9da3a6ca 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,6 @@ Full list of options in `config.json`: | use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | | secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | | secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | -| secondary_user | String | No | PostgreSQL Replica user (Default: same as `user`) | -| secondary_password | String | No | PostgreSQL Replica password (Default: same as `password`) | ### Run the tap in Discovery Mode diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index ddb7afb5..5c7c2ac1 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -415,9 +415,6 @@ def main_impl(): # Host and Port are mandatory. 'secondary_host': args.config['secondary_host'], 'secondary_port': args.config['secondary_port'], - # User and Password default to the same as on the Primary. - 'secondary_user': args.config.get('secondary_user', conn_config['user']), - 'secondary_password': args.config.get('secondary_password', conn_config['password']), }) except KeyError as exc: raise ValueError( diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 3228bbb8..1b39c974 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -53,8 +53,6 @@ def open_connection(conn_config, logical_replication=False, prioritize_primary=F cfg.update({ 'host': conn_config.get("secondary_host", conn_config['host']), 'port': conn_config.get("secondary_port", conn_config['port']), - 'user': conn_config.get("secondary_user", conn_config['user']), - 'password': conn_config.get("secondary_password", conn_config['password']), }) if conn_config.get('sslmode'): From e892687529422cdf0d95698e1d1b2210faf03392 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 12 Jan 2022 15:56:19 +0000 Subject: [PATCH 05/20] Add documentation --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 9da3a6ca..9a787307 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,8 @@ to the tap for the next sync. ``` export TAP_POSTGRES_HOST= export TAP_POSTGRES_PORT= + export TAP_POSTGRES_SECONDARY_HOST= + export TAP_POSTGRES_SECONDARY_PORT= export TAP_POSTGRES_USER= export TAP_POSTGRES_PASSWORD= ``` From 0855fc734759dd413696a2835e847324060bace8 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 18 Jan 2022 15:07:23 +0000 Subject: [PATCH 06/20] Perform logical replication after initial sync --- tap_postgres/__init__.py | 4 ++++ tests/test_full_table_interruption.py | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 5c7c2ac1..2cafea11 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index c1417468..06ba33a4 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,4 +1,3 @@ -import re import psycopg2 import unittest.mock import pytest @@ -49,6 +48,7 @@ def do_not_dump_catalog(catalog): full_table.UPDATE_BOOKMARK_PERIOD = 1 @pytest.mark.parametrize('use_secondary', [False, True]) +@unittest.mock.patch('tap_postgres.sync_logical_streams') @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestLogicalInterruption: maxDiff = None @@ -67,7 +67,7 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, use_secondary): + def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok @@ -124,6 +124,8 @@ def test_catalog(self, mock_connect, use_secondary): mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() + mock_sync_logical_streams.assert_not_called() + assert 7 == len(CAUGHT_MESSAGES) assert CAUGHT_MESSAGES[0]['type'] =='SCHEMA' @@ -176,6 +178,9 @@ def test_catalog(self, mock_connect, use_secondary): mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() + mock_sync_logical_streams.assert_called_once() + + assert 8 == len(CAUGHT_MESSAGES) assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' From c48300a195fe66cdeb6a95d9062a3eb5650804fe Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 18 Jan 2022 12:29:03 +0000 Subject: [PATCH 07/20] Actually run the initial `LOG_BASED` sync in test --- Dockerfile | 10 +++++ docker-compose.yml | 13 +++--- tests/test_full_table_interruption.py | 62 ++++++++++++++++++++++----- tests/utils.py | 6 ++- 4 files changed, 72 insertions(+), 19 deletions(-) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..1944d2db --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM docker.io/bitnami/postgresql:12 + +# Install wal2json +USER root +RUN install_packages curl ca-certificates gnupg && \ + curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \ + echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ + install_packages postgresql-12-wal2json && \ + cp /usr/lib/postgresql/12/lib/wal2json.so /opt/bitnami/postgresql/lib/wal2json.so +USER 1001 diff --git a/docker-compose.yml b/docker-compose.yml index c416b85b..9503fe51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,18 +2,17 @@ version: "3.3" services: db_primary: - image: "docker.io/bitnami/postgresql:12" + build: . container_name: "primary" ports: - "5432:5432" environment: - POSTGRESQL_REPLICATION_MODE=master - - POSTGRESQL_REPLICATION_USER=repl_user - - POSTGRESQL_REPLICATION_PASSWORD=repl_password - - POSTGRES_USER=test_user - - POSTGRES_PASSWORD=my-secret-passwd + - POSTGRESQL_REPLICATION_USER=test_user + - POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd - POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd - POSTGRESQL_DATABASE=tap_postgres_test + - POSTGRESQL_WAL_LEVEL=logical - ALLOW_EMPTY_PASSWORD=yes db_replica: image: "docker.io/bitnami/postgresql:12" @@ -24,8 +23,8 @@ services: - db_primary environment: - POSTGRESQL_REPLICATION_MODE=slave - - POSTGRESQL_REPLICATION_USER=repl_user - - POSTGRESQL_REPLICATION_PASSWORD=repl_password + - POSTGRESQL_REPLICATION_USER=test_user + - POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd - POSTGRESQL_MASTER_HOST=db_primary - POSTGRESQL_MASTER_PORT_NUMBER=5432 - ALLOW_EMPTY_PASSWORD=yes diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 06ba33a4..fce78146 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -2,6 +2,7 @@ import unittest.mock import pytest import tap_postgres +from tap_postgres.sync_strategies import logical_replication import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common import singer @@ -48,11 +49,29 @@ def do_not_dump_catalog(catalog): full_table.UPDATE_BOOKMARK_PERIOD = 1 @pytest.mark.parametrize('use_secondary', [False, True]) -@unittest.mock.patch('tap_postgres.sync_logical_streams') +@unittest.mock.patch('tap_postgres.sync_logical_streams', wraps=tap_postgres.sync_logical_streams) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestLogicalInterruption: maxDiff = None + def setup_class(self): + conn_config = get_test_connection_config() + slot_name = logical_replication.generate_replication_slot_name( + dbname=conn_config['dbname'], tap_id=conn_config['tap_id'] + ) + with get_test_connection(superuser=True) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')") + + def teardown_class(self): + conn_config = get_test_connection_config() + slot_name = logical_replication.generate_replication_slot_name( + dbname=conn_config['dbname'], tap_id=conn_config['tap_id'] + ) + with get_test_connection(superuser=True) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM pg_drop_replication_slot('{slot_name}')") + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, @@ -75,7 +94,16 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database - expected_connection = { + primary_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['host'], + 'port': conn_config['port'], + } + secondary_connection = { 'application_name': unittest.mock.ANY, 'dbname': unittest.mock.ANY, 'user': unittest.mock.ANY, @@ -84,8 +112,8 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], } - mock_connect.assert_called_once_with(**expected_connection) - mock_connect.reset_mock() + + mock_connect.assert_called_once_with(**secondary_connection) cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] assert cow_stream is not None @@ -114,6 +142,7 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): state = {} #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record + mock_connect.reset_mock() try: tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception: @@ -121,11 +150,13 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): assert blew_up_on_cow is True - mock_connect.assert_called_with(**expected_connection) - mock_connect.reset_mock() - mock_sync_logical_streams.assert_not_called() + mock_connect.assert_has_calls( + [unittest.mock.call(**primary_connection)] * 2 + \ + [unittest.mock.call(**secondary_connection)] * 4 + ) + assert 7 == len(CAUGHT_MESSAGES) assert CAUGHT_MESSAGES[0]['type'] =='SCHEMA' @@ -173,15 +204,17 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) - - mock_connect.assert_called_with(**expected_connection) mock_connect.reset_mock() + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) mock_sync_logical_streams.assert_called_once() + mock_connect.assert_has_calls( + [unittest.mock.call(**primary_connection)] * 2 + \ + [unittest.mock.call(**secondary_connection)] * 4 + ) - assert 8 == len(CAUGHT_MESSAGES) + assert 10 == len(CAUGHT_MESSAGES) assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' @@ -230,6 +263,13 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version + assert CAUGHT_MESSAGES[8]['type'] == 'SCHEMA' + + assert isinstance(CAUGHT_MESSAGES[9], singer.messages.StateMessage) + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('version') == new_version + @pytest.mark.parametrize('use_secondary', [False, True]) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestFullTableInterruption: diff --git a/tests/utils.py b/tests/utils.py index ee678b85..6bfd064c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -18,7 +18,11 @@ def get_test_connection_config(target_db='postgres', use_secondary=False): if len(missing_envs) != 0: raise Exception("set TAP_POSTGRES_HOST, TAP_POSTGRES_USER, TAP_POSTGRES_PASSWORD, TAP_POSTGRES_PORT") - conn_config = {'host': os.environ.get('TAP_POSTGRES_HOST'), + conn_config = {'tap_id': 'test-postgres', + 'max_run_seconds': 5, + 'break_at_end_lsn': True, + 'logical_poll_total_seconds': 2, + 'host': os.environ.get('TAP_POSTGRES_HOST'), 'user': os.environ.get('TAP_POSTGRES_USER'), 'password': os.environ.get('TAP_POSTGRES_PASSWORD'), 'port': os.environ.get('TAP_POSTGRES_PORT'), From d542f96b42b22b820c4991c109cccb296c2abfd1 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 24 Jan 2022 14:04:02 +0000 Subject: [PATCH 08/20] Add ability to use `wal2json` `format-version` 2 This option removes the need to use `write-in-chunks`. --- README.md | 1 + tap_postgres/__init__.py | 1 + .../sync_strategies/logical_replication.py | 158 ++++++++++++++---- tests/test_full_table_interruption.py | 11 +- tests/test_logical_replication.py | 124 +++++++++++++- tests/utils.py | 5 +- 6 files changed, 258 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 9a787307..d523cab7 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ Full list of options in `config.json`: | use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | | secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | | secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | +| wal2json_message_format | Integer | No | Which `wal2json` message format to use (Default: 1) | ### Run the tap in Discovery Mode diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 2cafea11..24948860 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -411,6 +411,7 @@ def main_impl(): 'break_at_end_lsn': args.config.get('break_at_end_lsn', True), 'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)), 'use_secondary': args.config.get('use_secondary', False), + 'wal2json_message_format': args.config.get('wal2json_message_format', 1) } if conn_config['use_secondary']: diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index eb690956..2ec5d6e5 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -377,18 +377,24 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map, time_extracted=time_extracted) -# pylint: disable=unused-argument,too-many-locals -def consume_message(streams, state, msg, time_extracted, conn_info): - # Strip leading comma generated by write-in-chunks and parse valid JSON - try: - payload = json.loads(msg.payload.lstrip(',')) - except Exception: - return state +def check_for_new_columns(columns, target_stream, conn_info): + diff = set(columns).difference(target_stream['schema']['properties'].keys()) - lsn = msg.data_start + if diff: + LOGGER.info('Detected new columns "%s", refreshing schema of stream %s', diff, target_stream['stream']) + # encountered a column that is not in the schema + # refresh the stream schema and metadata by running discovery + refresh_streams_schema(conn_info, [target_stream]) - streams_lookup = {s['tap_stream_id']: s for s in streams} + # add the automatic properties back to the stream + add_automatic_properties(target_stream, conn_info.get('debug_lsn', False)) + # publish new schema + sync_common.send_schema_message(target_stream, ['lsn']) + + +# pylint: disable=too-many-locals +def consume_message_format_1(payload, conn_info, streams_lookup, state, time_extracted, lsn): tap_stream_id = post_db.compute_tap_stream_id(payload['schema'], payload['table']) if streams_lookup.get(tap_stream_id) is None: return state @@ -400,22 +406,8 @@ def consume_message(streams, state, msg, time_extracted, conn_info): # Get the additional fields in payload that are not in schema properties: # only inserts and updates have the list of columns that can be used to detect any different in columns - diff = set() if payload['kind'] in {'insert', 'update'}: - diff = set(payload['columnnames']).difference(target_stream['schema']['properties'].keys()) - - # if there is new columns in the payload that are not in the schema properties then refresh the stream schema - if diff: - LOGGER.info('Detected new columns "%s", refreshing schema of stream %s', diff, target_stream['stream']) - # encountered a column that is not in the schema - # refresh the stream schema and metadata by running discovery - refresh_streams_schema(conn_info, [target_stream]) - - # add the automatic properties back to the stream - add_automatic_properties(target_stream, conn_info.get('debug_lsn', False)) - - # publish new schema - sync_common.send_schema_message(target_stream, ['lsn']) + check_for_new_columns(payload['columnnames'], target_stream, conn_info) stream_version = get_stream_version(target_stream['tap_stream_id'], state) stream_md_map = metadata.to_map(target_stream['metadata']) @@ -476,6 +468,109 @@ def consume_message(streams, state, msg, time_extracted, conn_info): return state +def consume_message_format_2(payload, conn_info, streams_lookup, state, time_extracted, lsn): + ## Action Types: + # I = Insert + # U = Update + # D = Delete + # B = Begin Transaction + # C = Commit Transaction + # M = Message + # T = Truncate + action = payload['action'] + if action not in {'U', 'I', 'D'}: + raise UnsupportedPayloadKindError(f"unrecognized replication operation: {action}") + + tap_stream_id = post_db.compute_tap_stream_id(payload['schema'], payload['table']) + if streams_lookup.get(tap_stream_id) is not None: + target_stream = streams_lookup[tap_stream_id] + + # Get the additional fields in payload that are not in schema properties: + # only inserts and updates have the list of columns that can be used to detect any different in columns + if payload['action'] in {'I', 'U'}: + check_for_new_columns({column['name'] for column in payload['columns']}, target_stream, conn_info) + + stream_version = get_stream_version(target_stream['tap_stream_id'], state) + stream_md_map = metadata.to_map(target_stream['metadata']) + + desired_columns = {c for c in target_stream['schema']['properties'].keys() if sync_common.should_sync_column( + stream_md_map, c)} + + stream_version = get_stream_version(target_stream['tap_stream_id'], state) + stream_md_map = metadata.to_map(target_stream['metadata']) + + desired_columns = [ + col for col in target_stream['schema']['properties'].keys() + if sync_common.should_sync_column(stream_md_map, col) + ] + + col_names = [] + col_vals = [] + if payload['action'] in ['I', 'U']: + for column in payload['columns']: + if column['name'] in set(desired_columns): + col_names.append(column['name']) + col_vals.append(column['value']) + + col_names = col_names + ['_sdc_deleted_at'] + col_vals = col_vals + [None] + + if conn_info.get('debug_lsn'): + col_names = col_names + ['_sdc_lsn'] + col_vals = col_vals + [str(lsn)] + + elif payload['action'] == 'D': + for column in payload['identity']: + if column['name'] in set(desired_columns): + col_names.append(column['name']) + col_vals.append(column['value']) + + col_names = col_names + ['_sdc_deleted_at'] + col_vals = col_vals + [singer.utils.strftime(singer.utils.strptime_to_utc(payload['timestamp']))] + + if conn_info.get('debug_lsn'): + col_vals = col_vals + [str(lsn)] + col_names = col_names + ['_sdc_lsn'] + + # Write 1 record to match the API of V1 + record_message = row_to_singer_message( + target_stream, + col_vals, + stream_version, + col_names, + time_extracted, + stream_md_map, + conn_info, + ) + + singer.write_message(record_message) + state = singer.write_bookmark(state, target_stream['tap_stream_id'], 'lsn', lsn) + + return state + + +def consume_message(streams, state, msg, time_extracted, conn_info): + # Strip leading comma generated by write-in-chunks and parse valid JSON + try: + payload = json.loads(msg.payload.lstrip(',')) + except Exception: + return state + + lsn = msg.data_start + + streams_lookup = {s['tap_stream_id']: s for s in streams} + + message_format = conn_info['wal2json_message_format'] + if message_format == 1: + state = consume_message_format_1(payload, conn_info, streams_lookup, state, time_extracted, lsn) + elif message_format == 2: + state = consume_message_format_2(payload, conn_info, streams_lookup, state, time_extracted, lsn) + else: + raise Exception(f"Unknown wal2json message format version: {message_format}") + + return state + + def generate_replication_slot_name(dbname, tap_id=None, prefix='pipelinewise'): """Generate replication slot name with @@ -591,14 +686,21 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(end_lsn), slot) # psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval + options = { + 'add-tables': streams_to_wal2json_tables(logical_streams), + 'format-version': conn_info['wal2json_message_format'], + 'actions': 'insert,update,delete', + 'include-transaction': False, + 'include-timestamp': True, + 'include-types': False, + } + if options['format-version'] == 1: + options['write-in-chunks'] = 1 cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, status_interval=poll_interval, - options={ - 'write-in-chunks': 1, - 'add-tables': streams_to_wal2json_tables(logical_streams) - }) + options=options) except psycopg2.ProgrammingError as ex: raise Exception(f"Unable to start replication with logical replication (slot {ex})") from ex diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index fce78146..fffead69 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -48,7 +48,7 @@ def do_not_dump_catalog(catalog): tap_postgres.dump_catalog = do_not_dump_catalog full_table.UPDATE_BOOKMARK_PERIOD = 1 -@pytest.mark.parametrize('use_secondary', [False, True]) +@pytest.mark.parametrize('use_secondary,message_format', [(False, 1), (True, 2)]) @unittest.mock.patch('tap_postgres.sync_logical_streams', wraps=tap_postgres.sync_logical_streams) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestLogicalInterruption: @@ -86,11 +86,11 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): + def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary, message_format): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok - conn_config = get_test_connection_config(use_secondary=use_secondary) + conn_config = get_test_connection_config(use_secondary=use_secondary, message_format=message_format) streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database @@ -144,7 +144,7 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): #it will sync the first record and then blow up on the 2nd record mock_connect.reset_mock() try: - tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) + tap_postgres.do_sync(conn_config, {'streams' : streams}, None, state) except Exception: blew_up_on_cow = True @@ -204,8 +204,7 @@ def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() - mock_connect.reset_mock() - tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) + tap_postgres.do_sync(conn_config, {'streams' : streams}, None, old_state) mock_sync_logical_streams.assert_called_once() diff --git a/tests/test_logical_replication.py b/tests/test_logical_replication.py index ae0f5832..21d9e804 100644 --- a/tests/test_logical_replication.py +++ b/tests/test_logical_replication.py @@ -137,7 +137,7 @@ def test_consume_with_message_payload_is_not_json_expect_same_state(self): self.WalMessage(payload='this is an invalid json message', data_start=None), None, - {} + {'wal2json_message_format': 1}, ) self.assertDictEqual({}, output) @@ -148,7 +148,19 @@ def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_stat self.WalMessage(payload='{"schema": "myschema", "table": "notmytable"}', data_start='some lsn'), None, - {} + {'wal2json_message_format': 1}, + ) + + self.assertDictEqual({}, output) + + def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_state_format_2(self): + output = logical_replication.consume_message( + [{'tap_stream_id': 'myschema-mytable'}], + {}, + self.WalMessage(payload='{"action": "U", "schema": "myschema", "table": "notmytable"}', + data_start='some lsn'), + None, + {'wal2json_message_format': 2}, ) self.assertDictEqual({}, output) @@ -161,7 +173,18 @@ def test_consume_with_payload_kind_is_not_supported_expect_exception(self): self.WalMessage(payload='{"kind":"truncate", "schema": "myschema", "table": "mytable"}', data_start='some lsn'), None, - {} + {'wal2json_message_format': 1}, + ) + + def test_consume_with_payload_kind_is_not_supported_expect_exception_format_2(self): + with self.assertRaises(UnsupportedPayloadKindError): + logical_replication.consume_message( + [{'tap_stream_id': 'myschema-mytable'}], + {}, + self.WalMessage(payload='{"action":"T", "schema": "myschema", "table": "mytable"}', + data_start='some lsn'), + None, + {'wal2json_message_format': 2}, ) @patch('tap_postgres.logical_replication.singer.write_message') @@ -231,11 +254,100 @@ def test_consume_message_with_new_column_in_payload_will_refresh_schema(self, '"schema": "myschema", ' '"table": "mytable",' '"columnnames": ["id", "date_created", "new_col"],' - '"columnnames": [1, null, "some random text"]' + '"columnvalues": [1, null, "some random text"]' + '}', + data_start='some lsn'), + None, + {'wal2json_message_format': 1}, + ) + + self.assertDictEqual(return_v, + { + 'bookmarks': { + "myschema-mytable": { + "last_replication_method": "LOG_BASED", + "lsn": "some lsn", + "version": 1000, + "xmin": None + } + } + }) + + refresh_schema_mock.assert_called_once_with({'wal2json_message_format': 1}, [streams[0]]) + send_schema_mock.assert_called_once() + write_message_mock.assert_called_once() + + @patch('tap_postgres.logical_replication.singer.write_message') + @patch('tap_postgres.logical_replication.sync_common.send_schema_message') + @patch('tap_postgres.logical_replication.refresh_streams_schema') + def test_consume_message_with_new_column_in_payload_will_refresh_schema_format_2(self, + refresh_schema_mock, + send_schema_mock, + write_message_mock): + streams = [ + { + 'tap_stream_id': 'myschema-mytable', + 'stream': 'mytable', + 'schema': { + 'properties': { + 'id': {}, + 'date_created': {} + } + }, + 'metadata': [ + { + 'breadcrumb': [], + 'metadata': { + 'is-view': False, + 'table-key-properties': ['id'], + 'schema-name': 'myschema' + } + }, + { + "breadcrumb": [ + "properties", + "id" + ], + "metadata": { + "sql-datatype": "integer", + "inclusion": "automatic", + } + }, + { + "breadcrumb": [ + "properties", + "date_created" + ], + "metadata": { + "sql-datatype": "datetime", + "inclusion": "available", + "selected": True + } + } + ], + } + ] + + return_v = logical_replication.consume_message( + streams, + { + 'bookmarks': { + "myschema-mytable": { + "last_replication_method": "LOG_BASED", + "lsn": None, + "version": 1000, + "xmin": None + } + } + }, + self.WalMessage(payload='{"action": "I", ' + '"schema": "myschema", ' + '"table": "mytable",' + '"columns": [{"name": "id", "value": 1}, {"name": "date_created", "value": null}, {"name": "new_col", "value": "some random text"}]' '}', data_start='some lsn'), None, - {} + {'wal2json_message_format': 2}, ) self.assertDictEqual(return_v, @@ -250,7 +362,7 @@ def test_consume_message_with_new_column_in_payload_will_refresh_schema(self, } }) - refresh_schema_mock.assert_called_once_with({}, [streams[0]]) + refresh_schema_mock.assert_called_once_with({'wal2json_message_format': 2}, [streams[0]]) send_schema_mock.assert_called_once() write_message_mock.assert_called_once() diff --git a/tests/utils.py b/tests/utils.py index 9b3c4fbb..467dbf36 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ LOGGER = get_logger() -def get_test_connection_config(target_db='postgres', use_secondary=False): +def get_test_connection_config(target_db='postgres', use_secondary=False, message_format=1): try: conn_config = {'tap_id': 'test-postgres', 'max_run_seconds': 5, @@ -21,7 +21,8 @@ def get_test_connection_config(target_db='postgres', use_secondary=False): 'password': os.environ['TAP_POSTGRES_PASSWORD'], 'port': os.environ['TAP_POSTGRES_PORT'], 'dbname': target_db, - 'use_secondary': use_secondary,} + 'use_secondary': use_secondary, + 'wal2json_message_format': message_format} except KeyError as exc: raise Exception( "set TAP_POSTGRES_HOST, TAP_POSTGRES_USER, TAP_POSTGRES_PASSWORD, TAP_POSTGRES_PORT" From 42c558b3ba40948fa4339c6f69b2e4070baf9796 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 24 Jan 2022 16:36:02 +0000 Subject: [PATCH 09/20] Use fixed version of `wal2json` --- Dockerfile | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1944d2db..95fbeb42 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,17 @@ FROM docker.io/bitnami/postgresql:12 -# Install wal2json USER root -RUN install_packages curl ca-certificates gnupg && \ + +# Git SHA of v2.4 +ENV WAL2JSON_COMMIT_ID=36fbee6cbb7e4bc1bf9ee4f55842ab51393e3ac0 + +# Compile the plugins from sources and install +RUN install_packages ca-certificates curl gcc git gnupg make pkgconf && \ curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \ echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ - install_packages postgresql-12-wal2json && \ - cp /usr/lib/postgresql/12/lib/wal2json.so /opt/bitnami/postgresql/lib/wal2json.so + install_packages postgresql-server-dev-12 && \ + git clone https://github.com/eulerto/wal2json -b master --single-branch && \ + (cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) && \ + rm -rf wal2json + USER 1001 From ee9bedb4711bb27c9660892f56c8fb27f3770d2a Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 25 Jan 2022 17:43:49 +0000 Subject: [PATCH 10/20] Only set required `wal2json` options --- .../sync_strategies/logical_replication.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 2ec5d6e5..cc7fb3f2 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -688,14 +688,19 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): # psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval options = { 'add-tables': streams_to_wal2json_tables(logical_streams), - 'format-version': conn_info['wal2json_message_format'], - 'actions': 'insert,update,delete', - 'include-transaction': False, 'include-timestamp': True, 'include-types': False, } - if options['format-version'] == 1: - options['write-in-chunks'] = 1 + if conn_info['wal2json_message_format'] == 1: + options.update({'write-in-chunks': 1}) + else: + options.update( + { + 'format-version': conn_info['wal2json_message_format'], + 'include-transaction': False, + 'actions': 'insert,update,delete', + } + ) cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, From 790524eea8f60148391359e8c5b2fd4406ffd281 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 4 Mar 2022 23:13:44 +0000 Subject: [PATCH 11/20] Use a more useful value for `extracted_at` --- .../sync_strategies/logical_replication.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index cc7fb3f2..2131b124 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -394,7 +394,7 @@ def check_for_new_columns(columns, target_stream, conn_info): # pylint: disable=too-many-locals -def consume_message_format_1(payload, conn_info, streams_lookup, state, time_extracted, lsn): +def consume_message_format_1(payload, conn_info, streams_lookup, state, lsn): tap_stream_id = post_db.compute_tap_stream_id(payload['schema'], payload['table']) if streams_lookup.get(tap_stream_id) is None: return state @@ -409,6 +409,8 @@ def consume_message_format_1(payload, conn_info, streams_lookup, state, time_ext if payload['kind'] in {'insert', 'update'}: check_for_new_columns(payload['columnnames'], target_stream, conn_info) + time_extracted = utils.now() + stream_version = get_stream_version(target_stream['tap_stream_id'], state) stream_md_map = metadata.to_map(target_stream['metadata']) @@ -468,7 +470,7 @@ def consume_message_format_1(payload, conn_info, streams_lookup, state, time_ext return state -def consume_message_format_2(payload, conn_info, streams_lookup, state, time_extracted, lsn): +def consume_message_format_2(payload, conn_info, streams_lookup, state, lsn): ## Action Types: # I = Insert # U = Update @@ -481,6 +483,8 @@ def consume_message_format_2(payload, conn_info, streams_lookup, state, time_ext if action not in {'U', 'I', 'D'}: raise UnsupportedPayloadKindError(f"unrecognized replication operation: {action}") + time_extracted = singer.utils.strptime_to_utc(payload['timestamp']) + tap_stream_id = post_db.compute_tap_stream_id(payload['schema'], payload['table']) if streams_lookup.get(tap_stream_id) is not None: target_stream = streams_lookup[tap_stream_id] @@ -526,7 +530,7 @@ def consume_message_format_2(payload, conn_info, streams_lookup, state, time_ext col_vals.append(column['value']) col_names = col_names + ['_sdc_deleted_at'] - col_vals = col_vals + [singer.utils.strftime(singer.utils.strptime_to_utc(payload['timestamp']))] + col_vals = col_vals + [singer.utils.strftime(time_extracted)] if conn_info.get('debug_lsn'): col_vals = col_vals + [str(lsn)] @@ -549,7 +553,7 @@ def consume_message_format_2(payload, conn_info, streams_lookup, state, time_ext return state -def consume_message(streams, state, msg, time_extracted, conn_info): +def consume_message(streams, state, msg, conn_info): # Strip leading comma generated by write-in-chunks and parse valid JSON try: payload = json.loads(msg.payload.lstrip(',')) @@ -562,9 +566,9 @@ def consume_message(streams, state, msg, time_extracted, conn_info): message_format = conn_info['wal2json_message_format'] if message_format == 1: - state = consume_message_format_1(payload, conn_info, streams_lookup, state, time_extracted, lsn) + state = consume_message_format_1(payload, conn_info, streams_lookup, state, lsn) elif message_format == 2: - state = consume_message_format_2(payload, conn_info, streams_lookup, state, time_extracted, lsn) + state = consume_message_format_2(payload, conn_info, streams_lookup, state, lsn) else: raise Exception(f"Unknown wal2json message format version: {message_format}") @@ -652,7 +656,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) start_lsn = lsn_comitted lsn_to_flush = None - time_extracted = utils.now() slot = locate_replication_slot(conn_info) lsn_last_processed = None lsn_currently_processing = None @@ -739,7 +742,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(end_lsn)) break - state = consume_message(logical_streams, state, msg, time_extracted, conn_info) + state = consume_message(logical_streams, state, msg, conn_info) # When using wal2json with write-in-chunks, multiple messages can have the same lsn # This is to ensure we only flush to lsn that has completed entirely From 6771da14727fd532c50dd0a1a51db1490b023b7d Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 4 Mar 2022 23:32:18 +0000 Subject: [PATCH 12/20] Update tests --- tests/test_logical_replication.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/test_logical_replication.py b/tests/test_logical_replication.py index 21d9e804..45b380e2 100644 --- a/tests/test_logical_replication.py +++ b/tests/test_logical_replication.py @@ -136,7 +136,6 @@ def test_consume_with_message_payload_is_not_json_expect_same_state(self): {}, self.WalMessage(payload='this is an invalid json message', data_start=None), - None, {'wal2json_message_format': 1}, ) self.assertDictEqual({}, output) @@ -147,7 +146,6 @@ def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_stat {}, self.WalMessage(payload='{"schema": "myschema", "table": "notmytable"}', data_start='some lsn'), - None, {'wal2json_message_format': 1}, ) @@ -157,9 +155,8 @@ def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_stat output = logical_replication.consume_message( [{'tap_stream_id': 'myschema-mytable'}], {}, - self.WalMessage(payload='{"action": "U", "schema": "myschema", "table": "notmytable"}', + self.WalMessage(payload='{"action": "U", "schema": "myschema", "table": "notmytable", "timestamp": "2022-03-04T19:41:29+0000"}', data_start='some lsn'), - None, {'wal2json_message_format': 2}, ) @@ -172,7 +169,6 @@ def test_consume_with_payload_kind_is_not_supported_expect_exception(self): {}, self.WalMessage(payload='{"kind":"truncate", "schema": "myschema", "table": "mytable"}', data_start='some lsn'), - None, {'wal2json_message_format': 1}, ) @@ -181,9 +177,8 @@ def test_consume_with_payload_kind_is_not_supported_expect_exception_format_2(se logical_replication.consume_message( [{'tap_stream_id': 'myschema-mytable'}], {}, - self.WalMessage(payload='{"action":"T", "schema": "myschema", "table": "mytable"}', + self.WalMessage(payload='{"action":"T", "schema": "myschema", "table": "mytable", "timestamp": "2022-03-04T19:41:29+0000"}', data_start='some lsn'), - None, {'wal2json_message_format': 2}, ) @@ -257,7 +252,6 @@ def test_consume_message_with_new_column_in_payload_will_refresh_schema(self, '"columnvalues": [1, null, "some random text"]' '}', data_start='some lsn'), - None, {'wal2json_message_format': 1}, ) @@ -343,10 +337,10 @@ def test_consume_message_with_new_column_in_payload_will_refresh_schema_format_2 self.WalMessage(payload='{"action": "I", ' '"schema": "myschema", ' '"table": "mytable",' - '"columns": [{"name": "id", "value": 1}, {"name": "date_created", "value": null}, {"name": "new_col", "value": "some random text"}]' + '"columns": [{"name": "id", "value": 1}, {"name": "date_created", "value": null}, {"name": "new_col", "value": "some random text"}],' + '"timestamp": "2022-03-04T19:41:29+0000"' '}', data_start='some lsn'), - None, {'wal2json_message_format': 2}, ) From 8438fafcca4fef1d0377570700c1cd115d220f64 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Sat, 5 Mar 2022 00:13:38 +0000 Subject: [PATCH 13/20] Fix tests --- tests/test_logical_replication.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/test_logical_replication.py b/tests/test_logical_replication.py index c081fd6a..e6d3736b 100644 --- a/tests/test_logical_replication.py +++ b/tests/test_logical_replication.py @@ -678,7 +678,8 @@ def setUp(self) -> None: 'tap_id': 'tap_id_value', 'max_run_seconds': 10, 'break_at_end_lsn': False, - 'logical_poll_total_seconds': 1} + 'logical_poll_total_seconds': 1, + 'wal2json_message_format': 1} self.logical_streams = [{ 'tap_stream_id': 'foo-bar', @@ -1108,7 +1109,7 @@ def update_payload(cls, kind): self.conn_info['debug_lsn'] = True for kind in ('insert', 'update'): msg.update_payload(kind) - actual_output = logical_replication.consume_message(streams, state, msg, time_extracted, self.conn_info) + actual_output = logical_replication.consume_message(streams, state, msg, self.conn_info) self.assertDictEqual(expected_output, actual_output) @patch('tap_postgres.sync_strategies.logical_replication.refresh_streams_schema') @@ -1134,11 +1135,10 @@ class msg: }] state = {'bookmarks': {'foo-bar': {'foo': 'bar', 'version': 'foo'}}} - time_extracted = datetime(2020, 9, 1, 23, 10, 59, tzinfo=tzoffset(None, 0)) self.conn_info['debug_lsn'] = True expected_message = f'Unable to find sql-datatype for stream {streams[0]}' with self.assertRaises(Exception) as exp: - logical_replication.consume_message(streams, state, msg, time_extracted, self.conn_info) + logical_replication.consume_message(streams, state, msg, self.conn_info) self.assertEqual(expected_message, str(exp.exception)) @@ -1201,7 +1201,12 @@ def test_sync_tables_if_poll_duration_greater_than_logical_poll_total_seconds(se decode=True, start_lsn=state['bookmarks']['foo-bar']['lsn'], status_interval=10, - options={'write-in-chunks': 1, 'add-tables': 'schema_name_value.table_name_value'} + options={ + 'write-in-chunks': 1, + 'add-tables': 'schema_name_value.table_name_value', + 'include-timestamp': True, + 'include-types': False, + }, ) @patch('tap_postgres.sync_strategies.logical_replication.datetime.datetime') @@ -1236,7 +1241,12 @@ def test_sync_tables_if_reached_max_run_seconds(self, decode=True, start_lsn=state['bookmarks']['foo-bar']['lsn'], status_interval=10, - options={'write-in-chunks': 1, 'add-tables': 'schema_name_value.table_name_value'} + options={ + 'write-in-chunks': 1, + 'add-tables': 'schema_name_value.table_name_value', + 'include-timestamp': True, + 'include-types': False, + }, ) @patch('tap_postgres.sync_strategies.logical_replication.locate_replication_slot') From 699d21b814e10646ea7d088a1ab2c44017ec9733 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 25 Mar 2022 13:08:32 +0000 Subject: [PATCH 14/20] Add Postgres range types to schema --- tap_postgres/discovery_utils.py | 4 ++++ tests/test_discovery.py | 37 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/tap_postgres/discovery_utils.py b/tap_postgres/discovery_utils.py index cd9a00c4..8239334e 100644 --- a/tap_postgres/discovery_utils.py +++ b/tap_postgres/discovery_utils.py @@ -23,6 +23,7 @@ INTEGER_TYPES = {'integer', 'smallint', 'bigint'} FLOAT_TYPES = {'real', 'double precision'} JSON_TYPES = {'json', 'jsonb'} +RANGE_TYPES = {'int4range', 'int8range', 'numrange', 'tsrange', 'tstzrange', 'daterange'} BASE_RECURSIVE_SCHEMAS = { 'sdc_recursive_integer_array': {'type': ['null', 'integer', 'array'], 'items': {'$ref': '#/definitions/sdc_recursive_integer_array'}}, @@ -280,6 +281,9 @@ def schema_for_column_datatype(col): schema['type'] = nullable_column('string', col.is_primary_key) return schema + if data_type in RANGE_TYPES: + schema['type'] = nullable_column('string', col.is_primary_key) + return schema diff --git a/tests/test_discovery.py b/tests/test_discovery.py index bb9eb35a..4542fbe5 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -452,6 +452,43 @@ def test_catalog(self): 'definitions' : BASE_RECURSIVE_SCHEMAS}, stream_dict.get('schema')) + +class TestRangeTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'id', "type" : "integer", "primary_key" : True }, + {"name" : 'date_range', "type" : "daterange" }], + "name" : TestHStoreTable.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("""INSERT INTO "CHICKEN TIMES" (our_int_array_pk, our_string_array) VALUES ('{{1,2,3},{4,5,6}}', '{{"a","b","c"}}' )""") + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['id'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'id') : {'inclusion': 'automatic', 'sql-datatype' : 'integer', 'selected-by-default' : True}, + ('properties', 'date_range') : {'inclusion': 'available', 'sql-datatype' : 'daterange', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'id': {'type': ['null', 'integer']}, + 'date_range': {'type': ['null', 'string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + class TestArraysTable(unittest.TestCase): maxDiff = None table_name = 'CHICKEN TIMES' From 2216505248a7dd7c704c0dfafa448785a31eec69 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 25 Mar 2022 13:29:04 +0000 Subject: [PATCH 15/20] Fix test --- tests/test_discovery.py | 4 ++-- tests/test_unsupported_pk.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 4542fbe5..4b95e091 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -473,7 +473,7 @@ def test_catalog(self): with get_test_connection() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute("""INSERT INTO "CHICKEN TIMES" (our_int_array_pk, our_string_array) VALUES ('{{1,2,3},{4,5,6}}', '{{"a","b","c"}}' )""") + cur.execute("""INSERT INTO "CHICKEN TIMES" (id, date_range) VALUES ('1', '[2010-01-01, 2010-01-10)' )""") cur.execute("""SELECT * FROM "CHICKEN TIMES" """) self.assertEqual(metadata.to_map(stream_dict.get('metadata')), @@ -482,7 +482,7 @@ def test_catalog(self): ('properties', 'date_range') : {'inclusion': 'available', 'sql-datatype' : 'daterange', 'selected-by-default' : True}}) - self.assertEqual({'properties': {'id': {'type': ['null', 'integer']}, + self.assertEqual({'properties': {'id': {'maximum': 2147483647, 'minimum': -2147483648, 'type': ['integer']}, 'date_range': {'type': ['null', 'string']}}, 'type': 'object', 'definitions' : BASE_RECURSIVE_SCHEMAS}, diff --git a/tests/test_unsupported_pk.py b/tests/test_unsupported_pk.py index 0ab72832..35df69b5 100644 --- a/tests/test_unsupported_pk.py +++ b/tests/test_unsupported_pk.py @@ -28,7 +28,6 @@ def setUp(self): {"name": "circle_col", "type": "circle"}, {"name": "xml_col", "type": "xml"}, {"name": "composite_col", "type": "person_composite"}, - {"name": "int_range_col", "type": "int4range"}, ], "name": Unsupported.table_name} with get_test_connection() as conn: @@ -53,7 +52,6 @@ def test_catalog(self): ('properties', 'bit_string_col'): {'sql-datatype': 'bit(5)', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'line_col'): {'sql-datatype': 'line', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'xml_col'): {'sql-datatype': 'xml', 'selected-by-default': False, 'inclusion': 'unsupported'}, - ('properties', 'int_range_col'): {'sql-datatype': 'int4range', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'circle_col'): {'sql-datatype': 'circle', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'polygon_col'): {'sql-datatype': 'polygon', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'box_col'): {'sql-datatype': 'box', 'selected-by-default': False, 'inclusion': 'unsupported'}, From ee2afc9a7f01372ef0cbdfefebc7be4f81c1b656 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 4 Oct 2022 16:13:49 +0100 Subject: [PATCH 16/20] Update Dockerfile --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 95fbeb42..fb6433d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/bitnami/postgresql:12 +FROM docker.io/bitnami/postgresql:12-debian-11 USER root @@ -8,7 +8,7 @@ ENV WAL2JSON_COMMIT_ID=36fbee6cbb7e4bc1bf9ee4f55842ab51393e3ac0 # Compile the plugins from sources and install RUN install_packages ca-certificates curl gcc git gnupg make pkgconf && \ curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \ - echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ + echo "deb http://apt.postgresql.org/pub/repos/apt bullseye-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ install_packages postgresql-server-dev-12 && \ git clone https://github.com/eulerto/wal2json -b master --single-branch && \ (cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) && \ From 620b3a50b2c07fe75ba52e32e193458f27765e06 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 31 Oct 2022 12:34:20 +0000 Subject: [PATCH 17/20] Add missing argument --- tap_postgres/sync_strategies/logical_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index f0a2136b..72d64f78 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -668,7 +668,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(end_lsn)) break - state = consume_message(logical_streams, state, msg) + state = consume_message(logical_streams, state, msg, conn_info) # When using wal2json with write-in-chunks, multiple messages can have the same lsn # This is to ensure we only flush to lsn that has completed entirely From 15bb16fe61d7fd695d0a3ea49dfa6c35dea3c4dd Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 31 Oct 2022 12:37:55 +0000 Subject: [PATCH 18/20] Remove redundant code --- .../sync_strategies/logical_replication.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 72d64f78..8000ae3d 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -607,22 +607,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot) - # psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval - options = { - 'add-tables': streams_to_wal2json_tables(logical_streams), - 'include-timestamp': True, - 'include-types': False, - } - if conn_info['wal2json_message_format'] == 1: - options.update({'write-in-chunks': 1}) - else: - options.update( - { - 'format-version': conn_info['wal2json_message_format'], - 'include-transaction': False, - 'actions': 'insert,update,delete', - } - ) cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, From 2a7082e74656d8414af09cf22c22fdc5dbb5fc4e Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 31 Oct 2022 12:38:38 +0000 Subject: [PATCH 19/20] Remove unused import --- tap_postgres/sync_strategies/logical_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 8000ae3d..d38a85a0 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -10,7 +10,7 @@ from select import select from psycopg2 import sql -from singer import metadata, utils, get_bookmark +from singer import metadata, get_bookmark from dateutil.parser import parse, UnknownTimezoneWarning, ParserError from functools import reduce From c44d03bd82e808e22ff70f9af3162c7f5ee2b2ed Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 31 Oct 2022 12:39:25 +0000 Subject: [PATCH 20/20] Update README.md --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 6c97b9ce..13bdaf42 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,6 @@ Full list of options in `config.json`: | use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) | | secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) | | secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) | -| wal2json_message_format | Integer | No | Which `wal2json` message format to use (Default: 1) | ### Run the tap in Discovery Mode