diff --git a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py index a3139d3da1..c51180e1fc 100644 --- a/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py +++ b/src/crawlee/crawlers/_adaptive_playwright/_adaptive_playwright_crawler.py @@ -71,7 +71,6 @@ def __init__(self) -> None: async def __aenter__(self) -> Self: self._active = True await self._state.initialize() - self._after_initialize() return self async def __aexit__( diff --git a/src/crawlee/statistics/_statistics.py b/src/crawlee/statistics/_statistics.py index 68b4ff6551..667b96eebe 100644 --- a/src/crawlee/statistics/_statistics.py +++ b/src/crawlee/statistics/_statistics.py @@ -1,6 +1,7 @@ # Inspiration: https://github.com/apify/crawlee/blob/v3.9.2/packages/core/src/crawlers/statistics.ts from __future__ import annotations +import asyncio import math import time from datetime import datetime, timedelta, timezone @@ -84,8 +85,6 @@ def __init__( self._id = Statistics.__next_id Statistics.__next_id += 1 - self._instance_start: datetime | None = None - self.error_tracker = ErrorTracker( save_error_snapshots=save_error_snapshots, snapshot_kvs_name=persist_state_kvs_name, @@ -111,6 +110,9 @@ def __init__( # Flag to indicate the context state. self._active = False + # Pre-existing runtime offset, that can be non-zero when restoring serialized state from KVS. + self._runtime_offset = timedelta(seconds=0) + def replace_state_model(self, state_model: type[TNewStatisticsState]) -> Statistics[TNewStatisticsState]: """Create near copy of the `Statistics` with replaced `state_model`.""" new_statistics: Statistics[TNewStatisticsState] = Statistics( @@ -165,14 +167,17 @@ async def __aenter__(self) -> Self: if self._active: raise RuntimeError(f'The {self.__class__.__name__} is already active.') - self._active = True - self._instance_start = datetime.now(timezone.utc) - await self._state.initialize() - self._after_initialize() + self._runtime_offset = self.state.crawler_runtime + + # Start periodic logging and let it print initial state before activation. self._periodic_logger.start() + await asyncio.sleep(0.01) + self._active = True + self.state.crawler_last_started_at = datetime.now(timezone.utc) + self.state.crawler_started_at = self.state.crawler_started_at or self.state.crawler_last_started_at return self async def __aexit__( @@ -191,14 +196,16 @@ async def __aexit__( if not self.state.crawler_last_started_at: raise RuntimeError('Statistics.state.crawler_last_started_at not set.') - self.state.crawler_finished_at = datetime.now(timezone.utc) - self.state.crawler_runtime += self.state.crawler_finished_at - self.state.crawler_last_started_at - - await self._state.teardown() + # Stop logging and deactivate the statistics to prevent further changes to crawler_runtime await self._periodic_logger.stop() + self.state.crawler_finished_at = datetime.now(timezone.utc) + self.state.crawler_runtime = ( + self._runtime_offset + self.state.crawler_finished_at - self.state.crawler_last_started_at + ) self._active = False + await self._state.teardown() @property def state(self) -> TStatisticsState: @@ -255,10 +262,19 @@ def record_request_processing_failure(self, request_id_or_key: str) -> None: del self._requests_in_progress[request_id_or_key] + def _update_crawler_runtime(self) -> None: + current_run_duration = ( + (datetime.now(timezone.utc) - self.state.crawler_last_started_at) + if self.state.crawler_last_started_at + else timedelta() + ) + self.state.crawler_runtime = current_run_duration + self._runtime_offset + def calculate(self) -> FinalStatistics: """Calculate the current statistics.""" - if self._instance_start is None: - raise RuntimeError('The Statistics object is not initialized') + if self._active: + # Only update state when active. If not, just report the last known runtime. + self._update_crawler_runtime() total_minutes = self.state.crawler_runtime.total_seconds() / 60 state = self._state.current_value @@ -291,21 +307,6 @@ def _log(self) -> None: else: self._periodic_message_logger.info(self._log_message, extra=stats.to_dict()) - def _after_initialize(self) -> None: - state = self._state.current_value - - if state.crawler_started_at is None: - state.crawler_started_at = datetime.now(timezone.utc) - - if state.stats_persisted_at is not None and state.crawler_last_started_at: - self._instance_start = datetime.now(timezone.utc) - ( - state.stats_persisted_at - state.crawler_last_started_at - ) - elif state.crawler_last_started_at: - self._instance_start = state.crawler_last_started_at - - state.crawler_last_started_at = self._instance_start - def _save_retry_count_for_request(self, record: RequestProcessingRecord) -> None: retry_count = record.retry_count state = self._state.current_value diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index b2b75e50f7..8173ec324b 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -1701,3 +1701,29 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None: assert first_run_state.crawler_finished_at < second_run_state.crawler_finished_at assert first_run_state.crawler_runtime < second_run_state.crawler_runtime + + +async def test_crawler_intermediate_statistics() -> None: + """Test that crawler statistics are correctly updating total runtime on every calculate call.""" + crawler = BasicCrawler() + check_time = timedelta(seconds=0.1) + + async def wait_for_statistics_initialization() -> None: + while not crawler.statistics.active: # noqa: ASYNC110 # It is ok for tests. + await asyncio.sleep(0.1) + + @crawler.router.default_handler + async def handler(_: BasicCrawlingContext) -> None: + await asyncio.sleep(check_time.total_seconds() * 5) + + # Start crawler and wait until statistics are initialized. + crawler_task = asyncio.create_task(crawler.run(['https://a.placeholder.com'])) + await wait_for_statistics_initialization() + + # Wait some time and check that runtime is updated. + await asyncio.sleep(check_time.total_seconds()) + crawler.statistics.calculate() + assert crawler.statistics.state.crawler_runtime >= check_time + + # Wait for crawler to finish + await crawler_task