Skip to content

Commit 6277673

Browse files
paleolimbotCopilot
andauthored
feat(python/sedonadb): Implement GDAL/OGR formats via pyogrio (#283)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent ed81b0e commit 6277673

File tree

19 files changed

+1719
-26
lines changed

19 files changed

+1719
-26
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python/sedonadb/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ futures = { workspace = true }
4242
pyo3 = { version = "0.25.1" }
4343
sedona = { workspace = true }
4444
sedona-adbc = { workspace = true }
45+
sedona-datasource = { workspace = true }
46+
sedona-geometry = { workspace = true }
4547
sedona-expr = { workspace = true }
4648
sedona-geoparquet = { workspace = true }
4749
sedona-schema = { workspace = true }

python/sedonadb/python/sedonadb/context.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,74 @@ def read_parquet(
152152
self.options,
153153
)
154154

155+
def read_pyogrio(
156+
self,
157+
table_paths: Union[str, Path, Iterable[str]],
158+
options: Optional[Dict[str, Any]] = None,
159+
extension: str = "",
160+
) -> DataFrame:
161+
"""Read spatial file formats using GDAL/OGR via pyogrio
162+
163+
Creates a DataFrame from one or more paths or URLs to a file supported by
164+
[pyogrio](https://pyogrio.readthedocs.io/en/latest/), which is the same package
165+
that powers `geopandas.read_file()` by default. Some common formats that can be
166+
opened using GDAL/OGR are FlatGeoBuf, GeoPackage, Shapefile, GeoJSON, and many,
167+
many more. See <https://gdal.org/en/stable/drivers/vector/index.html> for a list
168+
of available vector drivers.
169+
170+
Like `read_parquet()`, globs and directories can be specified in addition to
171+
individual file paths. Paths ending in `.zip` are automatically prepended with
172+
`/vsizip/` (i.e., are automatically unzipped by GDAL). HTTP(s) URLs are
173+
supported via `/vsicurl/`.
174+
175+
Args:
176+
table_paths: A str, Path, or iterable of paths containing URLs or
177+
paths. Globs (i.e., `path/*.gpkg`), directories, and zipped
178+
versions of otherwise readable files are supported.
179+
options: An optional mapping of key/value pairs (open options)
180+
passed to GDAL/OGR.
181+
extension: An optional file extension (e.g., `"fgb"`) used when
182+
`table_paths` specifies one or more directories or a glob
183+
that does not enforce a file extension.
184+
185+
Examples:
186+
187+
>>> import geopandas
188+
>>> import tempfile
189+
>>> sd = sedona.db.connect()
190+
>>> df = geopandas.GeoDataFrame({
191+
... "geometry": geopandas.GeoSeries.from_wkt(["POINT (0 1)"], crs=3857)
192+
... })
193+
>>>
194+
>>> with tempfile.TemporaryDirectory() as td:
195+
... df.to_file(f"{td}/df.fgb")
196+
... sd.read_pyogrio(f"{td}/df.fgb").show()
197+
...
198+
┌──────────────┐
199+
│ wkb_geometry │
200+
│ geometry │
201+
╞══════════════╡
202+
│ POINT(0 1) │
203+
└──────────────┘
204+
205+
"""
206+
from sedonadb.datasource import PyogrioFormatSpec
207+
208+
if isinstance(table_paths, (str, Path)):
209+
table_paths = [table_paths]
210+
211+
spec = PyogrioFormatSpec(extension)
212+
if options is not None:
213+
spec = spec.with_options(options)
214+
215+
return DataFrame(
216+
self._impl,
217+
self._impl.read_external_format(
218+
spec, [str(path) for path in table_paths], False
219+
),
220+
self.options,
221+
)
222+
155223
def sql(self, sql: str) -> DataFrame:
156224
"""Create a [DataFrame][sedonadb.dataframe.DataFrame] by executing SQL
157225
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from typing import Any, Mapping
19+
20+
from sedonadb._lib import PyExternalFormat, PyProjectedRecordBatchReader
21+
22+
23+
class ExternalFormatSpec:
24+
"""Python file format specification
25+
26+
This class defines an abstract "file format", which maps to the DataFusion
27+
concept of a `FileFormat`. This is a layer on top of the `TableProvider` that
28+
provides standard support for querying collections of files using globs
29+
or directories of files with compatible schemas. This abstraction allows for
30+
basic support for pruning and partial filter pushdown (e.g., a bounding box
31+
is available if one was provided in the underlying query); however, data
32+
providers with more advanced features may wish to implement a `TableProvider`
33+
in Rust to take advantage of a wider range of DataFusion features.
34+
35+
Implementations are only required to implement `open_reader()`; however, if
36+
opening a reader is expensive and there is a more efficient way to infer a
37+
schema from a given source, implementers may wish to also implement
38+
`infer_schema()`.
39+
40+
This extension point is experimental and may evolve to serve the needs of
41+
various file formats.
42+
"""
43+
44+
@property
45+
def extension(self):
46+
"""A file extension for files that match this format
47+
48+
If this concept is not important for this format, returns an empty string.
49+
"""
50+
return ""
51+
52+
def with_options(self, options: Mapping[str, Any]):
53+
"""Clone this instance and return a new instance with options applied
54+
55+
Apply an arbitrary set of format-defined key/value options. It is useful
56+
to raise an error in this method if an option or value will later result
57+
in an error; however, implementation may defer the error until later if
58+
required by the underlying producer.
59+
60+
The default implementation of this method errors for any attempt to
61+
pass options.
62+
"""
63+
raise NotImplementedError(
64+
f"key/value options not supported by {type(self).__name__}"
65+
)
66+
67+
def open_reader(self, args: Any):
68+
"""Open an ArrowArrayStream/RecordBatchReader of batches given input information
69+
70+
Note that the output stream must take into account `args.file_projection`, if one
71+
exists (`PyProjectedRecordBatchReader` may be used to ensure a set of output
72+
columns or apply an output projection on an input stream.
73+
74+
The internals will keep a strong (Python) reference to the returned object
75+
for as long as batches are being produced.
76+
77+
Args:
78+
args: An object with attributes
79+
- `src`: An object/file abstraction. Currently, `.to_url()` is the best way
80+
to extract the underlying URL from the source.
81+
- `filter`: An object representing the filter expression that was pushed
82+
down, if one exists. Currently, `.bounding_box(column_index)` is the only
83+
way to interact with this object.
84+
- `file_schema`: An optional schema. If `None`, the implementation must
85+
infer the schema.
86+
- `file_projection`: An optional list of integers of the columns of
87+
`file_schema` that must be produced by this implementation (in the
88+
exact order specified).
89+
- `batch_size`: An optional integer specifying the number of rows requested
90+
for each output batch.
91+
92+
"""
93+
raise NotImplementedError()
94+
95+
def infer_schema(self, src):
96+
"""Infer the output schema
97+
98+
Implementations can leave this unimplemented, in which case the internals will call
99+
`open_reader()` and query the provided schema without pulling any batches.
100+
101+
Args:
102+
src: An object/file abstraction. Currently, `.to_url()` is the best way
103+
to extract the underlying URL from the source.
104+
"""
105+
raise NotImplementedError()
106+
107+
def __sedona_external_format__(self):
108+
return PyExternalFormat(self)
109+
110+
111+
class PyogrioFormatSpec(ExternalFormatSpec):
112+
"""An `ExternalFormatSpec` implementation wrapping GDAL/OGR via pyogrio"""
113+
114+
def __init__(self, extension=""):
115+
self._extension = extension
116+
self._options = {}
117+
118+
def with_options(self, options):
119+
cloned = type(self)(self.extension)
120+
cloned._options.update(options)
121+
return cloned
122+
123+
@property
124+
def extension(self) -> str:
125+
return self._extension
126+
127+
def open_reader(self, args):
128+
import pyogrio.raw
129+
130+
url = args.src.to_url()
131+
if url is None:
132+
raise ValueError(f"Can't convert {args.src} to OGR-openable object")
133+
134+
if url.startswith("http://") or url.startswith("https://"):
135+
ogr_src = f"/vsicurl/{url}"
136+
elif url.startswith("file://"):
137+
ogr_src = url.removeprefix("file://")
138+
else:
139+
raise ValueError(f"Can't open {url} with OGR")
140+
141+
if ogr_src.endswith(".zip"):
142+
ogr_src = f"/vsizip/{ogr_src}"
143+
144+
if args.is_projected():
145+
file_columns = args.file_schema.names
146+
columns = [file_columns[i] for i in args.file_projection]
147+
else:
148+
columns = None
149+
150+
batch_size = args.batch_size if args.batch_size is not None else 0
151+
152+
if args.filter and args.file_schema is not None:
153+
geometry_column_indices = args.file_schema.geometry_column_indices
154+
if len(geometry_column_indices) == 1:
155+
bbox = args.filter.bounding_box(geometry_column_indices[0])
156+
else:
157+
bbox = None
158+
else:
159+
bbox = None
160+
161+
return PyogrioReaderShelter(
162+
pyogrio.raw.ogr_open_arrow(
163+
ogr_src, {}, columns=columns, batch_size=batch_size, bbox=bbox
164+
),
165+
columns,
166+
)
167+
168+
169+
class PyogrioReaderShelter:
170+
"""Python object wrapper around the context manager returned by pyogrio
171+
172+
The pyogrio object returned by `pyogrio.raw.ogr_open_arrow()` is a context
173+
manager; however, the internals can only manage Rust object references.
174+
This object ensures that the context manager is closed when the object
175+
is deleted (which occurs as soon as possible when the returned reader
176+
is no longer required).
177+
"""
178+
179+
def __init__(self, inner, output_names=None):
180+
self._inner = inner
181+
self._output_names = output_names
182+
self._meta, self._reader = self._inner.__enter__()
183+
184+
def __del__(self):
185+
self._inner.__exit__(None, None, None)
186+
187+
def __arrow_c_stream__(self, requested_schema=None):
188+
if self._output_names is None:
189+
return self._reader.__arrow_c_stream__()
190+
else:
191+
projected = PyProjectedRecordBatchReader(
192+
self._reader, None, self._output_names
193+
)
194+
return projected.__arrow_c_stream__()

python/sedonadb/src/context.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use tokio::runtime::Runtime;
2323

2424
use crate::{
2525
dataframe::InternalDataFrame,
26+
datasource::PyExternalFormat,
2627
error::PySedonaError,
2728
import_from::{import_ffi_scalar_udf, import_table_provider_from_any},
2829
runtime::wait_for_future,
@@ -107,6 +108,26 @@ impl InternalContext {
107108
Ok(InternalDataFrame::new(df, self.runtime.clone()))
108109
}
109110

111+
pub fn read_external_format<'py>(
112+
&self,
113+
py: Python<'py>,
114+
format_spec: Bound<PyAny>,
115+
table_paths: Vec<String>,
116+
check_extension: bool,
117+
) -> Result<InternalDataFrame, PySedonaError> {
118+
let spec = format_spec
119+
.call_method0("__sedona_external_format__")?
120+
.extract::<PyExternalFormat>()?;
121+
let df = wait_for_future(
122+
py,
123+
&self.runtime,
124+
self.inner
125+
.read_external_format(Arc::new(spec), table_paths, None, check_extension),
126+
)??;
127+
128+
Ok(InternalDataFrame::new(df, self.runtime.clone()))
129+
}
130+
110131
pub fn sql<'py>(
111132
&self,
112133
py: Python<'py>,

0 commit comments

Comments
 (0)