From 12c55ec219b6ca675ef6c0fb77f57b344a9deba9 Mon Sep 17 00:00:00 2001 From: Antonije Velevski Date: Sun, 7 Sep 2025 15:08:59 +0300 Subject: [PATCH 1/8] fix(job-runner): prevent jobs from being double-counted in queue and progress Previously, jobs were marked as "in progress" in `get_jobs()` immediately after being fetched from the server, before they were actually scheduled by `run_jobs()`. This caused jobs to appear both in the queue and in progress simultaneously, which inflated `current_occupancy()` and blocked new jobs from starting until earlier ones had finished. The fix moves the `job_progress.add(job)` call into `run_jobs()`, right after the job is dequeued and scheduled as a task. This ensures that: - Jobs are only marked "in progress" once they actually start running. - Queue and progress metrics no longer overlap. - Concurrency limits are enforced correctly (e.g. with concurrency=4, two jobs now run in parallel instead of sequentially). As a result, jobs are dispatched immediately when capacity is available, and status logging reflects the real state of the worker. --- runpod/serverless/modules/rp_scale.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 5c7d79cc..70447c64 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -188,7 +188,6 @@ async def get_jobs(self, session: ClientSession): for job in acquired_jobs: await self.jobs_queue.put(job) - self.job_progress.add(job) log.debug("Job Queued", job["id"]) log.info(f"Jobs in queue: {self.jobs_queue.qsize()}") @@ -225,6 +224,7 @@ async def run_jobs(self, session: ClientSession): # Fetch as many jobs as the concurrency allows while len(tasks) < self.current_concurrency and not self.jobs_queue.empty(): job = await self.jobs_queue.get() + self.job_progress.add(job) # Create a new task for each job and add it to the task list task = asyncio.create_task(self.handle_job(session, job)) From 7625d91145287dc1060931a33743a50b024d4198 Mon Sep 17 00:00:00 2001 From: antonije Date: Sun, 7 Sep 2025 16:03:24 +0300 Subject: [PATCH 2/8] - Stop recreating the `job_queue` and don't set the size right away. --- runpod/serverless/modules/rp_scale.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 70447c64..f3a9034d 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -48,7 +48,7 @@ def __init__(self, config: Dict[str, Any]): self.config = config self.job_progress = JobsProgress() # Cache the singleton instance - self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency) + self.jobs_queue = asyncio.Queue() self.concurrency_modifier = _default_concurrency_modifier self.jobs_fetcher = get_job @@ -72,9 +72,9 @@ def __init__(self, config: Dict[str, Any]): self.jobs_handler = jobs_handler async def set_scale(self): - self.current_concurrency = self.concurrency_modifier(self.current_concurrency) + new_concurrency = self.concurrency_modifier(self.current_concurrency) - if self.jobs_queue and (self.current_concurrency == self.jobs_queue.maxsize): + if new_concurrency == self.current_concurrency: # no need to resize return @@ -83,10 +83,8 @@ async def set_scale(self): await asyncio.sleep(1) continue - self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency) - log.debug( - f"JobScaler.set_scale | New concurrency set to: {self.current_concurrency}" - ) + self.current_concurrency = new_concurrency + log.debug(f"JobScaler.set_scale | New concurrency set to: {self.current_concurrency}") def start(self): """ @@ -221,10 +219,12 @@ async def run_jobs(self, session: ClientSession): tasks = [] # Store the tasks for concurrent job processing while self.is_alive() or not self.jobs_queue.empty(): + log.debug(f"Task count: {len(tasks)}, Queue size: {self.jobs_queue.qsize()}, Concurrency: {self.current_concurrency}") # Fetch as many jobs as the concurrency allows while len(tasks) < self.current_concurrency and not self.jobs_queue.empty(): job = await self.jobs_queue.get() self.job_progress.add(job) + log.info(f"Dequeued job {job['id']}, now running. Queue size: {self.jobs_queue.qsize()}") # Create a new task for each job and add it to the task list task = asyncio.create_task(self.handle_job(session, job)) From 763c843d35490dbb1ec17ed049a5a0f73516b153 Mon Sep 17 00:00:00 2001 From: antonije Date: Sun, 7 Sep 2025 17:06:46 +0300 Subject: [PATCH 3/8] - Stop recreating the `job_queue` and don't set the size right away. --- runpod/serverless/modules/rp_scale.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index f3a9034d..7a1d0a3b 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -222,9 +222,10 @@ async def run_jobs(self, session: ClientSession): log.debug(f"Task count: {len(tasks)}, Queue size: {self.jobs_queue.qsize()}, Concurrency: {self.current_concurrency}") # Fetch as many jobs as the concurrency allows while len(tasks) < self.current_concurrency and not self.jobs_queue.empty(): + log.debug(f"About to get a job from the queue. Queue size: {self.jobs_queue.qsize()}") job = await self.jobs_queue.get() self.job_progress.add(job) - log.info(f"Dequeued job {job['id']}, now running. Queue size: {self.jobs_queue.qsize()}") + log.debug(f"Dequeued job {job['id']}, now running. Queue size: {self.jobs_queue.qsize()}") # Create a new task for each job and add it to the task list task = asyncio.create_task(self.handle_job(session, job)) From 8d33f2eb4c546a28d1e58c9fb6dfeabe45042eae Mon Sep 17 00:00:00 2001 From: antonije Date: Sun, 7 Sep 2025 17:51:26 +0300 Subject: [PATCH 4/8] - No more blocking and waiting until a job completes, but instead prune finished tasks each tick. --- runpod/serverless/modules/rp_scale.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 7a1d0a3b..ea838555 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -218,7 +218,7 @@ async def run_jobs(self, session: ClientSession): """ tasks = [] # Store the tasks for concurrent job processing - while self.is_alive() or not self.jobs_queue.empty(): + while self.is_alive() or not self.jobs_queue.empty() or tasks: log.debug(f"Task count: {len(tasks)}, Queue size: {self.jobs_queue.qsize()}, Concurrency: {self.current_concurrency}") # Fetch as many jobs as the concurrency allows while len(tasks) < self.current_concurrency and not self.jobs_queue.empty(): @@ -231,16 +231,14 @@ async def run_jobs(self, session: ClientSession): task = asyncio.create_task(self.handle_job(session, job)) tasks.append(task) - # Wait for any job to finish + # Prune completed tasks + tasks = [t for t in tasks if not t.done()] + if tasks: log.info(f"Jobs in progress: {len(tasks)}") - - done, pending = await asyncio.wait( - tasks, return_when=asyncio.FIRST_COMPLETED - ) - - # Remove completed tasks from the list - tasks = [t for t in tasks if t not in done] + else: + # If no jobs running, don’t spin CPU at 100% + await asyncio.sleep(0.5) # Yield control back to the event loop await asyncio.sleep(0) From 5520f0b2f3e071cb3de0730b410e222da66b674a Mon Sep 17 00:00:00 2001 From: antonije Date: Sun, 7 Sep 2025 18:26:08 +0300 Subject: [PATCH 5/8] - wait a bit when there are no jobs --- runpod/serverless/modules/rp_scale.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index ea838555..582c7559 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -219,10 +219,10 @@ async def run_jobs(self, session: ClientSession): tasks = [] # Store the tasks for concurrent job processing while self.is_alive() or not self.jobs_queue.empty() or tasks: - log.debug(f"Task count: {len(tasks)}, Queue size: {self.jobs_queue.qsize()}, Concurrency: {self.current_concurrency}") + # log.debug(f"Task count: {len(tasks)}, Queue size: {self.jobs_queue.qsize()}, Concurrency: {self.current_concurrency}") # Fetch as many jobs as the concurrency allows while len(tasks) < self.current_concurrency and not self.jobs_queue.empty(): - log.debug(f"About to get a job from the queue. Queue size: {self.jobs_queue.qsize()}") + # log.debug(f"About to get a job from the queue. Queue size: {self.jobs_queue.qsize()}") job = await self.jobs_queue.get() self.job_progress.add(job) log.debug(f"Dequeued job {job['id']}, now running. Queue size: {self.jobs_queue.qsize()}") @@ -234,14 +234,13 @@ async def run_jobs(self, session: ClientSession): # Prune completed tasks tasks = [t for t in tasks if not t.done()] - if tasks: - log.info(f"Jobs in progress: {len(tasks)}") + if tasks or not self.jobs_queue.empty(): + # log.info(f"Jobs in progress: {len(tasks)}") + await asyncio.sleep(0) # actively process work else: # If no jobs running, don’t spin CPU at 100% - await asyncio.sleep(0.5) - - # Yield control back to the event loop - await asyncio.sleep(0) + log.info("No jobs running, sleeping for 1 second.") + await asyncio.sleep(1) # Ensure all remaining tasks finish before stopping await asyncio.gather(*tasks) From b4f844dd926b5b6ef170c196232182e29a220f18 Mon Sep 17 00:00:00 2001 From: antonije Date: Mon, 8 Sep 2025 00:19:05 +0300 Subject: [PATCH 6/8] - wait a bit when there are jobs --- runpod/serverless/modules/rp_scale.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 582c7559..2def4cb8 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -208,7 +208,7 @@ async def get_jobs(self, session: ClientSession): ) finally: # Yield control back to the event loop - await asyncio.sleep(0) + await asyncio.sleep(0.1) async def run_jobs(self, session: ClientSession): """ @@ -235,10 +235,10 @@ async def run_jobs(self, session: ClientSession): tasks = [t for t in tasks if not t.done()] if tasks or not self.jobs_queue.empty(): - # log.info(f"Jobs in progress: {len(tasks)}") - await asyncio.sleep(0) # actively process work + # Work is active → check often for new jobs + await asyncio.sleep(0.5) else: - # If no jobs running, don’t spin CPU at 100% + # Fully idle → sleep longer to save CPU log.info("No jobs running, sleeping for 1 second.") await asyncio.sleep(1) From 68041ce2fa532e0a5bc6b2da8216aeb493a3ea99 Mon Sep 17 00:00:00 2001 From: antonije Date: Thu, 11 Sep 2025 14:27:50 +0300 Subject: [PATCH 7/8] - make `tasks` be a set of asyncio.Task and auto-discarded when done - add a timeout of 100 milliseconds that doesn't block the code and accepts new jobs --- runpod/serverless/modules/rp_scale.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 2def4cb8..aea5f9a0 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -216,10 +216,9 @@ async def run_jobs(self, session: ClientSession): Runs the block in an infinite loop while the worker is alive or jobs queue is not empty. """ - tasks = [] # Store the tasks for concurrent job processing + tasks: set[asyncio.Task] = set() # Store the tasks for concurrent job processing while self.is_alive() or not self.jobs_queue.empty() or tasks: - # log.debug(f"Task count: {len(tasks)}, Queue size: {self.jobs_queue.qsize()}, Concurrency: {self.current_concurrency}") # Fetch as many jobs as the concurrency allows while len(tasks) < self.current_concurrency and not self.jobs_queue.empty(): # log.debug(f"About to get a job from the queue. Queue size: {self.jobs_queue.qsize()}") @@ -229,18 +228,15 @@ async def run_jobs(self, session: ClientSession): # Create a new task for each job and add it to the task list task = asyncio.create_task(self.handle_job(session, job)) - tasks.append(task) + tasks.add(task) + task.add_done_callback(tasks.discard) - # Prune completed tasks - tasks = [t for t in tasks if not t.done()] - - if tasks or not self.jobs_queue.empty(): - # Work is active → check often for new jobs - await asyncio.sleep(0.5) + # 2. If jobs are running, wait a little for completions + if tasks: + await asyncio.wait(tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED) else: - # Fully idle → sleep longer to save CPU - log.info("No jobs running, sleeping for 1 second.") - await asyncio.sleep(1) + # Nothing running — don’t spin CPU + await asyncio.sleep(0.5) # Ensure all remaining tasks finish before stopping await asyncio.gather(*tasks) From 0c3c35c0e383a990a3a182845c20c5856a567518 Mon Sep 17 00:00:00 2001 From: antonije Date: Thu, 11 Sep 2025 16:45:49 +0300 Subject: [PATCH 8/8] - stop using `add_done_callback` so we can avoid mutation bugs --- runpod/serverless/modules/rp_scale.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index aea5f9a0..e9890cf3 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -229,11 +229,17 @@ async def run_jobs(self, session: ClientSession): # Create a new task for each job and add it to the task list task = asyncio.create_task(self.handle_job(session, job)) tasks.add(task) - task.add_done_callback(tasks.discard) # 2. If jobs are running, wait a little for completions if tasks: - await asyncio.wait(tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED) + # Wait for at least one task to finish + done, pending = await asyncio.wait( + tasks, + timeout=0.1, + return_when=asyncio.FIRST_COMPLETED, + ) + # Remove completed tasks + tasks.difference_update(done) else: # Nothing running — don’t spin CPU await asyncio.sleep(0.5)