Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Nov 8, 2025

feat: Add InferredSchemaLoader for runtime schema inference

Summary

This PR adds a new InferredSchemaLoader component to the declarative CDK that enables automatic schema inference at discover time by reading a sample of records from the stream. This addresses the need for streams where the schema is not known in advance or changes dynamically.

Key changes:

  • New InferredSchemaLoader class that reads up to record_sample_size records (default 100) and uses SchemaInferrer to generate a JSON schema
  • Full integration with declarative component schema YAML and pydantic model generation via poe assemble
  • Wired into ModelToComponentFactory following the same pattern as DynamicSchemaLoader
  • Helper function _to_builtin_types to recursively convert Mapping-like and Sequence-like objects to plain Python types (required for genson compatibility)
  • 6 unit tests covering basic functionality, empty records, sample size limits, error propagation, nested objects, and arrays
  • Modern Python 3.10+ type hints (| None instead of Optional)

Example usage in manifest:

schema_loader:
  type: InferredSchemaLoader
  retriever:
    type: SimpleRetriever
    requester: ...
  record_sample_size: 100  # optional, defaults to 100

Review & Testing Checklist for Human

⚠️ CRITICAL - This PR has not been fully integration tested:

  • Test with a real declarative manifest - The HttpMocker integration tests were removed per maintainer request due to testing infrastructure issues. Please test this with an actual declarative connector to verify it works correctly in production discover flow.
  • Verify retriever configuration - Check that the retriever in create_inferred_schema_loader (model_to_component_factory.py:2508-2539) is properly configured with the right parameters (partition_router, use_cache, log_formatter, etc.)
  • Test error handling - Errors now propagate instead of being caught (per maintainer request). Verify this behavior is acceptable and doesn't break discover for streams with transient errors.
  • Performance testing - Test discover performance with streams that have large records or many records to ensure record_sample_size properly limits the work done.
  • Review auto-generated pydantic changes - The declarative_component_schema.py file has formatting changes including copyright header removal. Verify these are expected from running poe assemble.

Notes

  • Requested by: AJ Steers (aj@airbyte.io) @aaronsteers
  • Devin session: https://app.devin.ai/sessions/6fbe7bb84c894b32b04196b00c9db457
  • All 6 unit tests pass locally
  • Ruff format, lint, and MyPy checks pass
  • The poe assemble build process is not documented in README.md or CONTRIBUTING.md
  • HttpMocker integration tests were removed per maintainer request after encountering testing infrastructure issues
  • Broad exception handling was removed per maintainer code review feedback

Summary by CodeRabbit

  • New Features

    • Added InferredSchemaLoader: infers stream JSON schemas by sampling records at discovery. Configurable record sample size (default 100), stream name, retriever and parameters. Available as an option in schema loader choices and stream configurations and exposed in the public schema API.
  • Tests

    • Added tests covering basic inference, empty/error cases, nested objects, arrays, and sample-size behavior.

Add a new InferredSchemaLoader component that infers JSON schemas by reading
a sample of records from the stream at discover time. This enables declarative
connectors to automatically generate schemas for streams where the schema is
not known in advance or changes dynamically.

Key features:
- Reads up to record_sample_size records (default 100) from the stream
- Uses SchemaInferrer to generate JSON schema from sample records
- Handles errors gracefully by returning empty schema
- Fully integrated with declarative component schema and model factory
- Includes comprehensive unit tests

Requested by: AJ Steers (aj@airbyte.io) @aaronsteers

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copilot AI review requested due to automatic review settings November 8, 2025 00:57
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #proj-ai-connector-builder:

@Devin - Find the auto-schema detection in the Python CDK which is used in our declarative test read implementation. I believe we are inferring schema and returning it in either the "test read" implementation or the "resolve manifest" or "fully resolve manifest". Create a CDK PR which adds a new schema resolution option based on reading `n` records and inferring schema from those `n` records, basically the same thing being done (I think) in those dev-time implementation, except we'd run them (in production) at `discover` time.
Thread URL: https://airbytehq-team.slack.com/archives/C099FV37L2Z/p1762562420838709

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions github-actions bot added the enhancement New feature or request label Nov 8, 2025
@github-actions
Copy link

github-actions bot commented Nov 8, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1762562686-inferred-schema-loader#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1762562686-inferred-schema-loader

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces runtime schema inference capabilities to the declarative CDK by adding an InferredSchemaLoader component that automatically discovers stream schemas during the discover phase by sampling records.

