Skip to content

Commit 95ff3b8

Browse files
authored
feat(query_properties): Add configured catalog to SimpleRetriever and only fetch properties that are included in the stream schema (#788)
1 parent 20ae208 commit 95ff3b8

File tree

17 files changed

+756
-19
lines changed

17 files changed

+756
-19
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(
169169
component_factory = ModelToComponentFactory(
170170
emit_connector_builder_messages=emit_connector_builder_messages,
171171
message_repository=ConcurrentMessageRepository(queue, message_repository),
172+
configured_catalog=catalog,
172173
connector_state_manager=self._connector_state_manager,
173174
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
174175
limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,6 +2520,34 @@ definitions:
25202520
type:
25212521
type: string
25222522
enum: [JsonlDecoder]
2523+
JsonSchemaPropertySelector:
2524+
title: Json Schema Property Selector
2525+
description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record.
2526+
type: object
2527+
required:
2528+
- type
2529+
properties:
2530+
type:
2531+
type: string
2532+
enum: [JsonSchemaPropertySelector]
2533+
transformations:
2534+
title: Transformations
2535+
description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.
2536+
linkable: true
2537+
type: array
2538+
items:
2539+
anyOf:
2540+
- "$ref": "#/definitions/AddFields"
2541+
- "$ref": "#/definitions/RemoveFields"
2542+
- "$ref": "#/definitions/KeysToLower"
2543+
- "$ref": "#/definitions/KeysToSnakeCase"
2544+
- "$ref": "#/definitions/FlattenFields"
2545+
- "$ref": "#/definitions/DpathFlattenFields"
2546+
- "$ref": "#/definitions/KeysReplace"
2547+
- "$ref": "#/definitions/CustomTransformation"
2548+
$parameters:
2549+
type: object
2550+
additionalProperties: true
25232551
KeysToLower:
25242552
title: Keys to Lower Case
25252553
description: A transformation that renames all keys to lower case.
@@ -3410,6 +3438,10 @@ definitions:
34103438
title: Property Chunking
34113439
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
34123440
"$ref": "#/definitions/PropertyChunking"
3441+
property_selector:
3442+
title: Property Selector
3443+
description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.
3444+
"$ref": "#/definitions/JsonSchemaPropertySelector"
34133445
$parameters:
34143446
type: object
34153447
additionalProperties: true
@@ -3746,7 +3778,7 @@ definitions:
37463778
properties:
37473779
type:
37483780
type: string
3749-
enum: [ PaginationReset ]
3781+
enum: [PaginationReset]
37503782
action:
37513783
type: string
37523784
enum:
@@ -3763,7 +3795,7 @@ definitions:
37633795
properties:
37643796
type:
37653797
type: string
3766-
enum: [ PaginationResetLimits ]
3798+
enum: [PaginationResetLimits]
37673799
number_of_records:
37683800
type: integer
37693801
GzipDecoder:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
20292031
)
20302032

20312033

2034+
class JsonSchemaPropertySelector(BaseModel):
2035+
type: Literal["JsonSchemaPropertySelector"]
2036+
transformations: Optional[
2037+
List[
2038+
Union[
2039+
AddFields,
2040+
RemoveFields,
2041+
KeysToLower,
2042+
KeysToSnakeCase,
2043+
FlattenFields,
2044+
DpathFlattenFields,
2045+
KeysReplace,
2046+
CustomTransformation,
2047+
]
2048+
]
2049+
] = Field(
2050+
None,
2051+
description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.",
2052+
title="Transformations",
2053+
)
2054+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2055+
2056+
20322057
class ListPartitionRouter(BaseModel):
20332058
type: Literal["ListPartitionRouter"]
20342059
cursor_field: str = Field(
@@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel):
27992824
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
28002825
title="Property Chunking",
28012826
)
2827+
property_selector: Optional[JsonSchemaPropertySelector] = Field(
2828+
None,
2829+
description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.",
2830+
title="Property Selector",
2831+
)
28022832
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
28032833

28042834

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
get_type_hints,
2727
)
2828

