1111 AsyncGenerator ,
1212 Awaitable ,
1313 Callable ,
14+ ClassVar ,
1415 DefaultDict ,
1516 Dict ,
1617 List ,
@@ -68,7 +69,7 @@ class AsyncBroker(ABC):
6869 in async mode.
6970 """
7071
71- available_tasks : Dict [str , AsyncTaskiqDecoratedTask [Any , Any ]] = {}
72+ global_task_registry : ClassVar [ Dict [str , AsyncTaskiqDecoratedTask [Any , Any ] ]] = {}
7273
7374 def __init__ (
7475 self ,
@@ -98,6 +99,7 @@ def __init__(
9899 self .decorator_class = AsyncTaskiqDecoratedTask
99100 self .formatter : "TaskiqFormatter" = JSONFormatter ()
100101 self .id_generator = task_id_generator
102+ self .local_task_registry : Dict [str , AsyncTaskiqDecoratedTask [Any , Any ]] = {}
101103 # Every event has a list of handlers.
102104 # Every handler is a function which takes state as a first argument.
103105 # And handler can be either sync or async.
@@ -112,6 +114,41 @@ def __init__(
112114 # True only if broker runs in scheduler process.
113115 self .is_scheduler_process : bool = False
114116
117+ def find_task (self , task_name : str ) -> Optional [AsyncTaskiqDecoratedTask [Any , Any ]]:
118+ """
119+ Returns task by name.
120+
121+ This method should be used to get task by name.
122+ Instead of accessing `available_tasks` or `local_available_tasks` directly.
123+
124+ It searches task by name in dict of tasks that
125+ were registered for this broker directly.
126+ If it fails, it checks global dict of all available tasks.
127+
128+ :param task_name: name of a task.
129+ :returns: found task or None.
130+ """
131+ return self .local_task_registry .get (
132+ task_name ,
133+ ) or self .global_task_registry .get (
134+ task_name ,
135+ )
136+
137+ def get_all_tasks (self ) -> Dict [str , AsyncTaskiqDecoratedTask [Any , Any ]]:
138+ """
139+ Method to fetch all tasks available in broker.
140+
141+ This method returns all tasks, globally and locally
142+ available in broker. With local tasks having higher priority.
143+
144+ So, if you have two tasks with the same name,
145+ one registered in global registry and one registered
146+ in local registry, then local task will be returned.
147+
148+ :return: dict of all tasks. Keys are task names, values are tasks.
149+ """
150+ return {** self .global_task_registry , ** self .local_task_registry }
151+
115152 def add_dependency_context (self , new_ctx : Dict [Any , Any ]) -> None :
116153 """
117154 Add first-level dependencies.
@@ -291,7 +328,7 @@ def inner(
291328 ),
292329 )
293330
294- self .available_tasks [ decorated_task .task_name ] = decorated_task
331+ self ._register_task ( decorated_task .task_name , decorated_task )
295332
296333 return decorated_task
297334
@@ -416,3 +453,19 @@ def with_event_handlers(
416453 """
417454 self .event_handlers [event ].extend (handlers )
418455 return self
456+
457+ def _register_task (
458+ self ,
459+ task_name : str ,
460+ task : AsyncTaskiqDecoratedTask [Any , Any ],
461+ ) -> None :
462+ """
463+ Mehtod is used to register tasks.
464+
465+ By default we register tasks in local task registry.
466+ But this behaviour can be changed in subclasses.
467+
468+ :param task_name: Name of a task.
469+ :param task: Decorated task.
470+ """
471+ self .local_task_registry [task_name ] = task
0 commit comments