From 5f1d7430f3e439bd12d4d574f14e897c96d946c5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 13 Oct 2025 21:30:50 +0000 Subject: [PATCH 1/2] fix: Add circular reference detection to schema_helpers._expand_refs() Fixes a ValueError: Circular reference detected error that occurs when serpyco-rs tries to serialize connector specs with circular patterns. The _expand_refs() function now tracks visited references during recursive expansion to prevent infinite loops when schemas contain circular references. This resolves issue airbytehq/oncall#9688 for the Facebook Marketing connector and any other connectors with similar circular reference patterns in their specs. Changes: - Add visited_refs parameter to _expand_refs() to track reference URLs - Skip expanding already-visited references to prevent infinite recursion - Remove references from visited set after expansion to allow reuse in different branches - Skip 'definitions' key during iteration to avoid re-expanding resolved definitions - Add unit tests for circular reference handling and simple reference expansion Co-Authored-By: unknown <> --- airbyte_cdk/manifest_migrations/README.md | 2 +- airbyte_cdk/manifest_server/README.md | 1 - .../declarative_component_schema.yaml | 4 +- airbyte_cdk/sources/utils/schema_helpers.py | 22 +++++--- cdk-migrations.md | 2 +- .../sources/utils/test_schema_helpers.py | 51 +++++++++++++++++++ 6 files changed, 71 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/manifest_migrations/README.md b/airbyte_cdk/manifest_migrations/README.md index ef85fc8be..6216279b6 100644 --- a/airbyte_cdk/manifest_migrations/README.md +++ b/airbyte_cdk/manifest_migrations/README.md @@ -21,7 +21,7 @@ This directory contains the logic and registry for manifest migrations in the Ai 3. **Register the Migration:** - Open `migrations/registry.yaml`. - Add an entry under the appropriate version, or create a new version section if needed. - - Version can be: "*", "==6.48.3", "~=1.2", ">=1.0.0,<2.0.0", "6.48.3" + - Version can be: "\*", "==6.48.3", "~=1.2", ">=1.0.0,<2.0.0", "6.48.3" - Each migration entry should include: - `name`: The filename (without `.py`) - `order`: The order in which this migration should be applied for the version diff --git a/airbyte_cdk/manifest_server/README.md b/airbyte_cdk/manifest_server/README.md index ee40ee55d..42724b708 100644 --- a/airbyte_cdk/manifest_server/README.md +++ b/airbyte_cdk/manifest_server/README.md @@ -175,4 +175,3 @@ This requires the `ddtrace` dependency, which is included in the `manifest-serve # Run with Datadog tracing enabled DD_ENABLED=true manifest-server start ``` - diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a77bc34ea..01e9da113 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3746,7 +3746,7 @@ definitions: properties: type: type: string - enum: [ PaginationReset ] + enum: [PaginationReset] action: type: string enum: @@ -3763,7 +3763,7 @@ definitions: properties: type: type: string - enum: [ PaginationResetLimits ] + enum: [PaginationResetLimits] number_of_records: type: integer GzipDecoder: diff --git a/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte_cdk/sources/utils/schema_helpers.py index ab7fbbf45..551206e18 100644 --- a/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte_cdk/sources/utils/schema_helpers.py @@ -82,25 +82,35 @@ def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry: ) -def _expand_refs(schema: Any, ref_resolver: Resolver) -> None: +def _expand_refs(schema: Any, ref_resolver: Resolver, visited_refs: set[str] | None = None) -> None: """Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive. :param schema: schema that will be patched + :param ref_resolver: resolver to look up references + :param visited_refs: set of already visited reference URLs to detect circular references """ + if visited_refs is None: + visited_refs = set() + if isinstance(schema, MutableMapping): if "$ref" in schema: ref_url = schema.pop("$ref") + + if ref_url in visited_refs: + return + + visited_refs.add(ref_url) definition = ref_resolver.lookup(ref_url).contents - _expand_refs( - definition, ref_resolver=ref_resolver - ) # expand refs in definitions as well + _expand_refs(definition, ref_resolver=ref_resolver, visited_refs=visited_refs) schema.update(definition) + visited_refs.remove(ref_url) else: for key, value in schema.items(): - _expand_refs(value, ref_resolver=ref_resolver) + if key != "definitions": + _expand_refs(value, ref_resolver=ref_resolver, visited_refs=visited_refs) elif isinstance(schema, List): for value in schema: - _expand_refs(value, ref_resolver=ref_resolver) + _expand_refs(value, ref_resolver=ref_resolver, visited_refs=visited_refs) def expand_refs(schema: Any) -> None: diff --git a/cdk-migrations.md b/cdk-migrations.md index 3445a04b8..b27f597db 100644 --- a/cdk-migrations.md +++ b/cdk-migrations.md @@ -14,7 +14,7 @@ Rationale: Our current interface for CustomIncrementalSync was assuming that the Migration steps: Ensures that you don't implement `Retriever.state` or relying on the field `SimpleRetriever.cursor`. For more information, see the point above. -Rationale: As mentioned above, the state has been moved outside the realm of the stream responsibilities. Therefore, it does not make sense for the retriever (which is a stream specific concept) to hold state information. This way, a connector developer wanting to implement a CustomRetriever will not have to bother about state management anymore. +Rationale: As mentioned above, the state has been moved outside the realm of the stream responsibilities. Therefore, it does not make sense for the retriever (which is a stream specific concept) to hold state information. This way, a connector developer wanting to implement a CustomRetriever will not have to bother about state management anymore. ### Inheriting from Substream Partition Routing diff --git a/unit_tests/sources/utils/test_schema_helpers.py b/unit_tests/sources/utils/test_schema_helpers.py index 3ccb13a90..168d1ec11 100644 --- a/unit_tests/sources/utils/test_schema_helpers.py +++ b/unit_tests/sources/utils/test_schema_helpers.py @@ -22,6 +22,7 @@ InternalConfig, ResourceSchemaLoader, check_config_against_spec_or_exit, + expand_refs, ) from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -209,3 +210,53 @@ def test_shared_schemas_resolves_nested(): def test_internal_config(limit, record_count, expected): config = InternalConfig(_limit=limit) assert config.is_limit_reached(record_count) == expected + + +def test_expand_refs_handles_circular_references(): + schema = { + "definitions": { + "A": { + "type": "object", + "properties": { + "b": {"$ref": "#/definitions/B"} + } + }, + "B": { + "type": "object", + "properties": { + "a": {"$ref": "#/definitions/A"} + } + } + }, + "type": "object", + "properties": { + "root": {"$ref": "#/definitions/A"} + } + } + + expand_refs(schema) + + assert "root" in schema["properties"] + assert schema["properties"]["root"]["type"] == "object" + assert "definitions" not in schema + + +def test_expand_refs_expands_simple_refs(): + schema = { + "definitions": { + "StringType": { + "type": "string", + "minLength": 1 + } + }, + "type": "object", + "properties": { + "name": {"$ref": "#/definitions/StringType"} + } + } + + expand_refs(schema) + + assert schema["properties"]["name"]["type"] == "string" + assert schema["properties"]["name"]["minLength"] == 1 + assert "definitions" not in schema From 404800047cf1e7f8a9990a75c3de5804c68f99e5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 13 Oct 2025 21:57:10 +0000 Subject: [PATCH 2/2] style: Apply Ruff formatting to test_schema_helpers.py Co-Authored-By: unknown <> --- .../sources/utils/test_schema_helpers.py | 37 +++++-------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/unit_tests/sources/utils/test_schema_helpers.py b/unit_tests/sources/utils/test_schema_helpers.py index 168d1ec11..0bb186e49 100644 --- a/unit_tests/sources/utils/test_schema_helpers.py +++ b/unit_tests/sources/utils/test_schema_helpers.py @@ -215,27 +215,15 @@ def test_internal_config(limit, record_count, expected): def test_expand_refs_handles_circular_references(): schema = { "definitions": { - "A": { - "type": "object", - "properties": { - "b": {"$ref": "#/definitions/B"} - } - }, - "B": { - "type": "object", - "properties": { - "a": {"$ref": "#/definitions/A"} - } - } + "A": {"type": "object", "properties": {"b": {"$ref": "#/definitions/B"}}}, + "B": {"type": "object", "properties": {"a": {"$ref": "#/definitions/A"}}}, }, "type": "object", - "properties": { - "root": {"$ref": "#/definitions/A"} - } + "properties": {"root": {"$ref": "#/definitions/A"}}, } - + expand_refs(schema) - + assert "root" in schema["properties"] assert schema["properties"]["root"]["type"] == "object" assert "definitions" not in schema @@ -243,20 +231,13 @@ def test_expand_refs_handles_circular_references(): def test_expand_refs_expands_simple_refs(): schema = { - "definitions": { - "StringType": { - "type": "string", - "minLength": 1 - } - }, + "definitions": {"StringType": {"type": "string", "minLength": 1}}, "type": "object", - "properties": { - "name": {"$ref": "#/definitions/StringType"} - } + "properties": {"name": {"$ref": "#/definitions/StringType"}}, } - + expand_refs(schema) - + assert schema["properties"]["name"]["type"] == "string" assert schema["properties"]["name"]["minLength"] == 1 assert "definitions" not in schema