diff --git a/pyproject.toml b/pyproject.toml index 5393888860..e6170e5478 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ keywords = [ "scraping", ] dependencies = [ + "async-timeout>=5.0.1", "cachetools>=5.5.0", "colorama>=0.4.0", "impit>=0.8.0", diff --git a/src/crawlee/_utils/time.py b/src/crawlee/_utils/time.py index dc2521f6b5..f6aa7bc57f 100644 --- a/src/crawlee/_utils/time.py +++ b/src/crawlee/_utils/time.py @@ -3,11 +3,14 @@ import time from contextlib import contextmanager from dataclasses import dataclass +from datetime import timedelta from typing import TYPE_CHECKING +from async_timeout import Timeout, timeout + if TYPE_CHECKING: from collections.abc import Iterator - from datetime import timedelta + from types import TracebackType _SECONDS_PER_MINUTE = 60 _SECONDS_PER_HOUR = 3600 @@ -35,6 +38,43 @@ def measure_time() -> Iterator[TimerResult]: result.cpu = after_cpu - before_cpu +class SharedTimeout: + """Keeps track of a time budget shared by multiple independent async operations. + + Provides a reusable, non-reentrant context manager interface. + """ + + def __init__(self, timeout: timedelta) -> None: + self._remaining_timeout = timeout + self._active_timeout: Timeout | None = None + self._activation_timestamp: float | None = None + + async def __aenter__(self) -> timedelta: + if self._active_timeout is not None or self._activation_timestamp is not None: + raise RuntimeError('A shared timeout context cannot be entered twice at the same time') + + self._activation_timestamp = time.monotonic() + self._active_timeout = new_timeout = timeout(self._remaining_timeout.total_seconds()) + await new_timeout.__aenter__() + return self._remaining_timeout + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + if self._active_timeout is None or self._activation_timestamp is None: + raise RuntimeError('Logic error') + + await self._active_timeout.__aexit__(exc_type, exc_value, exc_traceback) + elapsed = time.monotonic() - self._activation_timestamp + self._remaining_timeout = self._remaining_timeout - timedelta(seconds=elapsed) + + self._active_timeout = None + self._activation_timestamp = None + + def format_duration(duration: timedelta | None) -> str: """Format a timedelta into a human-readable string with appropriate units.""" if duration is None: diff --git a/src/crawlee/crawlers/__init__.py b/src/crawlee/crawlers/__init__.py index 504e6acef4..ec280f94c4 100644 --- a/src/crawlee/crawlers/__init__.py +++ b/src/crawlee/crawlers/__init__.py @@ -1,7 +1,7 @@ from crawlee._utils.try_import import install_import_hook as _install_import_hook from crawlee._utils.try_import import try_import as _try_import -from ._abstract_http import AbstractHttpCrawler, AbstractHttpParser, ParsedHttpCrawlingContext +from ._abstract_http import AbstractHttpCrawler, AbstractHttpParser, HttpCrawlerOptions, ParsedHttpCrawlingContext from ._basic import BasicCrawler, BasicCrawlerOptions, BasicCrawlingContext, ContextPipeline from ._http import HttpCrawler, HttpCrawlingContext, HttpCrawlingResult @@ -51,6 +51,7 @@ 'BeautifulSoupParserType', 'ContextPipeline', 'HttpCrawler', + 'HttpCrawlerOptions', 'HttpCrawlingContext', 'HttpCrawlingResult', 'ParsedHttpCrawlingContext', diff --git a/src/crawlee/crawlers/_abstract_http/__init__.py b/src/crawlee/crawlers/_abstract_http/__init__.py index 1e684fd948..85e3c3b0b7 100644 --- a/src/crawlee/crawlers/_abstract_http/__init__.py +++ b/src/crawlee/crawlers/_abstract_http/__init__.py @@ -1,9 +1,10 @@ -from ._abstract_http_crawler import AbstractHttpCrawler +from ._abstract_http_crawler import AbstractHttpCrawler, HttpCrawlerOptions from ._abstract_http_parser import AbstractHttpParser from ._http_crawling_context import ParsedHttpCrawlingContext __all__ = [ 'AbstractHttpCrawler', 'AbstractHttpParser', + 'HttpCrawlerOptions', 'ParsedHttpCrawlingContext', ] diff --git a/src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py b/src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py index b2678149df..d4cebcb9fe 100644 --- a/src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py +++ b/src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py @@ -3,14 +3,16 @@ import asyncio import logging from abc import ABC +from datetime import timedelta from typing import TYPE_CHECKING, Any, Generic from more_itertools import partition from pydantic import ValidationError -from typing_extensions import TypeVar +from typing_extensions import NotRequired, TypeVar from crawlee._request import Request, RequestOptions from crawlee._utils.docs import docs_group +from crawlee._utils.time import SharedTimeout from crawlee._utils.urls import to_absolute_url_iterator from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline from crawlee.errors import SessionError @@ -32,6 +34,19 @@ TStatisticsState = TypeVar('TStatisticsState', bound=StatisticsState, default=StatisticsState) +class HttpCrawlerOptions( + BasicCrawlerOptions[TCrawlingContext, TStatisticsState], + Generic[TCrawlingContext, TStatisticsState], +): + """Arguments for the `AbstractHttpCrawler` constructor. + + It is intended for typing forwarded `__init__` arguments in the subclasses. + """ + + navigation_timeout: NotRequired[timedelta | None] + """Timeout for the HTTP request.""" + + @docs_group('Crawlers') class AbstractHttpCrawler( BasicCrawler[TCrawlingContext, StatisticsState], @@ -56,10 +71,13 @@ def __init__( self, *, parser: AbstractHttpParser[TParseResult, TSelectResult], + navigation_timeout: timedelta | None = None, **kwargs: Unpack[BasicCrawlerOptions[TCrawlingContext, StatisticsState]], ) -> None: self._parser = parser + self._navigation_timeout = navigation_timeout or timedelta(minutes=1) self._pre_navigation_hooks: list[Callable[[BasicCrawlingContext], Awaitable[None]]] = [] + self._shared_navigation_timeouts: dict[int, SharedTimeout] = {} if '_context_pipeline' not in kwargs: raise ValueError( @@ -112,9 +130,17 @@ def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpC async def _execute_pre_navigation_hooks( self, context: BasicCrawlingContext ) -> AsyncGenerator[BasicCrawlingContext, None]: - for hook in self._pre_navigation_hooks: - await hook(context) - yield context + context_id = id(context) + self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout) + + try: + for hook in self._pre_navigation_hooks: + async with self._shared_navigation_timeouts[context_id]: + await hook(context) + + yield context + finally: + self._shared_navigation_timeouts.pop(context_id, None) async def _parse_http_response( self, context: HttpCrawlingContext @@ -216,12 +242,14 @@ async def _make_http_request(self, context: BasicCrawlingContext) -> AsyncGenera Yields: The original crawling context enhanced by HTTP response. """ - result = await self._http_client.crawl( - request=context.request, - session=context.session, - proxy_info=context.proxy_info, - statistics=self._statistics, - ) + async with self._shared_navigation_timeouts[id(context)] as remaining_timeout: + result = await self._http_client.crawl( + request=context.request, + session=context.session, + proxy_info=context.proxy_info, + statistics=self._statistics, + timeout=remaining_timeout, + ) yield HttpCrawlingContext.from_basic_crawling_context(context=context, http_response=result.http_response) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 79027aeba0..6e4574277e 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -1507,12 +1507,15 @@ async def __run_task_function(self) -> None: raise async def _run_request_handler(self, context: BasicCrawlingContext) -> None: - await wait_for( - lambda: self._context_pipeline(context, self.router), - timeout=self._request_handler_timeout, - timeout_message=f'{self._request_handler_timeout_text}' - f' {self._request_handler_timeout.total_seconds()} seconds', - logger=self._logger, + await self._context_pipeline( + context, + lambda final_context: wait_for( + lambda: self.router(final_context), + timeout=self._request_handler_timeout, + timeout_message=f'{self._request_handler_timeout_text}' + f' {self._request_handler_timeout.total_seconds()} seconds', + logger=self._logger, + ), ) def _raise_for_error_status_code(self, status_code: int) -> None: diff --git a/src/crawlee/crawlers/_beautifulsoup/_beautifulsoup_crawler.py b/src/crawlee/crawlers/_beautifulsoup/_beautifulsoup_crawler.py index 2e0f155fa2..919f26221e 100644 --- a/src/crawlee/crawlers/_beautifulsoup/_beautifulsoup_crawler.py +++ b/src/crawlee/crawlers/_beautifulsoup/_beautifulsoup_crawler.py @@ -5,7 +5,7 @@ from bs4 import BeautifulSoup, Tag from crawlee._utils.docs import docs_group -from crawlee.crawlers import AbstractHttpCrawler, BasicCrawlerOptions +from crawlee.crawlers import AbstractHttpCrawler, HttpCrawlerOptions from ._beautifulsoup_crawling_context import BeautifulSoupCrawlingContext from ._beautifulsoup_parser import BeautifulSoupParser, BeautifulSoupParserType @@ -58,7 +58,7 @@ def __init__( self, *, parser: BeautifulSoupParserType = 'lxml', - **kwargs: Unpack[BasicCrawlerOptions[BeautifulSoupCrawlingContext]], + **kwargs: Unpack[HttpCrawlerOptions[BeautifulSoupCrawlingContext]], ) -> None: """Initialize a new instance. diff --git a/src/crawlee/crawlers/_parsel/_parsel_crawler.py b/src/crawlee/crawlers/_parsel/_parsel_crawler.py index 7be2a36540..ac8e9c9f09 100644 --- a/src/crawlee/crawlers/_parsel/_parsel_crawler.py +++ b/src/crawlee/crawlers/_parsel/_parsel_crawler.py @@ -5,7 +5,7 @@ from parsel import Selector from crawlee._utils.docs import docs_group -from crawlee.crawlers import AbstractHttpCrawler, BasicCrawlerOptions +from crawlee.crawlers import AbstractHttpCrawler, HttpCrawlerOptions from ._parsel_crawling_context import ParselCrawlingContext from ._parsel_parser import ParselParser @@ -56,7 +56,7 @@ async def request_handler(context: ParselCrawlingContext) -> None: def __init__( self, - **kwargs: Unpack[BasicCrawlerOptions[ParselCrawlingContext]], + **kwargs: Unpack[HttpCrawlerOptions[ParselCrawlingContext]], ) -> None: """Initialize a new instance. diff --git a/src/crawlee/crawlers/_playwright/_playwright_crawler.py b/src/crawlee/crawlers/_playwright/_playwright_crawler.py index 8bb5c7a219..bf7a27af16 100644 --- a/src/crawlee/crawlers/_playwright/_playwright_crawler.py +++ b/src/crawlee/crawlers/_playwright/_playwright_crawler.py @@ -3,19 +3,25 @@ import asyncio import logging import warnings +from datetime import timedelta from functools import partial from typing import TYPE_CHECKING, Any, Generic, Literal +import playwright.async_api from more_itertools import partition from pydantic import ValidationError from typing_extensions import NotRequired, TypedDict, TypeVar from crawlee import service_locator from crawlee._request import Request, RequestOptions -from crawlee._types import ConcurrencySettings +from crawlee._types import ( + BasicCrawlingContext, + ConcurrencySettings, +) from crawlee._utils.blocked import RETRY_CSS_SELECTORS from crawlee._utils.docs import docs_group from crawlee._utils.robots import RobotsTxtFile +from crawlee._utils.time import SharedTimeout from crawlee._utils.urls import to_absolute_url_iterator from crawlee.browsers import BrowserPool from crawlee.crawlers._basic import BasicCrawler, BasicCrawlerOptions, ContextPipeline @@ -44,7 +50,6 @@ from crawlee import RequestTransformAction from crawlee._types import ( - BasicCrawlingContext, EnqueueLinksKwargs, ExtractLinksFunction, HttpHeaders, @@ -106,6 +111,7 @@ def __init__( fingerprint_generator: FingerprintGenerator | None | Literal['default'] = 'default', headless: bool | None = None, use_incognito_pages: bool | None = None, + navigation_timeout: timedelta | None = None, **kwargs: Unpack[BasicCrawlerOptions[PlaywrightCrawlingContext, StatisticsState]], ) -> None: """Initialize a new instance. @@ -134,12 +140,16 @@ def __init__( use_incognito_pages: By default pages share the same browser context. If set to True each page uses its own context that is destroyed once the page is closed or crashes. This option should not be used if `browser_pool` is provided. + navigation_timeout: Timeout for navigation (the process between opening a Playwright page and calling + the request handler) kwargs: Additional keyword arguments to pass to the underlying `BasicCrawler`. """ configuration = kwargs.pop('configuration', None) if configuration is not None: service_locator.set_configuration(configuration) + self._shared_navigation_timeouts: dict[int, SharedTimeout] = {} + if browser_pool: # Raise an exception if browser_pool is provided together with other browser-related arguments. if any( @@ -202,6 +212,8 @@ def __init__( if 'concurrency_settings' not in kwargs or kwargs['concurrency_settings'] is None: kwargs['concurrency_settings'] = ConcurrencySettings(desired_concurrency=1) + self._navigation_timeout = navigation_timeout or timedelta(minutes=1) + super().__init__(**kwargs) async def _open_page( @@ -228,10 +240,18 @@ async def _open_page( block_requests=partial(block_requests, page=crawlee_page.page), ) - async with browser_page_context(crawlee_page.page): - for hook in self._pre_navigation_hooks: - await hook(pre_navigation_context) - yield pre_navigation_context + context_id = id(pre_navigation_context) + self._shared_navigation_timeouts[context_id] = SharedTimeout(self._navigation_timeout) + + try: + async with browser_page_context(crawlee_page.page): + for hook in self._pre_navigation_hooks: + async with self._shared_navigation_timeouts[context_id]: + await hook(pre_navigation_context) + + yield pre_navigation_context + finally: + self._shared_navigation_timeouts.pop(context_id, None) def _prepare_request_interceptor( self, @@ -266,6 +286,7 @@ async def _navigate( Raises: ValueError: If the browser pool is not initialized. SessionError: If the URL cannot be loaded by the browser. + TimeoutError: If navigation does not succeed within the navigation timeout. Yields: The enhanced crawling context with the Playwright-specific features (page, response, enqueue_links, @@ -297,7 +318,13 @@ async def _navigate( # Set route_handler only for current request await context.page.route(context.request.url, route_handler) - response = await context.page.goto(context.request.url) + try: + async with self._shared_navigation_timeouts[id(context)] as remaining_timeout: + response = await context.page.goto( + context.request.url, timeout=remaining_timeout.total_seconds() * 1000 + ) + except playwright.async_api.TimeoutError as exc: + raise asyncio.TimeoutError from exc if response is None: raise SessionError(f'Failed to load the URL: {context.request.url}') diff --git a/src/crawlee/crawlers/_playwright/_playwright_http_client.py b/src/crawlee/crawlers/_playwright/_playwright_http_client.py index 5a72b80506..e522b6d63a 100644 --- a/src/crawlee/crawlers/_playwright/_playwright_http_client.py +++ b/src/crawlee/crawlers/_playwright/_playwright_http_client.py @@ -59,6 +59,7 @@ async def crawl( session: Session | None = None, proxy_info: ProxyInfo | None = None, statistics: Statistics | None = None, + timeout: timedelta | None = None, ) -> HttpCrawlingResult: raise NotImplementedError('The `crawl` method should not be used for `PlaywrightHttpClient`') @@ -72,6 +73,7 @@ async def send_request( payload: HttpPayload | None = None, session: Session | None = None, proxy_info: ProxyInfo | None = None, + timeout: timedelta | None = None, ) -> HttpResponse: # `proxy_info` are not used because `APIRequestContext` inherits the proxy from `BrowserContext` # TODO: Use `session` to restore all the fingerprint headers according to the `BrowserContext`, after resolved @@ -87,7 +89,11 @@ async def send_request( # Proxies appropriate to the browser context are used response = await browser_context.request.fetch( - url_or_request=url, method=method.lower(), headers=dict(headers) if headers else None, data=payload + url_or_request=url, + method=method.lower(), + headers=dict(headers) if headers else None, + data=payload, + timeout=timeout.total_seconds() if timeout else None, ) return await PlaywrightHttpResponse.from_playwright_response(response, protocol='') diff --git a/src/crawlee/http_clients/_base.py b/src/crawlee/http_clients/_base.py index 77db1e8617..0bc53ec761 100644 --- a/src/crawlee/http_clients/_base.py +++ b/src/crawlee/http_clients/_base.py @@ -104,6 +104,7 @@ async def crawl( session: Session | None = None, proxy_info: ProxyInfo | None = None, statistics: Statistics | None = None, + timeout: timedelta | None = None, ) -> HttpCrawlingResult: """Perform the crawling for a given request. @@ -114,6 +115,7 @@ async def crawl( session: The session associated with the request. proxy_info: The information about the proxy to be used. statistics: The statistics object to register status codes. + timeout: Maximum time allowed to process the request. Raises: ProxyError: Raised if a proxy-related error occurs. @@ -132,6 +134,7 @@ async def send_request( payload: HttpPayload | None = None, session: Session | None = None, proxy_info: ProxyInfo | None = None, + timeout: timedelta | None = None, ) -> HttpResponse: """Send an HTTP request via the client. @@ -144,6 +147,7 @@ async def send_request( payload: The data to be sent as the request body. session: The session associated with the request. proxy_info: The information about the proxy to be used. + timeout: Maximum time allowed to process the request. Raises: ProxyError: Raised if a proxy-related error occurs. diff --git a/src/crawlee/http_clients/_curl_impersonate.py b/src/crawlee/http_clients/_curl_impersonate.py index 63c0314f0b..b4eff2421b 100644 --- a/src/crawlee/http_clients/_curl_impersonate.py +++ b/src/crawlee/http_clients/_curl_impersonate.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any @@ -10,6 +11,7 @@ from curl_cffi.requests.cookies import CurlMorsel from curl_cffi.requests.exceptions import ProxyError as CurlProxyError from curl_cffi.requests.exceptions import RequestException as CurlRequestError +from curl_cffi.requests.exceptions import Timeout from curl_cffi.requests.impersonate import DEFAULT_CHROME as CURL_DEFAULT_CHROME from typing_extensions import override @@ -147,6 +149,7 @@ async def crawl( session: Session | None = None, proxy_info: ProxyInfo | None = None, statistics: Statistics | None = None, + timeout: timedelta | None = None, ) -> HttpCrawlingResult: client = self._get_client(proxy_info.url if proxy_info else None) @@ -157,7 +160,10 @@ async def crawl( headers=request.headers, data=request.payload, cookies=session.cookies.jar if session else None, + timeout=timeout.total_seconds() if timeout else None, ) + except Timeout as exc: + raise asyncio.TimeoutError from exc except CurlRequestError as exc: if self._is_proxy_error(exc): raise ProxyError from exc @@ -186,6 +192,7 @@ async def send_request( payload: HttpPayload | None = None, session: Session | None = None, proxy_info: ProxyInfo | None = None, + timeout: timedelta | None = None, ) -> HttpResponse: if isinstance(headers, dict) or headers is None: headers = HttpHeaders(headers or {}) @@ -200,7 +207,10 @@ async def send_request( headers=dict(headers) if headers else None, data=payload, cookies=session.cookies.jar if session else None, + timeout=timeout.total_seconds() if timeout else None, ) + except Timeout as exc: + raise asyncio.TimeoutError from exc except CurlRequestError as exc: if self._is_proxy_error(exc): raise ProxyError from exc @@ -241,6 +251,8 @@ async def stream( stream=True, timeout=timeout.total_seconds() if timeout else None, ) + except Timeout as exc: + raise asyncio.TimeoutError from exc except CurlRequestError as exc: if self._is_proxy_error(exc): raise ProxyError from exc diff --git a/src/crawlee/http_clients/_httpx.py b/src/crawlee/http_clients/_httpx.py index 7f6d8a17fd..257bfa10ae 100644 --- a/src/crawlee/http_clients/_httpx.py +++ b/src/crawlee/http_clients/_httpx.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from contextlib import asynccontextmanager from logging import getLogger from typing import TYPE_CHECKING, Any, cast @@ -146,6 +147,7 @@ async def crawl( session: Session | None = None, proxy_info: ProxyInfo | None = None, statistics: Statistics | None = None, + timeout: timedelta | None = None, ) -> HttpCrawlingResult: client = self._get_client(proxy_info.url if proxy_info else None) headers = self._combine_headers(request.headers) @@ -157,10 +159,13 @@ async def crawl( content=request.payload, cookies=session.cookies.jar if session else None, extensions={'crawlee_session': session if self._persist_cookies_per_session else None}, + timeout=timeout.total_seconds() if timeout is not None else httpx.USE_CLIENT_DEFAULT, ) try: response = await client.send(http_request) + except httpx.TimeoutException as exc: + raise asyncio.TimeoutError from exc except httpx.TransportError as exc: if self._is_proxy_error(exc): raise ProxyError from exc @@ -185,6 +190,7 @@ async def send_request( payload: HttpPayload | None = None, session: Session | None = None, proxy_info: ProxyInfo | None = None, + timeout: timedelta | None = None, ) -> HttpResponse: client = self._get_client(proxy_info.url if proxy_info else None) @@ -195,10 +201,13 @@ async def send_request( headers=headers, payload=payload, session=session, + timeout=httpx.Timeout(timeout.total_seconds()) if timeout is not None else None, ) try: response = await client.send(http_request) + except httpx.TimeoutException as exc: + raise asyncio.TimeoutError from exc except httpx.TransportError as exc: if self._is_proxy_error(exc): raise ProxyError from exc @@ -228,10 +237,13 @@ async def stream( headers=headers, payload=payload, session=session, - timeout=timeout, + timeout=httpx.Timeout(None, connect=timeout.total_seconds()) if timeout else None, ) - response = await client.send(http_request, stream=True) + try: + response = await client.send(http_request, stream=True) + except httpx.TimeoutException as exc: + raise asyncio.TimeoutError from exc try: yield _HttpxResponse(response) @@ -246,7 +258,7 @@ def _build_request( headers: HttpHeaders | dict[str, str] | None, payload: HttpPayload | None, session: Session | None = None, - timeout: timedelta | None = None, + timeout: httpx.Timeout | None = None, ) -> httpx.Request: """Build an `httpx.Request` using the provided parameters.""" if isinstance(headers, dict) or headers is None: @@ -254,15 +266,13 @@ def _build_request( headers = self._combine_headers(headers) - httpx_timeout = httpx.Timeout(None, connect=timeout.total_seconds()) if timeout else None - return client.build_request( url=url, method=method, headers=dict(headers) if headers else None, content=payload, extensions={'crawlee_session': session if self._persist_cookies_per_session else None}, - timeout=httpx_timeout, + timeout=timeout if timeout else httpx.USE_CLIENT_DEFAULT, ) def _get_client(self, proxy_url: str | None) -> httpx.AsyncClient: diff --git a/src/crawlee/http_clients/_impit.py b/src/crawlee/http_clients/_impit.py index 39edbab75d..0fca9c94ef 100644 --- a/src/crawlee/http_clients/_impit.py +++ b/src/crawlee/http_clients/_impit.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any, TypedDict from cachetools import LRUCache -from impit import AsyncClient, Browser, HTTPError, Response, TransportError +from impit import AsyncClient, Browser, HTTPError, Response, TimeoutException, TransportError from impit import ProxyError as ImpitProxyError from typing_extensions import override @@ -125,6 +125,7 @@ async def crawl( session: Session | None = None, proxy_info: ProxyInfo | None = None, statistics: Statistics | None = None, + timeout: timedelta | None = None, ) -> HttpCrawlingResult: client = self._get_client(proxy_info.url if proxy_info else None, session.cookies.jar if session else None) @@ -134,7 +135,10 @@ async def crawl( method=request.method, content=request.payload, headers=dict(request.headers) if request.headers else None, + timeout=timeout.total_seconds() if timeout else None, ) + except TimeoutException as exc: + raise asyncio.TimeoutError from exc except (TransportError, HTTPError) as exc: if self._is_proxy_error(exc): raise ProxyError from exc @@ -157,6 +161,7 @@ async def send_request( payload: HttpPayload | None = None, session: Session | None = None, proxy_info: ProxyInfo | None = None, + timeout: timedelta | None = None, ) -> HttpResponse: if isinstance(headers, dict) or headers is None: headers = HttpHeaders(headers or {}) @@ -165,8 +170,14 @@ async def send_request( try: response = await client.request( - method=method, url=url, content=payload, headers=dict(headers) if headers else None + method=method, + url=url, + content=payload, + headers=dict(headers) if headers else None, + timeout=timeout.total_seconds() if timeout else None, ) + except TimeoutException as exc: + raise asyncio.TimeoutError from exc except (TransportError, HTTPError) as exc: if self._is_proxy_error(exc): raise ProxyError from exc @@ -189,14 +200,18 @@ async def stream( ) -> AsyncGenerator[HttpResponse]: client = self._get_client(proxy_info.url if proxy_info else None, session.cookies.jar if session else None) - response = await client.request( - method=method, - url=url, - content=payload, - headers=dict(headers) if headers else None, - timeout=timeout.total_seconds() if timeout else None, - stream=True, - ) + try: + response = await client.request( + method=method, + url=url, + content=payload, + headers=dict(headers) if headers else None, + timeout=timeout.total_seconds() if timeout else None, + stream=True, + ) + except TimeoutException as exc: + raise asyncio.TimeoutError from exc + try: yield _ImpitResponse(response) finally: diff --git a/tests/unit/_utils/test_shared_timeout.py b/tests/unit/_utils/test_shared_timeout.py new file mode 100644 index 0000000000..beea7779ad --- /dev/null +++ b/tests/unit/_utils/test_shared_timeout.py @@ -0,0 +1,57 @@ +import asyncio +from datetime import timedelta + +import pytest + +from crawlee._utils.time import SharedTimeout, measure_time + + +async def test_shared_timeout_tracks_elapsed_time() -> None: + timeout_duration = timedelta(seconds=1) + shared_timeout = SharedTimeout(timeout_duration) + + # First usage + async with shared_timeout: + await asyncio.sleep(0.2) + + # Second usage - should have less time remaining + async with shared_timeout as remaining: + assert remaining < timedelta(seconds=0.85) + assert remaining > timedelta(seconds=0) + + +async def test_shared_timeout_expires() -> None: + timeout_duration = timedelta(seconds=0.1) + shared_timeout = SharedTimeout(timeout_duration) + + with measure_time() as elapsed, pytest.raises(asyncio.TimeoutError): + async with shared_timeout: + await asyncio.sleep(0.5) + + assert elapsed.wall is not None + assert elapsed.wall < 0.3 + + +async def test_shared_timeout_cannot_be_nested() -> None: + timeout_duration = timedelta(seconds=1) + shared_timeout = SharedTimeout(timeout_duration) + + async with shared_timeout: + with pytest.raises(RuntimeError, match='cannot be entered twice'): + async with shared_timeout: + pass + + +async def test_shared_timeout_multiple_sequential_uses() -> None: + """Test that SharedTimeout can be used multiple times sequentially.""" + timeout_duration = timedelta(seconds=1) + shared_timeout = SharedTimeout(timeout_duration) + + for _ in range(5): + async with shared_timeout: + await asyncio.sleep(0.05) + + # Should have consumed roughly 0.25 seconds + async with shared_timeout as remaining: + assert remaining < timedelta(seconds=0.8) + assert remaining > timedelta(seconds=0) diff --git a/tests/unit/_utils/test_timedelata_ms.py b/tests/unit/_utils/test_timedelta_ms.py similarity index 100% rename from tests/unit/_utils/test_timedelata_ms.py rename to tests/unit/_utils/test_timedelta_ms.py diff --git a/tests/unit/crawlers/_beautifulsoup/test_beautifulsoup_crawler.py b/tests/unit/crawlers/_beautifulsoup/test_beautifulsoup_crawler.py index 6b79eef895..71447ec20c 100644 --- a/tests/unit/crawlers/_beautifulsoup/test_beautifulsoup_crawler.py +++ b/tests/unit/crawlers/_beautifulsoup/test_beautifulsoup_crawler.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +from datetime import timedelta from typing import TYPE_CHECKING from unittest import mock @@ -341,3 +343,60 @@ async def handler(context: BeautifulSoupCrawlingContext) -> None: await context.enqueue_links(rq_id=queue_id, rq_name=queue_name, rq_alias=queue_alias) await crawler.run([str(server_url / 'start_enqueue')]) + + +async def test_navigation_timeout_on_slow_request(server_url: URL, http_client: HttpClient) -> None: + """Test that navigation_timeout causes TimeoutError on slow HTTP requests.""" + crawler = BeautifulSoupCrawler( + http_client=http_client, + navigation_timeout=timedelta(seconds=1), + max_request_retries=0, + ) + + failed_request_handler = mock.AsyncMock() + crawler.failed_request_handler(failed_request_handler) + + request_handler = mock.AsyncMock() + crawler.router.default_handler(request_handler) + + # Request endpoint that delays 5 seconds - should timeout at 1 second + await crawler.run([str(server_url.with_path('/slow').with_query(delay=5))]) + + assert failed_request_handler.call_count == 1 + assert isinstance(failed_request_handler.call_args[0][1], asyncio.TimeoutError) + + +async def test_navigation_timeout_applies_to_hooks(server_url: URL) -> None: + crawler = BeautifulSoupCrawler( + navigation_timeout=timedelta(seconds=1), + max_request_retries=0, + ) + + request_handler = mock.AsyncMock() + crawler.router.default_handler(request_handler) + crawler.pre_navigation_hook(lambda _: asyncio.sleep(1)) + + # Pre-navigation hook takes 1 second (exceeds navigation timeout), so the URL will not be handled + result = await crawler.run([str(server_url)]) + + assert result.requests_failed == 1 + assert result.requests_finished == 0 + assert request_handler.call_count == 0 + + +async def test_slow_navigation_does_not_count_toward_handler_timeout(server_url: URL, http_client: HttpClient) -> None: + crawler = BeautifulSoupCrawler( + http_client=http_client, + request_handler_timeout=timedelta(seconds=0.5), + max_request_retries=0, + ) + + request_handler = mock.AsyncMock() + crawler.router.default_handler(request_handler) + + # Navigation takes 1 second (exceeds handler timeout), but should still succeed + result = await crawler.run([str(server_url.with_path('/slow').with_query(delay=1))]) + + assert result.requests_failed == 0 + assert result.requests_finished == 1 + assert request_handler.call_count == 1 diff --git a/tests/unit/crawlers/_playwright/test_playwright_crawler.py b/tests/unit/crawlers/_playwright/test_playwright_crawler.py index c7346ef20f..90b4af293c 100644 --- a/tests/unit/crawlers/_playwright/test_playwright_crawler.py +++ b/tests/unit/crawlers/_playwright/test_playwright_crawler.py @@ -4,11 +4,13 @@ from __future__ import annotations +import asyncio import json import logging +from datetime import timedelta from typing import TYPE_CHECKING, Any, Literal from unittest import mock -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock import pytest @@ -925,3 +927,61 @@ async def handler(context: PlaywrightCrawlingContext) -> None: await context.enqueue_links(rq_id=queue_id, rq_name=queue_name, rq_alias=queue_alias) await crawler.run([str(server_url / 'start_enqueue')]) + + +async def test_navigation_timeout_on_slow_page_load(server_url: URL) -> None: + crawler = PlaywrightCrawler( + navigation_timeout=timedelta(seconds=1), + max_request_retries=0, + ) + + request_handler = AsyncMock() + crawler.router.default_handler(request_handler) + + failed_request_handler = AsyncMock() + crawler.failed_request_handler(failed_request_handler) + + result = await crawler.run([str((server_url / 'slow').with_query(delay=2))]) + + assert result.requests_failed == 1 + assert result.requests_finished == 0 + + assert request_handler.call_count == 0 + + assert failed_request_handler.call_count == 1 + assert isinstance(failed_request_handler.call_args[0][1], asyncio.TimeoutError) + + +async def test_navigation_timeout_applies_to_hooks(server_url: URL) -> None: + crawler = PlaywrightCrawler( + navigation_timeout=timedelta(seconds=0.5), + max_request_retries=0, + ) + + request_handler = AsyncMock() + crawler.router.default_handler(request_handler) + crawler.pre_navigation_hook(lambda _: asyncio.sleep(1)) + + # Pre-navigation hook takes 1 second (exceeds navigation timeout), so the URL will not be handled + result = await crawler.run([str(server_url)]) + + assert result.requests_failed == 1 + assert result.requests_finished == 0 + assert request_handler.call_count == 0 + + +async def test_slow_navigation_does_not_count_toward_handler_timeout(server_url: URL) -> None: + crawler = PlaywrightCrawler( + request_handler_timeout=timedelta(seconds=0.5), + max_request_retries=0, + ) + + request_handler = AsyncMock() + crawler.router.default_handler(request_handler) + + # Navigation takes 1 second (exceeds handler timeout), but should still succeed + result = await crawler.run([str((server_url / 'slow').with_query(delay=1))]) + + assert result.requests_failed == 0 + assert result.requests_finished == 1 + assert request_handler.call_count == 1 diff --git a/tests/unit/server.py b/tests/unit/server.py index 879c9407fd..8f37472872 100644 --- a/tests/unit/server.py +++ b/tests/unit/server.py @@ -123,6 +123,7 @@ async def app(scope: dict[str, Any], receive: Receive, send: Send) -> None: 'xml': hello_world_xml, 'robots.txt': robots_txt, 'get_compressed': get_compressed, + 'slow': slow_response, 'infinite_scroll': infinite_scroll_endpoint, 'resource_loading_page': resource_loading_endpoint, } @@ -415,6 +416,15 @@ async def get_compressed(_scope: dict[str, Any], _receive: Receive, send: Send) await send({'type': 'http.response.body', 'body': gzip.compress(HELLO_WORLD * 1000)}) +async def slow_response(scope: dict[str, Any], _receive: Receive, send: Send) -> None: + """Handle requests with a configurable delay to test timeouts.""" + query_params = get_query_params(scope.get('query_string', b'')) + delay = float(query_params.get('delay', '5')) # Default 5 second delay + + await asyncio.sleep(delay) + await send_html_response(send, HELLO_WORLD) + + async def infinite_scroll_endpoint(_scope: dict[str, Any], _receive: Receive, send: Send) -> None: """Handle requests for the infinite scroll page.""" await send_html_response( diff --git a/uv.lock b/uv.lock index 1c4501c039..8139d61837 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.13'", @@ -694,6 +694,7 @@ name = "crawlee" version = "1.1.1" source = { editable = "." } dependencies = [ + { name = "async-timeout" }, { name = "cachetools" }, { name = "colorama" }, { name = "impit" }, @@ -821,6 +822,7 @@ requires-dist = [ { name = "apify-fingerprint-datapoints", marker = "extra == 'adaptive-crawler'", specifier = ">=0.0.2" }, { name = "apify-fingerprint-datapoints", marker = "extra == 'httpx'", specifier = ">=0.0.2" }, { name = "apify-fingerprint-datapoints", marker = "extra == 'playwright'", specifier = ">=0.0.2" }, + { name = "async-timeout", specifier = ">=5.0.1" }, { name = "asyncpg", marker = "python_full_version < '3.14' and extra == 'sql-postgres'", specifier = ">=0.24.0" }, { name = "beautifulsoup4", extras = ["lxml"], marker = "extra == 'beautifulsoup'", specifier = ">=4.12.0" }, { name = "browserforge", marker = "extra == 'adaptive-crawler'", specifier = ">=1.2.3" }, @@ -1044,7 +1046,7 @@ name = "exceptiongroup" version = "1.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.13'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/50/79/66800aadf48771f6b62f7eb014e352e5d06856655206165d775e675a02c9/exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219", size = 30371, upload-time = "2025-11-21T23:01:54.787Z" } wheels = [