Skip to content

Commit 034029d

Browse files
authored
STARBackend.{aspirate,disense} probe_liquid_height parameter (#712)
1 parent 86b1a58 commit 034029d

File tree

2 files changed

+158
-46
lines changed

2 files changed

+158
-46
lines changed

pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py

Lines changed: 153 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
)
5353
from pylabrobot.resources import (
5454
Carrier,
55+
Container,
5556
Coordinate,
5657
Plate,
5758
Resource,
@@ -1605,6 +1606,83 @@ class LLDMode(enum.Enum):
16051606
DUAL = 3
16061607
Z_TOUCH_OFF = 4
16071608

1609+
async def probe_liquid_heights(
1610+
self,
1611+
containers: List[Container],
1612+
use_channels: List[int],
1613+
tips: List[HamiltonTip],
1614+
resource_offsets: Optional[List[Coordinate]] = None,
1615+
move_to_z_safety_after: bool = True,
1616+
) -> List[float]:
1617+
"""Probe liquid heights for the specified channels.
1618+
1619+
Moves the channels to the x and y positions of the containers, then probes the liquid height
1620+
using the CLLD function.
1621+
1622+
Returns the liquid height in each well in mm with respect to the bottom of the container cavity.
1623+
Returns `None` for channels where the liquid height could not be determined.
1624+
"""
1625+
1626+
if any(not resource.supports_compute_height_volume_functions() for resource in containers):
1627+
raise ValueError(
1628+
"automatic_surface_following can only be used with containers that support height<->volume functions."
1629+
)
1630+
1631+
resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers)
1632+
1633+
assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips)
1634+
1635+
await self.move_all_channels_in_z_safety()
1636+
1637+
# Check if all channels are on the same x position, then move there
1638+
x_pos = [
1639+
resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x
1640+
for resource, offset in zip(containers, resource_offsets)
1641+
]
1642+
if len(set(x_pos)) > 1:
1643+
raise NotImplementedError(
1644+
"automatic_surface_following is not supported for multiple x positions."
1645+
)
1646+
await self.move_channel_x(0, x_pos[0])
1647+
1648+
# move channels to above their y positions
1649+
y_pos = [
1650+
resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y
1651+
for resource, offset in zip(containers, resource_offsets)
1652+
]
1653+
await self.position_channels_in_y_direction(
1654+
{channel: y for channel, y in zip(use_channels, y_pos)}
1655+
)
1656+
1657+
# detect liquid heights
1658+
current_absolute_liquid_heights = await asyncio.gather(
1659+
*[
1660+
self.clld_probe_z_height_using_channel(
1661+
channel_idx=channel,
1662+
move_channels_to_save_pos_after=False,
1663+
lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z
1664+
+ tip.total_tip_length
1665+
- tip.fitting_depth,
1666+
start_pos_search=container.get_absolute_location("c", "c", "t").z
1667+
+ tip.total_tip_length
1668+
- tip.fitting_depth
1669+
+ 5,
1670+
)
1671+
for channel, container, tip in zip(use_channels, containers, tips)
1672+
]
1673+
)
1674+
1675+
relative_to_well = [
1676+
current_absolute_liquid_heights[i]
1677+
- resource.get_absolute_location("c", "c", "cavity_bottom").z
1678+
for i, resource in enumerate(containers)
1679+
]
1680+
1681+
if move_to_z_safety_after:
1682+
await self.move_all_channels_in_z_safety()
1683+
1684+
return relative_to_well
1685+
16081686
async def aspirate(
16091687
self,
16101688
ops: List[SingleChannelAspiration],
@@ -1643,11 +1721,13 @@ async def aspirate(
16431721
min_z_endpos: Optional[float] = None,
16441722
hamilton_liquid_classes: Optional[List[Optional[HamiltonLiquidClass]]] = None,
16451723
liquid_surfaces_no_lld: Optional[List[float]] = None,
1646-
# remove > 2026-01
1647-
immersion_depth_direction: Optional[List[int]] = None,
1724+
# PLR:
1725+
probe_liquid_height: bool = False,
1726+
# remove >2026-01
16481727
mix_volume: Optional[List[float]] = None,
16491728
mix_cycles: Optional[List[int]] = None,
16501729
mix_speed: Optional[List[float]] = None,
1730+
immersion_depth_direction: Optional[List[int]] = None,
16511731
):
16521732
"""Aspirate liquid from the specified channels.
16531733
@@ -1667,20 +1747,17 @@ async def aspirate(
16671747
pull_out_distance_transport_air: The distance to pull out when aspirating air, if LLD is
16681748
disabled.
16691749
second_section_height: The height to start the second section of aspiration.
1670-
second_section_ratio: The ratio of [the bottom of the container * 10000] / [the height top of the container].
1671-
minimum_height: The minimum height to move to, this is the end of aspiration. The channel
1672-
will move linearly from the liquid surface to this height over the course of the aspiration.
1750+
second_section_ratio:
1751+
minimum_height: The minimum height to move to, this is the end of aspiration. The channel will move linearly from the liquid surface to this height over the course of the aspiration.
16731752
immersion_depth: The z distance to move after detecting the liquid, can be into or away from the liquid surface.
16741753
surface_following_distance: The distance to follow the liquid surface.
16751754
transport_air_volume: The volume of air to aspirate after the liquid.
16761755
pre_wetting_volume: The volume of liquid to use for pre-wetting.
16771756
lld_mode: The liquid level detection mode to use.
16781757
gamma_lld_sensitivity: The sensitivity of the gamma LLD.
16791758
dp_lld_sensitivity: The sensitivity of the DP LLD.
1680-
aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above
1681-
the bottom of the well (presumably) to aspirate from.
1682-
detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if
1683-
the LLD mode is DUAL.
1759+
aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above the bottom of the well (presumably) to aspirate from.
1760+
detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if the LLD mode is DUAL.
16841761
swap_speed: Swap speed (on leaving liquid) [1mm/s]. Must be between 3 and 1600. Default 100.
16851762
settling_time: The time to wait after mix.
16861763
mix_position_from_liquid_surface: The height to aspirate from for mix (LLD or absolute terms).
@@ -1694,17 +1771,15 @@ async def aspirate(
16941771
z_drive_speed_during_2nd_section_search: Unknown.
16951772
cup_upper_edge: Unknown.
16961773
ratio_liquid_rise_to_tip_deep_in: Unknown.
1697-
immersion_depth_2nd_section: The depth to move into the liquid for the second section of
1698-
aspiration.
1774+
immersion_depth_2nd_section: The depth to move into the liquid for the second section of aspiration.
16991775
1700-
minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before
1701-
starting an aspiration.
1776+
minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before starting an aspiration.
17021777
min_z_endpos: The minimum height to move to, this is the end of aspiration.
17031778
1704-
hamilton_liquid_classes: Override the default liquid classes. See
1705-
pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py
1706-
liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0
1707-
and 360. Defaults to well bottom + liquid height. Should use absolute z.
1779+
hamilton_liquid_classes: Override the default liquid classes. See pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py
1780+
liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 and 360. Defaults to well bottom + liquid height. Should use absolute z.
1781+
1782+
probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function.
17081783
"""
17091784

17101785
# # # TODO: delete > 2026-01 # # #
@@ -1764,9 +1839,6 @@ async def aspirate(
17641839
op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness
17651840
for op in ops
17661841
]
1767-
liquid_surfaces_no_lld = liquid_surfaces_no_lld or [
1768-
wb + (op.liquid_height or 0) for wb, op in zip(well_bottoms, ops)
1769-
]
17701842
if lld_search_height is None:
17711843
lld_search_height = [
17721844
(
@@ -1859,6 +1931,28 @@ async def aspirate(
18591931
ratio_liquid_rise_to_tip_deep_in = _fill_in_defaults(ratio_liquid_rise_to_tip_deep_in, [0] * n)
18601932
immersion_depth_2nd_section = _fill_in_defaults(immersion_depth_2nd_section, [0.0] * n)
18611933

1934+
if probe_liquid_height:
1935+
if any(op.liquid_height is not None for op in ops):
1936+
raise ValueError("Cannot use probe_liquid_height when liquid heights are set.")
1937+
1938+
liquid_heights = await self.probe_liquid_heights(
1939+
containers=[op.resource for op in ops],
1940+
use_channels=use_channels,
1941+
tips=[cast(HamiltonTip, op.tip) for op in ops],
1942+
resource_offsets=[op.offset for op in ops],
1943+
move_to_z_safety_after=False,
1944+
)
1945+
1946+
# override minimum traversal height because we don't want to move channels up. we are already above the liquid.
1947+
minimum_traverse_height_at_beginning_of_a_command = 100
1948+
logger.info(f"Detected liquid heights: {liquid_heights}")
1949+
else:
1950+
liquid_heights = [op.liquid_height or 0 for op in ops]
1951+
1952+
liquid_surfaces_no_lld = liquid_surfaces_no_lld or [
1953+
wb + lh for wb, lh in zip(well_bottoms, liquid_heights)
1954+
]
1955+
18621956
try:
18631957
return await self.aspirate_pip(
18641958
aspiration_type=[0 for _ in range(n)],
@@ -1956,6 +2050,8 @@ async def dispense(
19562050
jet: Optional[List[bool]] = None,
19572051
blow_out: Optional[List[bool]] = None, # "empty" in the VENUS liquid editor
19582052
empty: Optional[List[bool]] = None, # truly "empty", does not exist in liquid editor, dm4
2053+
# PLR specific
2054+
probe_liquid_height: bool = False,
19592055
# remove in the future
19602056
immersion_depth_direction: Optional[List[int]] = None,
19612057
mix_volume: Optional[List[float]] = None,
@@ -2010,6 +2106,8 @@ async def dispense(
20102106
empty: Whether to use "empty" dispense mode for each dispense. Defaults to `False` for all.
20112107
Truly empty the tip, not available in the VENUS liquid editor, but is in the firmware
20122108
documentation. Dispense mode 4.
2109+
2110+
probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function.
20132111
"""
20142112

20152113
n = len(ops)
@@ -2083,9 +2181,6 @@ async def dispense(
20832181
op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness
20842182
for op in ops
20852183
]
2086-
liquid_surfaces_no_lld = liquid_surface_no_lld or [
2087-
ls + (op.liquid_height or 0) for ls, op in zip(well_bottoms, ops)
2088-
]
20892184
if lld_search_height is None:
20902185
lld_search_height = [
20912186
(
@@ -2158,6 +2253,28 @@ async def dispense(
21582253
mix_surface_following_distance = _fill_in_defaults(mix_surface_following_distance, [0.0] * n)
21592254
limit_curve_index = _fill_in_defaults(limit_curve_index, [0] * n)
21602255

2256+
if probe_liquid_height:
2257+
if any(op.liquid_height is not None for op in ops):
2258+
raise ValueError("Cannot use probe_liquid_height when liquid heights are set.")
2259+
2260+
liquid_heights = await self.probe_liquid_heights(
2261+
containers=[op.resource for op in ops],
2262+
use_channels=use_channels,
2263+
tips=[cast(HamiltonTip, op.tip) for op in ops],
2264+
resource_offsets=[op.offset for op in ops],
2265+
move_to_z_safety_after=False,
2266+
)
2267+
2268+
# override minimum traversal height because we don't want to move channels up. we are already above the liquid.
2269+
minimum_traverse_height_at_beginning_of_a_command = 100
2270+
logger.info(f"Detected liquid heights: {liquid_heights}")
2271+
else:
2272+
liquid_heights = [op.liquid_height or 0 for op in ops]
2273+
2274+
liquid_surfaces_no_lld = liquid_surface_no_lld or [
2275+
wb + lh for wb, lh in zip(well_bottoms, liquid_heights)
2276+
]
2277+
21612278
try:
21622279
ret = await self.dispense_pip(
21632280
tip_pattern=channels_involved,
@@ -2171,7 +2288,7 @@ async def dispense(
21712288
second_section_height=[round(sh * 10) for sh in second_section_height],
21722289
second_section_ratio=[round(sr * 10) for sr in second_section_ratio],
21732290
minimum_height=[round(mh * 10) for mh in minimum_height],
2174-
immersion_depth=[round(id_ * 10) for id_ in immersion_depth], # [0, 0]
2291+
immersion_depth=[round(id_ * 10) for id_ in immersion_depth],
21752292
immersion_depth_direction=immersion_depth_direction,
21762293
surface_following_distance=[round(sfd * 10) for sfd in surface_following_distance],
21772294
dispense_speed=[round(fr * 10) for fr in flow_rates],
@@ -7880,17 +7997,15 @@ async def clld_probe_z_height_using_channel(
78807997
78817998
Args:
78827999
channel_idx: The index of the channel to use for probing. Backmost channel = 0.
7883-
lowest_immers_pos: The lowest immersion position in mm.
7884-
start_pos_lld_search: The start position for z-touch search in mm.
8000+
lowest_immers_pos: The lowest immersion position in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth.
8001+
start_pos_lld_search: The start position for z-touch search in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth.
78858002
channel_speed: The speed of channel movement in mm/sec.
78868003
channel_acceleration: The acceleration of the channel in mm/sec**2.
78878004
detection_edge: The edge steepness at capacitive LLD detection.
78888005
detection_drop: The offset after capacitive LLD edge detection.
7889-
post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after
7890-
contacting the surface.
8006+
post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after contacting the surface.
78918007
post_detection_dist: Distance to move into the trajectory after detection in mm.
7892-
move_channels_to_save_pos_after: Flag to move channels to a safe position after
7893-
operation.
8008+
move_channels_to_save_pos_after: Flag to move channels to a safe position after operation.
78948009
78958010
Returns:
78968011
The detected Z-height in mm.
@@ -7931,26 +8046,18 @@ async def clld_probe_z_height_using_channel(
79318046
+ f" and {STARBackend.z_drive_increment_to_mm(9_999)} mm, is {post_detection_dist} mm"
79328047
)
79338048

7934-
lowest_immers_pos_str = f"{lowest_immers_pos_increments:05}"
7935-
start_pos_search_str = f"{start_pos_search_increments:05}"
7936-
channel_speed_str = f"{channel_speed_increments:05}"
7937-
channel_acc_str = f"{channel_acceleration_thousand_increments:03}"
7938-
detection_edge_str = f"{detection_edge:04}"
7939-
detection_drop_str = f"{detection_drop:04}"
7940-
post_detection_dist_str = f"{post_detection_dist_increments:04}"
7941-
79428049
try:
79438050
await self.send_command(
79448051
module=STARBackend.channel_id(channel_idx),
79458052
command="ZL",
7946-
zh=lowest_immers_pos_str, # Lowest immersion position [increment]
7947-
zc=start_pos_search_str, # Start position of LLD search [increment]
7948-
zl=channel_speed_str, # Speed of channel movement
7949-
zr=channel_acc_str, # Acceleration [1000 increment/second^2]
7950-
gt=detection_edge_str, # Edge steepness at capacitive LLD detection
7951-
gl=detection_drop_str, # Offset after capacitive LLD edge detection
8053+
zh=f"{lowest_immers_pos_increments:05}", # Lowest immersion position [increment]
8054+
zc=f"{start_pos_search_increments:05}", # Start position of LLD search [increment]
8055+
zl=f"{channel_speed_increments:05}", # Speed of channel movement
8056+
zr=f"{channel_acceleration_thousand_increments:03}", # Acceleration [1000 increment/second^2]
8057+
gt=f"{detection_edge:04}", # Edge steepness at capacitive LLD detection
8058+
gl=f"{detection_drop:04}", # Offset after capacitive LLD edge detection
79528059
zj=post_detection_trajectory, # Movement of the channel after contacting surface
7953-
zi=post_detection_dist_str, # Distance to move up after detection [increment]
8060+
zi=f"{post_detection_dist_increments:04}", # Distance to move up after detection [increment]
79548061
)
79558062
except STARFirmwareError:
79568063
await self.move_all_channels_in_z_safety()
@@ -8273,7 +8380,7 @@ async def position_channels_in_y_direction(self, ys: Dict[int, float], make_spac
82738380
raise ValueError("Channel N would hit the front of the robot")
82748381

82758382
if not all(
8276-
int((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_999 # float fixing
8383+
round((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_990 # float fixing
82778384
for i in range(len(channel_locations) - 1)
82788385
):
82798386
raise ValueError("Channels must be at least 9mm apart and in descending order")

pylabrobot/resources/container.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ def serialize_state(self) -> Dict[str, Any]:
6969
def load_state(self, state: Dict[str, Any]):
7070
self.tracker.load_state(state)
7171

72+
def supports_compute_height_volume_functions(self) -> bool:
73+
return (
74+
self._compute_volume_from_height is not None and self._compute_height_from_volume is not None
75+
)
76+
7277
def compute_volume_from_height(self, height: float) -> float:
7378
"""Compute the volume of liquid in a container from the height of the liquid relative to the
7479
bottom of the container."""

0 commit comments

Comments
 (0)