Skip to content

Commit 33f3a10

Browse files
committed
rebase
1 parent 517cf52 commit 33f3a10

File tree

7 files changed

+642
-0
lines changed

7 files changed

+642
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
from .api import JobStatus, Scheduler
8+
from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig
9+
10+
11+
async def scheduler_impl(config: SchedulerConfig) -> Scheduler:
12+
"""
13+
Factory function to instantiate scheduler implementations.
14+
15+
Args:
16+
config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig)
17+
18+
Returns:
19+
Scheduler: An initialized scheduler instance
20+
21+
Raises:
22+
ValueError: If the config type is unknown
23+
"""
24+
impl: Scheduler
25+
if isinstance(config, InlineSchedulerConfig):
26+
from .inline import InlineSchedulerImpl
27+
28+
impl = InlineSchedulerImpl(config)
29+
elif isinstance(config, CelerySchedulerConfig):
30+
from .celery import CelerySchedulerImpl
31+
32+
impl = CelerySchedulerImpl(config)
33+
else:
34+
raise ValueError(f"Unknown scheduler config type: {type(config)}")
35+
36+
await impl.initialize()
37+
return impl
38+
39+
40+
__all__ = [
41+
"JobStatus",
42+
"Scheduler",
43+
"SchedulerConfig",
44+
"InlineSchedulerConfig",
45+
"CelerySchedulerConfig",
46+
"scheduler_impl",
47+
]
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
from collections.abc import Awaitable, Callable
8+
from enum import StrEnum
9+
from typing import Protocol
10+
11+
12+
class JobStatus(StrEnum):
13+
PENDING = "pending"
14+
RUNNING = "running"
15+
COMPLETED = "completed"
16+
FAILED = "failed"
17+
CANCELLED = "cancelled"
18+
19+
20+
class Scheduler(Protocol):
21+
"""
22+
Abstract scheduler protocol for managing async jobs.
23+
24+
This provides a pluggable backend for job scheduling and execution.
25+
"""
26+
27+
async def schedule_job(
28+
self,
29+
job_type: str,
30+
job_data: dict,
31+
metadata: dict | None = None,
32+
) -> str:
33+
"""
34+
Schedule a new job for execution.
35+
36+
Args:
37+
job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection")
38+
job_data: Job-specific data and parameters
39+
metadata: Additional metadata for tracking
40+
41+
Returns:
42+
job_id: UUID for tracking the job
43+
"""
44+
...
45+
46+
async def get_job_info(self, job_id: str) -> dict:
47+
"""
48+
Get complete information about a job.
49+
50+
Returns:
51+
{
52+
"job_id": str,
53+
"job_type": str,
54+
"status": JobStatus,
55+
"created_at": datetime,
56+
"started_at": datetime | None,
57+
"completed_at": datetime | None,
58+
"progress": float, # 0.0 to 1.0
59+
"metadata": dict,
60+
"error": str | None, # Error message if status == FAILED
61+
"result": dict | None, # Job result if status == COMPLETED
62+
}
63+
"""
64+
...
65+
66+
async def cancel_job(self, job_id: str) -> bool:
67+
"""
68+
Cancel a pending or running job.
69+
70+
Args:
71+
job_id: Job to cancel
72+
73+
Returns:
74+
True if job was cancelled, False if not found or already completed
75+
"""
76+
...
77+
78+
async def delete_job(self, job_id: str) -> bool:
79+
"""
80+
Delete a completed or cancelled job.
81+
82+
Args:
83+
job_id: Job to delete
84+
85+
Returns:
86+
True if job was deleted, False if not found
87+
"""
88+
...
89+
90+
async def list_jobs(
91+
self,
92+
job_type: str | None = None,
93+
status: JobStatus | None = None,
94+
limit: int = 100,
95+
offset: int = 0,
96+
) -> list[dict]:
97+
"""
98+
List jobs with optional filtering.
99+
100+
Args:
101+
job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection")
102+
status: Filter by status
103+
limit: Maximum number of jobs to return
104+
offset: Offset for pagination
105+
106+
Returns:
107+
List of job info dictionaries
108+
"""
109+
...
110+
111+
def register_job_executor(
112+
self,
113+
job_type: str,
114+
executor: Callable[[dict], Awaitable[dict]],
115+
) -> None:
116+
"""
117+
Register a job executor function for a specific job type.
118+
119+
This allows components to register custom job execution logic with the scheduler.
120+
When a job of the registered type is executed, the scheduler will call the
121+
registered executor function.
122+
123+
Args:
124+
job_type: The type of job (e.g., "vector_store_file_batch")
125+
executor: Async function that takes job_data dict and returns result dict
126+
"""
127+
...
128+
129+
async def initialize(self) -> None:
130+
"""Initialize the scheduler (connect to backend, etc.)"""
131+
...
132+
133+
async def shutdown(self) -> None:
134+
"""Gracefully shutdown the scheduler"""
135+
...
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
from .scheduler import CelerySchedulerImpl
8+
9+
__all__ = ["CelerySchedulerImpl"]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
from collections.abc import Awaitable, Callable
8+
9+
from ..api import JobStatus, Scheduler
10+
from ..config import CelerySchedulerConfig
11+
12+
13+
class CelerySchedulerImpl(Scheduler):
14+
"""
15+
Celery-based scheduler implementation for distributed multi-worker deployments.
16+
17+
This scheduler uses Celery to distribute jobs across multiple worker processes
18+
or machines. It provides:
19+
- Persistent job queue (via Redis/RabbitMQ broker)
20+
- Multi-worker coordination
21+
- Crash recovery (jobs survive server restarts)
22+
- Distributed task execution
23+
24+
This is suitable for:
25+
- Production deployments
26+
- Multi-worker scenarios
27+
- High availability requirements
28+
"""
29+
30+
def __init__(self, config: CelerySchedulerConfig):
31+
self.config = config
32+
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
33+
# TODO: Initialize Celery app with broker and result backend
34+
raise NotImplementedError("Celery scheduler not yet implemented")
35+
36+
def register_job_executor(
37+
self,
38+
job_type: str,
39+
executor: Callable[[dict], Awaitable[dict]],
40+
) -> None:
41+
"""Register a job executor function for a specific job type."""
42+
self._job_executors[job_type] = executor
43+
44+
async def initialize(self) -> None:
45+
"""Initialize the Celery scheduler."""
46+
raise NotImplementedError("Celery scheduler not yet implemented")
47+
48+
async def shutdown(self) -> None:
49+
"""Gracefully shutdown the Celery scheduler."""
50+
raise NotImplementedError("Celery scheduler not yet implemented")
51+
52+
async def schedule_job(
53+
self,
54+
job_type: str,
55+
job_data: dict,
56+
metadata: dict | None = None,
57+
) -> str:
58+
"""Schedule a new job for execution via Celery."""
59+
raise NotImplementedError("Celery scheduler not yet implemented")
60+
61+
async def get_job_info(self, job_id: str) -> dict:
62+
"""Get complete information about a job from Celery result backend."""
63+
raise NotImplementedError("Celery scheduler not yet implemented")
64+
65+
async def cancel_job(self, job_id: str) -> bool:
66+
"""Cancel a pending or running Celery job."""
67+
raise NotImplementedError("Celery scheduler not yet implemented")
68+
69+
async def delete_job(self, job_id: str) -> bool:
70+
"""Delete a completed or cancelled job from Celery result backend."""
71+
raise NotImplementedError("Celery scheduler not yet implemented")
72+
73+
async def list_jobs(
74+
self,
75+
job_type: str | None = None,
76+
status: JobStatus | None = None,
77+
limit: int = 100,
78+
offset: int = 0,
79+
) -> list[dict]:
80+
"""List jobs from Celery result backend with optional filtering."""
81+
raise NotImplementedError("Celery scheduler not yet implemented")
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
from typing import Annotated, Literal
8+
9+
from pydantic import BaseModel, Field
10+
11+
from llama_stack.core.storage.datatypes import KVStoreReference
12+
13+
14+
class SchedulerConfig(BaseModel):
15+
"""Base class for scheduler configurations."""
16+
17+
type: str
18+
19+
20+
class InlineSchedulerConfig(SchedulerConfig):
21+
"""
22+
Configuration for inline (asyncio-based) scheduler.
23+
24+
This scheduler runs jobs in the same process using asyncio tasks.
25+
Suitable for development and single-worker deployments.
26+
"""
27+
28+
type: Literal["inline"] = "inline"
29+
kvstore: KVStoreReference = Field(
30+
description="KVStore reference for persisting job state",
31+
)
32+
max_concurrent_jobs: int = Field(
33+
default=10,
34+
description="Maximum number of jobs that can run concurrently",
35+
)
36+
37+
38+
class CelerySchedulerConfig(SchedulerConfig):
39+
"""
40+
Configuration for Celery-based distributed scheduler.
41+
42+
This scheduler distributes jobs across multiple worker processes/machines.
43+
Suitable for production and multi-worker deployments.
44+
"""
45+
46+
type: Literal["celery"] = "celery"
47+
broker_url: str = Field(
48+
description="Celery broker URL (e.g., 'redis://localhost:6379/0')",
49+
)
50+
result_backend: str = Field(
51+
description="Celery result backend URL (e.g., 'redis://localhost:6379/1')",
52+
)
53+
54+
55+
SchedulerConfigUnion = Annotated[
56+
InlineSchedulerConfig | CelerySchedulerConfig,
57+
Field(discriminator="type"),
58+
]
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the terms described in the LICENSE file in
5+
# the root directory of this source tree.
6+
7+
from .scheduler import InlineSchedulerImpl
8+
9+
__all__ = ["InlineSchedulerImpl"]

0 commit comments

Comments
 (0)