Skip to content

Commit 1c7a9e6

Browse files
authored
fix: Synchronize charge operations to prevent race conditions (#684)
Closes: #666
1 parent f1af1d9 commit 1c7a9e6

File tree

2 files changed

+32
-19
lines changed

2 files changed

+32
-19
lines changed

src/apify/_actor.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,14 @@ def event_manager(self) -> EventManager:
354354
def _charging_manager_implementation(self) -> ChargingManagerImplementation:
355355
return ChargingManagerImplementation(self.configuration, self.apify_client)
356356

357+
@cached_property
358+
def _charge_lock(self) -> asyncio.Lock:
359+
"""Lock to synchronize charge operations.
360+
361+
Prevents race conditions between Actor.charge and Actor.push_data calls.
362+
"""
363+
return asyncio.Lock()
364+
357365
@cached_property
358366
def _storage_client(self) -> SmartApifyStorageClient:
359367
"""Storage client used by the Actor.
@@ -606,30 +614,34 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non
606614

607615
data = data if isinstance(data, list) else [data]
608616

609-
max_charged_count = (
610-
self.get_charging_manager().calculate_max_event_charge_count_within_limit(charged_event_name)
611-
if charged_event_name is not None
612-
else None
613-
)
617+
# No charging, just push the data without locking.
618+
if charged_event_name is None:
619+
dataset = await self.open_dataset()
620+
await dataset.push_data(data)
621+
return None
614622

615-
# Push as many items as we can charge for
616-
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)
623+
# If charging is requested, acquire the charge lock to prevent race conditions between concurrent
624+
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
625+
async with self._charge_lock:
626+
max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit(
627+
charged_event_name
628+
)
617629

618-
dataset = await self.open_dataset()
630+
# Push as many items as we can charge for.
631+
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)
619632

620-
if pushed_items_count < len(data):
621-
await dataset.push_data(data[:pushed_items_count])
622-
elif pushed_items_count > 0:
623-
await dataset.push_data(data)
633+
dataset = await self.open_dataset()
634+
635+
if pushed_items_count < len(data):
636+
await dataset.push_data(data[:pushed_items_count])
637+
elif pushed_items_count > 0:
638+
await dataset.push_data(data)
624639

625-
if charged_event_name:
626640
return await self.get_charging_manager().charge(
627641
event_name=charged_event_name,
628642
count=pushed_items_count,
629643
)
630644

631-
return None
632-
633645
async def get_input(self) -> Any:
634646
"""Get the Actor input value from the default key-value store associated with the current Actor run."""
635647
self._raise_if_not_initialized()
@@ -692,7 +704,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
692704
count: Number of events to charge for.
693705
"""
694706
self._raise_if_not_initialized()
695-
return await self.get_charging_manager().charge(event_name, count)
707+
# Acquire lock to prevent race conditions with concurrent charge/push_data calls.
708+
async with self._charge_lock:
709+
return await self.get_charging_manager().charge(event_name, count)
696710

697711
@overload
698712
def on(

src/apify/_charging.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,8 @@ def calculate_chargeable() -> dict[str, int | None]:
239239
pricing_info = self._pricing_info.get(
240240
event_name,
241241
PricingInfoItem(
242-
price=Decimal()
243-
if self._is_at_home
244-
else Decimal(1), # Use a nonzero price for local development so that the maximum budget can be reached,
242+
# Use a nonzero price for local development so that the maximum budget can be reached.
243+
price=Decimal() if self._is_at_home else Decimal(1),
245244
title=f"Unknown event '{event_name}'",
246245
),
247246
)

0 commit comments

Comments
 (0)