Skip to content

Commit 5a33229

Browse files
authored
feat(storage): Implement storage capabilities and pipelines for sync and async drivers (#231)
- Added `select_to_storage` method to to all backends - Introduced `StorageDriverMixin` to provide common storage functionality across sync and async drivers. - Enhanced `DatabaseConfigProtocol` to include storage capabilities and caching mechanisms. - Created `StoragePipeline` classes for handling storage operations, including writing rows and Arrow tables. - Added telemetry support for storage operations to track metrics such as bytes processed and duration. - Implemented error handling for storage capabilities with `StorageCapabilityError`. - Added unit tests for storage capabilities in configuration.
1 parent 83cb6b5 commit 5a33229

File tree

47 files changed

+3578
-125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+3578
-125
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# /// script
2+
# requires-python = ">=3.10"
3+
# dependencies = [
4+
# "sqlspec[adbc]",
5+
# "pyarrow",
6+
# "rich",
7+
# "rich-click",
8+
# ]
9+
# ///
10+
"""ADBC Postgres ingestion workflow leveraging the storage bridge.
11+
12+
This example exports arbitrary SELECT statements to a Parquet or Arrow artifact,
13+
then loads the staged data back into a target table using the same ADBC driver.
14+
Use it as a template for warehouse ↔ object-store fan-outs.
15+
"""
16+
17+
from pathlib import Path
18+
from typing import Any
19+
20+
import rich_click as click
21+
from rich.console import Console
22+
from rich.table import Table
23+
24+
from sqlspec import SQLSpec
25+
from sqlspec.adapters.adbc import AdbcConfig
26+
from sqlspec.storage import StorageTelemetry
27+
from sqlspec.utils.serializers import to_json
28+
29+
__all__ = ("main",)
30+
31+
32+
def _build_partitioner(rows_per_chunk: int | None, partitions: int | None) -> "dict[str, Any] | None":
33+
if rows_per_chunk and partitions:
34+
msg = "Use either --rows-per-chunk or --partitions, not both."
35+
raise click.BadParameter(msg, param_hint="--rows-per-chunk / --partitions")
36+
if rows_per_chunk:
37+
return {"kind": "rows_per_chunk", "rows_per_chunk": rows_per_chunk}
38+
if partitions:
39+
return {"kind": "fixed", "partitions": partitions}
40+
return None
41+
42+
43+
def _write_telemetry(payload: "dict[str, Any]", output_path: Path) -> None:
44+
output_path.parent.mkdir(parents=True, exist_ok=True)
45+
output_path.write_text(to_json(payload), encoding="utf-8")
46+
47+
48+
def _format_job(stage: str, telemetry: StorageTelemetry) -> str:
49+
rows = telemetry.get("rows_processed", 0)
50+
bytes_processed = telemetry.get("bytes_processed", 0)
51+
destination = telemetry.get("destination", "")
52+
return f"[{stage}] rows={rows} bytes={bytes_processed} destination={destination}"
53+
54+
55+
def _render_capabilities(console: Console, config: AdbcConfig) -> None:
56+
capabilities = config.storage_capabilities()
57+
table = Table(title="Storage Capabilities", highlight=True)
58+
table.add_column("Capability", style="cyan")
59+
table.add_column("Enabled", style="green")
60+
for key, value in capabilities.items():
61+
table.add_row(str(key), str(value))
62+
console.print(table)
63+
64+
65+
@click.command(context_settings={"help_option_names": ["-h", "--help"], "max_content_width": 100})
66+
@click.option(
67+
"--uri",
68+
required=True,
69+
envvar="SQLSPEC_ADBC_URI",
70+
help="ADBC connection URI (e.g. postgres://user:pass@host:port/dbname)",
71+
)
72+
@click.option("--source-sql", required=True, help="SELECT statement to export")
73+
@click.option("--target-table", required=True, help="Fully qualified destination table name")
74+
@click.option(
75+
"--destination",
76+
type=click.Path(path_type=Path, dir_okay=False, writable=True, resolve_path=True),
77+
default=Path("./tmp/adbc_export.parquet"),
78+
show_default=True,
79+
help="Local path or mounted volume for the staged artifact",
80+
)
81+
@click.option(
82+
"--format",
83+
"file_format",
84+
type=click.Choice(["parquet", "arrow-ipc"], case_sensitive=False),
85+
default="parquet",
86+
show_default=True,
87+
help="Storage format used for export/import",
88+
)
89+
@click.option(
90+
"--rows-per-chunk",
91+
type=int,
92+
help="Rows per partition chunk. Combine with SQL predicates (e.g. `WHERE id BETWEEN ...`) per worker.",
93+
)
94+
@click.option(
95+
"--partitions",
96+
type=int,
97+
help="Fixed number of partitions. Pair with predicates like `MOD(id, N) = worker_id` when parallelizing.",
98+
)
99+
@click.option(
100+
"--overwrite/--no-overwrite", default=False, show_default=True, help="Overwrite the target table before load"
101+
)
102+
@click.option("--skip-load", is_flag=True, default=False, help="Export only and skip the load stage")
103+
@click.option(
104+
"--output-telemetry",
105+
type=click.Path(path_type=Path, dir_okay=False, writable=True, resolve_path=True),
106+
help="Optional path to persist telemetry JSON",
107+
)
108+
@click.version_option(message="%(version)s")
109+
def main(
110+
*,
111+
uri: str,
112+
source_sql: str,
113+
target_table: str,
114+
destination: Path,
115+
file_format: str,
116+
rows_per_chunk: int | None,
117+
partitions: int | None,
118+
overwrite: bool,
119+
skip_load: bool,
120+
output_telemetry: Path | None,
121+
) -> None:
122+
"""ADBC-powered export/import demo for Postgres-compatible backends."""
123+
124+
console = Console()
125+
partitioner = _build_partitioner(rows_per_chunk, partitions)
126+
destination.parent.mkdir(parents=True, exist_ok=True)
127+
128+
db_manager = SQLSpec()
129+
adbc_config = AdbcConfig(connection_config={"uri": uri})
130+
adbc_key = db_manager.add_config(adbc_config)
131+
132+
_render_capabilities(console, db_manager.get_config(adbc_key))
133+
telemetry_records: list[dict[str, Any]] = []
134+
135+
with db_manager.provide_session(adbc_key) as session:
136+
export_job = session.select_to_storage(
137+
source_sql, str(destination), format_hint=file_format, partitioner=partitioner
138+
)
139+
console.print(_format_job("export", export_job.telemetry))
140+
telemetry_records.append({"stage": "export", "metrics": export_job.telemetry})
141+
142+
if not skip_load:
143+
load_job = session.load_from_storage(
144+
target_table, str(destination), file_format=file_format, overwrite=overwrite, partitioner=partitioner
145+
)
146+
console.print(_format_job("load", load_job.telemetry))
147+
telemetry_records.append({"stage": "load", "metrics": load_job.telemetry})
148+
149+
if partitioner:
150+
console.print(
151+
"[dim]Tip:[/] launch multiple workers with mutually exclusive WHERE clauses ("
152+
"for example, `MOD(id, N) = worker_id`) so each process writes a distinct partition."
153+
)
154+
155+
if output_telemetry:
156+
payload: dict[str, Any] = {"telemetry": telemetry_records}
157+
_write_telemetry(payload, output_telemetry)
158+
159+
160+
if __name__ == "__main__":
161+
main()

0 commit comments

Comments
 (0)