Skip to content

Commit 1f4b726

Browse files
committed
fix:bug worker added to registry
1 parent 8fb4b6e commit 1f4b726

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

scheduler/redis_models/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def _deserialize(value: str, _type: Type) -> Any:
7676
class BaseModel:
7777
name: str
7878
_element_key_template: ClassVar[str] = ":element:{}"
79-
# fields that are not serializable using method above and should be dealt with in the subclass
80-
# e.g. args/kwargs for a job
79+
# fields that are not serializable using the method above and should be dealt with in the subclass
80+
# e.g., args/kwargs for a job
8181
_non_serializable_fields: ClassVar[Set[str]] = set()
8282

8383
@classmethod

scheduler/redis_models/registry/base_registry.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class DequeueTimeout(Exception):
1515
@dataclasses.dataclass(slots=True, kw_only=True)
1616
class ZSetModel(BaseModel):
1717
def cleanup(self, connection: ConnectionType, timestamp: Optional[float] = None) -> None:
18-
"""Remove expired jobs from registry."""
18+
"""Remove expired jobs from the registry."""
1919
score = timestamp or current_timestamp()
2020
connection.zremrangebyscore(self._key, 0, score)
2121

@@ -45,23 +45,23 @@ def __contains__(self, item: str) -> bool:
4545
return self.connection.zrank(self._key, item) is not None
4646

4747
def all(self, start: int = 0, end: int = -1) -> List[str]:
48-
"""Returns list of all job names.
48+
"""Returns a list of all job names.
4949
5050
:param start: Start score/timestamp, default to 0.
5151
:param end: End score/timestamp, default to -1 (i.e., no max score).
52-
:returns: Returns list of all job names with timestamp from start to end
52+
:returns: Returns a list of all job names with timestamp from start to end
5353
"""
5454
self.cleanup(self.connection)
5555
res = [as_str(job_name) for job_name in self.connection.zrange(self._key, start, end)]
5656
logger.debug(f"Getting jobs for registry {self._key}: {len(res)} found.")
5757
return res
5858

5959
def all_with_timestamps(self, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
60-
"""Returns list of all job names with their timestamps.
60+
"""Returns a list of all job names with their timestamps.
6161
6262
:param start: Start score/timestamp, default to 0.
6363
:param end: End score/timestamp, default to -1 (i.e., no max score).
64-
:returns: Returns list of all job names with timestamp from start to end
64+
:returns: Returns a list of all job names with timestamp from start to end
6565
"""
6666
self.cleanup(self.connection)
6767
res = self.connection.zrange(self._key, start, end, withscores=True)

scheduler/worker/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,10 +691,10 @@ def maintain_heartbeats(self, job: JobModel, queue: Queue) -> None:
691691
self._model.heartbeat(pipeline, self.job_monitoring_interval + 60)
692692
ttl = self.get_heartbeat_ttl(job)
693693

694-
queue.active_job_registry.add(pipeline, self.name, current_timestamp() + ttl, update_existing_only=False)
694+
queue.active_job_registry.add(pipeline, job.name, current_timestamp() + ttl, update_existing_only=False)
695695
results = pipeline.execute()
696696
if results[2] == 1:
697-
job.delete(self.connection)
697+
job.delete(queue.connection)
698698

699699
def execute_in_separate_process(self, job: JobModel, queue: Queue) -> None:
700700
"""This is the entry point of the newly spawned job execution process.

0 commit comments

Comments
 (0)