Skip to content

Commit e931b75

Browse files
authored
Merge pull request #3 from judahrand/thread-issue-107
Perform logical replication after initial sync
2 parents 3256aaa + c48300a commit e931b75

File tree

5 files changed

+80
-18
lines changed

5 files changed

+80
-18
lines changed

Dockerfile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM docker.io/bitnami/postgresql:12
2+
3+
# Install wal2json
4+
USER root
5+
RUN install_packages curl ca-certificates gnupg && \
6+
curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \
7+
echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
8+
install_packages postgresql-12-wal2json && \
9+
cp /usr/lib/postgresql/12/lib/wal2json.so /opt/bitnami/postgresql/lib/wal2json.so
10+
USER 1001

docker-compose.yml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@ version: "3.3"
22

33
services:
44
db_primary:
5-
image: "docker.io/bitnami/postgresql:12"
5+
build: .
66
container_name: "primary"
77
ports:
88
- "5432:5432"
99
environment:
1010
- POSTGRESQL_REPLICATION_MODE=master
11-
- POSTGRESQL_REPLICATION_USER=repl_user
12-
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
13-
- POSTGRES_USER=test_user
14-
- POSTGRES_PASSWORD=my-secret-passwd
11+
- POSTGRESQL_REPLICATION_USER=test_user
12+
- POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd
1513
- POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd
1614
- POSTGRESQL_DATABASE=tap_postgres_test
15+
- POSTGRESQL_WAL_LEVEL=logical
1716
- ALLOW_EMPTY_PASSWORD=yes
1817
db_replica:
1918
image: "docker.io/bitnami/postgresql:12"
@@ -24,8 +23,8 @@ services:
2423
- db_primary
2524
environment:
2625
- POSTGRESQL_REPLICATION_MODE=slave
27-
- POSTGRESQL_REPLICATION_USER=repl_user
28-
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
26+
- POSTGRESQL_REPLICATION_USER=test_user
27+
- POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd
2928
- POSTGRESQL_MASTER_HOST=db_primary
3029
- POSTGRESQL_MASTER_PORT_NUMBER=5432
3130
- ALLOW_EMPTY_PASSWORD=yes

tap_postgres/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
131131
# finishing previously interrupted full-table (first stage of logical replication)
132132
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
133133
traditional_steams.append(stream)
134+
# do any required logical replication after inital sync is complete
135+
logical_streams.append(stream)
134136

135137
# inconsistent state
136138
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
@@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
142144
# initial full-table phase of logical replication
143145
lookup[stream['tap_stream_id']] = 'logical_initial'
144146
traditional_steams.append(stream)
147+
# do any required logical replication after inital sync is complete
148+
logical_streams.append(stream)
145149

146150
else: # no xmin but we have an lsn
147151
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication

tests/test_full_table_interruption.py

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import re
21
import psycopg2
32
import unittest.mock
43
import pytest
54
import tap_postgres
5+
from tap_postgres.sync_strategies import logical_replication
66
import tap_postgres.sync_strategies.full_table as full_table
77
import tap_postgres.sync_strategies.common as pg_common
88
import singer
@@ -49,10 +49,29 @@ def do_not_dump_catalog(catalog):
4949
full_table.UPDATE_BOOKMARK_PERIOD = 1
5050

5151
@pytest.mark.parametrize('use_secondary', [False, True])
52+
@unittest.mock.patch('tap_postgres.sync_logical_streams', wraps=tap_postgres.sync_logical_streams)
5253
@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect)
5354
class TestLogicalInterruption:
5455
maxDiff = None
5556

57+
def setup_class(self):
58+
conn_config = get_test_connection_config()
59+
slot_name = logical_replication.generate_replication_slot_name(
60+
dbname=conn_config['dbname'], tap_id=conn_config['tap_id']
61+
)
62+
with get_test_connection(superuser=True) as conn:
63+
with conn.cursor() as cur:
64+
cur.execute(f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')")
65+
66+
def teardown_class(self):
67+
conn_config = get_test_connection_config()
68+
slot_name = logical_replication.generate_replication_slot_name(
69+
dbname=conn_config['dbname'], tap_id=conn_config['tap_id']
70+
)
71+
with get_test_connection(superuser=True) as conn:
72+
with conn.cursor() as cur:
73+
cur.execute(f"SELECT * FROM pg_drop_replication_slot('{slot_name}')")
74+
5675
def setup_method(self):
5776
table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True},
5877
{"name" : 'name', "type": "character varying"},
@@ -67,15 +86,24 @@ def setup_method(self):
6786
global CAUGHT_MESSAGES
6887
CAUGHT_MESSAGES.clear()
6988

