Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
aa2ddd9
Add `use_replica` config flag
judahrand Jan 11, 2022
2383d7a
Add tests for `use_replica` option
judahrand Jan 12, 2022
58462ff
Rename `replica_*` config options to `secondary_*`
judahrand Jan 12, 2022
09d6b9c
Require primary and secondary to use the same user
judahrand Jan 12, 2022
e892687
Add documentation
judahrand Jan 12, 2022
d2d9aa6
Merge branch 'master' into replica-read
judahrand Jan 17, 2022
595813b
Merge pull request #2 from judahrand/replica-read
judahrand Jan 18, 2022
0855fc7
Perform logical replication after initial sync
judahrand Jan 18, 2022
c48300a
Actually run the initial `LOG_BASED` sync in test
judahrand Jan 18, 2022
3256aaa
Merge branch 'transferwise:master' into master
judahrand Jan 20, 2022
e931b75
Merge pull request #3 from judahrand/thread-issue-107
judahrand Jan 20, 2022
6b8ff42
Merge branch 'master' of https://github.com/transferwise/pipelinewise…
judahrand Jan 21, 2022
d542f96
Add ability to use `wal2json` `format-version` 2
judahrand Jan 24, 2022
931384b
Merge pull request #5 from thread/thread-wal2json-format-2
judahrand Jan 24, 2022
42c558b
Use fixed version of `wal2json`
judahrand Jan 24, 2022
ee9bedb
Only set required `wal2json` options
judahrand Jan 25, 2022
3e22e7a
Merge branch 'transferwise:master' into master
judahrand Feb 24, 2022
790524e
Use a more useful value for `extracted_at`
judahrand Mar 4, 2022
6771da1
Update tests
judahrand Mar 4, 2022
ab57e34
Merge pull request #6 from thread/extracted_at
judahrand Mar 4, 2022
ac75c8e
Merge remote-tracking branch 'upstream/master'
judahrand Mar 4, 2022
8438faf
Fix tests
judahrand Mar 5, 2022
699d21b
Add Postgres range types to schema
judahrand Mar 25, 2022
62f8588
Merge pull request #7 from thread/range-types
judahrand Mar 25, 2022
2216505
Fix test
judahrand Mar 25, 2022
bee1d0c
Merge branch 'transferwise:master' into master
judahrand Oct 4, 2022
ee2afc9
Update Dockerfile
judahrand Oct 4, 2022
009dfa5
Merge branch 'master' of https://github.com/transferwise/pipelinewise…
judahrand Oct 31, 2022
620b3a5
Add missing argument
judahrand Oct 31, 2022
15bb16f
Remove redundant code
judahrand Oct 31, 2022
2a7082e
Remove unused import
judahrand Oct 31, 2022
c44d03b
Update README.md
judahrand Oct 31, 2022
cc562fe
fixed conflicts w/ master
josescuderoh Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
FROM docker.io/bitnami/postgresql:12
FROM docker.io/bitnami/postgresql:12-debian-11

USER root

RUN apt-get update \
&& apt-get -y install git build-essential \
&& git clone --depth 1 --branch wal2json_2_3 https://github.com/eulerto/wal2json.git \
&& cd /wal2json \
&& make && make install \
&& cd / \
&& rm -rf wal2json \
&& rm -r /var/lib/apt/lists /var/cache/apt/archives
# Git SHA of v2.4
ENV WAL2JSON_COMMIT_ID=36fbee6cbb7e4bc1bf9ee4f55842ab51393e3ac0

# Compile the plugins from sources and install
RUN install_packages ca-certificates curl gcc git gnupg make pkgconf && \
curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \
echo "deb http://apt.postgresql.org/pub/repos/apt bullseye-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
install_packages postgresql-server-dev-12 && \
git clone https://github.com/eulerto/wal2json -b master --single-branch && \
(cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) && \
rm -rf wal2json

USER 1001
15 changes: 6 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ version: "3.3"