Key changes:

  • New InferredSchemaLoader class that reads up to record_sample_size records (default 100) and generates a JSON schema using SchemaInferrer
  • Integration with the declarative component factory and YAML schema definition
  • Comprehensive unit test coverage for basic functionality, error handling, nested objects, and arrays

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py New schema loader implementation that infers schemas from sampled records
airbyte_cdk/sources/declarative/schema/init.py Exports the new InferredSchemaLoader class
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Adds factory method to create InferredSchemaLoader instances from manifest models
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Auto-generated pydantic model for InferredSchemaLoader with formatting changes
airbyte_cdk/sources/declarative/declarative_component_schema.yaml YAML schema definition for InferredSchemaLoader configuration
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py Comprehensive unit tests covering various schema inference scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 8, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

A new InferredSchemaLoader was added across the declarative CDK: YAML schema, Pydantic model, model-to-component factory registration, runtime loader implementation that samples records to infer JSON Schema, plus unit tests covering basic and edge cases.

Changes

Cohort / File(s) Summary
Schema Definition
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Added InferredSchemaLoader definition (type, retriever required; optional record_sample_size, stream_name, $parameters) and referenced it in InlineSchemaLoader.anyOf, DeclarativeStream.schema_loader.anyOf, and the stream-level loaders array.
Model & Type Definitions
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Added InferredSchemaLoader Pydantic model with type: Literal["InferredSchemaLoader"], retriever union, record_sample_size (default 100), stream_name, and parameters (alias $parameters); updated DeclarativeStream.schema_loader union and forward refs.
Factory Integration
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Imported InferredSchemaLoaderModel, added create_inferred_schema_loader to build and wire the runtime loader (retriever, caching/partitioning/log formatter), and registered the model → constructor mapping.
Public API Export
airbyte_cdk/sources/declarative/schema/__init__.py
Exported InferredSchemaLoader in module __all__.
Implementation
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
New InferredSchemaLoader class (extends SchemaLoader) that samples up to record_sample_size records via a Retriever, converts records to builtin Python types, feeds them to SchemaInferrer, and returns the inferred JSON schema for the stream.
Tests
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
Added tests: basic inference, empty records, sample-size respect, error handling, nested objects, and arrays.

Sequence Diagram

sequenceDiagram
    participant Discovery as Discover Flow
    participant Stream as DeclarativeStream
    participant ISL as InferredSchemaLoader
    participant Retriever as Retriever
    participant Inferrer as SchemaInferrer

    Discovery->>Stream: request schema
    Stream->>ISL: get_json_schema()
    ISL->>Retriever: stream_slices()
    loop read up to record_sample_size
        ISL->>Retriever: read_records(slice)
        Retriever-->>ISL: record(s)
        ISL->>ISL: _to_builtin_types(record)
        ISL->>Inferrer: accumulate(AirbyteRecordMessage)
    end
    ISL->>Inferrer: inferred_schema_for(stream_name)
    Inferrer-->>ISL: JSON schema
    ISL-->>Stream: schema
    Stream-->>Discovery: schema
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Files to focus on:
    • _to_builtin_types recursion and handling of non-serializable types
    • get_json_schema loop: slice iteration, record counting, early exit, and exception handling
    • stream_name resolution from parameters vs explicit field and error messaging
    • Model-to-factory wiring to ensure retriever construction matches other loader patterns

Possibly related PRs

Suggested reviewers

  • brianjlai
  • darynaishchenko

Would you like a short docstring example in InferredSchemaLoader showing a typical manifest snippet (e.g., record_sample_size, stream_name, $parameters)? Wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 76.19% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately summarizes the main change: adding a new InferredSchemaLoader component for runtime schema inference, which is the primary focus of this PR.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1762562686-inferred-schema-loader

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@aaronsteers
Copy link
Contributor Author

aside -

@brianjlai and @maxi297 -

Devin auto-reply/auto-respond is enabled for this repo. Use "aside" prefix (example above) to skip Devin auto-replied.

The point of this PR is to enable a fully dynamic schema option for cases that require it.

Use cases:

  1. There is no REST method of getting the JSON Schema, or:
  2. The REST method for getting JSON Schema requires admin permissions and thereby is not viable for the creds that the Airbyte connector will be granted.

This came up in a user's ServiceNow use case on AI Connector Builder. It also unblocks streamlined (although less robust) AI connector builder implementations where we want to defer schema discovery and/or provide a minimal solution for dynamic schemas that otherwise would require custom python components.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1)

5-5: Clean up unused imports and variables.

The static analysis tool has flagged several unused imports and variables:

  • import json (line 5)
  • ConcurrentDeclarativeSource (lines 10-12)
  • HttpMocker, HttpRequest, HttpResponse (line 14)
  • _CONFIG (lines 16-19)
  • _MANIFEST (lines 21-76)

Based on the PR objectives, these seem to be leftovers from the integration test that was removed due to HttpMocker issues. Perhaps they could be removed now, or kept if you're planning to add the integration test back soon?

Also applies to: 10-12, 14-14, 16-19, 21-76

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)