29+
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
2930
from isodate import parse_duration
3031
from pydantic.v1 import BaseModel
3132
from requests import Response
@@ -42,6 +43,7 @@
4243
AirbyteStateMessage,
4344
AirbyteStateType,
4445
AirbyteStreamState,
46+
ConfiguredAirbyteCatalog,
4547
FailureType,
4648
Level,
4749
StreamDescriptor,
@@ -314,6 +316,9 @@
314316
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
315317
JsonlDecoder as JsonlDecoderModel,
316318
)
319+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
320+
JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel,
321+
)
317322
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
318323
JwtAuthenticator as JwtAuthenticatorModel,
319324
)
@@ -501,6 +506,9 @@
501506
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
502507
PropertyLimitType,
503508
)
509+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import (
510+
JsonSchemaPropertySelector,
511+
)
504512
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
505513
GroupByKey,
506514
)
@@ -668,6 +676,7 @@ def __init__(
668676
message_repository: Optional[MessageRepository] = None,
669677
connector_state_manager: Optional[ConnectorStateManager] = None,
670678
max_concurrent_async_job_count: Optional[int] = None,
679+
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
671680
):
672681
self._init_mappings()
673682
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -678,6 +687,9 @@ def __init__(
678687
self._message_repository = message_repository or InMemoryMessageRepository(
679688
self._evaluate_log_level(emit_connector_builder_messages)
680689
)
690+
self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream(
691+
configured_catalog
692+
)
681693
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
682694
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
683695
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
@@ -734,6 +746,7 @@ def _init_mappings(self) -> None:
734746
InlineSchemaLoaderModel: self.create_inline_schema_loader,
735747
JsonDecoderModel: self.create_json_decoder,
736748
JsonlDecoderModel: self.create_jsonl_decoder,
749+
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
737750
GzipDecoderModel: self.create_gzip_decoder,
738751
KeysToLowerModel: self.create_keys_to_lower_transformation,
739752
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
@@ -796,6 +809,16 @@ def _init_mappings(self) -> None:
796809
# Needed for the case where we need to perform a second parse on the fields of a custom component
797810
self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR}
798811

812+
@staticmethod
813+
def _create_stream_name_to_configured_stream(
814+
configured_catalog: Optional[ConfiguredAirbyteCatalog],
815+
) -> Mapping[str, ConfiguredAirbyteStream]:
816+
return (
817+
{stream.stream.name: stream for stream in configured_catalog.streams}
818+
if configured_catalog
819+
else {}
820+
)
821+
799822
def create_component(
800823
self,
801824
model_type: Type[BaseModel],
@@ -2987,7 +3010,7 @@ def create_property_chunking(
29873010
)
29883011

29893012
def create_query_properties(
2990-
self, model: QueryPropertiesModel, config: Config, **kwargs: Any
3013+
self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any
29913014
) -> QueryProperties:
29923015
if isinstance(model.property_list, list):
29933016
property_list = model.property_list
@@ -3004,10 +3027,43 @@ def create_query_properties(
30043027
else None
30053028
)
30063029

3030+
property_selector = (
3031+
self._create_component_from_model(
3032+
model=model.property_selector, config=config, stream_name=stream_name, **kwargs
3033+
)
3034+
if model.property_selector
3035+
else None
3036+
)
3037+
30073038
return QueryProperties(
30083039
property_list=property_list,
30093040
always_include_properties=model.always_include_properties,
30103041
property_chunking=property_chunking,
3042+
property_selector=property_selector,
3043+
config=config,
3044+
parameters=model.parameters or {},
3045+
)
3046+
3047+
def create_json_schema_property_selector(
3048+
self,
3049+
model: JsonSchemaPropertySelectorModel,
3050+
config: Config,
3051+
*,
3052+
stream_name: str,
3053+
**kwargs: Any,
3054+
) -> JsonSchemaPropertySelector:
3055+
configured_stream = self._stream_name_to_configured_stream.get(stream_name)
3056+
3057+
transformations = []
3058+
if model.transformations:
3059+
for transformation_model in model.transformations:
3060+
transformations.append(
3061+
self._create_component_from_model(model=transformation_model, config=config)
3062+
)
3063+
3064+
return JsonSchemaPropertySelector(
3065+
configured_stream=configured_stream,
3066+
properties_transformations=transformations,
30113067
config=config,
30123068
parameters=model.parameters or {},
30133069
)
@@ -3235,7 +3291,7 @@ def _get_url(req: Requester) -> str:
32353291

32363292
if len(query_properties_definitions) == 1:
32373293
query_properties = self._create_component_from_model(
3238-
model=query_properties_definitions[0], config=config
3294+
model=query_properties_definitions[0], stream_name=name, config=config
32393295
)
32403296

32413297
# Removes QueryProperties components from the interpolated mappings because it has been designed
@@ -3261,11 +3317,13 @@ def _get_url(req: Requester) -> str:
32613317

32623318
query_properties = self.create_query_properties(
32633319
model=query_properties_definition,
3320+
stream_name=name,
32643321
config=config,
32653322
)
32663323
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
32673324
query_properties = self.create_query_properties(
32683325
model=model.requester.query_properties,
3326+
stream_name=name,
32693327
config=config,
32703328
)
32713329

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dataclasses import InitVar, dataclass
44
from enum import Enum
5-
from typing import Any, Iterable, List, Mapping, Optional
5+
from typing import Any, Iterable, List, Mapping, Optional, Set
66

77
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
88
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
@@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4040
)
4141

