Skip to content

Commit 0b7d976

Browse files
fix(low-code CDK): fix checkpointing for declarative streams (#177)
Co-authored-by: darynaishchenko <darina.ishchenko17@gmail.com>
1 parent 4f8e9d8 commit 0b7d976

File tree

3 files changed

+224
-6
lines changed

3 files changed

+224
-6
lines changed

airbyte_cdk/sources/streams/core.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,17 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o
223223
record_counter += 1
224224

225225
checkpoint_interval = self.state_checkpoint_interval
226-
checkpoint = checkpoint_reader.get_checkpoint()
227226
if (
228227
should_checkpoint
229228
and checkpoint_interval
230229
and record_counter % checkpoint_interval == 0
231-
and checkpoint is not None
232230
):
233-
airbyte_state_message = self._checkpoint_state(
234-
checkpoint, state_manager=state_manager
235-
)
236-
yield airbyte_state_message
231+
checkpoint = checkpoint_reader.get_checkpoint()
232+
if checkpoint:
233+
airbyte_state_message = self._checkpoint_state(
234+
checkpoint, state_manager=state_manager
235+
)
236+
yield airbyte_state_message
237237

238238
if internal_config.is_limit_reached(record_counter):
239239
break

unit_tests/sources/declarative/test_manifest_declarative_source.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,3 +1998,145 @@ def validate_refs(yaml_file: str) -> List[str]:
19981998
/ "airbyte_cdk/sources/declarative/declarative_component_schema.yaml"
19991999
)
20002000
assert not validate_refs(yaml_file_path)
2001+
2002+
2003+
@pytest.mark.parametrize(
2004+
"test_name, manifest, pages, expected_states_qty",
2005+
[
2006+
(
2007+
"test_with_pagination_and_partition_router",
2008+
{
2009+
"version": "0.34.2",
2010+
"type": "DeclarativeSource",
2011+
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
2012+
"streams": [
2013+
{
2014+
"type": "DeclarativeStream",
2015+
"name": "Rates",
2016+
"primary_key": [],
2017+
"schema_loader": {
2018+
"type": "InlineSchemaLoader",
2019+
"schema": {
2020+
"$schema": "http://json-schema.org/schema#",
2021+
"properties": {
2022+
"ABC": {"type": "number"},
2023+
"AED": {"type": "number"},
2024+
"partition": {"type": "number"},
2025+
},
2026+
"type": "object",
2027+
},
2028+
},
2029+
"retriever": {
2030+
"type": "SimpleRetriever",
2031+
"requester": {
2032+
"type": "HttpRequester",
2033+
"url_base": "https://api.apilayer.com",
2034+
"path": "/exchangerates_data/latest",
2035+
"http_method": "GET",
2036+
"request_parameters": {},
2037+
"request_headers": {},
2038+
"request_body_json": {},
2039+
"authenticator": {
2040+
"type": "ApiKeyAuthenticator",
2041+
"header": "apikey",
2042+
"api_token": "{{ config['api_key'] }}",
2043+
},
2044+
},
2045+
"partition_router": {
2046+
"type": "ListPartitionRouter",
2047+
"values": ["0", "1"],
2048+
"cursor_field": "partition",
2049+
},
2050+
"record_selector": {
2051+
"type": "RecordSelector",
2052+
"extractor": {"type": "DpathExtractor", "field_path": ["rates"]},
2053+
},
2054+
"paginator": {
2055+
"type": "DefaultPaginator",
2056+
"page_size": 2,
2057+
"page_size_option": {
2058+
"inject_into": "request_parameter",
2059+
"field_name": "page_size",
2060+
},
2061+
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
2062+
"pagination_strategy": {
2063+
"type": "CursorPagination",
2064+
"cursor_value": "{{ response._metadata.next }}",
2065+
"page_size": 2,
2066+
},
2067+
},
2068+
},
2069+
"incremental_sync": {
2070+
"type": "DatetimeBasedCursor",
2071+
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%S.%fZ"],
2072+
"datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ",
2073+
"cursor_field": "updated_at",
2074+
"start_datetime": {
2075+
"datetime": "{{ config.get('start_date', '2020-10-16T00:00:00.000Z') }}"
2076+
},
2077+
},
2078+
}
2079+
],
2080+
"spec": {
2081+
"connection_specification": {
2082+
"$schema": "http://json-schema.org/draft-07/schema#",
2083+
"type": "object",
2084+
"required": ["api_key"],
2085+
"properties": {
2086+
"api_key": {
2087+
"type": "string",
2088+
"title": "API Key",
2089+
"airbyte_secret": True,
2090+
},
2091+
"start_date": {
2092+
"title": "Start Date",
2093+
"description": "UTC date and time in the format YYYY-MM-DDTHH:MM:SS.000Z. During incremental sync, any data generated before this date will not be replicated. If left blank, the start date will be set to 2 years before the present date.",
2094+
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
2095+
"pattern_descriptor": "YYYY-MM-DDTHH:MM:SS.000Z",
2096+
"examples": ["2020-11-16T00:00:00.000Z"],
2097+
"type": "string",
2098+
"format": "date-time",
2099+
},
2100+
},
2101+
"additionalProperties": True,
2102+
},
2103+
"documentation_url": "https://example.org",
2104+
"type": "Spec",
2105+
},
2106+
},
2107+
(
2108+
_create_page(
2109+
{
2110+
"rates": [
2111+
{"ABC": 0, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"},
2112+
{"AED": 1, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"},
2113+
],
2114+
"_metadata": {"next": "next"},
2115+
}
2116+
),
2117+
_create_page(
2118+
{
2119+
"rates": [
2120+
{"USD": 3, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"}
2121+
],
2122+
"_metadata": {},
2123+
}
2124+
),
2125+
_create_page(
2126+
{
2127+
"rates": [
2128+
{"ABC": 2, "partition": 1, "updated_at": "2020-11-16T00:00:00.000Z"}
2129+
],
2130+
"_metadata": {},
2131+
}
2132+
),
2133+
),
2134+
2,
2135+
),
2136+
],
2137+
)
2138+
def test_slice_checkpoint(test_name, manifest, pages, expected_states_qty):
2139+
_stream_name = "Rates"
2140+
with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages):
2141+
states = [message.state for message in _run_read(manifest, _stream_name) if message.state]
2142+
assert len(states) == expected_states_qty

unit_tests/sources/streams/test_stream_read.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,82 @@ def test_incremental_read_two_slices():
472472
assert len(actual_records) == len(expected_records)
473473

474474

475+
@pytest.mark.parametrize(
476+
"timestamp, records",
477+
[
478+
pytest.param(
479+
"1708899428",
480+
[
481+
{"id": 2, "created_at": "1708899000"},
482+
{"id": 3, "created_at": "1708899001"},
483+
{"id": 4, "created_at": "1708899428"},
484+
],
485+
id="emits correct state when records are sorted by cursor",
486+
),
487+
pytest.param(
488+
"1708899428",
489+
[
490+
{"id": 1, "created_at": "1708899428"},
491+
{"id": 2, "created_at": "1708899000"},
492+
{"id": 3, "created_at": "1708899001"},
493+
{"id": 4, "created_at": "1708899002"},
494+
],
495+
id="emits correct state when records are not sorted by cursor",
496+
),
497+
pytest.param(
498+
"1708899428",
499+
[
500+
{"id": 1, "created_at": "1708899428"},
501+
{"id": 2, "created_at": "1708899000"},
502+
{"id": 3, "created_at": "1708899428"},
503+
{"id": 4, "created_at": "1708899002"},
504+
],
505+
id="not emit duplicated state",
506+
),
507+
],
508+
)
509+
def test_incremental_read_emit_state(timestamp, records):
510+
# This test verifies that a stream running in incremental mode emits state messages correctly
511+
configured_stream = ConfiguredAirbyteStream(
512+
stream=AirbyteStream(
513+
name="mock_stream",
514+
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
515+
json_schema={},
516+
),
517+
sync_mode=SyncMode.incremental,
518+
cursor_field=["created_at"],
519+
destination_sync_mode=DestinationSyncMode.overwrite,
520+
)
521+
internal_config = InternalConfig()
522+
logger = _mock_logger()
523+
slice_logger = DebugSliceLogger()
524+
message_repository = InMemoryMessageRepository(Level.INFO)
525+
state_manager = ConnectorStateManager()
526+
slice_to_partition = {1: records}
527+
stream = _incremental_stream(
528+
slice_to_partition, slice_logger, logger, message_repository, timestamp
529+
)
530+
531+
expected_records = [
532+
*records,
533+
_create_state_message("__mock_incremental_stream", {"created_at": timestamp}),
534+
]
535+
536+
actual_records = _read(
537+
stream,
538+
configured_stream,
539+
logger,
540+
slice_logger,
541+
message_repository,
542+
state_manager,
543+
internal_config,
544+
)
545+
546+
for record in expected_records:
547+
assert record in actual_records
548+
assert len(actual_records) == len(expected_records)
549+
550+
475551
def test_concurrent_incremental_read_two_slices():
476552
# This test verifies that an incremental concurrent stream manages state correctly for multiple slices syncing concurrently
477553
configured_stream = ConfiguredAirbyteStream(

0 commit comments

Comments
 (0)