58-58: Clarify the type: ignore comment, wdyt?

The # type: ignore[call-overload] comment suggests there's a signature mismatch with read_records(). Looking at the Retriever interface, it expects records_schema: Mapping[str, Any] and stream_slice: Optional[StreamSlice].

Passing an empty dict for records_schema makes sense here since we're inferring the schema, but you might want to:

  1. Add a brief comment explaining why an empty schema is passed
  2. Consider passing stream_slice=None explicitly to match the full signature

Would that help make the intent clearer for future maintainers?


56-71: Consider performance implications for large streams.

The current implementation reads up to record_sample_size records, which is great for sampling. However, for APIs that return very large records or have slow response times, this could potentially take a while during discovery.

Have you considered adding:

  1. A timeout mechanism for the sampling process?
  2. An early exit if schema inference stabilizes (i.e., no new fields detected after N consecutive records)?

Just thinking about the user experience during discovery time. What do you think?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2508-2539: Consider using kwargs["name"] as a fallback for stream_name, wdyt?

The method extracts name from kwargs (line 2511) but only uses it for the retriever's name and log formatter. The stream_name parameter at line 2537 uses model.stream_name or "", which means if the model doesn't have stream_name set and parameters doesn't contain "name", the InferredSchemaLoader will use an empty string.

While the __post_init__ method provides a fallback via parameters.get("name", ""), it might be more intuitive to also check kwargs:

stream_name=model.stream_name or kwargs.get("name", ""),

This would provide a sensible default when the InferredSchemaLoader is created in contexts where the stream name is available in kwargs. However, if this isn't a typical usage pattern and parameters are the expected mechanism for stream name propagation, feel free to keep the current implementation.

Based on learnings: The pattern of using kwargs for name propagation varies across the codebase, so it's worth verifying whether this is needed for InferredSchemaLoader's typical usage scenarios.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6504148 and 81cd888.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/schema/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
  • airbyte_cdk/sources/declarative/schema/__init__.py
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/declarative/schema/__init__.py
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2024-11-15T00:59:08.154Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.

Applied to files:

  • airbyte_cdk/sources/declarative/schema/__init__.py
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
🧬 Code graph analysis (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (17)
  • InferredSchemaLoader (2756-2774)
  • Config (134-135)
  • Config (148-149)
  • Config (162-163)
  • Config (176-177)
  • Config (190-191)
  • Config (204-205)
  • Config (218-219)
  • Config (232-233)
  • Config (246-247)
  • Config (260-261)
  • Config (274-275)
  • Config (290-291)
  • Config (304-305)
  • Config (318-319)
  • Config (352-353)
  • Config (376-377)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)
  • InferredSchemaLoader (16-77)
airbyte_cdk/sources/http_logger.py (1)
  • format_http_message (12-51)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)
  • InferredSchemaLoader (16-77)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
  • SimpleRetriever (59-627)
airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
  • AsyncRetriever (19-124)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (3)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
  • Retriever (14-58)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
  • SchemaLoader (11-17)
airbyte_cdk/utils/schema_inferrer.py (2)
  • SchemaInferrer (81-270)
  • get_stream_schema (260-270)
airbyte_cdk/sources/declarative/schema/__init__.py (3)
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1)
  • inferred_schema_loader (94-104)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)
  • InferredSchemaLoader (16-77)
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)
  • InferredSchemaLoader (16-77)
  • get_json_schema (43-77)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (10)
airbyte_cdk/sources/declarative/schema/__init__.py (1)

12-12: LGTM! Clean API integration.

The import and export of InferredSchemaLoader follows the established pattern for other schema loaders. Nice and consistent!

Also applies to: 22-22

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

2467-2500: Well-structured YAML definition!

The InferredSchemaLoader definition follows the established pattern of other schema loaders. The field descriptions are clear, and the default value of 100 for record_sample_size seems reasonable for most use cases.

Based on learnings, this YAML file is the source for auto-generating the Python models, so it's the right place for this definition.

airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)

15-36: Nice use of dataclass and InitVar!

The class structure is clean and well-documented. Using InitVar for parameters is a good choice since it's only needed during initialization to populate stream_name.


38-41: Solid defensive programming.

The fallback logic for stream_name is well-thought-out and handles the case where it's not explicitly provided in the constructor.

unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (2)

79-104: Well-structured test fixtures!

The mock_retriever and inferred_schema_loader fixtures are cleanly separated and reusable. Nice use of MagicMock to simulate the retriever behavior.


107-248: Excellent test coverage!

The test suite is comprehensive and covers all the important scenarios:

  • ✅ Basic schema inference with primitive types
  • ✅ Empty records edge case
  • ✅ Sample size limits
  • ✅ Error handling (graceful degradation)
  • ✅ Nested objects
  • ✅ Arrays

