Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def status(self) -> AsyncJobStatus:
return AsyncJobStatus.FAILED
elif AsyncJobStatus.TIMED_OUT in statuses:
return AsyncJobStatus.TIMED_OUT
elif AsyncJobStatus.IGNORE in statuses:
return AsyncJobStatus.IGNORE
else:
return AsyncJobStatus.RUNNING

Expand Down Expand Up @@ -149,6 +151,7 @@ class AsyncJobOrchestrator:
AsyncJobStatus.FAILED,
AsyncJobStatus.RUNNING,
AsyncJobStatus.TIMED_OUT,
AsyncJobStatus.IGNORE,
}
_RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}

Expand Down Expand Up @@ -364,6 +367,11 @@ def _process_running_partitions_and_yield_completed_ones(
case _ if partition.has_reached_max_attempt():
self._stop_partition(partition)
self._process_partitions_with_errors(partition)
case AsyncJobStatus.IGNORE:
self._stop_partition(partition)
LOGGER.warning(
f"Stopping processing partition: {partition.stream_slice} due to received {AsyncJobStatus.IGNORE} status."
)
case _:
self._stop_timed_out_jobs(partition)
# re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/declarative/async_job/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class AsyncJobStatus(Enum):
COMPLETED = ("COMPLETED", _TERMINAL)
FAILED = ("FAILED", _TERMINAL)
TIMED_OUT = ("TIMED_OUT", _TERMINAL)
IGNORE = ("IGNORE", _TERMINAL)

def __init__(self, value: str, is_terminal: bool) -> None:
self._value = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3863,6 +3863,10 @@ definitions:
type: array
items:
type: string
ignore:
type: array
items:
type: string
AsyncRetriever:
title: Asynchronous Retriever
description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,7 @@ class AsyncJobStatusMap(BaseModel):
completed: List[str]
failed: List[str]
timeout: List[str]
ignore: Optional[List[str]] = None


class ValueType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3539,6 +3539,10 @@ def _create_async_job_status_mapping(
# This is an element of the dict because of the typing of the CDK but it is not a CDK status
continue

if api_statuses is None and cdk_status == "ignore":
# ignore status is not required
continue

for status in api_statuses:
if status in api_status_to_cdk_status:
raise ValueError(
Expand All @@ -3557,6 +3561,8 @@ def _get_async_job_status(self, status: str) -> AsyncJobStatus:
return AsyncJobStatus.FAILED
case "timeout":
return AsyncJobStatus.TIMED_OUT
case "ignore":
return AsyncJobStatus.IGNORE
case _:
raise ValueError(f"Unsupported CDK status {status}")

Expand Down
Loading