Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ jobs:

- name: Tests
env:
TAP_POSTGRES_HOST: localhost
TAP_POSTGRES_PORT: 5432
TAP_POSTGRES_USER: test_user
TAP_POSTGRES_PASSWORD: my-secret-passwd
TAP_POSTGRES_HOST: localhost
TAP_POSTGRES_SECONDARY_HOST: localhost
TAP_POSTGRES_SECONDARY_PORT: 5433
LOGGING_CONF_FILE: ./sample_logging.conf
run: make test
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pylint:
pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/

start_db:
docker-compose up -d --build db
docker-compose up -d

test:
. ./venv/bin/activate ;\
Expand Down
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Full list of options in `config.json`:
| tap_id | String | No | ID of the pipeline/tap (Default: None) |
| itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) |
| default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) |
| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) |
| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) |
| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) |


### Run the tap in Discovery Mode
Expand Down Expand Up @@ -142,7 +145,7 @@ to the tap for the next sync.
```

Restart your PostgreSQL service to ensure the changes take effect.

**Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5.
This should be sufficient unless you have a large number of read replicas connected to the master instance.

Expand All @@ -151,11 +154,11 @@ to the tap for the next sync.
In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a
client in the order they were made on the original server. Each slot streams a sequence of changes from a single
database.

Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot:
```
SELECT *
FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
```

**Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple
Expand All @@ -172,6 +175,8 @@ to the tap for the next sync.
```
export TAP_POSTGRES_HOST=<postgres-host>
export TAP_POSTGRES_PORT=<postgres-port>
export TAP_POSTGRES_SECONDARY_HOST=<postgres-replica-host>
export TAP_POSTGRES_SECONDARY_PORT=<postgres-replica-port>
export TAP_POSTGRES_USER=<postgres-user>
export TAP_POSTGRES_PASSWORD=<postgres-password>
```
Expand Down
28 changes: 23 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
version: "3.3"

services:
db:
image: "debezium/postgres:12-alpine"
container_name: ""
db_primary:
image: "docker.io/bitnami/postgresql:12"
container_name: "primary"
ports:
- "5432:5432"
environment:
- POSTGRESQL_REPLICATION_MODE=master
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRES_USER=test_user
- POSTGRES_PASSWORD=my-secret-passwd
- POSTGRES_DB=tap_postgres_test
command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5"
- POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd
- POSTGRESQL_DATABASE=tap_postgres_test
- ALLOW_EMPTY_PASSWORD=yes
db_replica:
image: "docker.io/bitnami/postgresql:12"
container_name: replica
ports:
- "5433:5432"
depends_on:
- db_primary
environment:
- POSTGRESQL_REPLICATION_MODE=slave
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRESQL_MASTER_HOST=db_primary
- POSTGRESQL_MASTER_PORT_NUMBER=5432
- ALLOW_EMPTY_PASSWORD=yes
19 changes: 18 additions & 1 deletion tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# finishing previously interrupted full-table (first stage of logical replication)
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

# inconsistent state
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
Expand All @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# initial full-table phase of logical replication
lookup[stream['tap_stream_id']] = 'logical_initial'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

else: # no xmin but we have an lsn
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication
Expand Down Expand Up @@ -405,9 +409,22 @@ def main_impl():
'debug_lsn': args.config.get('debug_lsn') == 'true',
'max_run_seconds': args.config.get('max_run_seconds', 43200),
'break_at_end_lsn': args.config.get('break_at_end_lsn', True),
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0))
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
'use_secondary': args.config.get('use_secondary', False),
}

if conn_config['use_secondary']:
try:
conn_config.update({
# Host and Port are mandatory.
'secondary_host': args.config['secondary_host'],
'secondary_port': args.config['secondary_port'],
})
except KeyError as exc:
raise ValueError(
"When 'use_secondary' enabled 'secondary_host' and 'secondary_port' must be defined."
) from exc

if args.config.get('ssl') == 'true':
conn_config['sslmode'] = 'require'

Expand Down
10 changes: 9 additions & 1 deletion tap_postgres/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def fully_qualified_table_name(schema, table):
return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"'


def open_connection(conn_config, logical_replication=False):
def open_connection(conn_config, logical_replication=False, prioritize_primary=False):
cfg = {
'application_name': 'pipelinewise',
'host': conn_config['host'],
Expand All @@ -47,6 +47,14 @@ def open_connection(conn_config, logical_replication=False):
'connect_timeout': 30
}

if conn_config.get('use_secondary', False) and not prioritize_primary and not logical_replication:
# Try to use replica but fallback to primary if keys are missing. This is the same behavior as
# https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129
cfg.update({
'host': conn_config.get("secondary_host", conn_config['host']),
'port': conn_config.get("secondary_port", conn_config['port']),
})

if conn_config.get('sslmode'):
cfg['sslmode'] = conn_config['sslmode']

Expand Down
12 changes: 6 additions & 6 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UnsupportedPayloadKindError(Exception):

# pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments
def get_pg_version(conn_info):
with post_db.open_connection(conn_info, False) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
version = cur.fetchone()[0]
Expand Down Expand Up @@ -93,7 +93,7 @@ def fetch_current_lsn(conn_config):
if version < 90400:
raise Exception('Logical replication not supported before PostgreSQL 9.4')

with post_db.open_connection(conn_config, False) as conn:
with post_db.open_connection(conn_config, False, True) as conn:
with conn.cursor() as cur:
# Use version specific lsn command
if version >= 100000:
Expand Down Expand Up @@ -138,7 +138,7 @@ def create_hstore_elem_query(elem):


def create_hstore_elem(conn_info, elem):
with post_db.open_connection(conn_info) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
query = create_hstore_elem_query(elem)
cur.execute(query)
Expand All @@ -151,7 +151,7 @@ def create_array_elem(elem, sql_datatype, conn_info):
if elem is None:
return None

with post_db.open_connection(conn_info) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
if sql_datatype == 'bit[]':
cast_datatype = 'boolean[]'
Expand Down Expand Up @@ -517,7 +517,7 @@ def locate_replication_slot_by_cur(cursor, dbname, tap_id=None):


def locate_replication_slot(conn_info):
with post_db.open_connection(conn_info, False) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
return locate_replication_slot_by_cur(cur, conn_info['dbname'], conn_info['tap_id'])

Expand Down Expand Up @@ -576,7 +576,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
version = get_pg_version(conn_info)

# Create replication connection and cursor
conn = post_db.open_connection(conn_info, True)
conn = post_db.open_connection(conn_info, True, True)
cur = conn.cursor()

# Set session wal_sender_timeout for PG12 and above
Expand Down
12 changes: 6 additions & 6 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def setUp(self):
table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True },
{"name" : 'our_hstore', "type" : "hstore" }],
"name" : TestHStoreTable.table_name}
with get_test_connection() as conn:
with get_test_connection(superuser=True) as conn:
cur = conn.cursor()
cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """)
if cur.fetchone()[0] is None:
Expand Down Expand Up @@ -536,7 +536,7 @@ class TestColumnGrants(unittest.TestCase):
table_name = 'CHICKEN TIMES'
user = 'tmp_user_for_grant_tests'
password = 'password'

def setUp(self):
table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True},
{"name" : 'size integer', "type" : "integer", "quoted" : True},
Expand All @@ -545,7 +545,7 @@ def setUp(self):
"name" : TestColumnGrants.table_name}
ensure_test_table(table_spec)

with get_test_connection() as conn:
with get_test_connection(superuser=True) as conn:
cur = conn.cursor()

sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password)
Expand All @@ -560,8 +560,8 @@ def setUp(self):
LOGGER.info("running sql: {}".format(sql))
cur.execute(sql)




def test_catalog(self):
conn_config = get_test_connection_config()
Expand All @@ -587,7 +587,7 @@ def test_catalog(self):
('properties', 'id'): {'inclusion': 'available',
'selected-by-default': True,
'sql-datatype': 'integer'}})

self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS,
'type': 'object',
'properties': {'id': {'type': ['null', 'integer'],
Expand Down
Loading