The test names are descriptive and the docstrings make the intent clear. This gives good confidence that the implementation will behave correctly in various situations!

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

2517-2519: Quick verification: Should schema inference respect partition routing?

The retriever is created with a partition_router built from the model (lines 2517-2519). For schema inference during discovery, we're sampling a small number of records (default 100) to infer the schema.

If the partition_router creates multiple partitions, the schema inference might only sample from the first partition(s), potentially missing fields that only appear in later partitions. This could lead to an incomplete inferred schema.

Two questions:

  1. Is this the intended behavior (sample from first partition only)?
  2. Or should we consider using SinglePartitionRouter(parameters={}) to simplify schema inference?

The current approach matches DynamicSchemaLoader, so it's consistent with existing patterns. Just want to confirm this is the desired behavior for schema inference. wdyt?


2508-2539: Overall: Excellent consistency with existing schema loader patterns! 👍

The implementation follows the established pattern from create_dynamic_schema_loader very closely:

  • ✅ Uses use_cache=True to avoid redundant API calls during discovery
  • ✅ Sets primary_key=None (appropriate for schema inference)
  • ✅ Uses empty transformations=[] (records should be sampled as-is)
  • ✅ Properly configures log_formatter with is_auxiliary=True
  • ✅ Provides sensible defaults (record_sample_size=100)

The code is clean, well-structured, and integrates smoothly with the factory pattern. Nice work!

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

1-3: This file is auto-generated and changes should be made in the source YAML.

Based on learnings, this file is auto-generated from declarative_component_schema.yaml and manual changes should not be made here. If modifications are needed, they should be applied to the source YAML file and then regenerated using poe assemble (as mentioned in the PR description).

A previous review noted that a copyright header was removed. If a copyright header is required, it should be added to the generation template or source YAML, wdyt?

Based on learnings


2756-2775: Generated InferredSchemaLoader structure looks correct.

The generated InferredSchemaLoader class includes all the expected fields that match the implementation in inferred_schema_loader.py:

  • retriever field correctly accepts SimpleRetriever, AsyncRetriever, or CustomRetriever
  • record_sample_size defaults to 100 (matching the implementation)
  • stream_name and parameters fields are properly optional

The integration into DeclarativeStream.schema_loader unions (lines 2484, 2490) and forward reference update (line 3117) also appear correct.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)

57-71: The broad exception handler still needs logging for debuggability.

This was flagged in previous reviews but remains unaddressed. When schema inference fails, returning an empty schema without any logging makes it nearly impossible to debug issues in production. Could we at least log the exception before returning {}, wdyt?

+import logging
+
+logger = logging.getLogger(__name__)
+
 # ... in get_json_schema method ...
     record_count = 0
     try:
         for record in self.retriever.read_records({}):  # type: ignore[call-overload]
             if record_count >= self.record_sample_size:
                 break

             airbyte_record = AirbyteRecordMessage(
                 stream=self.stream_name,
                 data=record,  # type: ignore[arg-type]
                 emitted_at=0,  # Not used for schema inference
             )

             schema_inferrer.accumulate(airbyte_record)
             record_count += 1
-    except Exception:
+    except Exception as e:
+        logger.warning(f"Failed to infer schema for stream {self.stream_name}: {e}", exc_info=True)
         return {}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 81cd888 and c671ccf.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (4)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
  • Retriever (14-58)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
  • SchemaLoader (11-17)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
airbyte_cdk/utils/schema_inferrer.py (1)
  • get_stream_schema (260-270)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Check: source-shopify
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (3)

1-13: LGTM - Clean imports and resolved copyright.

The imports are well-organized and complete for the implementation. The copyright year has been correctly updated to 2025 as discussed in previous comments.


15-36: Well-structured schema loader with clear documentation.

The dataclass design is clean, and using InitVar for parameters is the right approach since it's only needed during initialization. The defaults are sensible for schema inference use cases.


43-77: Solid schema inference implementation with clear logic flow.

The approach of sampling records, wrapping them in AirbyteRecordMessage, and using SchemaInferrer is well-designed. The type ignore comments are acceptable here since we're dealing with dynamic schema inference where exact types can't be known ahead of time. The fallback to empty schema when inference fails or no records exist is a sensible default.

@github-actions
Copy link

github-actions bot commented Nov 8, 2025

PyTest Results (Fast)

3 818 tests  +1   3 806 ✅ +1   6m 34s ⏱️ +2s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 66db70e. ± Comparison against base commit e8ab340.

