11from dataclasses import asdict , is_dataclass
2+ from datetime import datetime
23from logging import getLogger
34from typing import (
45 TYPE_CHECKING ,
1617from typing_extensions import ParamSpec
1718
1819from taskiq .abc .middleware import TaskiqMiddleware
20+ from taskiq .compat import model_dump
1921from taskiq .exceptions import SendTaskError
2022from taskiq .message import TaskiqMessage
23+ from taskiq .scheduler .scheduled_task import CronSpec , ScheduledTask
2124from taskiq .task import AsyncTaskiqTask
2225from taskiq .utils import maybe_awaitable
2326
2427if TYPE_CHECKING : # pragma: no cover
2528 from taskiq .abc .broker import AsyncBroker
29+ from taskiq .abc .schedule_source import ScheduleSource
2630
2731_T = TypeVar ("_T" )
2832_FuncParams = ParamSpec ("_FuncParams" )
@@ -142,6 +146,70 @@ async def kiq(
142146 result_backend = self .broker .result_backend ,
143147 )
144148
149+ async def schedule_cron (
150+ self ,
151+ source : "ScheduleSource" ,
152+ cron : Union [str , "CronSpec" ],
153+ * args : _FuncParams .args ,
154+ ** kwargs : _FuncParams .kwargs ,
155+ ) -> None :
156+ """
157+ Function to schedule task with cron.
158+
159+ :param source: schedule source.
160+ :param cron: cron expression.
161+ :param args: function's args.
162+ :param cron_offset: cron offset.
163+ :param kwargs: function's kwargs.
164+ """
165+ message = self ._prepare_message (* args , ** kwargs )
166+ cron_offset = None
167+ if isinstance (cron , CronSpec ):
168+ cron_str = cron .to_cron ()
169+ cron_offset = cron .offset
170+ else :
171+ cron_str = cron
172+ await maybe_awaitable (
173+ source .add_schedule (
174+ ScheduledTask (
175+ task_name = message .task_name ,
176+ labels = message .labels ,
177+ args = message .args ,
178+ kwargs = message .kwargs ,
179+ cron = cron_str ,
180+ cron_offset = cron_offset ,
181+ ),
182+ ),
183+ )
184+
185+ async def schedule_time (
186+ self ,
187+ source : "ScheduleSource" ,
188+ time : datetime ,
189+ * args : _FuncParams .args ,
190+ ** kwargs : _FuncParams .kwargs ,
191+ ) -> None :
192+ """
193+ Function to schedule task to run at specific time.
194+
195+ :param source: schedule source.
196+ :param time: time to run task at.
197+ :param args: function's args.
198+ :param kwargs: function's kwargs.
199+ """
200+ message = self ._prepare_message (* args , ** kwargs )
201+ await maybe_awaitable (
202+ source .add_schedule (
203+ ScheduledTask (
204+ task_name = message .task_name ,
205+ labels = message .labels ,
206+ args = message .args ,
207+ kwargs = message .kwargs ,
208+ time = time ,
209+ ),
210+ ),
211+ )
212+
145213 @classmethod
146214 def _prepare_arg (cls , arg : Any ) -> Any :
147215 """
@@ -154,7 +222,7 @@ def _prepare_arg(cls, arg: Any) -> Any:
154222 :return: Formatted argument.
155223 """
156224 if isinstance (arg , BaseModel ):
157- arg = arg . dict ( )
225+ arg = model_dump ( arg )
158226 if is_dataclass (arg ):
159227 arg = asdict (arg )
160228 return arg
0 commit comments