From b1d5b556da31affd645bd45551ed9e0336374f07 Mon Sep 17 00:00:00 2001 From: Dean Morin Date: Wed, 6 Oct 2021 16:53:25 -0700 Subject: [PATCH 1/3] Replace tabs with spaces A later commit modifies tests/test_streams_utils.py, and I noticed it was the only file really using tabs in this repo. Changed it to spaces, and replaced a few other random ones. --- LICENSE | 1 - tap_postgres/__init__.py | 4 +- tests/test_streams_utils.py | 158 ++++++++++++++++++------------------ 3 files changed, 81 insertions(+), 82 deletions(-) diff --git a/LICENSE b/LICENSE index 627c3e96..4ec8c3f7 100644 --- a/LICENSE +++ b/LICENSE @@ -617,4 +617,3 @@ Program, unless a warranty or assumption of liability accompanies a copy of the Program in return for a fee. END OF TERMS AND CONDITIONS - \ No newline at end of file diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 553f2d57..9f69ace2 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -90,8 +90,8 @@ def do_sync_incremental(conn_config, stream, state, desired_columns, md_map): def sync_method_for_streams(streams, state, default_replication_method): """ - Determines the replication method of each stream - """ + Determines the replication method of each stream + """ lookup = {} traditional_steams = [] logical_streams = [] diff --git a/tests/test_streams_utils.py b/tests/test_streams_utils.py index 114ab5c2..417c437c 100644 --- a/tests/test_streams_utils.py +++ b/tests/test_streams_utils.py @@ -8,92 +8,92 @@ from tap_postgres import stream_utils try: - from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config + from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config except ImportError: - from utils import get_test_connection, ensure_test_table, get_test_connection_config + from utils import get_test_connection, ensure_test_table, get_test_connection_config def do_not_dump_catalog(catalog): - pass + pass tap_postgres.dump_catalog = do_not_dump_catalog class TestInit(unittest.TestCase): - maxDiff = None - table_name = 'CHICKEN TIMES' - - def setUp(self): - table_spec = {"columns": [{"name": "id", "type": "integer", "primary_key": True, "serial": True}, - {"name": '"character-varying_name"', "type": "character varying"}, - {"name": '"varchar-name"', "type": "varchar(28)"}, - {"name": 'char_name', "type": "char(10)"}, - {"name": '"text-name"', "type": "text"}], - "name": self.table_name} - - ensure_test_table(table_spec) - - def test_refresh_streams_schema(self): - conn_config = get_test_connection_config() - - streams = [ - { - 'table_name': self.table_name, - 'stream': self.table_name, - 'tap_stream_id': f'public-{self.table_name}', - 'schema': [], - 'metadata': [ - { - 'breadcrumb': [], - 'metadata': { - 'replication-method': 'LOG_BASED', - 'table-key-properties': ['some_id'], - 'row-count': 1000, - } - } - ] - } - ] - - stream_utils.refresh_streams_schema(conn_config, streams) - - self.assertEqual(len(streams), 1) - self.assertEqual(self.table_name, streams[0].get('table_name')) - self.assertEqual(self.table_name, streams[0].get('stream')) - - streams[0]['metadata'].sort(key=lambda md: md['breadcrumb']) - - self.assertEqual(metadata.to_map(streams[0]['metadata']), { - (): {'table-key-properties': ['id'], - 'database-name': 'postgres', - 'schema-name': 'public', - 'is-view': False, - 'row-count': 0, - 'replication-method': 'LOG_BASED' - }, - ('properties', 'character-varying_name'): {'inclusion': 'available', - 'sql-datatype': 'character varying', - 'selected-by-default': True}, - ('properties', 'id'): {'inclusion': 'automatic', - 'sql-datatype': 'integer', - 'selected-by-default': True}, - ('properties', 'varchar-name'): {'inclusion': 'available', - 'sql-datatype': 'character varying', - 'selected-by-default': True}, - ('properties', 'text-name'): {'inclusion': 'available', - 'sql-datatype': 'text', - 'selected-by-default': True}, - ('properties', 'char_name'): {'selected-by-default': True, - 'inclusion': 'available', - 'sql-datatype': 'character'}}) - - self.assertEqual({'properties': {'id': {'type': ['integer'], - 'maximum': 2147483647, - 'minimum': -2147483648}, - 'character-varying_name': {'type': ['null', 'string']}, - 'varchar-name': {'type': ['null', 'string'], 'maxLength': 28}, - 'char_name': {'type': ['null', 'string'], 'maxLength': 10}, - 'text-name': {'type': ['null', 'string']}}, - 'type': 'object', - 'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema')) + maxDiff = None + table_name = 'CHICKEN TIMES' + + def setUp(self): + table_spec = {"columns": [{"name": "id", "type": "integer", "primary_key": True, "serial": True}, + {"name": '"character-varying_name"', "type": "character varying"}, + {"name": '"varchar-name"', "type": "varchar(28)"}, + {"name": 'char_name', "type": "char(10)"}, + {"name": '"text-name"', "type": "text"}], + "name": self.table_name} + + ensure_test_table(table_spec) + + def test_refresh_streams_schema(self): + conn_config = get_test_connection_config() + + streams = [ + { + 'table_name': self.table_name, + 'stream': self.table_name, + 'tap_stream_id': f'public-{self.table_name}', + 'schema': [], + 'metadata': [ + { + 'breadcrumb': [], + 'metadata': { + 'replication-method': 'LOG_BASED', + 'table-key-properties': ['some_id'], + 'row-count': 1000, + } + } + ] + } + ] + + stream_utils.refresh_streams_schema(conn_config, streams) + + self.assertEqual(len(streams), 1) + self.assertEqual(self.table_name, streams[0].get('table_name')) + self.assertEqual(self.table_name, streams[0].get('stream')) + + streams[0]['metadata'].sort(key=lambda md: md['breadcrumb']) + + self.assertEqual(metadata.to_map(streams[0]['metadata']), { + (): {'table-key-properties': ['id'], + 'database-name': 'postgres', + 'schema-name': 'public', + 'is-view': False, + 'row-count': 0, + 'replication-method': 'LOG_BASED' + }, + ('properties', 'character-varying_name'): {'inclusion': 'available', + 'sql-datatype': 'character varying', + 'selected-by-default': True}, + ('properties', 'id'): {'inclusion': 'automatic', + 'sql-datatype': 'integer', + 'selected-by-default': True}, + ('properties', 'varchar-name'): {'inclusion': 'available', + 'sql-datatype': 'character varying', + 'selected-by-default': True}, + ('properties', 'text-name'): {'inclusion': 'available', + 'sql-datatype': 'text', + 'selected-by-default': True}, + ('properties', 'char_name'): {'selected-by-default': True, + 'inclusion': 'available', + 'sql-datatype': 'character'}}) + + self.assertEqual({'properties': {'id': {'type': ['integer'], + 'maximum': 2147483647, + 'minimum': -2147483648}, + 'character-varying_name': {'type': ['null', 'string']}, + 'varchar-name': {'type': ['null', 'string'], 'maxLength': 28}, + 'char_name': {'type': ['null', 'string'], 'maxLength': 10}, + 'text-name': {'type': ['null', 'string']}}, + 'type': 'object', + 'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema')) From cb914eb2a012ce351d48a7c4e705ec40c7fdd921 Mon Sep 17 00:00:00 2001 From: Dean Morin Date: Wed, 6 Oct 2021 10:09:38 -0700 Subject: [PATCH 2/3] Allow overrides from catalog in refresh_streams_schema Fixes https://github.com/transferwise/pipelinewise-tap-postgres/issues/128 - Moves the bulk of the code out of the context manager (open_connection) since it isn't needed. - Creates private functions for merging existing metadata and schema to dicts with those in `new_discovery`. - Aliases `metadata` (from singer) to `metadata_util`. I kept using `metadata` as a local var when developing by accident and breaking things. --- tap_postgres/stream_utils.py | 63 ++++++++++++++++++++++++------------ tests/test_streams_utils.py | 20 +++++++++--- 2 files changed, 58 insertions(+), 25 deletions(-) diff --git a/tap_postgres/stream_utils.py b/tap_postgres/stream_utils.py index 84a4f46e..c881be61 100644 --- a/tap_postgres/stream_utils.py +++ b/tap_postgres/stream_utils.py @@ -4,7 +4,7 @@ import singer from typing import List, Dict -from singer import metadata +from singer import metadata as metadata_util from tap_postgres.db import open_connection from tap_postgres.discovery_utils import discover_db @@ -29,7 +29,7 @@ def is_selected_via_metadata(stream: Dict) -> bool: Returns: True if selected, False otherwise. """ - table_md = metadata.to_map(stream['metadata']).get((), {}) + table_md = metadata_util.to_map(stream['metadata']).get((), {}) return table_md.get('selected', False) @@ -72,29 +72,50 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]): for stream in discover_db(conn, conn_config.get('filter_schemas'), [st['table_name'] for st in streams]) } - LOGGER.debug('New discovery schemas %s', new_discovery) + LOGGER.debug('New discovery schemas %s', new_discovery) - # For every stream dictionary, update the schema and metadata from the new discovery - for idx, stream in enumerate(streams): - # update schema - streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema']) + # For every stream, update the schema and metadata from the corresponding discovered stream + for idx, stream in enumerate(streams): + discovered_stream = new_discovery[stream['tap_stream_id']] + streams[idx]['schema'] = _merge_stream_schema(stream, discovered_stream) + streams[idx]['metadata'] = _merge_stream_metadata(stream, discovered_stream) - # Update metadata - # - # 1st step: new discovery doesn't contain non-discoverable metadata: e.g replication method & key, selected - # so let's copy those from the original stream object - md_map = metadata.to_map(stream['metadata']) - meta = md_map.get(()) + LOGGER.debug('Updated streams schemas %s', streams) + + +def _merge_stream_schema(stream, discovered_stream): + """ + A discovered stream doesn't include any schema overrides from the catalog + file. Merges overrides from the catalog file into the discovered schema. + """ + discovered_schema = copy.deepcopy(discovered_stream['schema']) - for idx_met, metadatum in enumerate(new_discovery[stream['tap_stream_id']]['metadata']): - if not metadatum['breadcrumb']: - meta.update(new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata']) - new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'] = meta + for column, column_schema in stream['schema']['properties'].items(): + if column in discovered_schema['properties'] and column_schema != discovered_schema['properties'][column]: + override = copy.deepcopy(stream['schema']['properties'][column]) + LOGGER.debug('Overriding schema for %s with %s', column, override) + discovered_schema['properties'][column].update(override) - # 2nd step: now copy all the metadata from the updated new discovery to the original stream - streams[idx]['metadata'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['metadata']) + return discovered_schema - LOGGER.debug('Updated streams schemas %s', streams) + +def _merge_stream_metadata(stream, discovered_stream): + """ + Discovered metadata for a stream doesn't contain non-discoverable + keys/values such as replication method, key, selected, or any other + arbitrary overridden metadata from the catalog file. Merges the discovered + metadata into the metadata from the catalog file. + """ + stream_md = metadata_util.to_map(stream['metadata']) + discovery_md = metadata_util.to_map(discovered_stream['metadata']) + + for breadcrumb, metadata in discovery_md.items(): + if breadcrumb in stream_md: + stream_md[breadcrumb].update(metadata) + else: + stream_md[breadcrumb] = metadata + + return copy.deepcopy(metadata_util.to_list(stream_md)) def any_logical_streams(streams, default_replication_method): @@ -102,7 +123,7 @@ def any_logical_streams(streams, default_replication_method): Checks if streams list contains any stream with log_based method """ for stream in streams: - stream_metadata = metadata.to_map(stream['metadata']) + stream_metadata = metadata_util.to_map(stream['metadata']) replication_method = stream_metadata.get((), {}).get('replication-method', default_replication_method) if replication_method == 'LOG_BASED': return True diff --git a/tests/test_streams_utils.py b/tests/test_streams_utils.py index 417c437c..2743e0c1 100644 --- a/tests/test_streams_utils.py +++ b/tests/test_streams_utils.py @@ -29,7 +29,8 @@ def setUp(self): {"name": '"character-varying_name"', "type": "character varying"}, {"name": '"varchar-name"', "type": "varchar(28)"}, {"name": 'char_name', "type": "char(10)"}, - {"name": '"text-name"', "type": "text"}], + {"name": '"text-name"', "type": "text"}, + {"name": "json_name", "type": "jsonb"}], "name": self.table_name} ensure_test_table(table_spec) @@ -42,7 +43,7 @@ def test_refresh_streams_schema(self): 'table_name': self.table_name, 'stream': self.table_name, 'tap_stream_id': f'public-{self.table_name}', - 'schema': [], + 'schema': {'properties': {'json_name': {'type': ['null', 'string']}}}, 'metadata': [ { 'breadcrumb': [], @@ -51,6 +52,12 @@ def test_refresh_streams_schema(self): 'table-key-properties': ['some_id'], 'row-count': 1000, } + }, + { + 'breadcrumb': ['properties', 'char_name'], + 'metadata': { + 'arbitrary_field_metadata': 'should be preserved' + } } ] } @@ -86,7 +93,11 @@ def test_refresh_streams_schema(self): 'selected-by-default': True}, ('properties', 'char_name'): {'selected-by-default': True, 'inclusion': 'available', - 'sql-datatype': 'character'}}) + 'sql-datatype': 'character', + 'arbitrary_field_metadata': 'should be preserved'}, + ('properties', 'json_name'): {'selected-by-default': True, + 'inclusion': 'available', + 'sql-datatype': 'jsonb'}}) self.assertEqual({'properties': {'id': {'type': ['integer'], 'maximum': 2147483647, @@ -94,6 +105,7 @@ def test_refresh_streams_schema(self): 'character-varying_name': {'type': ['null', 'string']}, 'varchar-name': {'type': ['null', 'string'], 'maxLength': 28}, 'char_name': {'type': ['null', 'string'], 'maxLength': 10}, - 'text-name': {'type': ['null', 'string']}}, + 'text-name': {'type': ['null', 'string']}, + 'json_name': {'type': ['null', 'string']}}, 'type': 'object', 'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema')) From dff9d6c472f25d0e2e8ef1f5afdbdc6e5c6a6bef Mon Sep 17 00:00:00 2001 From: Dean Morin Date: Thu, 7 Oct 2021 10:13:38 -0700 Subject: [PATCH 3/3] Improve logging message Changed from debug -> info level. I deployed the changes from this PR, and I wanted to make sure things were working as expected. It was in an environment with sensitive data, so I wasn't able to simply switch to debug level since that would lead the sensitive data to the logs. Also added the column being affected to the message. --- tap_postgres/stream_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_postgres/stream_utils.py b/tap_postgres/stream_utils.py index c881be61..c5f0a564 100644 --- a/tap_postgres/stream_utils.py +++ b/tap_postgres/stream_utils.py @@ -93,7 +93,7 @@ def _merge_stream_schema(stream, discovered_stream): for column, column_schema in stream['schema']['properties'].items(): if column in discovered_schema['properties'] and column_schema != discovered_schema['properties'][column]: override = copy.deepcopy(stream['schema']['properties'][column]) - LOGGER.debug('Overriding schema for %s with %s', column, override) + LOGGER.info('Overriding schema for %s.%s with %s', stream['tap_stream_id'], column, override) discovered_schema['properties'][column].update(override) return discovered_schema