This pull request removes 6 and adds 7 tests. Note that renamed tests count towards both.
unit_tests.legacy.sources.declarative.test_manifest_declarative_source ‑ test_read_manifest_declarative_source[test_no_pagination_with_partition_router-manifest4-pages4-expected_records4-expected_calls4]
unit_tests.legacy.sources.declarative.test_manifest_declarative_source ‑ test_read_manifest_declarative_source[test_read_manifest_no_pagination_no_partitions-manifest0-pages0-expected_records0-expected_calls0]
unit_tests.legacy.sources.declarative.test_manifest_declarative_source ‑ test_read_manifest_declarative_source[test_read_manifest_with_added_fields-manifest1-pages1-expected_records1-expected_calls1]
unit_tests.legacy.sources.declarative.test_manifest_declarative_source ‑ test_read_manifest_declarative_source[test_read_manifest_with_flatten_fields-manifest2-pages2-expected_records2-expected_calls2]
unit_tests.legacy.sources.declarative.test_manifest_declarative_source ‑ test_read_manifest_declarative_source[test_read_with_pagination_no_partitions-manifest3-pages3-expected_records3-expected_calls3]
unit_tests.legacy.sources.declarative.test_manifest_declarative_source ‑ test_read_manifest_declarative_source[test_with_pagination_and_partition_router-manifest5-pages5-expected_records5-expected_calls5]
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_basic
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_caches_schema
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_empty_records
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_handles_errors
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_respects_sample_size
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_with_arrays
unit_tests.sources.declarative.schema.test_inferred_schema_loader ‑ test_inferred_schema_loader_with_nested_objects

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Nov 8, 2025

PyTest Results (Full)

3 821 tests   3 809 ✅  11m 2s ⏱️
    1 suites     12 💤
    1 files        0 ❌

Results for commit 66db70e.

♻️ This comment has been updated with latest results.

devin-ai-integration bot and others added 2 commits November 8, 2025 02:25
- Fix stream_name propagation in factory to use stream context name
- Add type conversion for Mapping-like wrapper objects to plain dicts
- Add 5 comprehensive integration tests using HttpMocker and JSONPlaceholder API
- Update test assertions to handle genson's nullable type format
- Add proper error handling for retriever failures
- Update all unit test mocks to include stream_slices() method

Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c671ccf and 038b58f.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader_integration.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
🧬 Code graph analysis (1)
unit_tests/sources/declarative/schema/test_inferred_schema_loader_integration.py (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
  • ConcurrentDeclarativeSource (128-653)
airbyte_cdk/test/mock_http/mocker.py (1)
  • HttpMocker (25-185)
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/schema/test_inferred_schema_loader_integration.py

[error] 301-309: Ruff formatting required. The diff indicates changes would be applied by 'poetry run ruff format --diff .', and the process exited with code 1.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)

devin-ai-integration bot and others added 6 commits November 8, 2025 02:38
- Add _to_builtin_types helper function to recursively convert Mapping-like
  and Sequence-like objects to plain Python types
- This fixes CI test failure where nested objects (like address field) were
  not being properly converted for genson schema inference
- Genson doesn't handle custom Mapping/Sequence implementations properly,
  so we need to convert everything to plain dicts, lists, and primitives

Co-Authored-By: AJ Steers <aj@airbyte.io>
- Remove try/except block that was catching all exceptions in get_json_schema()
- Update test_inferred_schema_loader_handles_errors to verify errors now propagate
- This allows actual errors to surface for debugging instead of being silently suppressed
- Addresses PR comment from @aaronsteers

Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
- Remove test_inferred_schema_loader_integration.py which contained failing integration tests
- HttpMocker tests were not matching requests properly in CI
- Keep 6 passing unit tests in test_inferred_schema_loader.py
- Core functionality remains fully tested with mocked retrievers

Co-Authored-By: AJ Steers <aj@airbyte.io>
- Remove unused imports from test file: json, ConcurrentDeclarativeSource, HttpMocker
- Remove unused global variables: _CONFIG, _MANIFEST
- Update Optional[Mapping[str, Any]] to Mapping[str, Any] | None per Python 3.10+ style
- All 6 unit tests pass locally

Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1)

1-186: Excellent test coverage!

The test suite comprehensively validates the InferredSchemaLoader functionality across basic inference, empty records, sample size limits, error propagation, nested objects, and arrays. The structure is clear and the assertions appropriately verify schema structure and types.

One optional enhancement to consider: adding a test case for records containing null values (e.g., {"id": 1, "name": None}) to verify the schema inferrer handles nullable fields correctly, wdyt?

airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)

71-110: Schema inference logic is well-implemented.

The method correctly iterates through stream slices, respects the sample size limit, converts records to plain Python types, and accumulates them in the SchemaInferrer. The nested loop structure properly breaks out when the sample size is reached.

One minor consideration: when no records are found, the method returns {} (line 110). While this is handled in tests, an empty object may not be a semantically valid JSON Schema. Would it make sense to return a minimal valid schema like {"type": "object"} instead for better downstream compatibility, wdyt?