4242
def get_request_property_chunks(
43-
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
43+
self,
44+
property_fields: Iterable[str],
45+
always_include_properties: Optional[List[str]],
46+
configured_properties: Optional[Set[str]],
4447
) -> Iterable[List[str]]:
4548
if not self.property_limit:
4649
single_property_chunk = list(property_fields)
@@ -53,6 +56,8 @@ def get_request_property_chunks(
5356
for property_field in property_fields:
5457
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
5558
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
59+
if configured_properties is not None and property_field not in configured_properties:
60+
continue
5661
property_field_size = (
5762
len(property_field)
5863
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import (
4+
JsonSchemaPropertySelector,
5+
)
6+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
7+
PropertySelector,
8+
)
9+
10+
__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
import copy
3+
from dataclasses import InitVar, dataclass, field
4+
from typing import Any, List, Mapping, Optional, Set
5+
6+
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
7+
8+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
9+
PropertySelector,
10+
)
11+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
12+
from airbyte_cdk.sources.types import Config
13+
14+
15+
@dataclass
16+
class JsonSchemaPropertySelector(PropertySelector):
17+
"""
18+
A class that contains a list of transformations to apply to properties.
19+
"""
20+
21+
config: Config
22+
parameters: InitVar[Mapping[str, Any]]
23+
# For other non-read operations, there is no configured catalog and therefore no schema selection
24+
configured_stream: Optional[ConfiguredAirbyteStream] = None
25+
properties_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
26+
27+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
28+
self._parameters = parameters
29+
30+
def select(self) -> Optional[Set[str]]:
31+
"""
32+
Returns the set of properties that have been selected for the configured stream. The intent being that
33+
we should only query for selected properties not all since disabled properties are discarded.
34+
35+
When configured_stream is None, then there was no incoming catalog and all fields should be retrieved.
36+
This is different from the empty set where the json_schema was empty and no schema fields were selected.
37+
"""
38+
39+
# For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected
40+
# columns. In this case we return None which is interpreted by the QueryProperties component to not
41+
# perform any filtering of schema properties and fetch all of them
42+
if self.configured_stream is None:
43+
return None
44+
45+
schema_properties = copy.deepcopy(
46+
self.configured_stream.stream.json_schema.get("properties", {})
47+
)
48+
if self.properties_transformations:
49+
for transformation in self.properties_transformations:
50+
transformation.transform(
51+
record=schema_properties,
52+
config=self.config,
53+
)
54+
return set(schema_properties.keys())
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from abc import ABC, abstractmethod
4+
from dataclasses import dataclass
5+
from typing import Optional, Set
6+
7+
8+
@dataclass
9+
class PropertySelector(ABC):
10+
"""
11+
Describes the interface for selecting and transforming properties from a configured stream's schema
12+
to determine which properties should be queried from the API.
13+
"""
14+
15+
@abstractmethod
16+
def select(self) -> Optional[Set[str]]:
17+
"""
18+
Selects and returns the set of properties that should be queried from the API based on the
19+
configured stream's schema and any applicable transformations.
20+
21+
Returns:
22+
Set[str]: The set of property names to query
23+
"""
24+
pass

0 commit comments

Comments
 (0)