Skip to content

Commit 9e90c99

Browse files
committed
Adds driver for PreciseFlex PF400. Adds TCP connection for io.
1 parent 4745282 commit 9e90c99

File tree

3 files changed

+393
-0
lines changed

3 files changed

+393
-0
lines changed

pylabrobot/arms/backend.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
from pylabrobot.machines.backend import MachineBackend
4+
5+
6+
class ArmBackend(MachineBackend, metaclass=ABCMeta):
7+
"""Backend for a robotic arm"""
8+
9+
@abstractmethod
10+
async def move_to(self, position: tuple[float, float, float]):
11+
"""Move the arm to a specified position in 3D space."""
12+
13+
@abstractmethod
14+
async def get_position(self) -> tuple[float, float, float]:
15+
"""Get the current position of the arm in 3D space."""
16+
17+
@abstractmethod
18+
async def set_speed(self, speed: float):
19+
"""Set the speed of the arm's movement."""
20+
21+
@abstractmethod
22+
async def get_speed(self) -> float:
23+
"""Get the current speed of the arm's movement."""
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
from pylabrobot.arms.backend import ArmBackend
2+
from pylabrobot.io.tcp import TCP
3+
4+
class PreciseFlexError(Exception):
5+
def __init__(self, replycode: int, message: str):
6+
self.replycode = replycode
7+
super().__init__(f"PreciseFlexError {replycode}: {message}")
8+
9+
class PreciseFlexBackend(ArmBackend):
10+
"""Backend for the PreciseFlex robotic arm"""
11+
def __init__(self, host: str, port: int, timeout=20, profile=1) -> None:
12+
super().__init__()
13+
self.host = host
14+
self.port = port
15+
self.timeout = timeout
16+
self.profile = profile
17+
self.io = TCP(host=self.host, port=self.port)
18+
19+
async def setup(self):
20+
"""Initialize the PreciseFlex backend."""
21+
await self.io.setup()
22+
await self.power_on_robot()
23+
await self.attach()
24+
25+
async def stop(self):
26+
"""Stop the PreciseFlex backend."""
27+
await self.detach()
28+
await self.power_off_robot()
29+
await self.exit()
30+
await self.io.stop()
31+
32+
async def power_on_robot(self):
33+
"""Power on the robot."""
34+
await self.send_command(f"hp 1 {self.timeout}")
35+
36+
async def power_off_robot(self):
37+
"""Power off the robot."""
38+
await self.send_command("hp 0")
39+
40+
async def move_to(self, position: tuple[float, float, float]):
41+
x, y, z = position
42+
yaw, pitch, roll = 0.0, 0.0, 0.0 # Assuming no rotation for PF400
43+
await self.send_command(f"movec {x} {y} {z} {yaw} {pitch} {roll}")
44+
45+
async def get_position(self) -> tuple[float, float, float]:
46+
data = await self.send_command("wherec")
47+
x, y, z, yaw, pitch, roll, config = data.split(" ")
48+
return float(x), float(y), float(z)
49+
50+
async def set_speed(self, speed: float):
51+
"""Set the speed of the arm's movement."""
52+
if not (0 <= speed <= 100):
53+
raise ValueError("Speed must be between 0 and 100.")
54+
await self.send_command(f"speed {self.profile} {speed}")
55+
56+
async def get_speed(self) -> float:
57+
"""Get the current speed of the arm's movement."""
58+
data = await self.send_command(f"speed {self.profile}")
59+
return float(data)
60+
61+
def set_profile(self, profile: int):
62+
"""Set the motion profile index."""
63+
if not isinstance(profile, int) or profile < 0:
64+
raise ValueError("Profile index must be a non-negative integer.")
65+
self.profile = profile
66+
67+
def get_profile(self) -> int:
68+
"""Get the current motion profile index."""
69+
return self.profile
70+
71+
async def set_profile_values(self,
72+
speed: float,
73+
speed2: float,
74+
acceleration: float,
75+
deceleration: float,
76+
acceleration_ramp: float,
77+
deceleration_ramp: float,
78+
in_range: float,
79+
straight: bool):
80+
"""
81+
Set motion profile values for the specified profile index on the PreciseFlex robot.
82+
83+
Parameters:
84+
profile (int): Profile index to set values for.
85+
speed (float): Percentage of maximum speed. 100 = full speed. Values >100 may be accepted depending on system config.
86+
speed2 (float): Secondary speed setting, typically for Cartesian moves. Normally 0. Interpreted as a percentage.
87+
acceleration (float): Percentage of maximum acceleration. 100 = full accel.
88+
deceleration (float): Percentage of maximum deceleration. 100 = full decel.
89+
acceleration_ramp (float): Acceleration ramp time in seconds.
90+
deceleration_ramp (float): Deceleration ramp time in seconds.
91+
in_range (float): InRange value, from -1 to 100. -1 = allow blending, 0 = stop without checking, >0 = enforce position accuracy.
92+
straight (bool): If True, follow a straight-line path (-1). If False, follow a joint-based path (0).
93+
"""
94+
if not (0 <= speed):
95+
raise ValueError("Speed must be >= 0 (percent).")
96+
97+
if not (0 <= speed2):
98+
raise ValueError("Speed2 must be >= 0 (percent).")
99+
100+
if not (0 <= acceleration <= 100):
101+
raise ValueError("Acceleration must be between 0 and 100 (percent).")
102+
103+
if not (0 <= deceleration <= 100):
104+
raise ValueError("Deceleration must be between 0 and 100 (percent).")
105+
106+
if acceleration_ramp < 0:
107+
raise ValueError("Acceleration ramp must be >= 0 (seconds).")
108+
109+
if deceleration_ramp < 0:
110+
raise ValueError("Deceleration ramp must be >= 0 (seconds).")
111+
112+
if not (-1 <= in_range <= 100):
113+
raise ValueError("InRange must be between -1 and 100.")
114+
115+
straight_int = -1 if straight else 0
116+
await self.send_command(f"Profile {self.profile} {speed} {speed2} {acceleration} {deceleration} {acceleration_ramp} {deceleration_ramp} {in_range} {straight_int}")
117+
118+
async def free_mode(self, on: bool, axis: int = 0):
119+
"""
120+
Activates or deactivates free mode. The robot must be attached to enter free mode.
121+
122+
Parameters:
123+
on (bool): If True, enable free mode. If False, disable free mode for all axes.
124+
axis (int): Axis to apply free mode to. 0 = all axes or > 0 = Free just this axis. Ignored if 'on' parameter is False.
125+
"""
126+
if not on:
127+
axis = -1 # means turn off free mode for all axes
128+
await self.send_command(f"freemode {axis}")
129+
130+
async def get_profile_values(self) -> dict:
131+
"""
132+
Get the current motion profile values for the specified profile index on the PreciseFlex robot.
133+
134+
Returns:
135+
dict: A dictionary containing the profile values.
136+
"""
137+
data = await self.send_command(f"Profile {self.profile}")
138+
parts = data.split(" ")
139+
if len(parts) != 8:
140+
raise PreciseFlexError(-1, "Unexpected response format from device.")
141+
142+
return {
143+
"speed": float(parts[0]),
144+
"speed2": float(parts[1]),
145+
"acceleration": float(parts[2]),
146+
"deceleration": float(parts[3]),
147+
"acceleration_ramp": float(parts[4]),
148+
"deceleration_ramp": float(parts[5]),
149+
"in_range": float(parts[6]),
150+
"straight": parts[7] == "-1"
151+
}
152+
153+
async def halt(self):
154+
"""Halt the robot immediately."""
155+
await self.send_command("halt")
156+
157+
async def attach(self):
158+
"""Attach the robot."""
159+
await self.send_command("attach 1")
160+
161+
async def detach(self):
162+
"""Detach the robot."""
163+
await self.send_command("detach 1")
164+
165+
async def open_gripper(self):
166+
"""Open the gripper."""
167+
await self.send_command("gripper 1")
168+
169+
async def close_gripper(self):
170+
"""Close the gripper."""
171+
await self.send_command("gripper 0")
172+
173+
async def pick_plate(self,
174+
position_id: int,
175+
horizontal_compliance: bool = False,
176+
horizontal_compliance_torque: int = 0):
177+
"""Pick an item at the specified position ID.
178+
179+
Parameters:
180+
position_id (int): The ID of the position where the plate should be picked.
181+
horizontal_compliance (bool): enable horizontal compliance while closing the gripper to allow centering around the plate.
182+
horizontal_compliance_torque (int): The % of the original horizontal holding torque to be retained during compliance. If omitted, 0 is used.
183+
"""
184+
horizontal_compliance_int = 1 if horizontal_compliance else 0
185+
ret_code = await self.send_command(f"pickplate {position_id} {horizontal_compliance_int} {horizontal_compliance_torque}")
186+
if ret_code == "0":
187+
raise PreciseFlexError(-1, "the force-controlled gripper detected no plate present.")
188+
189+
async def place_plate(self,
190+
position_id: int,
191+
horizontal_compliance: bool = False,
192+
horizontal_compliance_torque: int = 0):
193+
"""Place an item at the specified position ID.
194+
195+
Parameters:
196+
position_id (int): The ID of the position where the plate should be placed.
197+
horizontal_compliance (bool): enable horizontal compliance during the move to place the plate, to allow centering in the fixture.
198+
horizontal_compliance_torque (int): The % of the original horizontal holding torque to be retained during compliance. If omitted, 0 is used.
199+
"""
200+
horizontal_compliance_int = 1 if horizontal_compliance else 0
201+
await self.send_command(f"placeplate {position_id} {horizontal_compliance_int} {horizontal_compliance_torque}")
202+
203+
async def teach_position(self, position_id: int, z_clearance: float = 50.0):
204+
""" Sets the plate location to the current robot position and configuration. The location is saved as Cartesian coordinates.
205+
206+
Parameters:
207+
position_id (int): The ID of the position to be taught.
208+
z_clearance (float): Optional. The Z Clearance value. If omitted, a value of 50 is used. Z clearance must be high enough to withdraw the gripper.
209+
"""
210+
await self.send_command(f"teachplate {position_id} {z_clearance}")
211+
212+
async def exit(self):
213+
"""Closes the communications link immediately."""
214+
await self.send_command("exit")
215+
216+
async def home(self):
217+
"""Homes robot."""
218+
await self.send_command("home")
219+
220+
async def send_command(self, command: str):
221+
await self.io.write(command.encode('utf-8') + b'\n')
222+
reply = await self.io.readline()
223+
return self._parse_reply_ensure_successful(reply)
224+
225+
def _parse_reply_ensure_successful(self, reply: bytes) -> str:
226+
"""Parse reply from Precise Flex.
227+
228+
Expected format: b'replycode data message\r\n'
229+
- replycode is an integer at the beginning
230+
- data is rest of the line (excluding CRLF)
231+
"""
232+
text = reply.decode().strip() # removes \r\n
233+
if not text:
234+
raise PreciseFlexError(-1, "Empty reply from device.")
235+
236+
parts = text.split(" ", 1)
237+
if len(parts) == 1:
238+
replycode = int(parts[0])
239+
data = ""
240+
else:
241+
replycode, data = int(parts[0]), parts[1]
242+
243+
if replycode != 0:
244+
# if error is reported, the data part generally contains the error message
245+
raise PreciseFlexError(replycode, data)
246+
247+
return data

