Skip to content

Commit 407121d

Browse files
committed
Added ClientEvent class
1 parent 532de8c commit 407121d

File tree

1 file changed

+61
-0
lines changed

1 file changed

+61
-0
lines changed

labthings/core/event.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from gevent.hub import getcurrent
2+
import gevent
3+
import time
4+
5+
from gevent.monkey import get_original
6+
7+
# Guarantee that Task threads will always be proper system threads, regardless of Gevent patches
8+
Event = get_original("threading", "Event")
9+
10+
class ClientEvent(object):
11+
"""
12+
An event-signaller object with per-client setting and waiting.
13+
14+
A client can be any Greenlet or native Thread. This can be used, for example,
15+
to signal to clients that new data is available
16+
"""
17+
18+
def __init__(self):
19+
self.events = {}
20+
21+
def wait(self, timeout: int = 5):
22+
"""Wait for the next data frame (invoked from each client's thread)."""
23+
ident = id(getcurrent())
24+
if ident not in self.events:
25+
# this is a new client
26+
# add an entry for it in the self.events dict
27+
# each entry has two elements, a threading.Event() and a timestamp
28+
self.events[ident] = [Event(), time.time()]
29+
30+
# We have to reimplement event waiting here as we need native thread events to allow gevent context switching
31+
while not self.events[ident][0].is_set():
32+
gevent.time.sleep(0)
33+
return True
34+
35+
def set(self):
36+
"""Signal that a new frame is available."""
37+
now = time.time()
38+
remove = None
39+
for ident, event in self.events.items():
40+
if not event[0].is_set():
41+
# if this client's event is not set, then set it
42+
# also update the last set timestamp to now
43+
event[0].set()
44+
event[1] = now
45+
else:
46+
# if the client's event is already set, it means the client
47+
# did not process a previous frame
48+
# if the event stays set for more than 5 seconds, then assume
49+
# the client is gone and remove it
50+
if now - event[1] > 5:
51+
remove = ident
52+
if remove:
53+
del self.events[remove]
54+
55+
def clear(self):
56+
"""Clear frame event, once processed."""
57+
ident = id(getcurrent())
58+
if ident not in self.events:
59+
logging.error(f"Mismatched ident. Current: {ident}, available:")
60+
logging.error(self.events.keys())
61+
self.events[id(getcurrent())][0].clear()

0 commit comments

Comments
 (0)