Skip to content

Commit c607067

Browse files
author
Joel Collins
committed
Reimplemented tasks to use greenlets
1 parent 407121d commit c607067

File tree

4 files changed

+46
-119
lines changed

4 files changed

+46
-119
lines changed

examples/simple_thing.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
#!/usr/bin/env python
2+
from gevent import monkey
3+
4+
# Patch most system modules. Leave threads untouched so we can still use them normally if needed.
5+
print("Monkey patching with Gevenet")
6+
monkey.patch_all(thread=False)
7+
print("Monkey patching successful")
8+
19
import random
210
import math
311
import time
@@ -13,7 +21,7 @@
1321
from labthings.server.view import View
1422
from labthings.server.find import find_component
1523
from labthings.server import fields
16-
from labthings.core.tasks import taskify
24+
from labthings.core.tasks import taskify, update_task_data
1725

1826

1927
"""
@@ -22,6 +30,14 @@
2230
"""
2331

2432

33+
from gevent.monkey import get_original
34+
35+
get_ident = get_original("_thread", "get_ident")
36+
37+
print(f"ROOT IDENT")
38+
print(get_ident())
39+
40+
2541
class MyComponent:
2642
def __init__(self):
2743
self.x_range = range(-100, 100)
@@ -50,6 +66,7 @@ def average_data(self, n: int):
5066

5167
for i in range(n):
5268
summed_data = [summed_data[i] + el for i, el in enumerate(self.data)]
69+
update_task_data({"data": summed_data})
5370
time.sleep(0.25)
5471

5572
summed_data = [i / n for i in summed_data]

labthings/core/tasks/pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import threading
22
import logging
33
from functools import wraps
4+
from gevent import getcurrent
45

56
from .thread import TaskThread
67

@@ -114,7 +115,7 @@ def current_task():
114115
Returns:
115116
TaskThread -- Currently running Task thread.
116117
"""
117-
current_task_thread = threading.current_thread()
118+
current_task_thread = getcurrent()
118119
if not isinstance(current_task_thread, TaskThread):
119120
return None
120121
return current_task_thread

labthings/core/tasks/thread.py

Lines changed: 17 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,23 @@
1-
import ctypes
1+
from gevent import Greenlet
22
import datetime
33
import logging
44
import traceback
55
import uuid
66

7-
from gevent.monkey import get_original
8-
9-
# Guarantee that Task threads will always be proper system threads, regardless of Gevent patches
10-
Thread = get_original("threading", "Thread")
11-
Event = get_original("threading", "Event")
12-
Lock = get_original("threading", "Lock")
13-
147
_LOG = logging.getLogger(__name__)
158

169

1710
class ThreadTerminationError(SystemExit):
1811
"""Sibling of SystemExit, but specific to thread termination."""
1912

2013

21-
class TaskThread(Thread):
22-
def __init__(self, target=None, name=None, args=None, kwargs=None, daemon=True):
23-
Thread.__init__(
24-
self,
25-
group=None,
26-
target=target,
27-
name=name,
28-
args=args,
29-
kwargs=kwargs,
30-
daemon=daemon,
31-
)
14+
class TaskKillException(Exception):
15+
"""Sibling of SystemExit, but specific to thread termination."""
16+
17+
18+
class TaskThread(Greenlet):
19+
def __init__(self, target=None, args=None, kwargs=None):
20+
Greenlet.__init__(self)
3221
# Handle arguments
3322
if args is None:
3423
args = ()
@@ -56,10 +45,6 @@ def __init__(self, target=None, name=None, args=None, kwargs=None, daemon=True):
5645
self.progress: int = None # Percent progress of the task
5746
self.data = {} # Dictionary of custom data added during the task
5847

59-
# Stuff for handling termination
60-
self._running_lock = Lock() # Lock obtained while self._target is running
61-
self._killed = Event() # Event triggered when thread is manually terminated
62-
6348
@property
6449
def id(self):
6550
"""Return ID of current TaskThread"""
@@ -86,6 +71,9 @@ def update_data(self, data: dict):
8671
# Store data to be used before task finishes (eg for real-time plotting)
8772
self.data.update(data)
8873

74+
def _run(self):
75+
return self._thread_proc(self._target)(*self._args, **self._kwargs)
76+
8977
def _thread_proc(self, f):
9078
"""
9179
Wraps the target function to handle recording `status` and `return` to `state`.
@@ -110,95 +98,12 @@ def wrapped(*args, **kwargs):
11098

11199
return wrapped
112100

