Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
f385522
Create replication for partitioned table issue
abhi-airspace-intelligence Aug 28, 2025
9913523
Fix partitioned tables
abhi-airspace-intelligence Aug 28, 2025
5509568
Clippy
abhi-airspace-intelligence Aug 28, 2025
b423049
merge conflict
abhi-airspace-intelligence Sep 1, 2025
c4087b2
make sql lowercase
abhi-airspace-intelligence Sep 1, 2025
ae4f86f
fit new test style
abhi-airspace-intelligence Sep 2, 2025
ef58a4b
Switch to `publish_via_partition_root = true`
abhi-airspace-intelligence Sep 2, 2025
8272754
clippy
abhi-airspace-intelligence Sep 10, 2025
80ece8f
Merge conflict
abhi-airspace-intelligence Sep 22, 2025
f8be974
Use cargo nextest to speed up runs
abhi-airspace-intelligence Sep 15, 2025
8d2fd3c
add retries to test for nondeterministic tests
abhi-airspace-intelligence Sep 15, 2025
9bd46b5
Add slow timeout
abhi-airspace-intelligence Sep 15, 2025
3524085
Simplify COPY command
abhi-airspace-intelligence Sep 29, 2025
2089cc0
Use recursive query to get oids and write test
abhi-airspace-intelligence Sep 29, 2025
5ef0c6d
Remove unnecessary namespace check
abhi-airspace-intelligence Sep 29, 2025
82436de
Add test about dropping partitions
abhi-airspace-intelligence Sep 29, 2025
c00be91
fmt
abhi-airspace-intelligence Sep 29, 2025
c41ed85
Clippy
abhi-airspace-intelligence Sep 29, 2025
9974e25
wip
abhi-airspace-intelligence Aug 26, 2025
67657e8
Actually get working
abhi-airspace-intelligence Aug 26, 2025
e2f8e0c
Get stuff working
abhi-airspace-intelligence Aug 26, 2025
8141bf6
Get integration tests to pass
abhi-airspace-intelligence Aug 26, 2025
f7e466c
Trigger CI
abhi-airspace-intelligence Aug 26, 2025
56bb6d1
Pass storage_options directly instead of reading from env
abhi-airspace-intelligence Aug 27, 2025
a650584
Clippy
abhi-airspace-intelligence Aug 27, 2025
db87503
Implement proper support for datatypes
abhi-airspace-intelligence Aug 27, 2025
af4a628
Clippy
abhi-airspace-intelligence Aug 28, 2025
59135d6
sigh
abhi-airspace-intelligence Aug 28, 2025
8145044
Bump deltalake and delta_kernel to latest
abhi-airspace-intelligence Aug 28, 2025
61d6291
Fix deprecated usage
abhi-airspace-intelligence Aug 28, 2025
079804b
Disable coveralls debug mode
abhi-airspace-intelligence Aug 28, 2025
cd94766
Correctly parse decimal/numeric types
abhi-airspace-intelligence Aug 28, 2025
15d7713
Vibecode some stuff to clean up tomorrow
abhi-airspace-intelligence Aug 28, 2025
7babad3
blah
abhi-airspace-intelligence Aug 28, 2025
3022994
Merge conflicts + remove explict delta_kernel dependency
abhi-airspace-intelligence Sep 2, 2025
025d545
Disable test temporarily
abhi-airspace-intelligence Sep 2, 2025
51854ca
rename `delta` -> `deltalake`
abhi-airspace-intelligence Sep 2, 2025
c8952ab
Temporarily disable table truncation
abhi-airspace-intelligence Sep 3, 2025
46351d7
Clean up dependencies
abhi-airspace-intelligence Sep 3, 2025
205c963
Feature flag deltalake
abhi-airspace-intelligence Sep 4, 2025
ab7f08e
rebase conflict
abhi-airspace-intelligence Sep 5, 2025
87b18a5
Clippy
abhi-airspace-intelligence Sep 10, 2025
56aeea4
fmt
abhi-airspace-intelligence Sep 10, 2025
86e6b6f
wip
abhi-airspace-intelligence Sep 13, 2025
4191a75
more wip
abhi-airspace-intelligence Sep 14, 2025
2143c3b
wip
abhi-airspace-intelligence Sep 14, 2025
f873ef6
reduce clone frequency
abhi-airspace-intelligence Sep 14, 2025
3b9b420
more cleanup
abhi-airspace-intelligence Sep 14, 2025
7865987
more wip
abhi-airspace-intelligence Sep 14, 2025
10692d3
clippy/rename
abhi-airspace-intelligence Sep 14, 2025
20b126e
Clean up dependencies
abhi-airspace-intelligence Sep 14, 2025
8c3e565
Fix schema mapping
abhi-airspace-intelligence Sep 14, 2025
7e011ca
Make merges work
abhi-airspace-intelligence Sep 14, 2025
8e2ec2b
Enable rustls feature
abhi-airspace-intelligence Sep 14, 2025
1262555
Fix merges and switch to snapshot testing
abhi-airspace-intelligence Sep 15, 2025
9f071f2
merge conflict
abhi-airspace-intelligence Sep 15, 2025
8b9aa8b
another merge conflict
abhi-airspace-intelligence Sep 15, 2025
659e647
Move arrow encoding to a separate module
abhi-airspace-intelligence Sep 15, 2025
a1e9a46
add back parquet features that were removed
abhi-airspace-intelligence Sep 15, 2025
f0d6837
fmt
abhi-airspace-intelligence Sep 15, 2025
83430b2
Implement delete from table
abhi-airspace-intelligence Sep 15, 2025
29dde11
Add tests for appends
abhi-airspace-intelligence Sep 15, 2025
09bf24b
fmt
abhi-airspace-intelligence Sep 15, 2025
201381f
wip add delta benchmark
abhi-airspace-intelligence Sep 15, 2025
fc13149
Add tpch seeder image
abhi-airspace-intelligence Sep 16, 2025
db8b797
Remove more unncessary clones
abhi-airspace-intelligence Sep 16, 2025
4f39731
cleanup integration tests a bit
abhi-airspace-intelligence Sep 17, 2025
373f3e0
Add zorder/compact support
abhi-airspace-intelligence Sep 17, 2025
d1fc642
Bump to git deltalake
abhi-airspace-intelligence Sep 22, 2025
1175b18
fmt
abhi-airspace-intelligence Sep 22, 2025
8cfb9f0
Merge conflicts
abhi-airspace-intelligence Sep 22, 2025
711d08a
Increase disk size for CI/CD
abhi-airspace-intelligence Sep 22, 2025
d9fbf37
Rework maintenance tasks
abhi-airspace-intelligence Sep 24, 2025
e38846a
Update delta-rs version
abhi-airspace-intelligence Sep 24, 2025
273ab47
Remove maximize build space job
abhi-airspace-intelligence Sep 24, 2025
87b2f68
Fix imports
abhi-airspace-intelligence Sep 24, 2025
c08885e
Remove arrow dependency
abhi-airspace-intelligence Sep 24, 2025
2fdf2f5
wip refactor arrow schema
abhi-airspace-intelligence Sep 24, 2025
3795682
Continue refactor
abhi-airspace-intelligence Sep 24, 2025
1b2db55
Mostly finish the refactor
abhi-airspace-intelligence Sep 25, 2025
9de4f17
Fix Decimal128 mapping
abhi-airspace-intelligence Sep 25, 2025
e3c3383
Improve data mapping integration test
abhi-airspace-intelligence Sep 25, 2025
7f85c76
Add compaction test
abhi-airspace-intelligence Sep 25, 2025
f1521ae
Ensure keys are qualified
abhi-airspace-intelligence Sep 25, 2025
89ab186
Add more tracing and instrumentation
abhi-airspace-intelligence Sep 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[profile.default]
retries = 2
slow-timeout = { period = "15s", terminate-after = 4 }

