diff --git a/Dockerfile b/Dockerfile index d6f68eb..fb6433d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,17 @@ -FROM docker.io/bitnami/postgresql:12 +FROM docker.io/bitnami/postgresql:12-debian-11 USER root -RUN apt-get update \ - && apt-get -y install git build-essential \ - && git clone --depth 1 --branch wal2json_2_3 https://github.com/eulerto/wal2json.git \ - && cd /wal2json \ - && make && make install \ - && cd / \ - && rm -rf wal2json \ - && rm -r /var/lib/apt/lists /var/cache/apt/archives +# Git SHA of v2.4 +ENV WAL2JSON_COMMIT_ID=36fbee6cbb7e4bc1bf9ee4f55842ab51393e3ac0 + +# Compile the plugins from sources and install +RUN install_packages ca-certificates curl gcc git gnupg make pkgconf && \ + curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \ + echo "deb http://apt.postgresql.org/pub/repos/apt bullseye-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ + install_packages postgresql-server-dev-12 && \ + git clone https://github.com/eulerto/wal2json -b master --single-branch && \ + (cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) && \ + rm -rf wal2json USER 1001 diff --git a/docker-compose.yml b/docker-compose.yml index 3174750..81fd644 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,20 +2,17 @@ version: "3.3" services: db_primary: + build: . container_name: "primary" - build: - context: . - dockerfile: Dockerfile ports: - "5432:5432" environment: - POSTGRESQL_REPLICATION_MODE=master - - POSTGRESQL_REPLICATION_USER=repl_user - - POSTGRESQL_REPLICATION_PASSWORD=repl_password - - POSTGRES_USER=test_user - - POSTGRES_PASSWORD=my-secret-passwd + - POSTGRESQL_REPLICATION_USER=test_user + - POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd - POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd - POSTGRESQL_DATABASE=tap_postgres_test + - POSTGRESQL_WAL_LEVEL=logical - ALLOW_EMPTY_PASSWORD=yes volumes: - ./db_setup/config:/bitnami/postgresql/conf @@ -29,8 +26,8 @@ services: - db_primary environment: - POSTGRESQL_REPLICATION_MODE=slave - - POSTGRESQL_REPLICATION_USER=repl_user - - POSTGRESQL_REPLICATION_PASSWORD=repl_password + - POSTGRESQL_REPLICATION_USER=test_user + - POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd - POSTGRESQL_MASTER_HOST=db_primary - POSTGRESQL_MASTER_PORT_NUMBER=5432 - ALLOW_EMPTY_PASSWORD=yes diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index bd52be3..d20c275 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication diff --git a/tap_postgres/discovery_utils.py b/tap_postgres/discovery_utils.py index cd9a00c..8239334 100644 --- a/tap_postgres/discovery_utils.py +++ b/tap_postgres/discovery_utils.py @@ -23,6 +23,7 @@ INTEGER_TYPES = {'integer', 'smallint', 'bigint'} FLOAT_TYPES = {'real', 'double precision'} JSON_TYPES = {'json', 'jsonb'} +RANGE_TYPES = {'int4range', 'int8range', 'numrange', 'tsrange', 'tstzrange', 'daterange'} BASE_RECURSIVE_SCHEMAS = { 'sdc_recursive_integer_array': {'type': ['null', 'integer', 'array'], 'items': {'$ref': '#/definitions/sdc_recursive_integer_array'}}, @@ -280,6 +281,9 @@ def schema_for_column_datatype(col): schema['type'] = nullable_column('string', col.is_primary_key) return schema + if data_type in RANGE_TYPES: + schema['type'] = nullable_column('string', col.is_primary_key) + return schema diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index a594b5a..d38a85a 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -10,7 +10,7 @@ from select import select from psycopg2 import sql -from singer import metadata, utils, get_bookmark +from singer import metadata, get_bookmark from dateutil.parser import parse, UnknownTimezoneWarning, ParserError from functools import reduce @@ -377,12 +377,14 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map, # pylint: disable=unused-argument,too-many-locals -def consume_message(streams, state, msg, time_extracted, conn_info): +def consume_message(streams, state, msg, conn_info): try: payload = json.loads(msg.payload) except Exception: return state + time_extracted = singer.utils.strptime_to_utc(payload['timestamp']) + lsn = msg.data_start streams_lookup = {s['tap_stream_id']: s for s in streams} @@ -575,7 +577,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) start_lsn = lsn_comitted lsn_to_flush = None - time_extracted = utils.now() slot = locate_replication_slot(conn_info) lsn_last_processed = None lsn_currently_processing = None @@ -606,7 +607,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot) - # psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, @@ -652,7 +652,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(end_lsn)) break - state = consume_message(logical_streams, state, msg, time_extracted, conn_info) + state = consume_message(logical_streams, state, msg, conn_info) # When using wal2json with write-in-chunks, multiple messages can have the same lsn # This is to ensure we only flush to lsn that has completed entirely diff --git a/tests/integration/test_unsupported_pk.py b/tests/integration/test_unsupported_pk.py index 0ab7283..35df69b 100644 --- a/tests/integration/test_unsupported_pk.py +++ b/tests/integration/test_unsupported_pk.py @@ -28,7 +28,6 @@ def setUp(self): {"name": "circle_col", "type": "circle"}, {"name": "xml_col", "type": "xml"}, {"name": "composite_col", "type": "person_composite"}, - {"name": "int_range_col", "type": "int4range"}, ], "name": Unsupported.table_name} with get_test_connection() as conn: @@ -53,7 +52,6 @@ def test_catalog(self): ('properties', 'bit_string_col'): {'sql-datatype': 'bit(5)', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'line_col'): {'sql-datatype': 'line', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'xml_col'): {'sql-datatype': 'xml', 'selected-by-default': False, 'inclusion': 'unsupported'}, - ('properties', 'int_range_col'): {'sql-datatype': 'int4range', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'circle_col'): {'sql-datatype': 'circle', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'polygon_col'): {'sql-datatype': 'polygon', 'selected-by-default': False, 'inclusion': 'unsupported'}, ('properties', 'box_col'): {'sql-datatype': 'box', 'selected-by-default': False, 'inclusion': 'unsupported'}, diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..4b95e09 --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,633 @@ +import unittest +import psycopg2 +import tap_postgres +from tap_postgres.discovery_utils import BASE_RECURSIVE_SCHEMAS + +import tap_postgres.db as post_db +from singer import get_logger, metadata +from psycopg2.extensions import quote_ident +try: + from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config +except ImportError: + from utils import get_test_connection, ensure_test_table, get_test_connection_config + +LOGGER = get_logger() + +def do_not_dump_catalog(catalog): + pass + +tap_postgres.dump_catalog = do_not_dump_catalog + +class TestStringTableWithPK(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : "id", "type" : "integer", "primary_key" : True, "serial" : True}, + {"name" : '"character-varying_name"', "type": "character varying"}, + {"name" : '"varchar-name"', "type": "varchar(28)"}, + {"name" : 'char_name', "type": "char(10)"}, + {"name" : '"text-name"', "type": "text"}], + "name" : TestStringTableWithPK.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == "public-CHICKEN TIMES"] + + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('table_name')) + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('stream')) + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['id'], 'database-name': 'postgres', + 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'character-varying_name') : {'inclusion': 'available', 'sql-datatype' : 'character varying', 'selected-by-default' : True}, + ('properties', 'id') : {'inclusion': 'automatic', 'sql-datatype' : 'integer', 'selected-by-default' : True}, + ('properties', 'varchar-name') : {'inclusion': 'available', 'sql-datatype' : 'character varying', 'selected-by-default' : True}, + ('properties', 'text-name') : {'inclusion': 'available', 'sql-datatype' : 'text', 'selected-by-default' : True}, + ('properties', 'char_name'): {'selected-by-default': True, 'inclusion': 'available', 'sql-datatype': 'character'}}) + + self.assertEqual({'properties': {'id': {'type': ['integer'], + 'maximum': 2147483647, + 'minimum': -2147483648}, + 'character-varying_name': {'type': ['null', 'string']}, + 'varchar-name': {'type': ['null', 'string'], 'maxLength': 28}, + 'char_name': {'type': ['null', 'string'], 'maxLength': 10}, + 'text-name': {'type': ['null', 'string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, stream_dict.get('schema')) + + +class TestIntegerTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, + {"name" : 'size integer', "type" : "integer", "quoted" : True}, + {"name" : 'size smallint', "type" : "smallint", "quoted" : True}, + {"name" : 'size bigint', "type" : "bigint", "quoted" : True}], + "name" : TestIntegerTable.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('table_name')) + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('stream')) + + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'id') : {'inclusion': 'available', 'sql-datatype' : 'integer', 'selected-by-default' : True}, + ('properties', 'size integer') : {'inclusion': 'available', 'sql-datatype' : 'integer', 'selected-by-default' : True}, + ('properties', 'size smallint') : {'inclusion': 'available', 'sql-datatype' : 'smallint', 'selected-by-default' : True}, + ('properties', 'size bigint') : {'inclusion': 'available', 'sql-datatype' : 'bigint', 'selected-by-default' : True}}) + + self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS, + 'type': 'object', + 'properties': {'id': {'type': ['null', 'integer'], 'minimum': -2147483648, 'maximum': 2147483647}, + 'size smallint': {'type': ['null', 'integer'], 'minimum': -32768, 'maximum': 32767}, + 'size integer': {'type': ['null', 'integer'], 'minimum': -2147483648, 'maximum': 2147483647}, + 'size bigint': {'type': ['null', 'integer'], 'minimum': -9223372036854775808, 'maximum': 9223372036854775807}}}, + stream_dict.get('schema')) + + + +class TestDecimalPK(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_decimal', "type" : "numeric", "primary_key": True}, + {"name" : 'our_decimal_10_2', "type" : "decimal(10,2)"}, + {"name" : 'our_decimal_38_4', "type" : "decimal(38,4)"}], + "name" : TestDecimalPK.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_decimal'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_decimal') : {'inclusion': 'automatic', 'sql-datatype' : 'numeric', 'selected-by-default' : True}, + ('properties', 'our_decimal_38_4') : {'inclusion': 'available', 'sql-datatype' : 'numeric', 'selected-by-default' : True}, + ('properties', 'our_decimal_10_2') : {'inclusion': 'available', 'sql-datatype' : 'numeric', 'selected-by-default' : True}}) + + self.assertEqual({'properties': {'our_decimal': {'exclusiveMaximum': True, + 'exclusiveMinimum': True, + 'multipleOf': 10 ** (0 - post_db.MAX_SCALE), + 'maximum': 10 ** (post_db.MAX_PRECISION - post_db.MAX_SCALE), + 'minimum': -10 ** (post_db.MAX_PRECISION - post_db.MAX_SCALE), + 'type': ['number']}, + 'our_decimal_10_2': {'exclusiveMaximum': True, + 'exclusiveMinimum': True, + 'maximum': 100000000, + 'minimum': -100000000, + 'multipleOf': 0.01, + 'type': ['null', 'number']}, + 'our_decimal_38_4': {'exclusiveMaximum': True, + 'exclusiveMinimum': True, + 'maximum': 10000000000000000000000000000000000, + 'minimum': -10000000000000000000000000000000000, + 'multipleOf': 0.0001, + 'type': ['null', 'number']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + + +class TestDatesTablePK(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_date', "type" : "DATE", "primary_key": True }, + {"name" : 'our_ts', "type" : "TIMESTAMP"}, + {"name" : 'our_ts_tz', "type" : "TIMESTAMP WITH TIME ZONE"}, + {"name" : 'our_time', "type" : "TIME"}, + {"name" : 'our_time_tz', "type" : "TIME WITH TIME ZONE"}], + "name" : TestDatesTablePK.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_date'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_date') : {'inclusion': 'automatic', 'sql-datatype' : 'date', 'selected-by-default' : True}, + ('properties', 'our_ts') : {'inclusion': 'available', 'sql-datatype' : 'timestamp without time zone', 'selected-by-default' : True}, + ('properties', 'our_ts_tz') : {'inclusion': 'available', 'sql-datatype' : 'timestamp with time zone', 'selected-by-default' : True}, + ('properties', 'our_time') : {'inclusion': 'available', 'sql-datatype' : 'time without time zone', 'selected-by-default' : True}, + ('properties', 'our_time_tz') : {'inclusion': 'available', 'sql-datatype' : 'time with time zone', 'selected-by-default' : True}}) + + self.assertEqual({'properties': {'our_date': {'type': ['string'], 'format' : 'date-time'}, + 'our_ts': {'type': ['null', 'string'], 'format' : 'date-time'}, + 'our_ts_tz': {'type': ['null', 'string'], 'format' : 'date-time'}, + 'our_time': {'format': 'time', 'type': ['null', 'string']}, + 'our_time_tz': {'format': 'time', 'type': ['null', 'string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + +class TestFloatTablePK(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_float', "type" : "float", "primary_key": True }, + {"name" : 'our_real', "type" : "real"}, + {"name" : 'our_double', "type" : "double precision"}], + "name" : TestFloatTablePK.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_float'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_float') : {'inclusion': 'automatic', 'sql-datatype' : 'double precision', 'selected-by-default' : True}, + ('properties', 'our_real') : {'inclusion': 'available', 'sql-datatype' : 'real', 'selected-by-default' : True}, + ('properties', 'our_double') : {'inclusion': 'available', 'sql-datatype' : 'double precision', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_float': {'type': ['number']}, + 'our_real': {'type': ['null', 'number']}, + 'our_double': {'type': ['null', 'number']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + +class TestBoolsAndBits(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_bool', "type" : "boolean" }, + {"name" : 'our_bit', "type" : "bit" }], + "name" : TestBoolsAndBits.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_bool') : {'inclusion': 'available', 'sql-datatype' : 'boolean', 'selected-by-default' : True}, + ('properties', 'our_bit') : {'inclusion': 'available', 'sql-datatype' : 'bit', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_bool': {'type': ['null', 'boolean']}, + 'our_bit': {'type': ['null', 'boolean']}}, + 'definitions' : BASE_RECURSIVE_SCHEMAS, + 'type': 'object'}, + stream_dict.get('schema')) + +class TestJsonTables(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_secrets', "type" : "json" }, + {"name" : 'our_secrets_b', "type" : "jsonb" }], + "name" : TestJsonTables.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_secrets') : {'inclusion': 'available', 'sql-datatype' : 'json', 'selected-by-default' : True}, + ('properties', 'our_secrets_b') : {'inclusion': 'available', 'sql-datatype' : 'jsonb', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_secrets': {'type': ['null', 'object', 'array']}, + 'our_secrets_b': {'type': ['null', 'object', 'array']}}, + 'definitions' : BASE_RECURSIVE_SCHEMAS, + 'type': 'object'}, + stream_dict.get('schema')) + + +class TestUUIDTables(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_pk', "type" : "uuid", "primary_key" : True }, + {"name" : 'our_uuid', "type" : "uuid" }], + "name" : TestUUIDTables.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == "public-CHICKEN TIMES"] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_pk'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_pk') : {'inclusion': 'automatic', 'sql-datatype' : 'uuid', 'selected-by-default' : True}, + ('properties', 'our_uuid') : {'inclusion': 'available', 'sql-datatype' : 'uuid', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_uuid': {'type': ['null', 'string']}, + 'our_pk': {'type': ['string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + +class TestHStoreTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True }, + {"name" : 'our_hstore', "type" : "hstore" }], + "name" : TestHStoreTable.table_name} + with get_test_connection(superuser=True) as conn: + cur = conn.cursor() + cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """) + if cur.fetchone()[0] is None: + cur.execute(""" CREATE EXTENSION hstore; """) + + + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("""INSERT INTO "CHICKEN TIMES" (our_pk, our_hstore) VALUES ('size=>"small",name=>"betty"', 'size=>"big",name=>"fred"')""") + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_pk'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_pk') : {'inclusion': 'automatic', 'sql-datatype' : 'hstore', 'selected-by-default' : True}, + ('properties', 'our_hstore') : {'inclusion': 'available', 'sql-datatype' : 'hstore', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_hstore': {'type': ['null', 'object'], 'properties' : {}}, + 'our_pk': {'type': ['object'], 'properties': {}}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + def test_escaping_values(self): + key = 'nickname' + value = "Dave's Courtyard" + elem = '"{}"=>"{}"'.format(key, value) + + with get_test_connection() as conn: + with conn.cursor() as cur: + query = tap_postgres.sync_strategies.logical_replication.create_hstore_elem_query(elem) + self.assertEqual(query.as_string(cur), "SELECT hstore_to_array('\"nickname\"=>\"Dave''s Courtyard\"')") + + +class TestEnumTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_mood_enum_pk', "type" : "mood_enum", "primary_key" : True }, + {"name" : 'our_mood_enum', "type" : "mood_enum" }], + "name" : TestHStoreTable.table_name} + with get_test_connection() as conn: + cur = conn.cursor() + cur.execute(""" DROP TYPE IF EXISTS mood_enum CASCADE """) + cur.execute(""" CREATE TYPE mood_enum AS ENUM ('sad', 'ok', 'happy'); """) + + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("""INSERT INTO "CHICKEN TIMES" (our_mood_enum_pk, our_mood_enum) VALUES ('sad', 'happy')""") + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_mood_enum_pk'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_mood_enum_pk') : {'inclusion': 'automatic', 'sql-datatype' : 'mood_enum', 'selected-by-default' : True}, + ('properties', 'our_mood_enum') : {'inclusion': 'available', 'sql-datatype' : 'mood_enum', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_mood_enum': {'type': ['null', 'string']}, + 'our_mood_enum_pk': {'type': ['string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + +class TestMoney(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_money_pk', "type" : "money", "primary_key" : True }, + {"name" : 'our_money', "type" : "money" }], + "name" : TestHStoreTable.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("""INSERT INTO "CHICKEN TIMES" (our_money_pk, our_money) VALUES ('$1.24', '$777.63')""") + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_money_pk'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_money_pk') : {'inclusion': 'automatic', 'sql-datatype' : 'money', 'selected-by-default' : True}, + ('properties', 'our_money') : {'inclusion': 'available', 'sql-datatype' : 'money', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_money': {'type': ['null', 'string']}, + 'our_money_pk': {'type': ['string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + +class TestRangeTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'id', "type" : "integer", "primary_key" : True }, + {"name" : 'date_range', "type" : "daterange" }], + "name" : TestHStoreTable.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("""INSERT INTO "CHICKEN TIMES" (id, date_range) VALUES ('1', '[2010-01-01, 2010-01-10)' )""") + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['id'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'id') : {'inclusion': 'automatic', 'sql-datatype' : 'integer', 'selected-by-default' : True}, + ('properties', 'date_range') : {'inclusion': 'available', 'sql-datatype' : 'daterange', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'id': {'maximum': 2147483647, 'minimum': -2147483648, 'type': ['integer']}, + 'date_range': {'type': ['null', 'string']}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + +class TestArraysTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name" : 'our_int_array_pk', "type" : "integer[]", "primary_key" : True }, + {"name" : 'our_string_array', "type" : "varchar[]" }], + "name" : TestHStoreTable.table_name} + ensure_test_table(table_spec) + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("""INSERT INTO "CHICKEN TIMES" (our_int_array_pk, our_string_array) VALUES ('{{1,2,3},{4,5,6}}', '{{"a","b","c"}}' )""") + cur.execute("""SELECT * FROM "CHICKEN TIMES" """) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': ['our_int_array_pk'], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': False, 'row-count': 0}, + ('properties', 'our_int_array_pk') : {'inclusion': 'automatic', 'sql-datatype' : 'integer[]', 'selected-by-default' : True}, + ('properties', 'our_string_array') : {'inclusion': 'available', 'sql-datatype' : 'character varying[]', 'selected-by-default' : True}}) + + + self.assertEqual({'properties': {'our_int_array_pk': {'type': ['null', 'array'], 'items' : {'$ref': '#/definitions/sdc_recursive_integer_array'}}, + 'our_string_array': {'type': ['null', 'array'], 'items' : {'$ref': '#/definitions/sdc_recursive_string_array'}}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + + +class TestArraysLikeTable(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + like_table_name = 'LIKE CHICKEN TIMES' + + def setUp(self): + with get_test_connection('postgres') as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute('DROP MATERIALIZED VIEW IF EXISTS "LIKE CHICKEN TIMES"') + table_spec = {"columns": [{"name" : 'our_int_array_pk', "type" : "integer[]", "primary_key" : True }, + {"name" : 'our_text_array', "type" : "text[]" }], + "name" : TestArraysLikeTable.table_name} + ensure_test_table(table_spec) + + with get_test_connection('postgres') as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + create_sql = "CREATE MATERIALIZED VIEW {} AS SELECT * FROM {}\n".format(quote_ident(TestArraysLikeTable.like_table_name, cur), + quote_ident(TestArraysLikeTable.table_name, cur)) + + + cur.execute(create_sql) + + + def test_catalog(self): + conn_config = get_test_connection_config() + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-LIKE CHICKEN TIMES'] + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + with get_test_connection() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {() : {'table-key-properties': [], 'database-name': 'postgres', 'schema-name': 'public', 'is-view': True, 'row-count': 0}, + ('properties', 'our_int_array_pk') : {'inclusion': 'available', 'sql-datatype' : 'integer[]', 'selected-by-default' : True}, + ('properties', 'our_text_array') : {'inclusion': 'available', 'sql-datatype' : 'text[]', 'selected-by-default' : True}}) + self.assertEqual({'properties': {'our_int_array_pk': {'type': ['null', 'array'], 'items' : {'$ref': '#/definitions/sdc_recursive_integer_array'}}, + 'our_text_array': {'type': ['null', 'array'], 'items' : {'$ref': '#/definitions/sdc_recursive_string_array'}}}, + 'type': 'object', + 'definitions' : BASE_RECURSIVE_SCHEMAS}, + stream_dict.get('schema')) + +class TestColumnGrants(unittest.TestCase): + maxDiff = None + table_name = 'CHICKEN TIMES' + user = 'tmp_user_for_grant_tests' + password = 'password' + + def setUp(self): + table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True}, + {"name" : 'size integer', "type" : "integer", "quoted" : True}, + {"name" : 'size smallint', "type" : "smallint", "quoted" : True}, + {"name" : 'size bigint', "type" : "bigint", "quoted" : True}], + "name" : TestColumnGrants.table_name} + ensure_test_table(table_spec) + + with get_test_connection(superuser=True) as conn: + cur = conn.cursor() + + sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password) + LOGGER.info(sql) + cur.execute(sql) + + sql = """ CREATE USER {} WITH PASSWORD '{}' """.format(self.user, self.password) + LOGGER.info(sql) + cur.execute(sql) + + sql = """ GRANT SELECT ("id") ON "{}" TO {}""".format(TestColumnGrants.table_name, self.user) + LOGGER.info("running sql: {}".format(sql)) + cur.execute(sql) + + + + + def test_catalog(self): + conn_config = get_test_connection_config() + conn_config['user'] = self.user + conn_config['password'] = self.password + streams = tap_postgres.do_discovery(conn_config) + chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES'] + + self.assertEqual(len(chicken_streams), 1) + stream_dict = chicken_streams[0] + + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('table_name')) + self.assertEqual(TestStringTableWithPK.table_name, stream_dict.get('stream')) + + stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(stream_dict.get('metadata')), + {(): {'table-key-properties': [], + 'database-name': 'postgres', + 'schema-name': 'public', + 'is-view': False, + 'row-count': 0}, + ('properties', 'id'): {'inclusion': 'available', + 'selected-by-default': True, + 'sql-datatype': 'integer'}}) + + self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS, + 'type': 'object', + 'properties': {'id': {'type': ['null', 'integer'], + 'minimum': -2147483648, + 'maximum': 2147483647}}}, + stream_dict.get('schema')) diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py new file mode 100644 index 0000000..fffead6 --- /dev/null +++ b/tests/test_full_table_interruption.py @@ -0,0 +1,451 @@ +import psycopg2 +import unittest.mock +import pytest +import tap_postgres +from tap_postgres.sync_strategies import logical_replication +import tap_postgres.sync_strategies.full_table as full_table +import tap_postgres.sync_strategies.common as pg_common +import singer +from singer import get_logger, metadata, write_bookmark +try: + from tests.utils import get_test_connection, ensure_test_table, select_all_of_stream, set_replication_method_for_stream, insert_record, get_test_connection_config +except ImportError: + from utils import get_test_connection, ensure_test_table, select_all_of_stream, set_replication_method_for_stream, insert_record, get_test_connection_config + + +LOGGER = get_logger() + +CAUGHT_MESSAGES = [] +COW_RECORD_COUNT = 0 + +def singer_write_message_no_cow(message): + global COW_RECORD_COUNT + + if isinstance(message, singer.RecordMessage) and message.stream == 'public-COW': + COW_RECORD_COUNT = COW_RECORD_COUNT + 1 + if COW_RECORD_COUNT > 2: + raise Exception("simulated exception") + CAUGHT_MESSAGES.append(message) + else: + CAUGHT_MESSAGES.append(message) + +def singer_write_schema_ok(message): + CAUGHT_MESSAGES.append(message) + +def singer_write_message_ok(message): + CAUGHT_MESSAGES.append(message) + +def expected_record(fixture_row): + expected_record = {} + for k,v in fixture_row.items(): + expected_record[k.replace('"', '')] = v + + return expected_record + +def do_not_dump_catalog(catalog): + pass + +tap_postgres.dump_catalog = do_not_dump_catalog +full_table.UPDATE_BOOKMARK_PERIOD = 1 + +@pytest.mark.parametrize('use_secondary,message_format', [(False, 1), (True, 2)]) +@unittest.mock.patch('tap_postgres.sync_logical_streams', wraps=tap_postgres.sync_logical_streams) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestLogicalInterruption: + maxDiff = None + + def setup_class(self): + conn_config = get_test_connection_config() + slot_name = logical_replication.generate_replication_slot_name( + dbname=conn_config['dbname'], tap_id=conn_config['tap_id'] + ) + with get_test_connection(superuser=True) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')") + + def teardown_class(self): + conn_config = get_test_connection_config() + slot_name = logical_replication.generate_replication_slot_name( + dbname=conn_config['dbname'], tap_id=conn_config['tap_id'] + ) + with get_test_connection(superuser=True) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM pg_drop_replication_slot('{slot_name}')") + + def setup_method(self): + table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, + {"name" : 'name', "type": "character varying"}, + {"name" : 'colour', "type": "character varying"}, + {"name" : 'timestamp_ntz', "type": "timestamp without time zone"}, + {"name" : 'timestamp_tz', "type": "timestamp with time zone"}, + ], + "name" : 'COW'} + ensure_test_table(table_spec_1) + global COW_RECORD_COUNT + COW_RECORD_COUNT = 0 + global CAUGHT_MESSAGES + CAUGHT_MESSAGES.clear() + + def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary, message_format): + singer.write_message = singer_write_message_no_cow + pg_common.write_schema_message = singer_write_message_ok + + conn_config = get_test_connection_config(use_secondary=use_secondary, message_format=message_format) + streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + primary_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['host'], + 'port': conn_config['port'], + } + secondary_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], + } + + mock_connect.assert_called_once_with(**secondary_connection) + + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] + assert cow_stream is not None + cow_stream = select_all_of_stream(cow_stream) + cow_stream = set_replication_method_for_stream(cow_stream, 'LOG_BASED') + + conn = get_test_connection() + conn.autocommit = True + + with conn.cursor() as cur: + cow_rec = {'name': 'betty', 'colour': 'blue', + 'timestamp_ntz': '2020-09-01 10:40:59', 'timestamp_tz': '2020-09-01 00:50:59+02'} + insert_record(cur, 'COW', cow_rec) + + cow_rec = {'name': 'smelly', 'colour': 'brow', + 'timestamp_ntz': '2020-09-01 10:40:59 BC', 'timestamp_tz': '2020-09-01 00:50:59+02 BC'} + insert_record(cur, 'COW', cow_rec) + + cow_rec = {'name': 'pooper', 'colour': 'green', + 'timestamp_ntz': '30000-09-01 10:40:59', 'timestamp_tz': '10000-09-01 00:50:59+02'} + insert_record(cur, 'COW', cow_rec) + + conn.close() + + blew_up_on_cow = False + state = {} + #the initial phase of cows logical replication will be a full table. + #it will sync the first record and then blow up on the 2nd record + mock_connect.reset_mock() + try: + tap_postgres.do_sync(conn_config, {'streams' : streams}, None, state) + except Exception: + blew_up_on_cow = True + + assert blew_up_on_cow is True + + mock_sync_logical_streams.assert_not_called() + + mock_connect.assert_has_calls( + [unittest.mock.call(**primary_connection)] * 2 + \ + [unittest.mock.call(**secondary_connection)] * 4 + ) + + assert 7 == len(CAUGHT_MESSAGES) + + assert CAUGHT_MESSAGES[0]['type'] =='SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') is not None + end_lsn = CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') + + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + new_version = CAUGHT_MESSAGES[2].version + + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert CAUGHT_MESSAGES[3].record == { + 'colour': 'blue', + 'id': 1, + 'name': 'betty', + 'timestamp_ntz': '2020-09-01T10:40:59+00:00', + 'timestamp_tz': '2020-08-31T22:50:59+00:00' + } + + assert 'public-COW' == CAUGHT_MESSAGES[3].stream + + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) + #xmin is set while we are processing the full table replication + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['xmin'] is not None + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-COW']['lsn'] == end_lsn + + assert CAUGHT_MESSAGES[5].record == { + 'colour': 'brow', + 'id': 2, + 'name': 'smelly', + 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', + 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' + } + + assert 'public-COW' == CAUGHT_MESSAGES[5].stream + + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) + last_xmin = CAUGHT_MESSAGES[6].value['bookmarks']['public-COW']['xmin'] + old_state = CAUGHT_MESSAGES[6].value + + #run another do_sync, should get the remaining record which effectively finishes the initial full_table + #replication portion of the logical replication + singer.write_message = singer_write_message_ok + global COW_RECORD_COUNT + COW_RECORD_COUNT = 0 + CAUGHT_MESSAGES.clear() + tap_postgres.do_sync(conn_config, {'streams' : streams}, None, old_state) + + mock_sync_logical_streams.assert_called_once() + + mock_connect.assert_has_calls( + [unittest.mock.call(**primary_connection)] * 2 + \ + [unittest.mock.call(**secondary_connection)] * 4 + ) + + assert 10 == len(CAUGHT_MESSAGES) + + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('xmin') == last_xmin + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW'].get('version') == new_version + + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record == { + 'colour': 'brow', + 'id': 2, + 'name': 'smelly', + 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', + 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' + } + + assert 'public-COW' == CAUGHT_MESSAGES[2].stream + + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('xmin'),last_xmin + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW'].get('version') == new_version + + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record == { + 'colour': 'green', + 'id': 3, + 'name': 'pooper', + 'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00', + 'timestamp_tz': '9999-12-31T23:59:59.999000+00:00' + } + assert 'public-COW' == CAUGHT_MESSAGES[4].stream + + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('xmin') > last_xmin + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW'].get('version') == new_version + + + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == new_version + + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version + + assert CAUGHT_MESSAGES[8]['type'] == 'SCHEMA' + + assert isinstance(CAUGHT_MESSAGES[9], singer.messages.StateMessage) + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('version') == new_version + +@pytest.mark.parametrize('use_secondary', [False, True]) +@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) +class TestFullTableInterruption: + maxDiff = None + def setup_method(self): + table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, + {"name" : 'name', "type": "character varying"}, + {"name" : 'colour', "type": "character varying"}], + "name" : 'COW'} + ensure_test_table(table_spec_1) + + table_spec_2 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, + {"name" : 'name', "type": "character varying"}, + {"name" : 'colour', "type": "character varying"}], + "name" : 'CHICKEN'} + ensure_test_table(table_spec_2) + + global COW_RECORD_COUNT + COW_RECORD_COUNT = 0 + global CAUGHT_MESSAGES + CAUGHT_MESSAGES.clear() + + def test_catalog(self, mock_connect, use_secondary): + singer.write_message = singer_write_message_no_cow + pg_common.write_schema_message = singer_write_message_ok + + conn_config = get_test_connection_config(use_secondary=use_secondary) + streams = tap_postgres.do_discovery(conn_config) + + # Assert that we connected to the correct database + expected_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], + 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], + } + mock_connect.assert_called_once_with(**expected_connection) + mock_connect.reset_mock() + + cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] + assert cow_stream is not None + cow_stream = select_all_of_stream(cow_stream) + cow_stream = set_replication_method_for_stream(cow_stream, 'FULL_TABLE') + + chicken_stream = [s for s in streams if s['table_name'] == 'CHICKEN'][0] + assert chicken_stream is not None + chicken_stream = select_all_of_stream(chicken_stream) + chicken_stream = set_replication_method_for_stream(chicken_stream, 'FULL_TABLE') + + conn = get_test_connection() + conn.autocommit = True + + with conn.cursor() as cur: + cow_rec = {'name': 'betty', 'colour': 'blue'} + insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'}) + + cow_rec = {'name': 'smelly', 'colour': 'brow'} + insert_record(cur, 'COW', cow_rec) + + cow_rec = {'name': 'pooper', 'colour': 'green'} + insert_record(cur, 'COW', cow_rec) + + chicken_rec = {'name': 'fred', 'colour': 'red'} + insert_record(cur, 'CHICKEN', chicken_rec) + + conn.close() + + state = {} + blew_up_on_cow = False + + #this will sync the CHICKEN but then blow up on the COW + try: + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) + except Exception as ex: + # LOGGER.exception(ex) + blew_up_on_cow = True + + assert blew_up_on_cow + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() + + assert 14 == len(CAUGHT_MESSAGES) + + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-CHICKEN'].get('xmin') is None + + assert isinstance(CAUGHT_MESSAGES[2], singer.ActivateVersionMessage) + new_version = CAUGHT_MESSAGES[2].version + + assert isinstance(CAUGHT_MESSAGES[3], singer.RecordMessage) + assert 'public-CHICKEN' == CAUGHT_MESSAGES[3].stream + + assert isinstance(CAUGHT_MESSAGES[4], singer.StateMessage) + #xmin is set while we are processing the full table replication + assert CAUGHT_MESSAGES[4].value['bookmarks']['public-CHICKEN']['xmin'] is not None + + assert isinstance(CAUGHT_MESSAGES[5], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[5].version == new_version + + assert isinstance(CAUGHT_MESSAGES[6], singer.StateMessage) + assert None == singer.get_currently_syncing( CAUGHT_MESSAGES[6].value) + #xmin is cleared at the end of the full table replication + assert CAUGHT_MESSAGES[6].value['bookmarks']['public-CHICKEN']['xmin'] is None + + + #cow messages + assert CAUGHT_MESSAGES[7]['type'] == 'SCHEMA' + + assert "public-COW" == CAUGHT_MESSAGES[7]['stream'] + assert isinstance(CAUGHT_MESSAGES[8], singer.StateMessage) + assert CAUGHT_MESSAGES[8].value['bookmarks']['public-COW'].get('xmin') is None + assert "public-COW" == CAUGHT_MESSAGES[8].value['currently_syncing'] + + assert isinstance(CAUGHT_MESSAGES[9], singer.ActivateVersionMessage) + cow_version = CAUGHT_MESSAGES[9].version + assert isinstance(CAUGHT_MESSAGES[10], singer.RecordMessage) + + assert CAUGHT_MESSAGES[10].record['name'] == 'betty' + assert 'public-COW' == CAUGHT_MESSAGES[10].stream + + assert isinstance(CAUGHT_MESSAGES[11], singer.StateMessage) + #xmin is set while we are processing the full table replication + assert CAUGHT_MESSAGES[11].value['bookmarks']['public-COW']['xmin'] is not None + + + assert CAUGHT_MESSAGES[12].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[12].stream + old_state = CAUGHT_MESSAGES[13].value + + #run another do_sync + singer.write_message = singer_write_message_ok + CAUGHT_MESSAGES.clear() + global COW_RECORD_COUNT + COW_RECORD_COUNT = 0 + + tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) + + mock_connect.assert_called_with(**expected_connection) + mock_connect.reset_mock() + + assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' + assert isinstance(CAUGHT_MESSAGES[1], singer.StateMessage) + + # because we were interrupted, we do not switch versions + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['version'] == cow_version + assert CAUGHT_MESSAGES[1].value['bookmarks']['public-COW']['xmin'] is not None + assert "public-COW" == singer.get_currently_syncing(CAUGHT_MESSAGES[1].value) + + assert isinstance(CAUGHT_MESSAGES[2], singer.RecordMessage) + assert CAUGHT_MESSAGES[2].record['name'] == 'smelly' + assert 'public-COW' == CAUGHT_MESSAGES[2].stream + + + #after record: activate version, state with no xmin or currently syncing + assert isinstance(CAUGHT_MESSAGES[3], singer.StateMessage) + #we still have an xmin for COW because are not yet done with the COW table + assert CAUGHT_MESSAGES[3].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[3].value) == 'public-COW' + + assert isinstance(CAUGHT_MESSAGES[4], singer.RecordMessage) + assert CAUGHT_MESSAGES[4].record['name'] == 'pooper' + assert 'public-COW' == CAUGHT_MESSAGES[4].stream + + assert isinstance(CAUGHT_MESSAGES[5], singer.StateMessage) + assert CAUGHT_MESSAGES[5].value['bookmarks']['public-COW']['xmin'] is not None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[5].value) == 'public-COW' + + + #xmin is cleared because we are finished the full table replication + assert isinstance(CAUGHT_MESSAGES[6], singer.ActivateVersionMessage) + assert CAUGHT_MESSAGES[6].version == cow_version + + assert isinstance(CAUGHT_MESSAGES[7], singer.StateMessage) + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None + assert CAUGHT_MESSAGES[7].value['bookmarks']['public-CHICKEN']['xmin'] is None + assert singer.get_currently_syncing( CAUGHT_MESSAGES[7].value) is None diff --git a/tests/unit/test_logical_replication.py b/tests/unit/test_logical_replication.py index 55e019a..1bf73ea 100644 --- a/tests/unit/test_logical_replication.py +++ b/tests/unit/test_logical_replication.py @@ -160,8 +160,7 @@ def test_consume_with_message_payload_is_not_json_expect_same_state(self): {}, self.WalMessage(payload='this is an invalid json message', data_start=None), - None, - {} + {}, ) self.assertDictEqual({}, output) @@ -169,10 +168,9 @@ def test_consume_with_message_stream_in_payload_is_not_selected_expect_same_stat output = logical_replication.consume_message( [{'tap_stream_id': 'myschema-mytable'}], {}, - self.WalMessage(payload='{"schema": "myschema", "table": "notmytable"}', + self.WalMessage(payload='{"action": "U", "schema": "myschema", "table": "notmytable", "timestamp": "2022-03-04T19:41:29+0000"}', data_start='some lsn'), - None, - {} + {}, ) self.assertDictEqual({}, output) @@ -182,10 +180,9 @@ def test_consume_with_payload_kind_is_not_supported_expect_exception(self): logical_replication.consume_message( [{'tap_stream_id': 'myschema-mytable'}], {}, - self.WalMessage(payload='{"action":"truncate", "schema": "myschema", "table": "mytable"}', + self.WalMessage(payload='{"action":"T", "schema": "myschema", "table": "mytable", "timestamp": "2022-03-04T19:41:29+0000"}', data_start='some lsn'), - None, - {} + {}, ) @patch('tap_postgres.logical_replication.singer.write_message') @@ -251,18 +248,14 @@ def test_consume_message_with_new_column_in_payload_will_refresh_schema(self, } } }, - self.WalMessage( - payload='{"action": "I", ' - '"schema": "myschema", ' - '"table": "mytable",' - '"columns": [' - '{"name": "id", "value": 1}, ' - '{"name": "date_created", "value": null}, ' - '{"name": "new_col", "value": "some random text"}' - ']}', - data_start='some lsn'), - None, - {} + self.WalMessage(payload='{"action": "I", ' + '"schema": "myschema", ' + '"table": "mytable",' + '"columns": [{"name": "id", "value": 1}, {"name": "date_created", "value": null}, {"name": "new_col", "value": "some random text"}],' + '"timestamp": "2022-03-04T19:41:29+0000"' + '}', + data_start='some lsn'), + {}, ) self.assertDictEqual(return_v, @@ -972,6 +965,7 @@ def test_consume_message_if_payload_kind_insert_or_update(self, *args): 'table': 'bar', 'action': 'I', 'columns': [{'name': '_sdc_deleted_at', 'value': 'foo_column'}], + 'timestamp': '2022-03-04T19:41:29+0000', } ) ) @@ -983,6 +977,7 @@ def test_consume_message_if_payload_kind_insert_or_update(self, *args): 'table': 'bar', 'action': 'U', 'columns': [{'name': '_sdc_deleted_at', 'value': 'foo_column'}], + 'timestamp': '2022-03-04T19:41:29+0000', } ) ) @@ -995,7 +990,6 @@ def test_consume_message_if_payload_kind_insert_or_update(self, *args): }] state = {'bookmarks': {'foo-bar': {'foo': 'bar', 'version': 'foo_version'}}} - time_extracted = datetime(2020, 9, 1, 23, 10, 59, tzinfo=tzoffset(None, -3600)) expected_output = { 'bookmarks': { @@ -1007,10 +1001,10 @@ def test_consume_message_if_payload_kind_insert_or_update(self, *args): } } self.conn_info['debug_lsn'] = True - actual_output = logical_replication.consume_message(streams, state, insert_msg, time_extracted, self.conn_info) + actual_output = logical_replication.consume_message(streams, state, insert_msg, self.conn_info) self.assertDictEqual(expected_output, actual_output) - actual_output = logical_replication.consume_message(streams, state, update_msg, time_extracted, self.conn_info) + actual_output = logical_replication.consume_message(streams, state, update_msg, self.conn_info) self.assertDictEqual(expected_output, actual_output) @patch('tap_postgres.sync_strategies.logical_replication.refresh_streams_schema') @@ -1024,6 +1018,7 @@ def test_consume_message_raises_exception_if_delete_and_no_datatype_for_stream(s 'table': 'bar', 'action': 'D', 'identity': [{'name': 'foo_desired', 'value': 'bar_value'}], + 'timestamp': '2022-03-04T19:41:29+0000', }) ) @@ -1035,11 +1030,10 @@ def test_consume_message_raises_exception_if_delete_and_no_datatype_for_stream(s }] state = {'bookmarks': {'foo-bar': {'foo': 'bar', 'version': 'foo'}}} - time_extracted = datetime(2020, 9, 1, 23, 10, 59, tzinfo=tzoffset(None, 0)) self.conn_info['debug_lsn'] = True expected_message = f'Unable to find sql-datatype for stream {streams[0]}' with self.assertRaises(Exception) as exp: - logical_replication.consume_message(streams, state, delete_msg, time_extracted, self.conn_info) + logical_replication.consume_message(streams, state, delete_msg, self.conn_info) self.assertEqual(expected_message, str(exp.exception)) diff --git a/tests/utils.py b/tests/utils.py index ff07e21..f68abcd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -51,9 +51,13 @@ def __init__(self, *args, **kwargs): pass -def get_test_connection_config(target_db='postgres', use_secondary=False): +def get_test_connection_config(target_db='postgres', use_secondary=False, message_format=1): try: - conn_config = {'host': os.environ['TAP_POSTGRES_HOST'], + conn_config = {'tap_id': 'test-postgres', + 'max_run_seconds': 5, + 'break_at_end_lsn': True, + 'logical_poll_total_seconds': 2, + 'host': os.environ['TAP_POSTGRES_HOST'], 'user': os.environ['TAP_POSTGRES_USER'], 'password': os.environ['TAP_POSTGRES_PASSWORD'], 'postgres_password': os.environ['TAP_POSTGRES_PG_PASSWORD'], @@ -81,6 +85,21 @@ def get_test_connection_config(target_db='postgres', use_secondary=False): "set TAP_POSTGRES_SECONDARY_HOST, TAP_POSTGRES_SECONDARY_PORT" ) from exc + if use_secondary: + missing_envs = [x for x in [os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + os.getenv('TAP_POSTGRES_SECONDARY_PORT')] if x == None] + + if len(missing_envs) != 0: + raise Exception( + "set TAP_POSTGRES_SECONDARY_HOST, TAP_POSTGRES_SECONDARY_PORT" + ) + + conn_config.update({ + 'use_secondary': use_secondary, + 'secondary_host': os.getenv('TAP_POSTGRES_SECONDARY_HOST'), + 'secondary_port': os.getenv('TAP_POSTGRES_SECONDARY_PORT'), + }) + return conn_config