File tree Expand file tree Collapse file tree 2 files changed +50
-0
lines changed
styleguide_example/blog_examples Expand file tree Collapse file tree 2 files changed +50
-0
lines changed Original file line number Diff line number Diff line change 1+ import logging
2+ from functools import wraps
3+
4+ from celery import shared_task
5+
6+ from styleguide_example .tasks import celery_app
7+
8+ inspect = celery_app .control .inspect
9+
10+ logger = logging .getLogger (__name__ )
11+
12+
13+ def non_concurrent_task (_func = None , * args , ** kwargs ):
14+ def wrapper (func ):
15+ @wraps (func )
16+ def inner (_bound_self , * _func_args , ** _func_kwargs ):
17+ running_task_count = 0
18+
19+ queues = inspect ().active ()
20+
21+ if queues is None :
22+ queues = {}
23+
24+ for running_tasks in queues .values ():
25+ for task in running_tasks :
26+ if task ["name" ] == _bound_self .name :
27+ running_task_count += 1
28+
29+ if running_task_count > 1 :
30+ logger .warning (f"[non_concurrent_task] Task { _bound_self .name } is already running" )
31+ return
32+
33+ return func (* _func_args , ** _func_kwargs )
34+
35+ return shared_task (bind = True , * args , ** kwargs )(inner )
36+
37+ if _func is None :
38+ return wrapper
39+
40+ return wrapper (_func )
41+
42+
43+ @non_concurrent_task
44+ def test_non_concurrent_task ():
45+ logger .info ("A non-concurrent task is running" )
46+ import time
47+ time .sleep (10 )
48+ logger .info ("A non-concurrent task finished" )
Original file line number Diff line number Diff line change 1+ # ruff: noqa
2+ from styleguide_example .blog_examples .celery_non_concurrent .tasks import test_non_concurrent_task
You can’t perform that action at this time.
0 commit comments