66from typing import Callable , Optional , TYPE_CHECKING
77
88from canopen .async_guard import ensure_not_async
9+ from canopen .utils import call_callbacks
910import canopen .network
1011
1112if TYPE_CHECKING :
@@ -121,22 +122,19 @@ def __init__(self, node_id: int):
121122 #: Timestamp of last heartbeat message
122123 self .timestamp : Optional [float ] = None
123124 self .state_update = threading .Condition ()
124- self .astate_update = asyncio .Condition ()
125125 self ._callbacks = []
126126
127127 # @callback # NOTE: called from another thread
128128 @ensure_not_async # NOTE: Safeguard for accidental async use
129129 def on_heartbeat (self , can_id , data , timestamp ):
130+ new_state , = struct .unpack_from ("B" , data )
131+ # Mask out toggle bit
132+ new_state &= 0x7F
133+ logger .debug ("Received heartbeat can-id %d, state is %d" , can_id , new_state )
134+
130135 # NOTE: Blocking lock
131136 with self .state_update :
132137 self .timestamp = timestamp
133- new_state , = struct .unpack_from ("B" , data )
134- # Mask out toggle bit
135- new_state &= 0x7F
136- logger .debug ("Received heartbeat can-id %d, state is %d" , can_id , new_state )
137- for callback in self ._callbacks :
138- # FIXME: Assert if callback is coroutine?
139- callback (new_state )
140138 if new_state == 0 :
141139 # Boot-up, will go to PRE-OPERATIONAL automatically
142140 self ._state = 127
@@ -145,25 +143,8 @@ def on_heartbeat(self, can_id, data, timestamp):
145143 self ._state_received = new_state
146144 self .state_update .notify_all ()
147145
148- # @callback
149- async def aon_heartbeat (self , can_id , data , timestamp ):
150- async with self .astate_update :
151- self .timestamp = timestamp
152- new_state , = struct .unpack_from ("B" , data )
153- # Mask out toggle bit
154- new_state &= 0x7F
155- logger .debug ("Received heartbeat can-id %d, state is %d" , can_id , new_state )
156- for callback in self ._callbacks :
157- res = callback (new_state )
158- if res is not None and asyncio .iscoroutine (res ):
159- await res
160- if new_state == 0 :
161- # Boot-up, will go to PRE-OPERATIONAL automatically
162- self ._state = 127
163- else :
164- self ._state = new_state
165- self ._state_received = new_state
166- self .astate_update .notify_all ()
146+ # Call all registered callbacks
147+ call_callbacks (self ._callbacks , self .network .loop , new_state )
167148
168149 def send_command (self , code : int ):
169150 """Send an NMT command code to the node.
@@ -190,13 +171,7 @@ def wait_for_heartbeat(self, timeout: float = 10):
190171
191172 async def await_for_heartbeat (self , timeout : float = 10 ):
192173 """Wait until a heartbeat message is received."""
193- async with self .astate_update :
194- self ._state_received = None
195- try :
196- await asyncio .wait_for (self .astate_update .wait (), timeout = timeout )
197- except asyncio .TimeoutError :
198- raise NmtError ("No boot-up or heartbeat received" )
199- return self .state
174+ return await asyncio .to_thread (self .wait_for_heartbeat , timeout )
200175
201176 @ensure_not_async # NOTE: Safeguard for accidental async use
202177 def wait_for_bootup (self , timeout : float = 10 ) -> None :
@@ -216,17 +191,7 @@ def wait_for_bootup(self, timeout: float = 10) -> None:
216191
217192 async def await_for_bootup (self , timeout : float = 10 ) -> None :
218193 """Wait until a boot-up message is received."""
219- async def _wait_for_bootup ():
220- while True :
221- async with self .astate_update :
222- self ._state_received = None
223- await self .astate_update .wait ()
224- if self ._state_received == 0 :
225- return
226- try :
227- await asyncio .wait_for (_wait_for_bootup (), timeout = timeout )
228- except asyncio .TimeoutError :
229- raise NmtError ("Timeout waiting for boot-up message" )
194+ return await asyncio .to_thread (self .wait_for_bootup , timeout )
230195
231196 def add_heartbeat_callback (self , callback : Callable [[int ], None ]):
232197 """Add function to be called on heartbeat reception.
0 commit comments