Skip to content

Commit 50ea053

Browse files
authored
Build lh.probe_tip_presence_via_pickup() (#524)
1 parent bc469aa commit 50ea053

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed

pylabrobot/liquid_handling/liquid_handler.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,3 +2334,80 @@ def assign_child_resource(
23342334
"Cannot assign child resource to liquid handler. Use "
23352335
"lh.deck.assign_child_resource() instead."
23362336
)
2337+
2338+
async def probe_tip_presence_via_pickup(
2339+
self, tip_spots: List[TipSpot], use_channels: Optional[List[int]] = None
2340+
) -> List[bool]:
2341+
"""Probe tip presence by attempting pickup on each TipSpot.
2342+
2343+
Args:
2344+
tip_spots: TipSpots to probe.
2345+
use_channels: Channels to use (must match tip_spots length).
2346+
2347+
Returns:
2348+
List[bool]: True if tip is present, False otherwise.
2349+
"""
2350+
2351+
if use_channels is None:
2352+
use_channels = list(range(len(tip_spots)))
2353+
2354+
if len(use_channels) > self.backend.num_channels:
2355+
raise ValueError(
2356+
"Liquid handler given more channels to use than exist: "
2357+
f"Given {len(use_channels)} channels to use but liquid handler "
2358+
f"only has {self.backend.num_channels}."
2359+
)
2360+
2361+
if len(use_channels) != len(tip_spots):
2362+
raise ValueError(
2363+
f"Length mismatch: received {len(use_channels)} channels for "
2364+
f"{len(tip_spots)} tip spots. One channel must be assigned per tip spot."
2365+
)
2366+
2367+
presence_flags = [True] * len(tip_spots)
2368+
z_height = tip_spots[0].get_absolute_location(z="top").z + 5
2369+
2370+
# Step 1: Cluster tip spots by x-coordinate
2371+
clusters_by_x: Dict[float, List[Tuple[TipSpot, int, int]]] = {}
2372+
for idx, tip_spot in enumerate(tip_spots):
2373+
assert tip_spot.location is not None, "TipSpot location must be at a location"
2374+
x = tip_spot.location.x
2375+
clusters_by_x.setdefault(x, []).append((tip_spot, use_channels[idx], idx))
2376+
2377+
sorted_clusters = [clusters_by_x[x] for x in sorted(clusters_by_x)]
2378+
2379+
# Step 2: Probe each cluster
2380+
for cluster in sorted_clusters:
2381+
tip_subset, channel_subset, index_subset = zip(*cluster)
2382+
2383+
try:
2384+
await self.pick_up_tips(
2385+
list(tip_subset),
2386+
use_channels=list(channel_subset),
2387+
minimum_traverse_height_at_beginning_of_a_command=z_height,
2388+
z_position_at_end_of_a_command=z_height,
2389+
)
2390+
except ChannelizedError as e:
2391+
for ch in e.errors:
2392+
if ch in channel_subset:
2393+
failed_local_idx = channel_subset.index(ch)
2394+
presence_flags[index_subset[failed_local_idx]] = False
2395+
else:
2396+
raise
2397+
2398+
# Step 3: Drop tips immediately after probing
2399+
if any(presence_flags[index] for index in index_subset):
2400+
spots = [ts for ts, _, i in cluster if presence_flags[i]]
2401+
use_channels = [uc for _, uc, i in cluster if presence_flags[i]]
2402+
try:
2403+
await self.drop_tips(
2404+
spots,
2405+
use_channels=use_channels,
2406+
# minimum_traverse_height_at_beginning_of_a_command=z_height,
2407+
z_position_at_end_of_a_command=z_height,
2408+
)
2409+
except Exception as e:
2410+
assert cluster[0][0].location is not None, "TipSpot location must be at a location"
2411+
print(f"Warning: drop_tips failed for cluster at x={cluster[0][0].location.x}: {e}")
2412+
2413+
return presence_flags

0 commit comments

Comments
 (0)