71-110: Consider operational performance characteristics.

Since schema inference reads records synchronously at discover time, the performance will depend on the API's response time and the record_sample_size. The default of 100 records is reasonable, but for slow APIs or large sample sizes, discovery could take longer. This is expected behavior for discover-time operations, but worth keeping in mind for user experience and potentially documenting for connector builders.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 02050b8 and ff51fa5.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
📚 Learning: 2024-11-15T00:59:08.154Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (3)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
  • Retriever (14-58)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
  • SchemaLoader (11-17)
airbyte_cdk/utils/schema_inferrer.py (2)
  • SchemaInferrer (81-270)
  • get_stream_schema (260-270)
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (2)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)
  • InferredSchemaLoader (42-110)
  • get_json_schema (71-110)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py

[error] 66-69: poetry run ruff format --diff . reported formatting changes needed and exited with code 1. The raised ValueError was wrapped across multiple lines to satisfy formatting rules.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Analyze (python)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)

17-38: Helper function handles type conversion correctly.

The recursive conversion logic properly handles Mapping, Sequence, and primitive types. The exclusion of str and bytes from Sequence handling (line 35) is correct since they shouldn't be decomposed character-by-character.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)

17-38: Nice helper function for type normalization!

The logic correctly handles the conversion of Mapping-like and Sequence-like objects to plain Python types for genson compatibility. The explicit handling of str/bytes exclusion is good defensive programming.

One minor observation: lines 33-34 handle (list, tuple) and lines 35-36 handle generic Sequence. Since lists and tuples are also sequences, there's a bit of redundancy here. The current approach works fine due to short-circuiting, but you could simplify by removing lines 33-34 and just relying on the Sequence check. That said, the explicit handling might be clearer for readability, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ff51fa5 and 6906cb0.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (4)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
  • Retriever (14-58)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
  • SchemaLoader (11-17)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
airbyte_cdk/utils/schema_inferrer.py (2)
  • SchemaInferrer (81-270)
  • get_stream_schema (260-270)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (3)

41-62: Well-structured class definition!

The dataclass pattern is appropriate here, and the use of InitVar for parameters is the right approach. The default record_sample_size of 100 provides a good balance between schema accuracy and performance. The comprehensive docstring makes the purpose and usage crystal clear.


64-71: Solid validation logic!

The __post_init__ method correctly ensures that stream_name is always populated, either from the direct attribute or from the parameters. The ValueError provides a clear message if neither source is available, preventing subtle downstream issues. This addresses the concerns raised in earlier reviews.


73-112: Code pattern matches established conventions — no changes needed.

The verification confirms both concerns are non-issues:

  1. Empty schema is standard: The pattern read_records(records_schema={}, stream_slice=...) is consistently used across schema discovery components (properties_from_endpoint.py, dynamic_schema_loader.py, http_components_resolver.py), confirming that passing an empty schema is the intended approach when inferring schemas.

  2. stream_slices() usage is active: The method is still widely used throughout the codebase (including http_components_resolver.py), so the deprecation status—if any—appears informal and not yet requiring migration.

Your implementation follows the established patterns perfectly.

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Nov 9, 2025

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/19207237224

@github-actions
Copy link

github-actions bot commented Nov 9, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1762562686-inferred-schema-loader#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1762562686-inferred-schema-loader

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

devin-ai-integration bot added a commit to airbytehq/airbyte that referenced this pull request Nov 9, 2025
…7224

This commit updates the base Docker image to use the prerelease CDK version
that includes the new InferredSchemaLoader feature. This is the first commit
in a two-commit proof-of-concept test.

Related: airbytehq/airbyte-python-cdk#831
Co-Authored-By: AJ Steers <aj@airbyte.io>
devin-ai-integration bot added a commit to airbytehq/airbyte that referenced this pull request Nov 9, 2025
This commit replaces the InlineSchemaLoader with the new InferredSchemaLoader
on the pokemon stream. The InferredSchemaLoader will read 1 sample record at
discover time and infer the schema using genson.

This demonstrates the end-to-end functionality of the InferredSchemaLoader
feature from CDK PR airbytehq/airbyte-python-cdk#831.

Expected behavior:
- Discover should return a non-empty schema inferred from the sample record
- Schema may include nullable unions (e.g., ["string", "null"]) from genson
- Read operations should continue to work with the inferred schema

Related: airbytehq/airbyte-python-cdk#831
Co-Authored-By: AJ Steers <aj@airbyte.io>
schema_inferrer = SchemaInferrer()

record_count = 0
for stream_slice in self.retriever.stream_slices():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that during a read, we will read some records twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The InferredSchemaLoader reads records during discover time (when get_json_schema() is called), not during the read operation. So the flow is:

  1. Discover phase: InferredSchemaLoader reads up to record_sample_size records to infer the schema
  2. Read phase: The stream reads records normally (potentially including the same records from discover)

