From 99e67b72a2d377d7f7db97169f42d697a1b87422 Mon Sep 17 00:00:00 2001 From: andreyshalitkin Date: Mon, 9 Aug 2021 19:48:30 +0300 Subject: [PATCH 1/2] https://github.com/transferwise/pipelinewise-tap-postgres/issues/113 --- tap_postgres/__init__.py | 21 ++++++++++++++++----- tap_postgres/db.py | 1 + 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 75ed479f..e55a619c 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -56,11 +56,20 @@ def do_sync_full_table(conn_config, stream, state, desired_columns, md_map): """ LOGGER.info("Stream %s is using full_table replication", stream['tap_stream_id']) sync_common.send_schema_message(stream, []) - if md_map.get((), {}).get('is-view'): - state = full_table.sync_view(conn_config, stream, state, desired_columns, md_map) - else: - state = full_table.sync_table(conn_config, stream, state, desired_columns, md_map) - return state + attemp = 0 + while True: + try: + if md_map.get((), {}).get('is-view'): + state = full_table.sync_view(conn_config, stream, state, desired_columns, md_map) + else: + state = full_table.sync_table(conn_config, stream, state, desired_columns, md_map) + return state + except Exception as e: + LOGGER.warn("error on read for a stream: %s. Message: %s", stream['tap_stream_id'], e) + if attemp > post_db.TRY_NUMBER: + raise e + else: + attemp = attemp + 1 # Possible state keys: replication_key, replication_key_value, version @@ -411,6 +420,8 @@ def main_impl(): conn_config['sslmode'] = 'require' post_db.CURSOR_ITER_SIZE = int(args.config.get('itersize', post_db.CURSOR_ITER_SIZE)) + post_db.TRY_NUMBER = int(args.config.get('trynumber', post_db.TRY_NUMBER)) + if args.discover: do_discovery(conn_config) diff --git a/tap_postgres/db.py b/tap_postgres/db.py index c7711c42..737148da 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -14,6 +14,7 @@ LOGGER = singer.get_logger('tap_postgres') CURSOR_ITER_SIZE = 20000 +TRY_NUMBER = 10 # pylint: disable=invalid-name,missing-function-docstring From 0a7303cd2d2b3f3a2bfcd4d96d35e3006e9f1789 Mon Sep 17 00:00:00 2001 From: andreyshalitkin Date: Mon, 9 Aug 2021 19:50:46 +0300 Subject: [PATCH 2/2] https://github.com/transferwise/pipelinewise-tap-postgres/issues/113. Typo fix --- tap_postgres/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index e55a619c..7fb9ed73 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -56,7 +56,7 @@ def do_sync_full_table(conn_config, stream, state, desired_columns, md_map): """ LOGGER.info("Stream %s is using full_table replication", stream['tap_stream_id']) sync_common.send_schema_message(stream, []) - attemp = 0 + attempt = 0 while True: try: if md_map.get((), {}).get('is-view'): @@ -66,10 +66,10 @@ def do_sync_full_table(conn_config, stream, state, desired_columns, md_map): return state except Exception as e: LOGGER.warn("error on read for a stream: %s. Message: %s", stream['tap_stream_id'], e) - if attemp > post_db.TRY_NUMBER: + if attempt > post_db.TRY_NUMBER: raise e else: - attemp = attemp + 1 + attempt = attempt + 1 # Possible state keys: replication_key, replication_key_value, version