diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f5e9a8548..889a3e4df 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1548,6 +1548,7 @@ definitions: loaders defined first taking precedence in the event of a conflict. anyOf: - "$ref": "#/definitions/InlineSchemaLoader" + - "$ref": "#/definitions/InferredSchemaLoader" - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/JsonFileSchemaLoader" - title: Multiple Schema Loaders @@ -1555,6 +1556,7 @@ definitions: items: anyOf: - "$ref": "#/definitions/InlineSchemaLoader" + - "$ref": "#/definitions/InferredSchemaLoader" - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/JsonFileSchemaLoader" - "$ref": "#/definitions/CustomSchemaLoader" @@ -2462,6 +2464,40 @@ definitions: $parameters: type: object additionalProperties: true + InferredSchemaLoader: + title: Inferred Schema Loader + description: Infers a JSON Schema by reading a sample of records from the stream at discover time. This is useful for streams where the schema is not known in advance or changes dynamically. + type: object + required: + - type + - retriever + properties: + type: + type: string + enum: [InferredSchemaLoader] + retriever: + title: Retriever + description: Component used to coordinate how records are extracted across stream slices and request pages. + anyOf: + - "$ref": "#/definitions/SimpleRetriever" + - "$ref": "#/definitions/AsyncRetriever" + - "$ref": "#/definitions/CustomRetriever" + record_sample_size: + title: Record Sample Size + description: The maximum number of records to read for schema inference. Defaults to 100. + type: integer + default: 100 + example: + - 100 + - 500 + - 1000 + stream_name: + title: Stream Name + description: The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context. + type: string + $parameters: + type: object + additionalProperties: true InlineSchemaLoader: title: Inline Schema Loader description: Loads a schema that is defined directly in the manifest file. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 35186ef71..721409c36 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2483,11 +2481,13 @@ class Config: schema_loader: Optional[ Union[ InlineSchemaLoader, + InferredSchemaLoader, DynamicSchemaLoader, JsonFileSchemaLoader, List[ Union[ InlineSchemaLoader, + InferredSchemaLoader, DynamicSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader, @@ -2753,6 +2753,27 @@ class DynamicSchemaLoader(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class InferredSchemaLoader(BaseModel): + type: Literal["InferredSchemaLoader"] + retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( + ..., + description="Component used to coordinate how records are extracted across stream slices and request pages.", + title="Retriever", + ) + record_sample_size: Optional[int] = Field( + 100, + description="The maximum number of records to read for schema inference. Defaults to 100.", + example=[100, 500, 1000], + title="Record Sample Size", + ) + stream_name: Optional[str] = Field( + None, + description="The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.", + title="Stream Name", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ParentStreamConfig(BaseModel): type: Literal["ParentStreamConfig"] stream: Union[DeclarativeStream, StateDelegatingStream] = Field( @@ -3093,6 +3114,7 @@ class DynamicDeclarativeStream(BaseModel): SessionTokenAuthenticator.update_forward_refs() HttpRequester.update_forward_refs() DynamicSchemaLoader.update_forward_refs() +InferredSchemaLoader.update_forward_refs() ParentStreamConfig.update_forward_refs() PropertiesFromEndpoint.update_forward_refs() SimpleRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdaf26bba..e75d5f169 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -301,6 +301,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( IncrementingCountCursor as IncrementingCountCursorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + InferredSchemaLoader as InferredSchemaLoaderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( InlineSchemaLoader as InlineSchemaLoaderModel, ) @@ -549,6 +552,7 @@ ComplexFieldType, DefaultSchemaLoader, DynamicSchemaLoader, + InferredSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader, SchemaLoader, @@ -748,6 +752,7 @@ def _init_mappings(self) -> None: HttpRequesterModel: self.create_http_requester, HttpResponseFilterModel: self.create_http_response_filter, InlineSchemaLoaderModel: self.create_inline_schema_loader, + InferredSchemaLoaderModel: self.create_inferred_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector, @@ -2500,6 +2505,39 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) + def create_inferred_schema_loader( + self, model: InferredSchemaLoaderModel, config: Config, **kwargs: Any + ) -> InferredSchemaLoader: + name = kwargs.get("name", "inferred_schema") + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name=name, + primary_key=None, + partition_router=self._build_stream_slicer_from_partition_router( + model.retriever, config + ), + transformations=[], + use_cache=True, + log_formatter=( + lambda response: format_http_message( + response, + f"Schema loader '{name}' request", + f"Request performed in order to infer schema.", + name, + is_auxiliary=True, + ) + ), + ) + + return InferredSchemaLoader( + retriever=retriever, + config=config, + record_sample_size=model.record_sample_size or 100, + stream_name=model.stream_name or name, + parameters=model.parameters or {}, + ) + def create_complex_field_type( self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any ) -> ComplexFieldType: diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index cad0c2f06..c0e5ce64d 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -9,6 +9,7 @@ SchemaTypeIdentifier, TypesMap, ) +from airbyte_cdk.sources.declarative.schema.inferred_schema_loader import InferredSchemaLoader from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader @@ -18,6 +19,7 @@ "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", + "InferredSchemaLoader", "DynamicSchemaLoader", "ComplexFieldType", "TypesMap", diff --git a/airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py b/airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py new file mode 100644 index 000000000..21407518c --- /dev/null +++ b/airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py @@ -0,0 +1,123 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import threading +from collections.abc import Mapping as ABCMapping +from collections.abc import Sequence +from dataclasses import InitVar, dataclass +from typing import Any, Mapping + +from airbyte_cdk.models import AirbyteRecordMessage +from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader +from airbyte_cdk.sources.types import Config +from airbyte_cdk.utils.schema_inferrer import SchemaInferrer + + +def _to_builtin_types(value: Any) -> Any: + """ + Recursively convert Mapping-like and Sequence-like objects to plain Python types. + + This is necessary because genson's schema inference doesn't handle custom Mapping + or Sequence implementations properly. We need to convert everything to plain dicts, + lists, and primitive types. + + Args: + value: The value to convert + + Returns: + The value converted to plain Python types + """ + if isinstance(value, ABCMapping): + return {k: _to_builtin_types(v) for k, v in value.items()} + elif isinstance(value, (list, tuple)): + return [_to_builtin_types(item) for item in value] + elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)): + return [_to_builtin_types(item) for item in value] + else: + return value + + +@dataclass +class InferredSchemaLoader(SchemaLoader): + """ + Infers a JSON Schema by reading a sample of records from the stream at discover time. + + This schema loader reads up to `record_sample_size` records from the stream and uses + the SchemaInferrer to generate a JSON schema based on the structure of those records. + This is useful for streams where the schema is not known in advance or changes dynamically. + + Attributes: + retriever (Retriever): The retriever used to fetch records from the stream + config (Config): The user-provided configuration as specified by the source's spec + parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed + record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100. + stream_name (str): The name of the stream for which to infer the schema + """ + + retriever: Retriever + config: Config + parameters: InitVar[Mapping[str, Any]] + record_sample_size: int = 100 + stream_name: str = "" + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._parameters = parameters + if not self.stream_name: + self.stream_name = parameters.get("name", "") + if not self.stream_name: + raise ValueError( + "stream_name must be provided either directly or via the 'name' parameter" + ) + self._cached_schema: Mapping[str, Any] | None = None + self._lock = threading.Lock() + + def get_json_schema(self) -> Mapping[str, Any]: + """ + Infers and returns a JSON schema by reading a sample of records from the stream. + + This method reads up to `record_sample_size` records from the stream and uses + the SchemaInferrer to generate a JSON schema. The schema is cached after the first + call to avoid re-reading records on subsequent calls (e.g., during partition reads). + + Returns: + A mapping representing the inferred JSON schema for the stream + """ + if self._cached_schema is not None: + return self._cached_schema + + with self._lock: + if self._cached_schema is not None: + return self._cached_schema + + schema_inferrer = SchemaInferrer() + + record_count = 0 + for stream_slice in self.retriever.stream_slices(): + for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice): + if record_count >= self.record_sample_size: + break + + # Convert all Mapping-like and Sequence-like objects to plain Python types + # This is necessary because genson doesn't handle custom implementations properly + record = _to_builtin_types(record) + + airbyte_record = AirbyteRecordMessage( + stream=self.stream_name, + data=record, # type: ignore[arg-type] + emitted_at=0, + ) + + schema_inferrer.accumulate(airbyte_record) + record_count += 1 + + if record_count >= self.record_sample_size: + break + + inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema( + self.stream_name + ) + + self._cached_schema = inferred_schema if inferred_schema else {} + return self._cached_schema diff --git a/unit_tests/sources/declarative/schema/test_inferred_schema_loader.py b/unit_tests/sources/declarative/schema/test_inferred_schema_loader.py new file mode 100644 index 000000000..2d4c917f4 --- /dev/null +++ b/unit_tests/sources/declarative/schema/test_inferred_schema_loader.py @@ -0,0 +1,216 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +import pytest + +from airbyte_cdk.sources.declarative.schema import InferredSchemaLoader + + +@pytest.fixture +def mock_retriever(): + """Create a mock retriever that returns sample records.""" + retriever = MagicMock() + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.return_value = iter( + [ + {"id": 1, "name": "Alice", "age": 30, "active": True}, + {"id": 2, "name": "Bob", "age": 25, "active": False}, + {"id": 3, "name": "Charlie", "age": 35, "active": True}, + ] + ) + return retriever + + +@pytest.fixture +def inferred_schema_loader(mock_retriever): + """Create an InferredSchemaLoader with a mock retriever.""" + config = MagicMock() + parameters = {"name": "users"} + return InferredSchemaLoader( + retriever=mock_retriever, + config=config, + parameters=parameters, + record_sample_size=3, + stream_name="users", + ) + + +def test_inferred_schema_loader_basic(inferred_schema_loader): + """Test that InferredSchemaLoader correctly infers schema from sample records.""" + schema = inferred_schema_loader.get_json_schema() + + assert "$schema" in schema + assert schema["type"] == "object" + assert "properties" in schema + + assert "id" in schema["properties"] + assert "name" in schema["properties"] + assert "age" in schema["properties"] + assert "active" in schema["properties"] + + assert "number" in schema["properties"]["id"]["type"] + assert "string" in schema["properties"]["name"]["type"] + assert "number" in schema["properties"]["age"]["type"] + assert "boolean" in schema["properties"]["active"]["type"] + + +def test_inferred_schema_loader_empty_records(): + """Test that InferredSchemaLoader returns empty schema when no records are available.""" + retriever = MagicMock() + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.return_value = iter([]) + + config = MagicMock() + parameters = {"name": "users"} + loader = InferredSchemaLoader( + retriever=retriever, + config=config, + parameters=parameters, + record_sample_size=100, + stream_name="users", + ) + + schema = loader.get_json_schema() + + assert schema == {} + + +def test_inferred_schema_loader_respects_sample_size(): + """Test that InferredSchemaLoader respects the record_sample_size parameter.""" + retriever = MagicMock() + records = [{"id": i, "name": f"User{i}"} for i in range(10)] + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.return_value = iter(records) + + config = MagicMock() + parameters = {"name": "users"} + loader = InferredSchemaLoader( + retriever=retriever, + config=config, + parameters=parameters, + record_sample_size=5, + stream_name="users", + ) + + schema = loader.get_json_schema() + + assert "properties" in schema + assert "id" in schema["properties"] + assert "name" in schema["properties"] + + +def test_inferred_schema_loader_handles_errors(): + """Test that InferredSchemaLoader propagates errors from the retriever.""" + retriever = MagicMock() + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.side_effect = Exception("API Error") + + config = MagicMock() + parameters = {"name": "users"} + loader = InferredSchemaLoader( + retriever=retriever, + config=config, + parameters=parameters, + record_sample_size=100, + stream_name="users", + ) + + with pytest.raises(Exception, match="API Error"): + loader.get_json_schema() + + +def test_inferred_schema_loader_with_nested_objects(): + """Test that InferredSchemaLoader handles nested objects correctly.""" + retriever = MagicMock() + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.return_value = iter( + [ + { + "id": 1, + "name": "Alice", + "address": {"street": "123 Main St", "city": "Springfield", "zip": "12345"}, + }, + { + "id": 2, + "name": "Bob", + "address": {"street": "456 Oak Ave", "city": "Shelbyville", "zip": "67890"}, + }, + ] + ) + + config = MagicMock() + parameters = {"name": "users"} + loader = InferredSchemaLoader( + retriever=retriever, + config=config, + parameters=parameters, + record_sample_size=2, + stream_name="users", + ) + + schema = loader.get_json_schema() + + assert "properties" in schema + assert "address" in schema["properties"] + assert "object" in schema["properties"]["address"]["type"] + + +def test_inferred_schema_loader_with_arrays(): + """Test that InferredSchemaLoader handles arrays correctly.""" + retriever = MagicMock() + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.return_value = iter( + [ + {"id": 1, "name": "Alice", "tags": ["admin", "user"]}, + {"id": 2, "name": "Bob", "tags": ["user", "guest"]}, + ] + ) + + config = MagicMock() + parameters = {"name": "users"} + loader = InferredSchemaLoader( + retriever=retriever, + config=config, + parameters=parameters, + record_sample_size=2, + stream_name="users", + ) + + schema = loader.get_json_schema() + + assert "properties" in schema + assert "tags" in schema["properties"] + assert "array" in schema["properties"]["tags"]["type"] + + +def test_inferred_schema_loader_caches_schema(): + """Test that InferredSchemaLoader caches the schema and doesn't re-read records on subsequent calls.""" + retriever = MagicMock() + retriever.stream_slices.return_value = iter([None]) + retriever.read_records.return_value = iter( + [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"}, + ] + ) + + config = MagicMock() + parameters = {"name": "users"} + loader = InferredSchemaLoader( + retriever=retriever, + config=config, + parameters=parameters, + record_sample_size=2, + stream_name="users", + ) + + schema1 = loader.get_json_schema() + schema2 = loader.get_json_schema() + schema3 = loader.get_json_schema() + + assert schema1 == schema2 == schema3 + assert retriever.stream_slices.call_count == 1 + assert retriever.read_records.call_count == 1