diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 31e0d53a414..eceb7a2b8ad 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1795,7 +1795,9 @@ async def aspirate( immersion_depth_direction = immersion_depth_direction or [ 0 if (id_ >= 0) else 1 for id_ in immersion_depth ] - immersion_depth = [im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)] + immersion_depth = [ + im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth) + ] surface_following_distance = _fill_in_defaults(surface_following_distance, [0.0] * n) flow_rates = [ op.flow_rate or (hlc.aspiration_flow_rate if hlc is not None else 100.0) @@ -2101,7 +2103,9 @@ async def dispense( immersion_depth_direction = immersion_depth_direction or [ 0 if (id_ >= 0) else 1 for id_ in immersion_depth ] - immersion_depth = [im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)] + immersion_depth = [ + im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth) + ] surface_following_distance = _fill_in_defaults(surface_following_distance, [0.0] * n) flow_rates = [ op.flow_rate or (hlc.dispense_flow_rate if hlc is not None else 120.0) diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index ae2da828713..0f6f43d77c3 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -9,8 +9,10 @@ import logging import threading import warnings +from itertools import zip_longest from typing import ( Any, + AsyncIterator, Awaitable, Callable, Dict, @@ -1263,93 +1265,146 @@ async def dispense( async def transfer( self, - source: Well, - targets: List[Well], - source_vol: Optional[float] = None, - ratios: Optional[List[float]] = None, - target_vols: Optional[List[float]] = None, - aspiration_flow_rate: Optional[float] = None, - dispense_flow_rates: Optional[List[Optional[float]]] = None, - **backend_kwargs, + source_resources: Sequence[Container], + dest_resources: Sequence[Container], + vols: List[float], + tip_spots: Union[List[TipSpot], AsyncIterator[TipSpot]], + aspiration_kwargs: Optional[Dict[str, Any]] = None, + dispense_kwargs: Optional[Dict[str, Any]] = None, + tip_drop_method: Literal["return", "discard"] = "discard", ): - """Transfer liquid from one well to another. + """Transfer liquid from one set of resources to another. Each input resource matches to exactly one output resource. Examples: + Transfer liquid from one column to another column: + >>> await lh.transfer( + ... source_resources=plate1["A1":"H8"], + ... dest_resources=plate2["A1":"H8"], + ... vols=[50] * 8, + ... tip_spots=tip_rack["A1":"H1"], + ... ) + """ - Transfer 50 uL of liquid from the first well to the second well: - - >>> await lh.transfer(plate["A1"], plate["B1"], source_vol=50) - - Transfer 80 uL of liquid from the first well equally to the first column: + if not (len(source_resources) == len(dest_resources) == len(vols)): + raise ValueError( + "Number of source and destination resources must match, but got " + f"{len(source_resources)} source resources, {len(dest_resources)} destination resources, " + f"and {len(vols)} volumes." + ) - >>> await lh.transfer(plate["A1"], plate["A1:H1"], source_vol=80) + if isinstance(tip_spots, list) and len(tip_spots) < len(source_resources): + raise ValueError( + "Number of tip spots must be at least the number of channels, " + f"but got {len(tip_spots)} tip spots and {len(source_resources)} transfers." + ) + if hasattr(tip_spots, "__aiter__") and hasattr(tip_spots, "__anext__"): + tip_spots = [await tip_spots.__anext__() for _ in source_resources] # type: ignore + assert isinstance(tip_spots, list) + + for batch in range(0, len(source_resources), self.backend.num_channels): + batch_sources = source_resources[batch : batch + self.backend.num_channels] + batch_destinations = dest_resources[batch : batch + self.backend.num_channels] + batch_vols = vols[batch : batch + self.backend.num_channels] + batch_tip_spots = tip_spots[batch : batch + self.backend.num_channels] + + await self.pick_up_tips(batch_tip_spots) + + await self.aspirate( + resources=batch_sources, + vols=batch_vols, + **(aspiration_kwargs or {}), + ) - Transfer 60 uL of liquid from the first well in a 1:2 ratio to 2 other wells: + await self.dispense( + resources=batch_destinations, + vols=batch_vols, + **(dispense_kwargs or {}), + ) - >>> await lh.transfer(plate["A1"], plate["B1:C1"], source_vol=60, ratios=[2, 1]) + if tip_drop_method == "return": + await self.return_tips() + else: + await self.discard_tips() - Transfer arbitrary volumes to the first column: + async def distribute( + self, + operations: Dict[Container, List[Tuple[Union[Container, List[Container]], float]]], + tip_spots: Union[List[TipSpot], AsyncIterator[TipSpot]], + dead_volume: float = 10, + tip_drop_method: Literal["return", "discard"] = "discard", + aspiration_kwargs: Optional[Dict[str, Any]] = None, + dispense_kwargs: Optional[Dict[str, Any]] = None, + ): + """ + Distribute liquid from one resource to multiple resources. - >>> await lh.transfer(plate["A1"], plate["A1:H1"], target_vols=[3, 1, 4, 1, 5, 9, 6, 2]) + Examples: + Distribute liquid from one well to multiple wells: + >>> await lh.distribute({ + ... plate1["A1"]: [(plate2["A1"], 50), (plate2["A2"], 50)], + ... plate1["A2"]: [(plate2["B1"], 100), (plate2["B2"], 100), (plate2["B3"], 100)], + ... }) Args: - source: The source well. - targets: The target wells. - source_vol: The volume to transfer from the source well. - ratios: The ratios to use when transferring liquid to the target wells. If not specified, then - the volumes will be distributed equally. - target_vols: The volumes to transfer to the target wells. If specified, `source_vols` and - `ratios` must be `None`. - aspiration_flow_rate: The flow rate to use when aspirating, in ul/s. If `None`, the backend - default will be used. - dispense_flow_rates: The flow rates to use when dispensing, in ul/s. If `None`, the backend - default will be used. Either a single flow rate for all channels, or a list of flow rates, - one for each target well. - - Raises: - RuntimeError: If the setup has not been run. See :meth:`~LiquidHandler.setup`. + operations: A dictionary mapping source resources to a list of tuples, each containing a + destination resource and the volume to dispense to that resource. """ - self._log_command( - "transfer", - source=source, - targets=targets, - source_vol=source_vol, - ratios=ratios, - target_vols=target_vols, - aspiration_flow_rate=aspiration_flow_rate, - dispense_flow_rates=dispense_flow_rates, - ) - - if target_vols is not None: - if ratios is not None: - raise TypeError("Cannot specify ratios and target_vols at the same time") - if source_vol is not None: - raise TypeError("Cannot specify source_vol and target_vols at the same time") - else: - if source_vol is None: - raise TypeError("Must specify either source_vol or target_vols") - - if ratios is None: - ratios = [1] * len(targets) + if isinstance(tip_spots, list) and len(tip_spots) < len(operations): + raise ValueError( + "Number of tip spots must be at least the number of channels, " + f"but got {len(tip_spots)} tip spots and {len(operations)} distributions." + ) + if hasattr(tip_spots, "__aiter__") and hasattr(tip_spots, "__anext__"): + tip_spots = [await tip_spots.__anext__() for _ in operations] # type: ignore + assert isinstance(tip_spots, list) + + for source, dests in operations.items(): + for i, (dest, vol) in enumerate(dests): + if isinstance(dest, list): + if len(dest) > 1: + raise ValueError("Only one destination per dispense operation is supported.") + if len(dest) == 0: + raise ValueError("Destination list cannot be empty.") + operations[source][i] = (dest[0], vol) + + operations_list = list(operations.items()) + for batch in range(0, len(operations_list), self.backend.num_channels): + batch_operations = operations_list[batch : batch + self.backend.num_channels] + batch_tips = tip_spots[batch : batch + self.backend.num_channels] + batch_sources = [src for src, _ in batch_operations] + batch_destinations = [dest for _, dest in batch_operations] + batch_volumes = [sum(v for _, v in dests) + dead_volume for dests in batch_destinations] + use_channels = list(range(len(batch_operations))) + + await self.pick_up_tips(batch_tips) + + # Aspirate from all source resources + await self.aspirate( + resources=batch_sources, + vols=batch_volumes, + **(aspiration_kwargs or {}), + ) - target_vols = [source_vol * r / sum(ratios) for r in ratios] + for group in zip_longest(*batch_destinations): + dest, vols, channels = zip( + *( + (pair[0], pair[1], ch) + for pair, ch in zip_longest(group, use_channels) + if pair is not None + ) + ) + await self.dispense( + resources=list(dest), + vols=list(vols), + use_channels=list(channels), + **(dispense_kwargs or {}), + ) - await self.aspirate( - resources=[source], - vols=[sum(target_vols)], - flow_rates=[aspiration_flow_rate], - **backend_kwargs, - ) - dispense_flow_rates = dispense_flow_rates or [None] * len(targets) - for target, vol, dfr in zip(targets, target_vols, dispense_flow_rates): - await self.dispense( - resources=[target], - vols=[vol], - flow_rates=[dfr], - use_channels=[0], - **backend_kwargs, - ) + if tip_drop_method == "return": + await self.return_tips() + else: + await self.discard_tips() @contextlib.contextmanager def use_channels(self, channels: List[int]):