Skip to content

Commit f1af1d9

Browse files
authored
chore: Polish request queue Apify storage client (#695)
Follow-up to #643. It does not merge the clients. It only: - eliminates duplicated docstrings, - improves several docstrings, - fixes method ordering.
1 parent 6b1e851 commit f1af1d9

File tree

3 files changed

+242
-366
lines changed

3 files changed

+242
-366
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 77 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@
2525

2626

2727
class ApifyRequestQueueClient(RequestQueueClient):
28-
"""Base class for Apify platform implementations of the request queue client."""
28+
"""Request queue client for the Apify platform.
29+
30+
This client provides access to request queues stored on the Apify platform, supporting both single-consumer
31+
and multi-consumer scenarios. It manages local caching, request fetching, and state synchronization with the
32+
platform's API.
33+
"""
2934

3035
_MAX_CACHED_REQUESTS: Final[int] = 1_000_000
31-
"""Maximum number of requests that can be cached."""
36+
"""Maximum number of requests that can be cached locally."""
3237

3338
def __init__(
3439
self,
@@ -45,7 +50,8 @@ def __init__(
4550
"""The Apify request queue client for API operations."""
4651

4752
self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient
48-
"""Internal implementation used to communicate with the Apify platform based Request Queue."""
53+
"""Internal implementation used to communicate with the Apify platform based request queue."""
54+
4955
if access == 'single':
5056
self._implementation = ApifyRequestQueueSingleClient(
5157
api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS
@@ -60,119 +66,36 @@ def __init__(
6066
else:
6167
raise RuntimeError(f"Unsupported access type: {access}. Allowed values are 'single' or 'shared'.")
6268

63-
@property
64-
def _metadata(self) -> RequestQueueMetadata:
65-
return self._implementation.metadata
66-
67-
@override
68-
async def add_batch_of_requests(
69-
self,
70-
requests: Sequence[Request],
71-
*,
72-
forefront: bool = False,
73-
) -> AddRequestsResponse:
74-
"""Add a batch of requests to the queue.
75-
76-
Args:
77-
requests: The requests to add.
78-
forefront: Whether to add the requests to the beginning of the queue.
79-
80-
Returns:
81-
Response containing information about the added requests.
82-
"""
83-
return await self._implementation.add_batch_of_requests(requests, forefront=forefront)
84-
85-
@override
86-
async def fetch_next_request(self) -> Request | None:
87-
"""Return the next request in the queue to be processed.
88-
89-
Once you successfully finish processing of the request, you need to call `mark_request_as_handled`
90-
to mark the request as handled in the queue. If there was some error in processing the request, call
91-
`reclaim_request` instead, so that the queue will give the request to some other consumer
92-
in another call to the `fetch_next_request` method.
93-
94-
Returns:
95-
The request or `None` if there are no more pending requests.
96-
"""
97-
return await self._implementation.fetch_next_request()
98-
99-
@override
100-
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
101-
"""Mark a request as handled after successful processing.
102-
103-
Handled requests will never again be returned by the `fetch_next_request` method.
104-
105-
Args:
106-
request: The request to mark as handled.
107-
108-
Returns:
109-
Information about the queue operation. `None` if the given request was not in progress.
110-
"""
111-
return await self._implementation.mark_request_as_handled(request)
112-
113-
@override
114-
async def get_request(self, unique_key: str) -> Request | None:
115-
"""Get a request by unique key.
116-
117-
Args:
118-
unique_key: Unique key of the request to get.
119-
120-
Returns:
121-
The request or None if not found.
122-
"""
123-
return await self._implementation.get_request(unique_key)
124-
125-
@override
126-
async def reclaim_request(
127-
self,
128-
request: Request,
129-
*,
130-
forefront: bool = False,
131-
) -> ProcessedRequest | None:
132-
"""Reclaim a failed request back to the queue.
133-
134-
The request will be returned for processing later again by another call to `fetch_next_request`.
135-
136-
Args:
137-
request: The request to return to the queue.
138-
forefront: Whether to add the request to the head or the end of the queue.
139-
140-
Returns:
141-
Information about the queue operation. `None` if the given request was not in progress.
142-
"""
143-
return await self._implementation.reclaim_request(request, forefront=forefront)
144-
145-
@override
146-
async def is_empty(self) -> bool:
147-
"""Check if the queue is empty.
148-
149-
Returns:
150-
True if the queue is empty, False otherwise.
151-
"""
152-
return await self._implementation.is_empty()
153-
15469
@override
15570
async def get_metadata(self) -> ApifyRequestQueueMetadata:
156-
"""Get metadata about the request queue.
71+
"""Retrieve current metadata about the request queue.
72+
73+
This method fetches metadata from the Apify API and merges it with local estimations to provide
74+
the most up-to-date statistics. Local estimations are used to compensate for potential delays
75+
in API data propagation (typically a few seconds).
15776
15877
Returns:
159-
Metadata from the API, merged with local estimation, because in some cases, the data from the API can
160-
be delayed.
78+
Request queue metadata with accurate counts and timestamps, combining API data with local estimates.
16179
"""
16280
response = await self._api_client.get()
81+
16382
if response is None:
16483
raise ValueError('Failed to fetch request queue metadata from the API.')
165-
# Enhance API response by local estimations (API can be delayed few seconds, while local estimation not.)
84+
85+
# Enhance API response with local estimations to account for propagation delays (API data can be delayed
86+
# by a few seconds, while local estimates are immediately accurate).
16687
return ApifyRequestQueueMetadata(
16788
id=response['id'],
16889
name=response['name'],
169-
total_request_count=max(response['totalRequestCount'], self._metadata.total_request_count),
170-
handled_request_count=max(response['handledRequestCount'], self._metadata.handled_request_count),
90+
total_request_count=max(response['totalRequestCount'], self._implementation.metadata.total_request_count),
91+
handled_request_count=max(
92+
response['handledRequestCount'], self._implementation.metadata.handled_request_count
93+
),
17194
pending_request_count=response['pendingRequestCount'],
172-
created_at=min(response['createdAt'], self._metadata.created_at),
173-
modified_at=max(response['modifiedAt'], self._metadata.modified_at),
174-
accessed_at=max(response['accessedAt'], self._metadata.accessed_at),
175-
had_multiple_clients=response['hadMultipleClients'] or self._metadata.had_multiple_clients,
95+
created_at=min(response['createdAt'], self._implementation.metadata.created_at),
96+
modified_at=max(response['modifiedAt'], self._implementation.metadata.modified_at),
97+
accessed_at=max(response['accessedAt'], self._implementation.metadata.accessed_at),
98+
had_multiple_clients=response['hadMultipleClients'] or self._implementation.metadata.had_multiple_clients,
17699
stats=RequestQueueStats.model_validate(response['stats'], by_alias=True),
177100
)
178101

@@ -188,39 +111,27 @@ async def open(
188111
) -> ApifyRequestQueueClient:
189112
"""Open an Apify request queue client.
190113
191-
This method creates and initializes a new instance of the Apify request queue client. It handles
192-
authentication, storage lookup/creation, and metadata retrieval, and sets up internal caching and queue
193-
management structures.
114+
This method creates and initializes a new request queue client instance, handling authentication,
115+
storage lookup or creation, metadata retrieval, and initialization of internal caching structures.
194116
195117
Args:
196-
id: The ID of the RQ to open. If provided, searches for existing RQ by ID.
197-
Mutually exclusive with name and alias.
198-
name: The name of the RQ to open (global scope, persists across runs).
199-
Mutually exclusive with id and alias.
200-
alias: The alias of the RQ to open (run scope, creates unnamed storage).
201-
Mutually exclusive with id and name.
202-
configuration: The configuration object containing API credentials and settings. Must include a valid
203-
`token` and `api_base_url`. May also contain a `default_request_queue_id` for fallback when neither
204-
`id`, `name`, nor `alias` is provided.
205-
access: Controls the implementation of the request queue client based on expected scenario:
206-
- 'single' is suitable for single consumer scenarios. It makes less API calls, is cheaper and faster.
207-
- 'shared' is suitable for multiple consumers scenarios at the cost of higher API usage.
208-
Detailed constraints for the 'single' access type:
209-
- Only one client is consuming the request queue at the time.
210-
- Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to
211-
be handled so quickly as this client does not aggressively fetch the forefront and relies on local
212-
head estimation.
213-
- Requests are only added to the queue, never deleted by other clients. (Marking as handled is ok.)
214-
- Other producers can add new requests, but not modify existing ones.
215-
(Modifications would not be included in local cache)
118+
id: ID of an existing request queue to open. Mutually exclusive with `name` and `alias`.
119+
name: Name of the request queue to open or create (persists across Actor runs).
120+
Mutually exclusive with `id` and `alias`.
121+
alias: Alias for the request queue (scoped to current Actor run, creates unnamed storage).
122+
Mutually exclusive with `id` and `name`.
123+
configuration: Configuration object containing API credentials (`token`, `api_base_url`) and
124+
optionally a `default_request_queue_id` for fallback when no identifier is provided.
125+
access: Access mode controlling the client's behavior:
126+
- `single`: Optimized for single-consumer scenarios (lower API usage, better performance).
127+
- `shared`: Optimized for multi-consumer scenarios (more API calls, guaranteed consistency).
216128
217129
Returns:
218130
An instance for the opened or created storage client.
219131
220132
Raises:
221-
ValueError: If the configuration is missing required fields (token, api_base_url), if more than one of
222-
`id`, `name`, or `alias` is provided, or if none are provided and no default storage ID is available
223-
in the configuration.
133+
ValueError: If the configuration is missing required fields, if multiple identifiers (`id`, `name`,
134+
`alias`) are provided simultaneously, or if no identifier is provided and no default is configured.
224135
"""
225136
api_client = await create_storage_api_client(
226137
storage_type='RequestQueue',
@@ -230,10 +141,10 @@ async def open(
230141
id=id,
231142
)
232143

233-
# Fetch metadata separately
144+
# Fetch initial metadata from the API.
234145
raw_metadata = await api_client.get()
235146
if raw_metadata is None:
236-
raise ValueError('Failed to retrieve request queue metadata')
147+
raise ValueError('Failed to retrieve request queue metadata from the API.')
237148
metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata)
238149

239150
return cls(
@@ -252,3 +163,37 @@ async def purge(self) -> None:
252163
@override
253164
async def drop(self) -> None:
254165
await self._api_client.delete()
166+
167+
@override
168+
async def add_batch_of_requests(
169+
self,
170+
requests: Sequence[Request],
171+
*,
172+
forefront: bool = False,
173+
) -> AddRequestsResponse:
174+
return await self._implementation.add_batch_of_requests(requests, forefront=forefront)
175+
176+
@override
177+
async def fetch_next_request(self) -> Request | None:
178+
return await self._implementation.fetch_next_request()
179+
180+
@override
181+
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
182+
return await self._implementation.mark_request_as_handled(request)
183+
184+
@override
185+
async def get_request(self, unique_key: str) -> Request | None:
186+
return await self._implementation.get_request(unique_key)
187+
188+
@override
189+
async def reclaim_request(
190+
self,
191+
request: Request,
192+
*,
193+
forefront: bool = False,
194+
) -> ProcessedRequest | None:
195+
return await self._implementation.reclaim_request(request, forefront=forefront)
196+
197+
@override
198+
async def is_empty(self) -> bool:
199+
return await self._implementation.is_empty()

0 commit comments

Comments
 (0)