From ca23abd75b6f6f97bd2aac2add844086f3639637 Mon Sep 17 00:00:00 2001 From: Str1kez Date: Sun, 28 Sep 2025 23:32:03 +0300 Subject: [PATCH 1/4] Fix timeout retries --- arq/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index f1e613c9..58a3f43c 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -592,7 +592,7 @@ async def job_failed(exc: BaseException) -> None: # run repr(result) and extra inside try/except as they can raise exceptions try: result = await asyncio.wait_for(task, timeout_s) - except (Exception, asyncio.CancelledError) as e: + except (Exception, asyncio.CancelledError, asyncio.TimeoutError) as e: exc_extra = getattr(e, 'extra', None) if callable(exc_extra): exc_extra = exc_extra() @@ -602,7 +602,7 @@ async def job_failed(exc: BaseException) -> None: finally: del self.job_tasks[job_id] - except (Exception, asyncio.CancelledError) as e: + except (Exception, asyncio.CancelledError, asyncio.TimeoutError) as e: finished_ms = timestamp_ms() t = (finished_ms - start_ms) / 1000 if self.retry_jobs and isinstance(e, Retry): @@ -617,7 +617,7 @@ async def job_failed(exc: BaseException) -> None: finish = True self.aborting_tasks.remove(job_id) self.jobs_failed += 1 - elif self.retry_jobs and isinstance(e, (asyncio.CancelledError, RetryJob)): + elif self.retry_jobs and isinstance(e, (asyncio.TimeoutError, RetryJob)): logger.info('%6.2fs ↻ %s cancelled, will be run again', t, ref) self.jobs_retried += 1 else: From 5085659b0f024eca39104fdee91bc69a6497bf75 Mon Sep 17 00:00:00 2001 From: Str1kez Date: Mon, 29 Sep 2025 00:01:46 +0300 Subject: [PATCH 2/4] refactor(worker): handle error only on retries line --- arq/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 58a3f43c..6058f66a 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -592,7 +592,7 @@ async def job_failed(exc: BaseException) -> None: # run repr(result) and extra inside try/except as they can raise exceptions try: result = await asyncio.wait_for(task, timeout_s) - except (Exception, asyncio.CancelledError, asyncio.TimeoutError) as e: + except (Exception, asyncio.CancelledError) as e: exc_extra = getattr(e, 'extra', None) if callable(exc_extra): exc_extra = exc_extra() @@ -602,7 +602,7 @@ async def job_failed(exc: BaseException) -> None: finally: del self.job_tasks[job_id] - except (Exception, asyncio.CancelledError, asyncio.TimeoutError) as e: + except (Exception, asyncio.CancelledError) as e: finished_ms = timestamp_ms() t = (finished_ms - start_ms) / 1000 if self.retry_jobs and isinstance(e, Retry): @@ -617,7 +617,7 @@ async def job_failed(exc: BaseException) -> None: finish = True self.aborting_tasks.remove(job_id) self.jobs_failed += 1 - elif self.retry_jobs and isinstance(e, (asyncio.TimeoutError, RetryJob)): + elif self.retry_jobs and isinstance(e, (asyncio.CancelledError, asyncio.TimeoutError, RetryJob)): logger.info('%6.2fs ↻ %s cancelled, will be run again', t, ref) self.jobs_retried += 1 else: From be8a4c60837d4e40cb132acaa258bdbed5d8cd61 Mon Sep 17 00:00:00 2001 From: Str1kez Date: Mon, 29 Sep 2025 00:12:32 +0300 Subject: [PATCH 3/4] fix(worker): test_job_timeout --- tests/test_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 93fbc7f0..38f3196b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -980,10 +980,11 @@ async def longfunc(ctx): assert worker.jobs_complete == 0 assert worker.jobs_failed == 0 assert worker.jobs_retried == 0 + assert worker.retry_jobs await worker.main() assert worker.jobs_complete == 0 assert worker.jobs_failed == 1 - assert worker.jobs_retried == 0 + assert worker.jobs_retried == worker.max_tries log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records)) assert 'X.XXs ! testing:longfunc failed, TimeoutError:' in log From bdf2f1f72c9e2d9d859b3a8ffafa2495dfb81144 Mon Sep 17 00:00:00 2001 From: Str1kez Date: Mon, 29 Sep 2025 00:17:35 +0300 Subject: [PATCH 4/4] fix(worker): test_job_timeout --- tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 38f3196b..38e5e124 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -974,7 +974,7 @@ async def test_job_timeout(arq_redis: ArqRedis, worker, caplog): async def longfunc(ctx): await asyncio.sleep(0.3) - caplog.set_level(logging.ERROR) + caplog.set_level(logging.INFO) await arq_redis.enqueue_job('longfunc', _job_id='testing') worker: Worker = worker(functions=[func(longfunc, name='longfunc')], job_timeout=0.2, poll_delay=0.1) assert worker.jobs_complete == 0 @@ -986,7 +986,7 @@ async def longfunc(ctx): assert worker.jobs_failed == 1 assert worker.jobs_retried == worker.max_tries log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records)) - assert 'X.XXs ! testing:longfunc failed, TimeoutError:' in log + assert f'X.XXs ! testing:longfunc max retries {worker.max_tries} exceeded' in log async def test_on_job(arq_redis: ArqRedis, worker):