pylabrobot/io/tcp.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import asyncio
2+
import logging
3+
from dataclasses import dataclass
4+
from io import IOBase
5+
from typing import Optional
6+
7+
from pylabrobot.io.capture import CaptureReader, Command, capturer, get_capture_or_validation_active
8+
from pylabrobot.io.errors import ValidationError
9+
from pylabrobot.io.validation_utils import LOG_LEVEL_IO, align_sequences
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
@dataclass
15+
class TCPCommand(Command):
16+
data: str
17+
18+
def __init__(self, device_id: str, action: str, data: str, module: str = "tcp"):
19+
super().__init__(module=module, device_id=device_id, action=action)
20+
self.data = data
21+
22+
23+
class TCP(IOBase):
24+
def __init__(self, host: str, port: int = 5000):
25+
self._host = host
26+
self._port = port
27+
self._reader: Optional[asyncio.StreamReader] = None
28+
self._writer: Optional[asyncio.StreamWriter] = None
29+
30+
if get_capture_or_validation_active():
31+
raise RuntimeError("Cannot create a new TCP object while capture or validation is active")
32+
33+
async def setup(self):
34+
self._reader, self._writer = await asyncio.open_connection(self._host, self._port)
35+
36+
async def stop(self):
37+
if self._writer is not None:
38+
self._writer.close()
39+
await self._writer.wait_closed()
40+
self._reader = None
41+
self._writer = None
42+
43+
async def write(self, data: bytes):
44+
assert self._writer is not None, "forgot to call setup?"
45+
self._writer.write(data + b"\n")
46+
await self._writer.drain()
47+
logger.log(LOG_LEVEL_IO, "[%s:%d] write %s", self._host, self._port, data)
48+
capturer.record(
49+
TCPCommand(device_id=f"{self._host}:{self._port}", action="write", data=data.decode("unicode_escape"))
50+
)
51+
52+
async def read(self, num_bytes: int = 128) -> bytes:
53+
assert self._reader is not None, "forgot to call setup?"
54+
data = await self._reader.read(num_bytes)
55+
logger.log(LOG_LEVEL_IO, "[%s:%d] read %s", self._host, self._port, data)
56+
capturer.record(
57+
TCPCommand(device_id=f"{self._host}:{self._port}", action="read", data=data.decode("unicode_escape"))
58+
)
59+
return data
60+
61+
async def readline(self) -> bytes:
62+
assert self._reader is not None, "forgot to call setup?"
63+
data = await self._reader.readline()
64+
logger.log(LOG_LEVEL_IO, "[%s:%d] readline %s", self._host, self._port, data)
65+
capturer.record(
66+
TCPCommand(device_id=f"{self._host}:{self._port}", action="readline", data=data.decode("unicode_escape"))
67+
)
68+
return data
69+
70+
def serialize(self):
71+
return {
72+
"host": self._host,
73+
"port": self._port,
74+
}
75+
76+
@classmethod
77+
def deserialize(cls, data: dict) -> "TCP":
78+
return cls(
79+
host=data["host"],
80+
port=data["port"],
81+
)
82+
83+
84+
class TCPValidator(TCP):
85+
def __init__(self, cr: CaptureReader, host: str, port: int = 5000):
86+
super().__init__(host, port)
87+
self.cr = cr
88+
89+
async def setup(self):
90+
pass
91+
92+
async def write(self, data: bytes):
93+
next_command = TCPCommand(**self.cr.next_command())
94+
if not (
95+
next_command.module == "tcp"
96+
and next_command.device_id == f"{self._host}:{self._port}"
97+
and next_command.action == "write"
98+
):
99+
raise ValidationError(f"Next line is {next_command}, expected TCP write")
100+
if next_command.data != data.decode("unicode_escape"):
101+
align_sequences(expected=next_command.data, actual=data.decode("unicode_escape"))
102+
raise ValidationError("Data mismatch: difference was written to stdout.")
103+
104+
async def read(self, num_bytes: int = 128) -> bytes:
105+
next_command = TCPCommand(**self.cr.next_command())
106+
if not (
107+
next_command.module == "tcp"
108+
and next_command.device_id == f"{self._host}:{self._port}"
109+
and next_command.action == "read"
110+
and len(next_command.data) == num_bytes
111+
):
112+
raise ValidationError(f"Next line is {next_command}, expected TCP read {num_bytes}")
113+
return next_command.data.encode()
114+
115+
async def readline(self) -> bytes:
116+
next_command = TCPCommand(**self.cr.next_command())
117+
if not (
118+
next_command.module == "tcp"
119+
and next_command.device_id == f"{self._host}:{self._port}"
120+
and next_command.action == "readline"
121+
):
122+
raise ValidationError(f"Next line is {next_command}, expected TCP readline")
123+
return next_command.data.encode()

0 commit comments

Comments
 (0)