Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,7 @@ def create_default_stream(
client_side_incremental_sync={"cursor": concurrent_cursor}
if self._is_client_side_filtering_enabled(model)
else None,
cursor=concurrent_cursor,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
Expand Down Expand Up @@ -2279,6 +2280,7 @@ def create_default_paginator(
config: Config,
*,
url_base: str,
cursor: Cursor,
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
decoder: Optional[Decoder] = None,
cursor_used_for_stop_condition: Optional[Cursor] = None,
Expand Down Expand Up @@ -2316,6 +2318,7 @@ def create_default_paginator(
page_token_option=page_token_option,
pagination_strategy=pagination_strategy,
url_base=url_base,
cursor=cursor,
config=config,
parameters=model.parameters or {},
)
Expand Down Expand Up @@ -3149,6 +3152,7 @@ def create_simple_retriever(
config: Config,
*,
name: str,
cursor: Cursor,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_cursor: Optional[Cursor] = None,
Expand Down Expand Up @@ -3271,6 +3275,7 @@ def _get_url(req: Requester) -> str:
extractor_model=model.record_selector.extractor,
decoder=decoder,
cursor_used_for_stop_condition=stop_condition_cursor or None,
cursor=cursor,
)
if model.paginator
else NoPagination(parameters={})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import (
HttpResponseFilter,
)
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, ResponseAction
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
SUCCESS_RESOLUTION,
ErrorResolution,
Expand Down Expand Up @@ -106,6 +106,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not self.response_filters:
self.response_filters = [HttpResponseFilter(config=self.config, parameters={})]

self.response_filters = [
# FIXME I have not wired up the RESET_PAGINATION in the model but assume I did and I configured this in the manifest.yaml...
HttpResponseFilter(
action=ResponseAction.RESET_PAGINATION,
http_codes={400},
error_message_contains="You cannot access tickets beyond the 300th page",
config=self.config,
parameters={}
)
] + self.response_filters
self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {}

def interpret_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from sqlite3 import Cursor
from typing import Any, Mapping, MutableMapping, Optional, Union

import requests
Expand All @@ -22,11 +23,9 @@
RequestOptionType,
)
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import (
_validate_component_request_option_paths,
get_interpolation_context,
)
from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths


@dataclass
Expand Down Expand Up @@ -100,6 +99,7 @@ class DefaultPaginator(Paginator):
"""

pagination_strategy: PaginationStrategy
cursor: Cursor
config: Config
url_base: Union[InterpolatedString, str]
parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -223,6 +223,9 @@ def _get_request_options(

return options

def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
return self.cursor.reduce_slice_range(stream_slice)


class PaginatorTestReadDecorator(Paginator):
"""
Expand Down Expand Up @@ -318,3 +321,10 @@ def get_request_body_json(
return self._decorated.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
"""
We assume that this will not happen during test read because the feature relates to very long pagination and
hence we should not hit the maximum_number_of_pages limit.
"""
return stream_slice
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,6 @@ def next_page_token(
last_page_token_value: Optional[Any],
) -> Optional[Mapping[str, Any]]:
return {}

def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
return stream_slice
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ def path(
:return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
"""
pass

@abstractmethod
def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
pass
99 changes: 56 additions & 43 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import logging
from collections import defaultdict
from dataclasses import InitVar, dataclass, field
from functools import partial
Expand Down Expand Up @@ -43,11 +44,14 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http.http_client import PaginationResetRequiredException
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings

FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete"

LOGGER = logging.getLogger("airbyte")


@dataclass
class SimpleRetriever(Retriever):
Expand Down Expand Up @@ -392,56 +396,65 @@ def _read_pages(
extra_fields={"query_properties": properties},
)

response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
for current_record in records_generator_fn(response):
if (
current_record
and self.additional_query_properties
and self.additional_query_properties.property_chunking
):
merge_key = (
self.additional_query_properties.property_chunking.get_merge_key(
current_record
try:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
for current_record in records_generator_fn(response):
if (
current_record
and self.additional_query_properties
and self.additional_query_properties.property_chunking
):
merge_key = (
self.additional_query_properties.property_chunking.get_merge_key(
current_record
)
)
)
if merge_key:
_deep_merge(merged_records[merge_key], current_record)
if merge_key:
_deep_merge(merged_records[merge_key], current_record)
else:
# We should still emit records even if the record did not have a merge key
last_page_size += 1
last_record = current_record
yield current_record
else:
# We should still emit records even if the record did not have a merge key
last_page_size += 1
last_record = current_record
yield current_record

if (
self.additional_query_properties
and self.additional_query_properties.property_chunking
):
for merged_record in merged_records.values():
record = Record(
data=merged_record, stream_name=self.name, associated_slice=stream_slice
)
last_page_size += 1
last_record = record
yield record

if not response:
pagination_complete = True
else:
last_page_size += 1
last_record = current_record
yield current_record

if (
self.additional_query_properties
and self.additional_query_properties.property_chunking
):
for merged_record in merged_records.values():
record = Record(
data=merged_record, stream_name=self.name, associated_slice=stream_slice
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response=response,
last_page_size=last_page_size,
last_record=last_record,
last_page_token_value=last_page_token_value,
)
if not next_page_token:
pagination_complete = True
except PaginationResetRequiredException:
initial_token = self._paginator.get_initial_token()
next_page_token: Optional[Mapping[str, Any]] = (
{"next_page_token": initial_token} if initial_token is not None else None
)
last_page_size += 1
last_record = record
yield record

if not response:
pagination_complete = True
else:
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
next_page_token = self._next_page_token(
response=response,
last_page_size=last_page_size,
last_record=last_record,
last_page_token_value=last_page_token_value,
)
if not next_page_token:
pagination_complete = True
previous_slice = stream_slice
stream_slice = self._paginator.generate_stream_slice_on_reset(stream_slice)
LOGGER.info(f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}")

# Always return an empty generator just in case no records were ever yielded
yield from []
Expand Down
17 changes: 17 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def stream_slices(self) -> Iterable[StreamSlice]:
"""
yield StreamSlice(partition={}, cursor_slice={})

def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
return stream_slice


class FinalStateCursor(Cursor):
"""Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
Expand Down Expand Up @@ -516,3 +519,17 @@ def _log_for_record_without_cursor_value(self) -> None:
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
)
self._should_be_synced_logger_triggered = True

def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
return StreamSlice(
partition=stream_slice.partition,
cursor_slice={
self._slice_boundary_fields_wrapper[
self._START_BOUNDARY
]: self._connector_state_converter.output_format(self._most_recent_cursor_value_per_partition[stream_slice]),
self._slice_boundary_fields_wrapper[
self._END_BOUNDARY
]: stream_slice.cursor_slice[self._slice_boundary_fields_wrapper[self._END_BOUNDARY]],
},
extra_fields=stream_slice.extra_fields,
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ResponseAction(Enum):
FAIL = "FAIL"
IGNORE = "IGNORE"
RATE_LIMITED = "RATE_LIMITED"
RESET_PAGINATION = "RESET_PAGINATION"


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information


class PaginationResetRequiredException(Exception):
pass


class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException):
"""
Before the migration to the HttpClient in low-code, the exception raised was
Expand Down Expand Up @@ -428,6 +432,9 @@ def _handle_error_resolution(
if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
self._evict_key(request)

if error_resolution.response_action == ResponseAction.RESET_PAGINATION:
raise PaginationResetRequiredException()

# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
# TODO: Update to handle with message repository when concurrent message repository is ready
Expand Down
Loading