Skip to content

Commit b1a4800

Browse files
BioShake backend (#711)
1 parent 86033dd commit b1a4800

File tree

2 files changed

+252
-0
lines changed

2 files changed

+252
-0
lines changed

pylabrobot/heating_shaking/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""A hybrid between pylabrobot.shaking and pylabrobot.temperature_controlling"""
22

33
from pylabrobot.heating_shaking.backend import HeaterShakerBackend
4+
from pylabrobot.heating_shaking.bioshake_backend import BioShake
45
from pylabrobot.heating_shaking.chatterbox import HeaterShakerChatterboxBackend
56
from pylabrobot.heating_shaking.hamilton_backend import (
67
HamiltonHeaterShakerBackend,
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
import asyncio
2+
3+
from pylabrobot.heating_shaking.backend import HeaterShakerBackend
4+
from pylabrobot.io.serial import Serial
5+
from pylabrobot.machines.backend import MachineBackend
6+
7+
try:
8+
import serial
9+
10+
HAS_SERIAL = True
11+
except ImportError as e:
12+
HAS_SERIAL = False
13+
_SERIAL_IMPORT_ERROR = e
14+
15+
16+
class BioShake(HeaterShakerBackend):
17+
def __init__(self, port: str, timeout: int = 60):
18+
if not HAS_SERIAL:
19+
raise RuntimeError(
20+
f"pyserial is required for the BioShake module backend. Import error: {_SERIAL_IMPORT_ERROR}"
21+
)
22+
23+
self.setup_finished = False
24+
self.port = port
25+
self.timeout = timeout
26+
self.io = Serial(
27+
port=self.port,
28+
baudrate=9600,
29+
bytesize=serial.EIGHTBITS,
30+
parity=serial.PARITY_NONE,
31+
stopbits=serial.STOPBITS_ONE,
32+
write_timeout=10,
33+
timeout=self.timeout,
34+
)
35+
36+
async def _send_command(self, cmd: str, delay: float = 0.5, timeout: float = 2):
37+
try:
38+
# Flush serial buffers for a clean start
39+
await self.io.reset_input_buffer()
40+
await self.io.reset_output_buffer()
41+
42+
# Send the command
43+
await self.io.write((cmd + "\r").encode("ascii"))
44+
await asyncio.sleep(delay)
45+
46+
# Read and decode the response with a timeout
47+
try:
48+
response = await asyncio.wait_for(self.io.readline(), timeout=timeout)
49+
50+
except asyncio.TimeoutError:
51+
raise RuntimeError(f"Timed out waiting for response to '{cmd}'")
52+
53+
decoded = response.decode("ascii", errors="ignore").strip()
54+
55+
# Parsing the response from the BioShake
56+
57+
# No response at all
58+
if not decoded:
59+
raise RuntimeError(f"No response for '{cmd}'")
60+
61+
# Device-specific errors
62+
if decoded.startswith("e"):
63+
raise RuntimeError(f"Device returned error for '{cmd}': '{decoded}'")
64+
65+
if decoded.startswith("u ->"):
66+
raise NotImplementedError(f"'{cmd}' not supported: '{decoded}'")
67+
68+
# Standard OK
69+
if decoded.lower().startswith("ok"):
70+
return None
71+
72+
# All other valid responses (e.g. temperature and remaining time)
73+
return decoded
74+
75+
except Exception as e:
76+
raise RuntimeError(f"Unexpected error while sending '{cmd}': {type(e).__name__}: {e}") from e
77+
78+
async def setup(self, skip_home: bool = False):
79+
await MachineBackend.setup(self)
80+
await self.io.setup()
81+
if not skip_home:
82+
# Reset first before homing it to ensure the device is ready for run
83+
await self.reset()
84+
# Additional seconds until next command can be send after reset
85+
await asyncio.sleep(4)
86+
# Now home the device
87+
await self.home()
88+
89+
async def stop(self):
90+
await MachineBackend.stop(self)
91+
await self.io.stop()
92+
93+
async def reset(self):
94+
# Reset the BioShake if stuck in "e" state
95+
# Flush serial buffers for a clean start
96+
await self.io.reset_input_buffer()
97+
await self.io.reset_output_buffer()
98+
99+
# Send the command
100+
await self.io.write(("resetDevice\r").encode("ascii"))
101+
102+
start = asyncio.get_event_loop().time()
103+
max_seconds = 30 # How long a reset typically last
104+
105+
while True:
106+
# Break the loop if process takes longer than 30 seconds
107+
if asyncio.get_event_loop().time() - start > max_seconds:
108+
raise TimeoutError("Reset did not complete in time")
109+
110+
try:
111+
# Wait for each line with a timeout
112+
response = await asyncio.wait_for(self.io.readline(), timeout=2)
113+
decoded = response.decode("ascii", errors="ignore").strip()
114+
await asyncio.sleep(0.1)
115+
116+
if len(decoded) > 0:
117+
# Stop when the final message arrives
118+
if "Initialization complete" in decoded:
119+
break
120+
121+
except asyncio.TimeoutError:
122+
# Keep polling if nothing arrives within timeout
123+
continue
124+
125+
async def home(self):
126+
# Initialize the BioShake into home position
127+
await self._send_command(cmd="shakeGoHome", delay=5)
128+
129+
async def shake(self, speed: float, acceleration: int = 0):
130+
# Check if speed is an integer
131+
if isinstance(speed, float):
132+
if not speed.is_integer():
133+
raise ValueError(f"Speed must be a whole number, not {speed}")
134+
speed = int(speed)
135+
if not isinstance(speed, int):
136+
raise TypeError(
137+
f"Speed must be an integer or a whole number float, not {type(speed).__name__}"
138+
)
139+
140+
# Get the min and max speed of the device to assert speed
141+
min_speed = int(float(await self._send_command(cmd="getShakeMinRpm", delay=0.2)))
142+
max_speed = int(float(await self._send_command(cmd="getShakeMaxRpm", delay=0.2)))
143+
144+
assert (
145+
min_speed <= speed <= max_speed
146+
), f"Speed {speed} RPM is out of range. Allowed range is {min_speed}{max_speed} RPM"
147+
148+
# Set the speed of the shaker
149+
set_speed_cmd = f"setShakeTargetSpeed{speed}"
150+
await self._send_command(cmd=set_speed_cmd)
151+
152+
# Check if accel is an integer
153+
if isinstance(acceleration, float):
154+
if not acceleration.is_integer(): # type: ignore[attr-defined] # mypy is retarded
155+
raise ValueError(f"Acceleration must be a whole number, not {acceleration}")
156+
acceleration = int(acceleration)
157+
if not isinstance(acceleration, int):
158+
raise TypeError(
159+
f"Acceleration must be an integer or a whole number float, not {type(acceleration).__name__}"
160+
)
161+
162+
# Get the min and max acceleration of the device to check bounds
163+
min_accel = int(float(await self._send_command(cmd="getShakeAccelerationMin", delay=0.2)))
164+
max_accel = int(float(await self._send_command(cmd="getShakeAccelerationMax", delay=0.2)))
165+
166+
assert (
167+
min_accel <= acceleration <= max_accel
168+
), f"Acceleration {acceleration} seconds is out of range. Allowed range is {min_accel}-{max_accel} seconds"
169+
170+
# Set the acceleration of the shaker
171+
set_accel_cmd = f"setShakeAcceleration{acceleration}"
172+
await self._send_command(cmd=set_accel_cmd, delay=0.2)
173+
174+
# Send the command to start shaking, either with or without duration
175+
176+
await self._send_command(cmd="shakeOn", delay=0.2)
177+
178+
async def stop_shaking(self, deceleration: int = 0):
179+
# Check if decel is an integer
180+
if isinstance(deceleration, float):
181+
if not deceleration.is_integer(): # type: ignore[attr-defined] # mypy is retarded
182+
raise ValueError(f"Deceleration must be a whole number, not {deceleration}")
183+
deceleration = int(deceleration)
184+
if not isinstance(deceleration, int):
185+
raise TypeError(
186+
f"Deceleration must be an integer or a whole number float, not {type(deceleration).__name__}"
187+
)
188+
189+
# Get the min and max decel of the device to asset decel
190+
min_decel = int(float(await self._send_command(cmd="getShakeAccelerationMin", delay=0.2)))
191+
max_decel = int(float(await self._send_command(cmd="getShakeAccelerationMax", delay=0.2)))
192+
193+
assert (
194+
min_decel <= deceleration <= max_decel
195+
), f"Deceleration {deceleration} seconds is out of range. Allowed range is {min_decel}-{max_decel} seconds"
196+
197+
# Set the deceleration of the shaker
198+
set_decel_cmd = f"setShakeAcceleration{deceleration}"
199+
await self._send_command(cmd=set_decel_cmd, delay=0.2)
200+
201+
# stop shaking
202+
await self._send_command(cmd="shakeOff", delay=0.2)
203+
204+
@property
205+
def supports_locking(self) -> bool:
206+
return True
207+
208+
async def lock_plate(self):
209+
await self._send_command(cmd="setElmLockPos", delay=0.3)
210+
211+
async def unlock_plate(self):
212+
await self._send_command(cmd="setElmUnlockPos", delay=0.3)
213+
214+
@property
215+
def supports_active_cooling(self) -> bool:
216+
return True
217+
218+
async def set_temperature(self, temperature: float):
219+
# Get the min and max set points of the device to assert temperature
220+
min_temp = int(float(await self._send_command(cmd="getTempMin", delay=0.2)))
221+
max_temp = int(float(await self._send_command(cmd="getTempMax", delay=0.2)))
222+
223+
assert (
224+
min_temp <= temperature <= max_temp
225+
), f"Temperature {temperature} C is out of range. Allowed range is {min_temp}{max_temp} C."
226+
227+
temperature = temperature * 10
228+
229+
# Check if temperature is an integer
230+
if isinstance(temperature, float):
231+
if not temperature.is_integer():
232+
raise ValueError(f"Temperature must be a whole number, not {temperature} (1/10 C)")
233+
temperature = int(temperature)
234+
if not isinstance(temperature, int):
235+
raise TypeError(
236+
f"Temperature must be an integer or a whole number float, not {type(temperature).__name__} (1/10 C)"
237+
)
238+
239+
set_temp_cmd = f"setTempTarget{temperature}"
240+
await self._send_command(cmd=set_temp_cmd, delay=0.2)
241+
242+
# Start temperature control
243+
await self._send_command(cmd="tempOn", delay=0.2)
244+
245+
async def get_current_temperature(self) -> float:
246+
response = await self._send_command(cmd="getTempActual", delay=0.2)
247+
return float(response)
248+
249+
async def deactivate(self):
250+
# Stop temperature control
251+
await self._send_command(cmd="tempOff", delay=0.2)

0 commit comments

Comments
 (0)