[profile.no-bigquery]
default-filter = "not (package(etl-destinations) and binary(/bigquery_pipeline/))"
32 changes: 14 additions & 18 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,14 @@ jobs:
sudo apt-get install libpq-dev -y
./etl-api/scripts/run_migrations.sh

- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-nextest

- name: Run Tests
run: |
cargo test \
--workspace \
--all-features \
--no-fail-fast \
--exclude etl-destinations \
&& \
cargo test \
-p etl-destinations \
--no-default-features \
--no-fail-fast \
--features iceberg
cargo nextest run --all-features --no-fail-fast --profile no-bigquery

test-full:
name: Tests (Full)
Expand All @@ -121,6 +116,8 @@ jobs:

- name: Set up Rust
uses: dtolnay/rust-toolchain@1.88.0
with:
components: llvm-tools-preview

- name: Cache Cargo
uses: Swatinem/rust-cache@v2
Expand Down Expand Up @@ -149,8 +146,10 @@ jobs:
sudo apt-get install libpq-dev -y
./etl-api/scripts/run_migrations.sh

- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Install cargo-llvm-cov and cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-llvm-cov,cargo-nextest

- name: Set up BigQuery Credentials
run: |
Expand All @@ -161,15 +160,12 @@ jobs:
- name: Generate Code Coverage
id: coverage
run: |
cargo llvm-cov test \
--workspace --no-fail-fast \
--all-features \
--lcov --output-path lcov.info
cargo llvm-cov nextest --no-fail-fast \
--all-features --lcov --output-path lcov.info

