-
Notifications
You must be signed in to change notification settings - Fork 30
feat: Add InferredSchemaLoader for runtime schema inference #831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
62ba126
ac34204
81cd888
c671ccf
038b58f
32b512a
02050b8
b276315
ccdda0d
c288814
d99c2fa
ff51fa5
6906cb0
e118a20
66db70e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| # | ||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| import threading | ||
| from collections.abc import Mapping as ABCMapping | ||
| from collections.abc import Sequence | ||
| from dataclasses import InitVar, dataclass | ||
| from typing import Any, Mapping | ||
|
|
||
| from airbyte_cdk.models import AirbyteRecordMessage | ||
| from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever | ||
| from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader | ||
| from airbyte_cdk.sources.types import Config | ||
| from airbyte_cdk.utils.schema_inferrer import SchemaInferrer | ||
|
|
||
|
|
||
| def _to_builtin_types(value: Any) -> Any: | ||
| """ | ||
| Recursively convert Mapping-like and Sequence-like objects to plain Python types. | ||
| This is necessary because genson's schema inference doesn't handle custom Mapping | ||
| or Sequence implementations properly. We need to convert everything to plain dicts, | ||
| lists, and primitive types. | ||
| Args: | ||
| value: The value to convert | ||
| Returns: | ||
| The value converted to plain Python types | ||
| """ | ||
| if isinstance(value, ABCMapping): | ||
| return {k: _to_builtin_types(v) for k, v in value.items()} | ||
| elif isinstance(value, (list, tuple)): | ||
| return [_to_builtin_types(item) for item in value] | ||
| elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)): | ||
| return [_to_builtin_types(item) for item in value] | ||
| else: | ||
| return value | ||
|
|
||
|
|
||
| @dataclass | ||
| class InferredSchemaLoader(SchemaLoader): | ||
| """ | ||
| Infers a JSON Schema by reading a sample of records from the stream at discover time. | ||
| This schema loader reads up to `record_sample_size` records from the stream and uses | ||
| the SchemaInferrer to generate a JSON schema based on the structure of those records. | ||
| This is useful for streams where the schema is not known in advance or changes dynamically. | ||
| Attributes: | ||
| retriever (Retriever): The retriever used to fetch records from the stream | ||
| config (Config): The user-provided configuration as specified by the source's spec | ||
| parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed | ||
| record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100. | ||
| stream_name (str): The name of the stream for which to infer the schema | ||
| """ | ||
|
|
||
| retriever: Retriever | ||
| config: Config | ||
| parameters: InitVar[Mapping[str, Any]] | ||
| record_sample_size: int = 100 | ||
| stream_name: str = "" | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._parameters = parameters | ||
| if not self.stream_name: | ||
| self.stream_name = parameters.get("name", "") | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if not self.stream_name: | ||
| raise ValueError( | ||
| "stream_name must be provided either directly or via the 'name' parameter" | ||
| ) | ||
| self._cached_schema: Mapping[str, Any] | None = None | ||
| self._lock = threading.Lock() | ||
|
|
||
| def get_json_schema(self) -> Mapping[str, Any]: | ||
| """ | ||
| Infers and returns a JSON schema by reading a sample of records from the stream. | ||
| This method reads up to `record_sample_size` records from the stream and uses | ||
| the SchemaInferrer to generate a JSON schema. The schema is cached after the first | ||
| call to avoid re-reading records on subsequent calls (e.g., during partition reads). | ||
| Returns: | ||
| A mapping representing the inferred JSON schema for the stream | ||
| """ | ||
| if self._cached_schema is not None: | ||
| return self._cached_schema | ||
|
|
||
| with self._lock: | ||
| if self._cached_schema is not None: | ||
| return self._cached_schema | ||
|
|
||
| schema_inferrer = SchemaInferrer() | ||
|
|
||
| record_count = 0 | ||
| for stream_slice in self.retriever.stream_slices(): | ||
| for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice): | ||
| if record_count >= self.record_sample_size: | ||
| break | ||
|
|
||
| # Convert all Mapping-like and Sequence-like objects to plain Python types | ||
| # This is necessary because genson doesn't handle custom implementations properly | ||
| record = _to_builtin_types(record) | ||
|
|
||
| airbyte_record = AirbyteRecordMessage( | ||
| stream=self.stream_name, | ||
| data=record, # type: ignore[arg-type] | ||
| emitted_at=0, | ||
| ) | ||
|
|
||
| schema_inferrer.accumulate(airbyte_record) | ||
| record_count += 1 | ||
|
|
||
| if record_count >= self.record_sample_size: | ||
| break | ||
|
|
||
| inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema( | ||
| self.stream_name | ||
| ) | ||
|
|
||
| self._cached_schema = inferred_schema if inferred_schema else {} | ||
| return self._cached_schema | ||
|
Comment on lines
+87
to
+123
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix formatting to resolve pipeline failure. The pipeline reports that ruff format requires changes here. The line at 98 is likely too long and needs wrapping, wdyt? Apply formatting by running: poetry run ruff format airbyte_cdk/sources/declarative/schema/inferred_schema_loader.pyOr manually wrap the long line: for stream_slice in self.retriever.stream_slices():
- for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
+ for record in self.retriever.read_records(
+ records_schema={}, stream_slice=stream_slice
+ ):
if record_count >= self.record_sample_size:
break🧰 Tools🪛 GitHub Actions: Linters[error] 95-109: ruff format check failed. 1 file would be reformatted. Run 'poetry run ruff format' to auto-format the file. 🤖 Prompt for AI Agents |
||
Uh oh!
There was an error while loading. Please reload this page.