70-
def test_catalog(self, mock_connect, use_secondary):
89+
def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary):
7190
singer.write_message = singer_write_message_no_cow
7291
pg_common.write_schema_message = singer_write_message_ok
7392

7493
conn_config = get_test_connection_config(use_secondary=use_secondary)
7594
streams = tap_postgres.do_discovery(conn_config)
7695

7796
# Assert that we connected to the correct database
78-
expected_connection = {
97+
primary_connection = {
98+
'application_name': unittest.mock.ANY,
99+
'dbname': unittest.mock.ANY,
100+
'user': unittest.mock.ANY,
101+
'password': unittest.mock.ANY,
102+
'connect_timeout':unittest.mock.ANY,
103+
'host': conn_config['host'],
104+
'port': conn_config['port'],
105+
}
106+
secondary_connection = {
79107
'application_name': unittest.mock.ANY,
80108
'dbname': unittest.mock.ANY,
81109
'user': unittest.mock.ANY,
@@ -84,8 +112,8 @@ def test_catalog(self, mock_connect, use_secondary):
84112
'host': conn_config['secondary_host'] if use_secondary else conn_config['host'],
85113
'port': conn_config['secondary_port'] if use_secondary else conn_config['port'],
86114
}
87-
mock_connect.assert_called_once_with(**expected_connection)
88-
mock_connect.reset_mock()
115+
116+
mock_connect.assert_called_once_with(**secondary_connection)
89117

90118
cow_stream = [s for s in streams if s['table_name'] == 'COW'][0]
91119
assert cow_stream is not None
@@ -114,15 +142,20 @@ def test_catalog(self, mock_connect, use_secondary):
114142
state = {}
115143
#the initial phase of cows logical replication will be a full table.
116144
#it will sync the first record and then blow up on the 2nd record
145+
mock_connect.reset_mock()
117146
try:
118147
tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state)
119148
except Exception:
120149
blew_up_on_cow = True
121150

122151
assert blew_up_on_cow is True
123152

124-
mock_connect.assert_called_with(**expected_connection)
125-
mock_connect.reset_mock()
153+
mock_sync_logical_streams.assert_not_called()
154+
155+
mock_connect.assert_has_calls(
156+
[unittest.mock.call(**primary_connection)] * 2 + \
157+
[unittest.mock.call(**secondary_connection)] * 4
158+
)
126159

127160
assert 7 == len(CAUGHT_MESSAGES)
128161

@@ -171,12 +204,17 @@ def test_catalog(self, mock_connect, use_secondary):
171204
global COW_RECORD_COUNT
172205
COW_RECORD_COUNT = 0
173206
CAUGHT_MESSAGES.clear()
207+
mock_connect.reset_mock()
174208
tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state)
175209

176-
mock_connect.assert_called_with(**expected_connection)
177-
mock_connect.reset_mock()
210+
mock_sync_logical_streams.assert_called_once()
211+
212+
mock_connect.assert_has_calls(
213+
[unittest.mock.call(**primary_connection)] * 2 + \
214+
[unittest.mock.call(**secondary_connection)] * 4
215+
)
178216

179-
assert 8 == len(CAUGHT_MESSAGES)
217+
assert 10 == len(CAUGHT_MESSAGES)
180218

181219
assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA'
182220

@@ -225,6 +263,13 @@ def test_catalog(self, mock_connect, use_secondary):
225263
assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn
226264
assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version
227265

266+
assert CAUGHT_MESSAGES[8]['type'] == 'SCHEMA'
267+
268+
assert isinstance(CAUGHT_MESSAGES[9], singer.messages.StateMessage)
269+
assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('xmin') is None
270+
assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('lsn') == end_lsn
271+
assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('version') == new_version
272+
228273
@pytest.mark.parametrize('use_secondary', [False, True])
229274
@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect)
230275
class TestFullTableInterruption:

tests/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ def get_test_connection_config(target_db='postgres', use_secondary=False):
1818
if len(missing_envs) != 0:
1919
raise Exception("set TAP_POSTGRES_HOST, TAP_POSTGRES_USER, TAP_POSTGRES_PASSWORD, TAP_POSTGRES_PORT")
2020

21-
conn_config = {'host': os.environ.get('TAP_POSTGRES_HOST'),
21+
conn_config = {'tap_id': 'test-postgres',
22+
'max_run_seconds': 5,
23+
'break_at_end_lsn': True,
24+
'logical_poll_total_seconds': 2,
25+
'host': os.environ.get('TAP_POSTGRES_HOST'),
2226
'user': os.environ.get('TAP_POSTGRES_USER'),
2327
'password': os.environ.get('TAP_POSTGRES_PASSWORD'),
2428
'port': os.environ.get('TAP_POSTGRES_PORT'),

0 commit comments

Comments
 (0)