- name: Upload Coverage to Coveralls
uses: coverallsapp/github-action@v2
with:
fail-on-error: false
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: lcov.info
debug: true
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ pyvenv.cfg

# Log files
*.log

lcov.info
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
- Tests live per crate (`src` unit tests, `tests` integration); benches in `etl-benchmarks/benches/`.

## Build and Test
- Build: `cargo build --workspace --all-targets --all-features`.
- Lint/format: `cargo fmt`; `cargo clippy --all-targets --all-features -- -D warnings`.
- Build: `just build`.
- Lint/format: `just fmt; just lint`.
- Use `ENABLE_TRACING=1` when running integration tests to see the logs.
- Use `RUST_LOG=[log-level]` if you need to see the logs with a specific log level.

Expand Down
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ actix-web = { version = "4.11.0", default-features = false }
actix-web-httpauth = { version = "0.8.2", default-features = false }
actix-web-metrics = { version = "0.3.0", default-features = false }
anyhow = { version = "1.0.98", default-features = false }
arrow = { version = "55.0", default-features = false }
arrow = { version = "56.2.0", default-features = false }
async-trait = { version = "0.1.88" }
aws-lc-rs = { version = "1.13.3", default-features = false }
base64 = { version = "0.22.1", default-features = false }
Expand All @@ -43,6 +43,8 @@ clap = { version = "4.5.42", default-features = false }
config = { version = "0.14", default-features = false }
const-oid = { version = "0.9.6", default-features = false }
constant_time_eq = { version = "0.4.2" }
dashmap = { version = "6.1.0", default-features = false }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "d30b11f673b0111dbb0f904bf89d5b917ea652ed", default-features = false }
fail = { version = "0.5.1", default-features = false }
futures = { version = "0.3.31", default-features = false }
gcp-bigquery-client = { version = "0.27.0", default-features = false }
Expand All @@ -53,7 +55,7 @@ k8s-openapi = { version = "0.25.0", default-features = false }
kube = { version = "1.1.0", default-features = false }
metrics = { version = "0.24.2", default-features = false }
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
parquet = { version = "55.0", default-features = false }
parquet = { version = "55.0.0", default-features = false }
pg_escape = { version = "0.1.1", default-features = false }
pin-project-lite = { version = "0.2.16", default-features = false }
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
Expand Down Expand Up @@ -84,3 +86,7 @@ x509-cert = { version = "0.2.2", default-features = false }

[profile.bench]
debug = true

[profile.dev.package]
insta.opt-level = 3
similar.opt-level = 3
8 changes: 5 additions & 3 deletions docs/how-to/configure-postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ Publications define which tables and operations to replicate.

```sql
-- Create publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE users, orders;
CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true);

-- Create publication for all tables (use with caution)
CREATE PUBLICATION all_tables FOR ALL TABLES;
CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true);

-- Include only specific operations
CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert');
CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert') WITH (publish_via_partition_root = true);
```

`publish_via_partition_root` allows Postgres to treat [partitioned tables](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) as one table in the eyes of logical replication.

### Managing Publications