services:
db_primary:
build: .
container_name: "primary"
build:
context: .
dockerfile: Dockerfile
ports:
- "5432:5432"
environment:
- POSTGRESQL_REPLICATION_MODE=master
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRES_USER=test_user
- POSTGRES_PASSWORD=my-secret-passwd
- POSTGRESQL_REPLICATION_USER=test_user
- POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd
- POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd
- POSTGRESQL_DATABASE=tap_postgres_test
- POSTGRESQL_WAL_LEVEL=logical
- ALLOW_EMPTY_PASSWORD=yes
volumes:
- ./db_setup/config:/bitnami/postgresql/conf
Expand All @@ -29,8 +26,8 @@ services:
- db_primary
environment:
- POSTGRESQL_REPLICATION_MODE=slave
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRESQL_REPLICATION_USER=test_user
- POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd
- POSTGRESQL_MASTER_HOST=db_primary
- POSTGRESQL_MASTER_PORT_NUMBER=5432
- ALLOW_EMPTY_PASSWORD=yes
4 changes: 4 additions & 0 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# finishing previously interrupted full-table (first stage of logical replication)
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

# inconsistent state
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
Expand All @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# initial full-table phase of logical replication
lookup[stream['tap_stream_id']] = 'logical_initial'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

else: # no xmin but we have an lsn
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication
Expand Down
4 changes: 4 additions & 0 deletions tap_postgres/discovery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
INTEGER_TYPES = {'integer', 'smallint', 'bigint'}
FLOAT_TYPES = {'real', 'double precision'}
JSON_TYPES = {'json', 'jsonb'}
RANGE_TYPES = {'int4range', 'int8range', 'numrange', 'tsrange', 'tstzrange', 'daterange'}
BASE_RECURSIVE_SCHEMAS = {
'sdc_recursive_integer_array': {'type': ['null', 'integer', 'array'],
'items': {'$ref': '#/definitions/sdc_recursive_integer_array'}},
Expand Down Expand Up @@ -280,6 +281,9 @@ def schema_for_column_datatype(col):
schema['type'] = nullable_column('string', col.is_primary_key)
return schema

if data_type in RANGE_TYPES:
schema['type'] = nullable_column('string', col.is_primary_key)

return schema


Expand Down
10 changes: 5 additions & 5 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from select import select
from psycopg2 import sql
from singer import metadata, utils, get_bookmark
from singer import metadata, get_bookmark
from dateutil.parser import parse, UnknownTimezoneWarning, ParserError
from functools import reduce

Expand Down Expand Up @@ -377,12 +377,14 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map,


# pylint: disable=unused-argument,too-many-locals
def consume_message(streams, state, msg, time_extracted, conn_info):
def consume_message(streams, state, msg, conn_info):
try:
payload = json.loads(msg.payload)
except Exception:
return state

time_extracted = singer.utils.strptime_to_utc(payload['timestamp'])

lsn = msg.data_start

streams_lookup = {s['tap_stream_id']: s for s in streams}
Expand Down Expand Up @@ -575,7 +577,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
start_lsn = lsn_comitted
lsn_to_flush = None
time_extracted = utils.now()
slot = locate_replication_slot(conn_info)
lsn_last_processed = None
lsn_currently_processing = None
Expand Down Expand Up @@ -606,7 +607,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
int_to_lsn(start_lsn),
int_to_lsn(end_lsn),
slot)
# psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval
cur.start_replication(slot_name=slot,
decode=True,
start_lsn=start_lsn,
Expand Down Expand Up @@ -652,7 +652,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
int_to_lsn(end_lsn))
break

state = consume_message(logical_streams, state, msg, time_extracted, conn_info)
state = consume_message(logical_streams, state, msg, conn_info)

# When using wal2json with write-in-chunks, multiple messages can have the same lsn
# This is to ensure we only flush to lsn that has completed entirely
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_unsupported_pk.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def setUp(self):
{"name": "circle_col", "type": "circle"},
{"name": "xml_col", "type": "xml"},
{"name": "composite_col", "type": "person_composite"},
{"name": "int_range_col", "type": "int4range"},
],
"name": Unsupported.table_name}
with get_test_connection() as conn:
Expand All @@ -53,7 +52,6 @@ def test_catalog(self):
('properties', 'bit_string_col'): {'sql-datatype': 'bit(5)', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'line_col'): {'sql-datatype': 'line', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'xml_col'): {'sql-datatype': 'xml', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'int_range_col'): {'sql-datatype': 'int4range', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'circle_col'): {'sql-datatype': 'circle', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'polygon_col'): {'sql-datatype': 'polygon', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'box_col'): {'sql-datatype': 'box', 'selected-by-default': False, 'inclusion': 'unsupported'},
Expand Down
Loading