@@ -50,6 +50,7 @@ def __init__(
5050 self .broker = broker
5151 self .labels = labels
5252 self .custom_task_id : Optional [str ] = None
53+ self .custom_schedule_id : Optional [str ] = None
5354
5455 def with_labels (
5556 self ,
@@ -77,6 +78,19 @@ def with_task_id(self, task_id: str) -> "AsyncKicker[_FuncParams, _ReturnType]":
7778 self .custom_task_id = task_id
7879 return self
7980
81+ def with_schedule_id (
82+ self ,
83+ schedule_id : str ,
84+ ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
85+ """
86+ Set schedule_id for current execution.
87+
88+ :param schedule_id: custom schedule id.
89+ :return: kicker with custom schedule id.
90+ """
91+ self .custom_schedule_id = schedule_id
92+ return self
93+
8094 def with_broker (
8195 self ,
8296 broker : "AsyncBroker" ,
@@ -166,7 +180,9 @@ async def schedule_by_cron(
166180
167181 :return: schedule id.
168182 """
169- schedule_id = self .broker .id_generator ()
183+ schedule_id = self .custom_schedule_id
184+ if schedule_id is None :
185+ schedule_id = self .broker .id_generator ()
170186 message = self ._prepare_message (* args , ** kwargs )
171187 cron_offset = None
172188 if isinstance (cron , CronSpec ):
@@ -201,7 +217,9 @@ async def schedule_by_time(
201217 :param args: function's args.
202218 :param kwargs: function's kwargs.
203219 """
204- schedule_id = self .broker .id_generator ()
220+ schedule_id = self .custom_schedule_id
221+ if schedule_id is None :
222+ schedule_id = self .broker .id_generator ()
205223 message = self ._prepare_message (* args , ** kwargs )
206224 scheduled = ScheduledTask (
207225 schedule_id = schedule_id ,
0 commit comments