```sql
Expand Down
81 changes: 81 additions & 0 deletions etl-api/src/configs/destination.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use etl_config::SerializableSecretString;
use etl_config::shared::DestinationConfig;
use secrecy::ExposeSecret;
Expand All @@ -14,6 +16,7 @@ const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 8;

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum FullApiDestinationConfig {
Memory,
BigQuery {
Expand All @@ -30,6 +33,16 @@ pub enum FullApiDestinationConfig {
#[serde(skip_serializing_if = "Option::is_none")]
max_concurrent_streams: Option<usize>,
},
DeltaLake {
#[schema(example = "s3://my-bucket/my-path")]
base_uri: String,
#[schema(example = "{\"aws_access_key_id\": \"https://my-endpoint.com\"}")]
storage_options: Option<HashMap<String, String>>,
#[schema(example = "{\"my_table\": [\"date\"]}")]
partition_columns: Option<HashMap<String, Vec<String>>>,
#[schema(example = 100)]
optimize_after_commits: Option<u64>,
},
}

impl From<StoredDestinationConfig> for FullApiDestinationConfig {
Expand All @@ -49,6 +62,17 @@ impl From<StoredDestinationConfig> for FullApiDestinationConfig {
max_staleness_mins,
max_concurrent_streams: Some(max_concurrent_streams),
},
StoredDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
},
}
}
}
Expand All @@ -64,6 +88,12 @@ pub enum StoredDestinationConfig {
max_staleness_mins: Option<u16>,
max_concurrent_streams: usize,
},
DeltaLake {
base_uri: String,
storage_options: Option<HashMap<String, String>>,
partition_columns: Option<HashMap<String, Vec<String>>>,
optimize_after_commits: Option<u64>,
},
}

impl StoredDestinationConfig {
Expand All @@ -83,6 +113,17 @@ impl StoredDestinationConfig {
max_staleness_mins,
max_concurrent_streams,
},
Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => DestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
},
}
}
}
Expand All @@ -105,6 +146,17 @@ impl From<FullApiDestinationConfig> for StoredDestinationConfig {
max_concurrent_streams: max_concurrent_streams
.unwrap_or(DEFAULT_MAX_CONCURRENT_STREAMS),
},
FullApiDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
},
}
}
}
Expand Down Expand Up @@ -136,12 +188,24 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
max_concurrent_streams,
})
}
Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Ok(EncryptedStoredDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
}),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum EncryptedStoredDestinationConfig {
Memory,
BigQuery {
Expand All @@ -151,6 +215,12 @@ pub enum EncryptedStoredDestinationConfig {
max_staleness_mins: Option<u16>,
max_concurrent_streams: usize,
},
DeltaLake {
base_uri: String,
storage_options: Option<HashMap<String, String>>,
partition_columns: Option<HashMap<String, Vec<String>>>,
optimize_after_commits: Option<u64>,
},
}

impl Store for EncryptedStoredDestinationConfig {}
Expand Down Expand Up @@ -182,6 +252,17 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
max_concurrent_streams,
})
}
Self::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
} => Ok(StoredDestinationConfig::DeltaLake {
base_uri,
storage_options,
partition_columns,
optimize_after_commits,
}),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions etl-api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub async fn create_publication(
}
}

// Ensure partitioned tables publish via ancestor/root schema for logical replication
query.push_str(" with (publish_via_partition_root = true)");

pool.execute(query.as_str()).await?;
Ok(())
}
Expand Down
7 changes: 6 additions & 1 deletion etl-benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ rust-version.workspace = true
repository.workspace = true
homepage.workspace = true

[features]
default = ["bigquery", "deltalake"]
bigquery = ["etl-destinations/bigquery"]
deltalake = ["etl-destinations/deltalake"]

[dev-dependencies]
etl = { workspace = true, features = ["test-utils"] }
etl-config = { workspace = true }
etl-destinations = { workspace = true, features = ["bigquery"] }
etl-destinations = { workspace = true }
etl-postgres = { workspace = true, features = ["sqlx"] }
etl-telemetry = { workspace = true }

Expand Down
14 changes: 14 additions & 0 deletions etl-benchmarks/Dockerfile.tpch-seeder
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Builds an image that bundles go-tpc along with psql client tools for loading TPC-H data.
FROM golang:1.22 AS builder

ARG GO_TPC_VERSION=latest

RUN apt-get update \
&& apt-get install -y --no-install-recommends postgresql-client \
&& rm -rf /var/lib/apt/lists/*

RUN go install github.com/pingcap/go-tpc/cmd/go-tpc@${GO_TPC_VERSION}

ENV PATH="/go/bin:${PATH}"

ENTRYPOINT ["/bin/sh", "-c"]
Loading
Loading