diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 399f42430..54db0b6e8 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -100,6 +100,8 @@ def status(self) -> AsyncJobStatus: return AsyncJobStatus.FAILED elif AsyncJobStatus.TIMED_OUT in statuses: return AsyncJobStatus.TIMED_OUT + elif AsyncJobStatus.IGNORE in statuses: + return AsyncJobStatus.IGNORE else: return AsyncJobStatus.RUNNING @@ -149,6 +151,7 @@ class AsyncJobOrchestrator: AsyncJobStatus.FAILED, AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT, + AsyncJobStatus.IGNORE, } _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} @@ -364,6 +367,11 @@ def _process_running_partitions_and_yield_completed_ones( case _ if partition.has_reached_max_attempt(): self._stop_partition(partition) self._process_partitions_with_errors(partition) + case AsyncJobStatus.IGNORE: + self._stop_partition(partition) + LOGGER.warning( + f"Stopping processing partition: {partition.stream_slice} due to received {AsyncJobStatus.IGNORE} status." + ) case _: self._stop_timed_out_jobs(partition) # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated diff --git a/airbyte_cdk/sources/declarative/async_job/status.py b/airbyte_cdk/sources/declarative/async_job/status.py index 586e79889..798740fec 100644 --- a/airbyte_cdk/sources/declarative/async_job/status.py +++ b/airbyte_cdk/sources/declarative/async_job/status.py @@ -11,6 +11,7 @@ class AsyncJobStatus(Enum): COMPLETED = ("COMPLETED", _TERMINAL) FAILED = ("FAILED", _TERMINAL) TIMED_OUT = ("TIMED_OUT", _TERMINAL) + IGNORE = ("IGNORE", _TERMINAL) def __init__(self, value: str, is_terminal: bool) -> None: self._value = value diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f5e9a8548..524255d05 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3863,6 +3863,10 @@ definitions: type: array items: type: string + ignore: + type: array + items: + type: string AsyncRetriever: title: Asynchronous Retriever description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 35186ef71..64eab2a48 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1206,6 +1206,7 @@ class AsyncJobStatusMap(BaseModel): completed: List[str] failed: List[str] timeout: List[str] + ignore: Optional[List[str]] = None class ValueType(Enum): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdaf26bba..ca891e55d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3539,6 +3539,10 @@ def _create_async_job_status_mapping( # This is an element of the dict because of the typing of the CDK but it is not a CDK status continue + if api_statuses is None and cdk_status == "ignore": + # ignore status is not required + continue + for status in api_statuses: if status in api_status_to_cdk_status: raise ValueError( @@ -3557,6 +3561,8 @@ def _get_async_job_status(self, status: str) -> AsyncJobStatus: return AsyncJobStatus.FAILED case "timeout": return AsyncJobStatus.TIMED_OUT + case "ignore": + return AsyncJobStatus.IGNORE case _: raise ValueError(f"Unsupported CDK status {status}")