So yes, some records may be read twice - once during discover and once during read. This is intentional and consistent with how schema inference works. The alternative would be to cache records from discover and replay them during read, but that would add complexity and memory overhead.

This behavior is similar to how test_read works in the CDK - it also reads sample records separately from the main read operation.

Is this acceptable, or would you prefer a different approach (e.g., caching records from discover)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the schema loader called during read here which would mean that the HTTP requests are done once for self._schema_loader.get_json_schema() and another time for self._retriever.read_records(...)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch! You're absolutely right - get_json_schema() is called during read operations (in DeclarativePartition.read() line 92), not just during discover. This means InferredSchemaLoader would read up to record_sample_size records for every partition/slice during a sync, which is extremely inefficient.

I checked other schema loaders:

  • InlineSchemaLoader: Returns static schema (no issue)
  • DynamicSchemaLoader: Also reads records on every call to get_json_schema() without caching, but only reads 1 record so the impact is minimal

Proposed Solution:
Add internal memoization to InferredSchemaLoader with thread-safe caching:

  • Cache the inferred schema after first call
  • Use threading.Lock to prevent duplicate work under concurrent partition reads
  • Ensure the schema loader uses its own retriever instance (not shared with the stream) to avoid state side-effects

This will ensure schema inference happens only once per sync job, not once per partition.

I'll implement this fix now. Does this approach sound good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented the caching solution! 🎉

Changes made:

  • Added threading.Lock and _cached_schema instance variables in __post_init__
  • Implemented double-checked locking pattern in get_json_schema() to prevent duplicate work under concurrent calls
  • Added unit test test_inferred_schema_loader_caches_schema() to verify the retriever is only called once even when get_json_schema() is called multiple times

Result:
Schema inference now happens only once per sync job (on first call to get_json_schema()), not once per partition. This prevents the performance issue where InferredSchemaLoader would read up to record_sample_size records for every partition/slice during a sync.

The implementation follows standard thread-safe memoization patterns and ensures the cached schema is returned on all subsequent calls.

- Add internal memoization with threading.Lock to prevent duplicate schema inference
- Cache schema after first call to avoid re-reading records on subsequent calls
- This addresses the issue where get_json_schema() is called during read operations
  (in DeclarativePartition.read()), not just during discover
- Add unit test to verify caching behavior (schema inference happens only once)

Fixes performance issue identified by @maxi297 where InferredSchemaLoader would
read up to record_sample_size records for every partition/slice during a sync.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1)

18-40: Consider simplifying the tuple handling logic, wdyt?

The function correctly converts custom Mapping and Sequence types to builtins. However, there's a small redundancy: line 34 explicitly handles (list, tuple), while line 36 would also catch tuples via the Sequence check. Since tuples match the first condition, this works fine—but you could simplify by removing tuple from line 34 and letting line 36 handle it uniformly. This would make the logic clearer. What do you think?

-    elif isinstance(value, (list, tuple)):
+    elif isinstance(value, list):
         return [_to_builtin_types(item) for item in value]
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (2)

27-38: Minor: Redundant stream_name parameter, wdyt?

The stream_name is provided both in the parameters dict (line 31) and as a constructor argument (line 37). Since the constructor argument takes precedence in __post_init__, the value in parameters is unused here. While this doesn't cause issues, you could simplify by removing one of them for clarity. What do you think?

 @pytest.fixture
 def inferred_schema_loader(mock_retriever):
     """Create an InferredSchemaLoader with a mock retriever."""
     config = MagicMock()
     parameters = {"name": "users"}
     return InferredSchemaLoader(
         retriever=mock_retriever,
         config=config,
         parameters=parameters,
         record_sample_size=3,
-        stream_name="users",
     )

81-103: Test name suggests verification of sample size limit—should we add that check?

The test is named test_inferred_schema_loader_respects_sample_size and sets record_sample_size=5 with 10 available records. However, it only verifies that the schema contains the expected fields, not that the loader actually stopped at 5 records. Would it make sense to assert that only 5 records were processed, perhaps by checking retriever.read_records.call_count or by tracking how many times the mock iterator was consumed? This would make the test more accurately reflect its name, wdyt?

One approach:

 def test_inferred_schema_loader_respects_sample_size():
     """Test that InferredSchemaLoader respects the record_sample_size parameter."""
     retriever = MagicMock()
     records = [{"id": i, "name": f"User{i}"} for i in range(10)]
     retriever.stream_slices.return_value = iter([None])