113-
def run(self):
114-
"""Overrides default threading.Thread run() method"""
115-
logging.debug((self._args, self._kwargs))
116-
try:
117-
with self._running_lock:
118-
if self._killed.is_set():
119-
raise ThreadTerminationError()
120-
if self._target:
121-
self._thread_proc(self._target)(*self._args, **self._kwargs)
122-
finally:
123-
# Avoid a refcycle if the thread is running a function with
124-
# an argument that has a member that points to the thread.
125-
del self._target, self._args, self._kwargs
126-
127-
def wait(self):
128-
"""Start waiting for the task to finish before returning"""
129-
print("Joining thread {}".format(self))
130-
self.join()
131-
return self._return_value
132-
133-
def async_raise(self, exc_type):
134-
"""Raise an exception in this thread."""
135-
# Should only be called on a started thread, so raise otherwise.
136-
if self.ident is None:
137-
raise RuntimeError(
138-
"Cannot halt a thread that hasn't started. "
139-
"No valid running thread identifier."
140-
)
141-
142-
# If the thread has died we don't want to raise an exception so log.
143-
if not self.is_alive():
144-
_LOG.debug(
145-
"Not raising %s because thread %s (%s) is not alive",
146-
exc_type,
147-
self.name,
148-
self.ident,
149-
)
150-
return
151-
152-
result = ctypes.pythonapi.PyThreadState_SetAsyncExc(
153-
ctypes.c_long(self.ident), ctypes.py_object(exc_type)
154-
)
155-
if result == 0 and self.is_alive():
156-
# Don't raise an exception an error unnecessarily if the thread is dead.
157-
raise ValueError("Thread ID was invalid.", self.ident)
158-
elif result > 1:
159-
# Something bad happened, call with a NULL exception to undo.
160-
ctypes.pythonapi.PyThreadState_SetAsyncExc(self.ident, None)
161-
raise RuntimeError(
162-
"Error: PyThreadState_SetAsyncExc %s %s (%s) %s"
163-
% (exc_type, self.name, self.ident, result)
164-
)
165-
166-
def _is_thread_proc_running(self):
167-
"""
168-
Test if thread funtion (_thread_proc) is running,
169-
by attemtping to acquire the lock _thread_proc acquires at runtime.
170-
Returns:
171-
bool: If _thread_proc is currently running
172-
"""
173-
could_acquire = self._running_lock.acquire(False) # skipcq: PYL-E1111
174-
if could_acquire:
175-
self._running_lock.release()
176-
return False
177-
return True
178-
179-
def terminate(self):
180-
"""
181-
Raise ThreadTerminatedException in the context of the given thread,
182-
which should cause the thread to exit silently.
183-
"""
184-
_LOG.warning(f"Terminating thread {self}")
185-
self._killed.set()
186-
if not self.is_alive():
187-
logging.debug("Cannot kill thread that is no longer running.")
188-
return
189-
if not self._is_thread_proc_running():
190-
logging.debug(
191-
"Thread's _thread_proc function is no longer running, "
192-
"will not kill; letting thread exit gracefully."
193-
)
194-
return
195-
self.async_raise(ThreadTerminationError)
196-
197-
# Wait for the thread for finish closing. If the threaded function has cleanup code in a try-except,
198-
# this pause allows it to finish running before the main process can continue.
199-
while self._is_thread_proc_running():
200-
pass
201-
101+
def kill(self, exception=TaskKillException, block=True, timeout=None):
102+
# Kill the greenlet
103+
Greenlet.kill(self, exception=exception, block=block, timeout=timeout)
202104
# Set state to terminated
203105
self._status = "terminated"
204106
self.progress = None
107+
108+
def terminate(self):
109+
return self.kill()

labthings/server/views/tasks.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ def get(self, task_id):
3131
3232
Includes progress and intermediate data.
3333
"""
34+
task_dict = tasks.dictionary()
3435

35-
task = tasks.dictionary().get(task_id)
36-
if not task:
36+
if not task_id in task_dict:
3737
return abort(404) # 404 Not Found
3838

39+
task = task_dict.get(task_id)
40+
3941
return task
4042

4143
@marshal_with(TaskSchema())
@@ -45,11 +47,13 @@ def delete(self, task_id):
4547
4648
If the task is finished, deletes its entry.
4749
"""
50+
task_dict = tasks.dictionary()
4851

49-
task = tasks.dictionary().get(task_id)
50-
if not task:
52+
if not task_id in task_dict:
5153
return abort(404) # 404 Not Found
5254

53-
task.terminate()
55+
task = task_dict.get(task_id)
56+
57+
task.kill(block=True, timeout=3)
5458

5559
return task

0 commit comments

Comments
 (0)