Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1548,13 +1548,15 @@ definitions:
loaders defined first taking precedence in the event of a conflict.
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/InferredSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- title: Multiple Schema Loaders
type: array
items:
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/InferredSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
Expand Down Expand Up @@ -2462,6 +2464,40 @@ definitions:
$parameters:
type: object
additionalProperties: true
InferredSchemaLoader:
title: Inferred Schema Loader
description: Infers a JSON Schema by reading a sample of records from the stream at discover time. This is useful for streams where the schema is not known in advance or changes dynamically.
type: object
required:
- type
- retriever
properties:
type:
type: string
enum: [InferredSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/SimpleRetriever"
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
record_sample_size:
title: Record Sample Size
description: The maximum number of records to read for schema inference. Defaults to 100.
type: integer
default: 100
example:
- 100
- 500
- 1000
stream_name:
title: Stream Name
description: The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.
type: string
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2483,11 +2481,13 @@ class Config:
schema_loader: Optional[
Union[
InlineSchemaLoader,
InferredSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
List[
Union[
InlineSchemaLoader,
InferredSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
Expand Down Expand Up @@ -2753,6 +2753,27 @@ class DynamicSchemaLoader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InferredSchemaLoader(BaseModel):
type: Literal["InferredSchemaLoader"]
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
record_sample_size: Optional[int] = Field(
100,
description="The maximum number of records to read for schema inference. Defaults to 100.",
example=[100, 500, 1000],
title="Record Sample Size",
)
stream_name: Optional[str] = Field(
None,
description="The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.",
title="Stream Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
stream: Union[DeclarativeStream, StateDelegatingStream] = Field(
Expand Down Expand Up @@ -3093,6 +3114,7 @@ class DynamicDeclarativeStream(BaseModel):
SessionTokenAuthenticator.update_forward_refs()
HttpRequester.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
InferredSchemaLoader.update_forward_refs()
ParentStreamConfig.update_forward_refs()
PropertiesFromEndpoint.update_forward_refs()
SimpleRetriever.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IncrementingCountCursor as IncrementingCountCursorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InferredSchemaLoader as InferredSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InlineSchemaLoader as InlineSchemaLoaderModel,
)
Expand Down Expand Up @@ -549,6 +552,7 @@
ComplexFieldType,
DefaultSchemaLoader,
DynamicSchemaLoader,
InferredSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
SchemaLoader,
Expand Down Expand Up @@ -748,6 +752,7 @@ def _init_mappings(self) -> None:
HttpRequesterModel: self.create_http_requester,
HttpResponseFilterModel: self.create_http_response_filter,
InlineSchemaLoaderModel: self.create_inline_schema_loader,
InferredSchemaLoaderModel: self.create_inferred_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
Expand Down Expand Up @@ -2500,6 +2505,39 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

def create_inferred_schema_loader(
self, model: InferredSchemaLoaderModel, config: Config, **kwargs: Any
) -> InferredSchemaLoader:
name = kwargs.get("name", "inferred_schema")
retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name=name,
primary_key=None,
partition_router=self._build_stream_slicer_from_partition_router(
model.retriever, config
),
transformations=[],
use_cache=True,
log_formatter=(
lambda response: format_http_message(
response,
f"Schema loader '{name}' request",
f"Request performed in order to infer schema.",
name,
is_auxiliary=True,
)
),
)

return InferredSchemaLoader(
retriever=retriever,
config=config,
record_sample_size=model.record_sample_size or 100,
stream_name=model.stream_name or name,
parameters=model.parameters or {},
)

def create_complex_field_type(
self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any
) -> ComplexFieldType:
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.schema.inferred_schema_loader import InferredSchemaLoader
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
Expand All @@ -18,6 +19,7 @@
"DefaultSchemaLoader",
"SchemaLoader",
"InlineSchemaLoader",
"InferredSchemaLoader",
"DynamicSchemaLoader",
"ComplexFieldType",
"TypesMap",
Expand Down
123 changes: 123 additions & 0 deletions airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import threading
from collections.abc import Mapping as ABCMapping
from collections.abc import Sequence
from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.models import AirbyteRecordMessage
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer


def _to_builtin_types(value: Any) -> Any:
"""
Recursively convert Mapping-like and Sequence-like objects to plain Python types.
This is necessary because genson's schema inference doesn't handle custom Mapping
or Sequence implementations properly. We need to convert everything to plain dicts,
lists, and primitive types.
Args:
value: The value to convert
Returns:
The value converted to plain Python types
"""
if isinstance(value, ABCMapping):
return {k: _to_builtin_types(v) for k, v in value.items()}
elif isinstance(value, (list, tuple)):
return [_to_builtin_types(item) for item in value]
elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)):
return [_to_builtin_types(item) for item in value]
else:
return value


@dataclass
class InferredSchemaLoader(SchemaLoader):
"""
Infers a JSON Schema by reading a sample of records from the stream at discover time.
This schema loader reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema based on the structure of those records.
This is useful for streams where the schema is not known in advance or changes dynamically.
Attributes:
retriever (Retriever): The retriever used to fetch records from the stream
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100.
stream_name (str): The name of the stream for which to infer the schema
"""

retriever: Retriever
config: Config
parameters: InitVar[Mapping[str, Any]]
record_sample_size: int = 100
stream_name: str = ""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if not self.stream_name:
self.stream_name = parameters.get("name", "")
if not self.stream_name:
raise ValueError(
"stream_name must be provided either directly or via the 'name' parameter"
)
self._cached_schema: Mapping[str, Any] | None = None
self._lock = threading.Lock()

def get_json_schema(self) -> Mapping[str, Any]:
"""
Infers and returns a JSON schema by reading a sample of records from the stream.
This method reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema. The schema is cached after the first
call to avoid re-reading records on subsequent calls (e.g., during partition reads).
Returns:
A mapping representing the inferred JSON schema for the stream
"""
if self._cached_schema is not None:
return self._cached_schema

with self._lock:
if self._cached_schema is not None:
return self._cached_schema

schema_inferrer = SchemaInferrer()

record_count = 0
for stream_slice in self.retriever.stream_slices():
for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
if record_count >= self.record_sample_size:
break

# Convert all Mapping-like and Sequence-like objects to plain Python types
# This is necessary because genson doesn't handle custom implementations properly
record = _to_builtin_types(record)

airbyte_record = AirbyteRecordMessage(
stream=self.stream_name,
data=record, # type: ignore[arg-type]
emitted_at=0,
)

schema_inferrer.accumulate(airbyte_record)
record_count += 1

if record_count >= self.record_sample_size:
break

inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema(
self.stream_name
)

self._cached_schema = inferred_schema if inferred_schema else {}
return self._cached_schema
Comment on lines +87 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix formatting to resolve pipeline failure.

The pipeline reports that ruff format requires changes here. The line at 98 is likely too long and needs wrapping, wdyt?

Apply formatting by running:

poetry run ruff format airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py

Or manually wrap the long line:

             for stream_slice in self.retriever.stream_slices():
-                for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
+                for record in self.retriever.read_records(
+                    records_schema={}, stream_slice=stream_slice
+                ):
                     if record_count >= self.record_sample_size:
                         break
🧰 Tools
🪛 GitHub Actions: Linters

[error] 95-109: ruff format check failed. 1 file would be reformatted. Run 'poetry run ruff format' to auto-format the file.

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py around lines
87 to 123, the file fails ruff formatting (a line around line 98 is too long);
run the formatter or wrap the long line to satisfy ruff. Fix by running: `poetry
run ruff format
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py` (or manually
break the long line into multiple shorter lines keeping the same logic and
indentation), then re-run ruff/CI to confirm the formatting issue is resolved.

Loading
Loading