-    retriever.read_records.return_value = iter(records)
+    
+    # Track how many records were consumed
+    consumed_count = 0
+    def record_generator():
+        nonlocal consumed_count
+        for record in records:
+            consumed_count += 1
+            yield record
+    
+    retriever.read_records.return_value = record_generator()
 
     config = MagicMock()
     parameters = {"name": "users"}
     loader = InferredSchemaLoader(
         retriever=retriever,
         config=config,
         parameters=parameters,
         record_sample_size=5,
         stream_name="users",
     )
 
     schema = loader.get_json_schema()
 
     assert "properties" in schema
     assert "id" in schema["properties"]
     assert "name" in schema["properties"]
+    assert consumed_count == 5, f"Expected 5 records to be consumed, but {consumed_count} were consumed"
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6906cb0 and 66db70e.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (1 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
📚 Learning: 2024-11-15T00:59:08.154Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
  • airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • unit_tests/sources/declarative/schema/test_inferred_schema_loader.py
🧬 Code graph analysis (2)
unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (2)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)
  • InferredSchemaLoader (43-123)
  • get_json_schema (76-123)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (4)
airbyte_cdk/sources/declarative/retrievers/retriever.py (1)
  • Retriever (14-58)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
  • SchemaLoader (11-17)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • InferredSchemaLoader (2756-2774)
airbyte_cdk/utils/schema_inferrer.py (2)
  • SchemaInferrer (81-270)
  • get_stream_schema (260-270)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py

[error] 95-109: ruff format check failed. 1 file would be reformatted. Run 'poetry run ruff format' to auto-format the file.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.13, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Manifest Server Docker Image Build
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py (2)

1-3: LGTM! Copyright year is correct.

The copyright year of 2025 aligns with the PR creation date. Nice work addressing the earlier feedback!


65-75: Nice caching implementation!

The thread-safe caching with _lock and _cached_schema is a great addition that addresses the concern from past reviews about re-reading records on every partition. The double-check locking pattern looks solid. The stream_name validation also prevents downstream issues. Well done!

unit_tests/sources/declarative/schema/test_inferred_schema_loader.py (2)

189-216: Excellent caching test coverage!

This test perfectly verifies that the schema is cached and stream_slices/read_records are called only once across multiple get_json_schema() invocations. This directly addresses the performance concern from past reviews about re-reading records on every partition. Great work!


12-187: Comprehensive test coverage overall!

The test suite covers the essential scenarios: basic inference, empty records, error propagation, nested objects, and arrays. Combined with the caching test, this provides solid confidence in the implementation. Nice job!

Comment on lines +87 to +123
if self._cached_schema is not None:
return self._cached_schema

with self._lock:
if self._cached_schema is not None:
return self._cached_schema

schema_inferrer = SchemaInferrer()

record_count = 0
for stream_slice in self.retriever.stream_slices():
for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
if record_count >= self.record_sample_size:
break

# Convert all Mapping-like and Sequence-like objects to plain Python types
# This is necessary because genson doesn't handle custom implementations properly
record = _to_builtin_types(record)

airbyte_record = AirbyteRecordMessage(
stream=self.stream_name,
data=record, # type: ignore[arg-type]
emitted_at=0,
)

schema_inferrer.accumulate(airbyte_record)
record_count += 1

if record_count >= self.record_sample_size:
break

inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema(
self.stream_name
)

self._cached_schema = inferred_schema if inferred_schema else {}
return self._cached_schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix formatting to resolve pipeline failure.

The pipeline reports that ruff format requires changes here. The line at 98 is likely too long and needs wrapping, wdyt?

Apply formatting by running:

poetry run ruff format airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py

Or manually wrap the long line:

             for stream_slice in self.retriever.stream_slices():
-                for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
+                for record in self.retriever.read_records(
+                    records_schema={}, stream_slice=stream_slice
+                ):
                     if record_count >= self.record_sample_size:
                         break
🧰 Tools
🪛 GitHub Actions: Linters

[error] 95-109: ruff format check failed. 1 file would be reformatted. Run 'poetry run ruff format' to auto-format the file.

🤖 Prompt for AI Agents
In airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py around lines
87 to 123, the file fails ruff formatting (a line around line 98 is too long);
run the formatter or wrap the long line to satisfy ruff. Fix by running: `poetry
run ruff format
airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py` (or manually
break the long line into multiple shorter lines keeping the same logic and
indentation), then re-run ruff/CI to confirm the formatting issue is resolved.

Comment on lines +1 to +3
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Update copyright year to 2025, wdyt?

The PR was created in 2025-11-08, so the copyright year should be 2025 rather than 2024 for consistency.

-# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
🤖 Prompt for AI Agents
In unit_tests/sources/declarative/schema/test_inferred_schema_loader.py around
lines 1 to 3, update the file header copyright year from 2024 to 2025; edit the
top comment block so it reads "Copyright (c) 2025 Airbyte, Inc., all rights
reserved." ensuring punctuation and capitalization match the existing header
style.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants