From 7dcbc04ba72ebdacf7f772b0f757b8f265b4711c Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 1 Oct 2025 15:38:33 -0400 Subject: [PATCH] PoC pagination reset --- .../parsers/model_to_component_factory.py | 5 + .../error_handlers/default_error_handler.py | 12 ++- .../paginators/default_paginator.py | 18 +++- .../requesters/paginators/no_pagination.py | 3 + .../requesters/paginators/paginator.py | 4 + .../retrievers/simple_retriever.py | 99 +++++++++++-------- .../sources/streams/concurrent/cursor.py | 17 ++++ .../http/error_handlers/response_models.py | 1 + .../sources/streams/http/http_client.py | 7 ++ 9 files changed, 118 insertions(+), 48 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 040ac4689..e548ff995 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2057,6 +2057,7 @@ def create_default_stream( client_side_incremental_sync={"cursor": concurrent_cursor} if self._is_client_side_filtering_enabled(model) else None, + cursor=concurrent_cursor, transformations=transformations, file_uploader=file_uploader, incremental_sync=model.incremental_sync, @@ -2279,6 +2280,7 @@ def create_default_paginator( config: Config, *, url_base: str, + cursor: Cursor, extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None, decoder: Optional[Decoder] = None, cursor_used_for_stop_condition: Optional[Cursor] = None, @@ -2316,6 +2318,7 @@ def create_default_paginator( page_token_option=page_token_option, pagination_strategy=pagination_strategy, url_base=url_base, + cursor=cursor, config=config, parameters=model.parameters or {}, ) @@ -3149,6 +3152,7 @@ def create_simple_retriever( config: Config, *, name: str, + cursor: Cursor, primary_key: Optional[Union[str, List[str], List[List[str]]]], request_options_provider: Optional[RequestOptionsProvider] = None, stop_condition_cursor: Optional[Cursor] = None, @@ -3271,6 +3275,7 @@ def _get_url(req: Requester) -> str: extractor_model=model.record_selector.extractor, decoder=decoder, cursor_used_for_stop_condition=stop_condition_cursor or None, + cursor=cursor, ) if model.paginator else NoPagination(parameters={}) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py index b70ceaaeb..290f6e515 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py @@ -13,7 +13,7 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import ( HttpResponseFilter, ) -from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler +from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, ResponseAction from airbyte_cdk.sources.streams.http.error_handlers.response_models import ( SUCCESS_RESOLUTION, ErrorResolution, @@ -106,6 +106,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: if not self.response_filters: self.response_filters = [HttpResponseFilter(config=self.config, parameters={})] + self.response_filters = [ + # FIXME I have not wired up the RESET_PAGINATION in the model but assume I did and I configured this in the manifest.yaml... + HttpResponseFilter( + action=ResponseAction.RESET_PAGINATION, + http_codes={400}, + error_message_contains="You cannot access tickets beyond the 300th page", + config=self.config, + parameters={} + ) + ] + self.response_filters self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {} def interpret_response( diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py b/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py index cb1072a8c..1d25b8e0e 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py @@ -3,6 +3,7 @@ # from dataclasses import InitVar, dataclass, field +from sqlite3 import Cursor from typing import Any, Mapping, MutableMapping, Optional, Union import requests @@ -22,11 +23,9 @@ RequestOptionType, ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState -from airbyte_cdk.utils.mapping_helpers import ( - _validate_component_request_option_paths, - get_interpolation_context, -) +from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths @dataclass @@ -100,6 +99,7 @@ class DefaultPaginator(Paginator): """ pagination_strategy: PaginationStrategy + cursor: Cursor config: Config url_base: Union[InterpolatedString, str] parameters: InitVar[Mapping[str, Any]] @@ -223,6 +223,9 @@ def _get_request_options( return options + def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]: + return self.cursor.reduce_slice_range(stream_slice) + class PaginatorTestReadDecorator(Paginator): """ @@ -318,3 +321,10 @@ def get_request_body_json( return self._decorated.get_request_body_json( stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token ) + + def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]: + """ + We assume that this will not happen during test read because the feature relates to very long pagination and + hence we should not hit the maximum_number_of_pages limit. + """ + return stream_slice diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index b3b1d3b66..dd1165b25 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -74,3 +74,6 @@ def next_page_token( last_page_token_value: Optional[Any], ) -> Optional[Mapping[str, Any]]: return {} + + def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]: + return stream_slice diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index f8c31d4f5..58f4fd4c0 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -63,3 +63,7 @@ def path( :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token """ pass + + @abstractmethod + def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]: + pass diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ed83279de..23e633032 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -3,6 +3,7 @@ # import json +import logging from collections import defaultdict from dataclasses import InitVar, dataclass, field from functools import partial @@ -43,11 +44,14 @@ from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams.core import StreamData +from airbyte_cdk.sources.streams.http.http_client import PaginationResetRequiredException from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.utils.mapping_helpers import combine_mappings FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete" +LOGGER = logging.getLogger("airbyte") + @dataclass class SimpleRetriever(Retriever): @@ -392,56 +396,65 @@ def _read_pages( extra_fields={"query_properties": properties}, ) - response = self._fetch_next_page(stream_state, stream_slice, next_page_token) - for current_record in records_generator_fn(response): - if ( - current_record - and self.additional_query_properties - and self.additional_query_properties.property_chunking - ): - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record + try: + response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + for current_record in records_generator_fn(response): + if ( + current_record + and self.additional_query_properties + and self.additional_query_properties.property_chunking + ): + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) ) - ) - if merge_key: - _deep_merge(merged_records[merge_key], current_record) + if merge_key: + _deep_merge(merged_records[merge_key], current_record) + else: + # We should still emit records even if the record did not have a merge key + last_page_size += 1 + last_record = current_record + yield current_record else: - # We should still emit records even if the record did not have a merge key last_page_size += 1 last_record = current_record yield current_record + + if ( + self.additional_query_properties + and self.additional_query_properties.property_chunking + ): + for merged_record in merged_records.values(): + record = Record( + data=merged_record, stream_name=self.name, associated_slice=stream_slice + ) + last_page_size += 1 + last_record = record + yield record + + if not response: + pagination_complete = True else: - last_page_size += 1 - last_record = current_record - yield current_record - - if ( - self.additional_query_properties - and self.additional_query_properties.property_chunking - ): - for merged_record in merged_records.values(): - record = Record( - data=merged_record, stream_name=self.name, associated_slice=stream_slice + last_page_token_value = ( + next_page_token.get("next_page_token") if next_page_token else None + ) + next_page_token = self._next_page_token( + response=response, + last_page_size=last_page_size, + last_record=last_record, + last_page_token_value=last_page_token_value, + ) + if not next_page_token: + pagination_complete = True + except PaginationResetRequiredException: + initial_token = self._paginator.get_initial_token() + next_page_token: Optional[Mapping[str, Any]] = ( + {"next_page_token": initial_token} if initial_token is not None else None ) - last_page_size += 1 - last_record = record - yield record - - if not response: - pagination_complete = True - else: - last_page_token_value = ( - next_page_token.get("next_page_token") if next_page_token else None - ) - next_page_token = self._next_page_token( - response=response, - last_page_size=last_page_size, - last_record=last_record, - last_page_token_value=last_page_token_value, - ) - if not next_page_token: - pagination_complete = True + previous_slice = stream_slice + stream_slice = self._paginator.generate_stream_slice_on_reset(stream_slice) + LOGGER.info(f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}") # Always return an empty generator just in case no records were ever yielded yield from [] diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index ca63a6901..21e646124 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -86,6 +86,9 @@ def stream_slices(self) -> Iterable[StreamSlice]: """ yield StreamSlice(partition={}, cursor_slice={}) + def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: + return stream_slice + class FinalStateCursor(Cursor): """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" @@ -516,3 +519,17 @@ def _log_for_record_without_cursor_value(self) -> None: f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" ) self._should_be_synced_logger_triggered = True + + def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: + return StreamSlice( + partition=stream_slice.partition, + cursor_slice={ + self._slice_boundary_fields_wrapper[ + self._START_BOUNDARY + ]: self._connector_state_converter.output_format(self._most_recent_cursor_value_per_partition[stream_slice]), + self._slice_boundary_fields_wrapper[ + self._END_BOUNDARY + ]: stream_slice.cursor_slice[self._slice_boundary_fields_wrapper[self._END_BOUNDARY]], + }, + extra_fields=stream_slice.extra_fields, + ) diff --git a/airbyte_cdk/sources/streams/http/error_handlers/response_models.py b/airbyte_cdk/sources/streams/http/error_handlers/response_models.py index e882b89bd..861549aa3 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/response_models.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/response_models.py @@ -17,6 +17,7 @@ class ResponseAction(Enum): FAIL = "FAIL" IGNORE = "IGNORE" RATE_LIMITED = "RATE_LIMITED" + RESET_PAGINATION = "RESET_PAGINATION" @dataclass diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index bade96c9c..f26c6cf62 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -79,6 +79,10 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information +class PaginationResetRequiredException(Exception): + pass + + class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException): """ Before the migration to the HttpClient in low-code, the exception raised was @@ -428,6 +432,9 @@ def _handle_error_resolution( if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON: self._evict_key(request) + if error_resolution.response_action == ResponseAction.RESET_PAGINATION: + raise PaginationResetRequiredException() + # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached if error_resolution.response_action == ResponseAction.RATE_LIMITED: # TODO: Update to handle with message repository when concurrent message repository is ready