Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit b78ba51

Browse files
authored
Add docker-compose file and bump pylint (#142)
1 parent 8a3e0a9 commit b78ba51

File tree

11 files changed

+84
-97
lines changed

11 files changed

+84
-97
lines changed

.github/workflows/ci.yaml

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,13 @@ jobs:
1515
matrix:
1616
python-version: [ 3.6, 3.7, 3.8 ]
1717

18-
services:
19-
test_postgres:
20-
image: postgres:12
21-
env:
22-
POSTGRES_USER: test_user
23-
POSTGRES_PASSWORD: my-secret-passwd
24-
POSTGRES_DB: tap_postgres_test
25-
ports:
26-
- 5432:5432
27-
2818
steps:
2919
- name: Checkout repository
3020
uses: actions/checkout@v2
3121

22+
- name: Build PG DB container
23+
run: make start_db
24+
3225
- name: Set up Python ${{ matrix.python-version }}
3326
uses: actions/setup-python@v2
3427
with:

Makefile

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
venv:
2-
python3 -m venv venv
3-
. ./venv/bin/activate
4-
pip install --upgrade pip setuptools wheel
2+
python3 -m venv venv ;\
3+
. ./venv/bin/activate ;\
4+
pip install --upgrade pip setuptools wheel;\
55
pip install -e .[test]
66

77
pylint:
8-
. ./venv/bin/activate
8+
. ./venv/bin/activate ;\
99
pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/
1010

11+
start_db:
12+
docker-compose up -d --build db
13+
1114
test:
12-
. ./venv/bin/activate
15+
. ./venv/bin/activate ;\
1316
pytest --cov=tap_postgres --cov-fail-under=59 tests -v

README.md

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ It's recommended to use a virtualenv:
3131
or
3232

3333
```bash
34-
python3 -m venv venv
35-
. venv/bin/activate
36-
pip install --upgrade pip
37-
pip install .
34+
make venv
3835
```
3936

4037
### Create a config.json
@@ -168,10 +165,7 @@ to the tap for the next sync.
168165
169166
1. Install python test dependencies in a virtual env:
170167
```
171-
python3 -m venv venv
172-
. venv/bin/activate
173-
pip install --upgrade pip
174-
pip install .[test]
168+
make venv
175169
```
176170
177171
2. You need to have a postgres database to run the tests and export its credentials:
@@ -182,6 +176,8 @@ to the tap for the next sync.
182176
export TAP_POSTGRES_PASSWORD=<postgres-password>
183177
```
184178
179+
You can make use of the local docker-compose to spin up a test database by running `make start_db`
180+
185181
Test objects will be created in the `postgres` database.
186182
187183
3. To run the tests:
@@ -193,11 +189,6 @@ Test objects will be created in the `postgres` database.
193189
194190
1. Install python dependencies and run python linter
195191
```
196-
python3 -m venv venv
197-
. venv/bin/activate
198-
pip install --upgrade pip
199-
pip install .[test]
192+
make venv
200193
make pylint
201194
```
202-
203-
---

docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
version: "3.3"
2+
3+
services:
4+
db:
5+
image: "debezium/postgres:12-alpine"
6+
container_name: ""
7+
ports:
8+
- "5432:5432"
9+
environment:
10+
- POSTGRES_USER=test_user
11+
- POSTGRES_PASSWORD=my-secret-passwd
12+
- POSTGRES_DB=tap_postgres_test
13+
command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5"

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
extras_require={
2525
"test": [
2626
'pytest==6.2.5',
27-
'pylint==2.10.2',
27+
'pylint==2.12.*',
2828
'pytest-cov==3.0.0'
2929
]
3030
},

tap_postgres/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def do_sync_incremental(conn_config, stream, state, desired_columns, md_map):
7878
illegal_bk_keys = set(stream_state.keys()).difference(
7979
{'replication_key', 'replication_key_value', 'version', 'last_replication_method'})
8080
if len(illegal_bk_keys) != 0:
81-
raise Exception("invalid keys found in state: {}".format(illegal_bk_keys))
81+
raise Exception(f"invalid keys found in state: {illegal_bk_keys}")
8282

8383
state = singer.write_bookmark(state, stream['tap_stream_id'], 'replication_key', replication_key)
8484

@@ -104,7 +104,7 @@ def sync_method_for_streams(streams, state, default_replication_method):
104104
state = clear_state_on_replication_change(state, stream['tap_stream_id'], replication_key, replication_method)
105105

106106
if replication_method not in {'LOG_BASED', 'FULL_TABLE', 'INCREMENTAL'}:
107-
raise Exception("Unrecognized replication_method {}".format(replication_method))
107+
raise Exception(f"Unrecognized replication_method {replication_method}")
108108

109109
md_map = metadata.to_map(stream['metadata'])
110110
desired_columns = [c for c in stream['schema']['properties'].keys() if
@@ -187,7 +187,7 @@ def sync_traditional_stream(conn_config, stream, state, sync_method, end_lsn):
187187
sync_common.send_schema_message(stream, [])
188188
state = full_table.sync_table(conn_config, stream, state, desired_columns, md_map)
189189
else:
190-
raise Exception("unknown sync method {} for stream {}".format(sync_method, stream['tap_stream_id']))
190+
raise Exception(f"unknown sync method {sync_method} for stream {stream['tap_stream_id']}")
191191

192192
state = singer.set_currently_syncing(state, None)
193193
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
@@ -209,7 +209,7 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil
209209
# This is to avoid sending very old starting and flushing positions to source
210210
selected_streams = set()
211211
for stream in logical_streams:
212-
selected_streams.add("{}".format(stream['tap_stream_id']))
212+
selected_streams.add(stream['tap_stream_id'])
213213

214214
new_state = dict(currently_syncing=state['currently_syncing'], bookmarks={})
215215

@@ -269,7 +269,7 @@ def register_type_adapters(conn_config):
269269
enum_oid = oid[0]
270270
psycopg2.extensions.register_type(
271271
psycopg2.extensions.new_array_type(
272-
(enum_oid,), 'ENUM_{}[]'.format(enum_oid), psycopg2.STRING))
272+
(enum_oid,), f'ENUM_{enum_oid}[]', psycopg2.STRING))
273273

274274

275275
def do_sync(conn_config, catalog, default_replication_method, state, state_file=None):

tap_postgres/db.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
# pylint: disable=invalid-name,missing-function-docstring
2020
def calculate_destination_stream_name(stream, md_map):
21-
return "{}-{}".format(md_map.get((), {}).get('schema-name'), stream['stream'])
21+
return f"{md_map.get((), {}).get('schema-name')}-{stream['stream']}"
2222

2323

2424
# from the postgres docs:
@@ -29,13 +29,11 @@ def canonicalize_identifier(identifier):
2929

3030

3131
def fully_qualified_column_name(schema, table, column):
32-
return '"{}"."{}"."{}"'.format(canonicalize_identifier(schema),
33-
canonicalize_identifier(table),
34-
canonicalize_identifier(column))
32+
return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"."{canonicalize_identifier(column)}"'
3533

3634

3735
def fully_qualified_table_name(schema, table):
38-
return '"{}"."{}"'.format(canonicalize_identifier(schema), canonicalize_identifier(table))
36+
return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"'
3937

4038

4139
def open_connection(conn_config, logical_replication=False):
@@ -60,7 +58,7 @@ def open_connection(conn_config, logical_replication=False):
6058
return conn
6159

6260
def prepare_columns_for_select_sql(c, md_map):
63-
column_name = ' "{}" '.format(canonicalize_identifier(c))
61+
column_name = f' "{canonicalize_identifier(c)}" '
6462

6563
if ('properties', c) in md_map:
6664
sql_datatype = md_map[('properties', c)]['sql-datatype']
@@ -73,17 +71,17 @@ def prepare_columns_for_select_sql(c, md_map):
7371
return column_name
7472

7573
def prepare_columns_sql(c):
76-
column_name = """ "{}" """.format(canonicalize_identifier(c))
74+
column_name = f""" "{canonicalize_identifier(c)}" """
7775
return column_name
7876

7977

8078
def filter_dbs_sql_clause(sql, filter_dbs):
81-
in_clause = " AND datname in (" + ",".join(["'{}'".format(b.strip(' ')) for b in filter_dbs.split(',')]) + ")"
79+
in_clause = " AND datname in (" + ",".join([f"'{b.strip(' ')}'" for b in filter_dbs.split(',')]) + ")"
8280
return sql + in_clause
8381

8482

8583
def filter_schemas_sql_clause(sql, filer_schemas):
86-
in_clause = " AND n.nspname in (" + ",".join(["'{}'".format(b.strip(' ')) for b in filer_schemas.split(',')]) + ")"
84+
in_clause = " AND n.nspname in (" + ",".join([f"'{b.strip(' ')}'" for b in filer_schemas.split(',')]) + ")"
8785
return sql + in_clause
8886

8987

@@ -152,11 +150,10 @@ def selected_value_to_singer_value_impl(elem, sql_datatype):
152150
if sql_datatype == 'hstore':
153151
cleaned_elem = elem
154152
else:
155-
raise Exception("do not know how to marshall a dict if its not an hstore or json: {}".format(sql_datatype))
153+
raise Exception(f"do not know how to marshall a dict if its not an hstore or json: {sql_datatype}")
156154
else:
157155
raise Exception(
158-
"do not know how to marshall value of class( {} ) and sql_datatype ( {} )".format(elem.__class__,
159-
sql_datatype))
156+
f"do not know how to marshall value of class( {elem.__class__} ) and sql_datatype ( {sql_datatype} )")
160157

161158
return cleaned_elem
162159

@@ -249,7 +246,7 @@ def numeric_min(precision, scale):
249246

250247

251248
def filter_tables_sql_clause(sql, tables: List[str]):
252-
in_clause = " AND pg_class.relname in (" + ",".join(["'{}'".format(b.strip(' ')) for b in tables]) + ")"
249+
in_clause = " AND pg_class.relname in (" + ",".join([f"'{b.strip(' ')}'" for b in tables]) + ")"
253250
return sql + in_clause
254251

255252
def get_database_name(connection):

tap_postgres/discovery_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ def schema_for_column(col_info):
315315
scale = post_db.numeric_scale(col_info)
316316
precision = post_db.numeric_precision(col_info)
317317
schema_name = schema_name_for_numeric_array(precision, scale)
318-
column_schema['items'] = {'$ref': '#/definitions/{}'.format(schema_name)}
318+
column_schema['items'] = {'$ref': f'#/definitions/{schema_name}'}
319319
elif col_info.sql_data_type == 'double precision[]':
320320
column_schema['items'] = {'$ref': '#/definitions/sdc_recursive_number_array'}
321321
elif col_info.sql_data_type == 'hstore[]':
@@ -364,7 +364,7 @@ def nullable_column(col_type, pk):
364364

365365

366366
def schema_name_for_numeric_array(precision, scale):
367-
schema_name = 'sdc_recursive_decimal_{}_{}_array'.format(precision, scale)
367+
schema_name = f'sdc_recursive_decimal_{precision}_{scale}_array'
368368
return schema_name
369369

370370

@@ -382,7 +382,7 @@ def include_array_schemas(columns, schema):
382382
'maximum': post_db.numeric_max(precision, scale),
383383
'exclusiveMinimum': True,
384384
'minimum': post_db.numeric_min(precision, scale),
385-
'items': {'$ref': '#/definitions/{}'.format(schema_name)}}
385+
'items': {'$ref': f'#/definitions/{schema_name}'}}
386386

387387
return schema
388388

@@ -391,7 +391,7 @@ def write_sql_data_type_md(mdata, col_info):
391391
c_name = col_info.column_name
392392
if col_info.sql_data_type == 'bit' and col_info.character_maximum_length > 1:
393393
mdata = metadata.write(mdata, ('properties', c_name),
394-
'sql-datatype', "bit({})".format(col_info.character_maximum_length))
394+
'sql-datatype', f"bit({col_info.character_maximum_length})")
395395
else:
396396
mdata = metadata.write(mdata, ('properties', c_name), 'sql-datatype', col_info.sql_data_type)
397397

tap_postgres/sync_strategies/full_table.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,8 @@ def sync_view(conn_info, stream, state, desired_columns, md_map):
4444
with post_db.open_connection(conn_info) as conn:
4545
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='stitch_cursor') as cur:
4646
cur.itersize = post_db.CURSOR_ITER_SIZE
47-
select_sql = 'SELECT {} FROM {}'.format(','.join(escaped_columns),
48-
post_db.fully_qualified_table_name(schema_name,
49-
stream['table_name']))
47+
select_sql = f"SELECT {','.join(escaped_columns)} FROM " \
48+
f"{post_db.fully_qualified_table_name(schema_name,stream['table_name'])}"
5049

5150
LOGGER.info("select %s with itersize %s", select_sql, cur.itersize)
5251
cur.execute(select_sql)
@@ -60,7 +59,7 @@ def sync_view(conn_info, stream, state, desired_columns, md_map):
6059
time_extracted,
6160
md_map)
6261
singer.write_message(record_message)
63-
rows_saved = rows_saved + 1
62+
rows_saved += 1
6463
if rows_saved % UPDATE_BOOKMARK_PERIOD == 0:
6564
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
6665

@@ -128,17 +127,15 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
128127
xmin = singer.get_bookmark(state, stream['tap_stream_id'], 'xmin')
129128
if xmin:
130129
LOGGER.info("Resuming Full Table replication %s from xmin %s", nascent_stream_version, xmin)
131-
select_sql = """SELECT {}, xmin::text::bigint
132-
FROM {} where age(xmin::xid) <= age('{}'::xid)
133-
ORDER BY xmin::text ASC""".format(','.join(escaped_columns),
134-
fq_table_name,
135-
xmin)
130+
select_sql = f"""
131+
SELECT {','.join(escaped_columns)}, xmin::text::bigint
132+
FROM {fq_table_name} where age(xmin::xid) <= age('{xmin}'::xid)
133+
ORDER BY xmin::text ASC"""
136134
else:
137135
LOGGER.info("Beginning new Full Table replication %s", nascent_stream_version)
138-
select_sql = """SELECT {}, xmin::text::bigint
139-
FROM {}
140-
ORDER BY xmin::text ASC""".format(','.join(escaped_columns),
141-
fq_table_name)
136+
select_sql = f"""SELECT {','.join(escaped_columns)}, xmin::text::bigint
137+
FROM {fq_table_name}
138+
ORDER BY xmin::text ASC"""
142139

143140
LOGGER.info("select %s with itersize %s", select_sql, cur.itersize)
144141
cur.execute(select_sql)
@@ -155,7 +152,7 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
155152
md_map)
156153
singer.write_message(record_message)
157154
state = singer.write_bookmark(state, stream['tap_stream_id'], 'xmin', xmin)
158-
rows_saved = rows_saved + 1
155+
rows_saved += 1
159156
if rows_saved % UPDATE_BOOKMARK_PERIOD == 0:
160157
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
161158

0 commit comments

Comments
 (0)