diff --git a/pylabrobot/plate_reading/clario_star_backend.py b/pylabrobot/plate_reading/clario_star_backend.py index e67f6280f29..6674c010668 100644 --- a/pylabrobot/plate_reading/clario_star_backend.py +++ b/pylabrobot/plate_reading/clario_star_backend.py @@ -14,6 +14,7 @@ HAS_PYLIBFTDI = False _FTDI_IMPORT_ERROR = e +from plr.pylabrobot.pylabrobot.resources import plate from pylabrobot import utils from pylabrobot.io.ftdi import FTDI from pylabrobot.resources.plate import Plate @@ -99,7 +100,10 @@ async def read_resp(self, timeout=20) -> bytes: async def send(self, cmd: Union[bytearray, bytes], read_timeout=20): """Send a command to the plate reader and return the response.""" - + + message_size = (len(cmd)+7).to_bytes(2, byteorder="big") + cmd = b"\x02" + message_size + b"\x0c" + cmd + checksum = (sum(cmd) & 0xFFFF).to_bytes(2, byteorder="big") cmd = cmd + checksum + b"\x0d" @@ -123,29 +127,22 @@ async def _wait_for_ready_and_return(self, ret, timeout=150): command_status = await self.read_command_status() - if len(command_status) != 24: - logger.warning( - "unexpected response %s. I think a command status response is always 24 " "bytes", - command_status, - ) - continue - if command_status != last_status: logger.info("status changed %s", command_status.hex()) last_status = command_status else: continue - if command_status[2] != 0x18 or command_status[3] != 0x0C or command_status[4] != 0x01: + if command_status[4] != 0x01: logger.warning( - "unexpected response %s. I think 18 0c 01 indicates a command status " "response", + "unexpected response %s. I think 01 indicates a command status " "response", command_status, ) if command_status[5] not in { 0x25, 0x05, - }: # 25 is busy, 05 is ready. probably. + }: # 25 is busy, 05 is ready. probably, 01 is error? logger.warning("unexpected response %s.", command_status) if command_status[5] == 0x05: @@ -153,43 +150,97 @@ async def _wait_for_ready_and_return(self, ret, timeout=150): return ret async def read_command_status(self): - status = await self.send(b"\x02\x00\x09\x0c\x80\x00") + status = await self.send(b"\x80\x00") return status async def initialize(self): - command_response = await self.send(b"\x02\x00\x0d\x0c\x01\x00\x00\x10\x02\x00") + command_response = await self.send(b"\x80\x00") return await self._wait_for_ready_and_return(command_response) async def request_eeprom_data(self): - eeprom_response = await self.send(b"\x02\x00\x0f\x0c\x05\x07\x00\x00\x00\x00\x00\x00") + eeprom_response = await self.send(b"\x05\x07\x00\x00\x00\x00\x00\x00") return await self._wait_for_ready_and_return(eeprom_response) async def open(self): - open_response = await self.send(b"\x02\x00\x0e\x0c\x03\x01\x00\x00\x00\x00\x00") + open_response = await self.send(b"\x03\x01\x00\x00\x00\x00\x00") return await self._wait_for_ready_and_return(open_response) async def close(self, plate: Optional[Plate] = None): - close_response = await self.send(b"\x02\x00\x0e\x0c\x03\x00\x00\x00\x00\x00\x00") + close_response = await self.send(b"\x03\x00\x00\x00\x00\x00\x00") return await self._wait_for_ready_and_return(close_response) async def _mp_and_focus_height_value(self): mp_and_focus_height_value_response = await self.send( - b"\x02\x00\x0f\x0c\x05\17\x00\x00\x00\x00" + b"\x00\x00" + b"\x05\x17\x00\x00\x00\x00" + b"\x00\x00" ) return await self._wait_for_ready_and_return(mp_and_focus_height_value_response) + + async def _plate_bytes(self, plate: Plate): + """ + Returns a byte array representing the plate geometry. This is used to configure the plate + reader to read from the correct wells. + + This does not yet account for rotation. + """ + logging_prefix = "_plate_bytes" + float_to_bytes = lambda f: round(f * 100).to_bytes(2, byteorder="big") + + plate_length = plate.get_absolute_size_x() + plate_length_bytes = float_to_bytes(plate_length) + logger.debug("%s: Plate length: %s", logging_prefix, plate_length) + plate_width = plate.get_absolute_size_y() + plate_width_bytes = float_to_bytes(plate_width) + logger.debug("%s: Plate width: %s", logging_prefix, plate_width) + plate_x1 = (plate.get_well(0).location.x + plate.get_well(0).center().x) + logger.debug("%s: Well 0 location x: %s", logging_prefix, plate.get_well(0).location.x) + logger.debug("%s: Well 0 center x: %s", logging_prefix, plate.get_well(0).center().x) + logger.debug("%s: Distance from left edge to middle column 1: %s", logging_prefix, plate_x1) + plate_x1_bytes = float_to_bytes(plate_x1) + plate_y1 = (plate_width - (plate.get_well(0).location.y + plate.get_well(0).center().y)) + logger.debug("%s: Well 0 location y: %s", logging_prefix, plate.get_well(0).location.y) + logger.debug("%s: Well 0 center y: %s", logging_prefix, plate.get_well(0).center().y) + logger.debug("%s: Distance from top edge to middle row 1: %s", logging_prefix, plate_y1) + plate_y1_bytes = float_to_bytes(plate_y1) + plate_xn = plate_length - plate_x1 + logger.debug("%s: Distance from middle column 1 to right edge: %s", logging_prefix, plate_xn) + plate_xn_bytes = float_to_bytes(plate_xn) + plate_yn = plate_width - plate_y1 + logger.debug("%s: Distance from middle row 1 to bottom edge: %s", logging_prefix, plate_yn) + plate_yn_bytes = float_to_bytes(plate_yn) + plate_cols = plate.num_items_x + logger.debug("%s: Number of columns: %s", logging_prefix, plate_cols) + plate_cols_byte = plate_cols.to_bytes(1, byteorder="big") + plate_rows = plate.num_items_y + logger.debug("%s: Number of rows: %s", logging_prefix, plate_rows) + plate_rows_byte = plate_rows.to_bytes(1, byteorder="big") + # wells to read, for now we assume all wells + wells = ([1] * plate.num_items) + ([0] * (384 - plate.num_items)) + wells_bytes = sum(b << i for i, b in enumerate(wells[::-1])).to_bytes(48, 'big') + plate_encodings = (plate_length_bytes + + plate_width_bytes + + plate_x1_bytes + + plate_y1_bytes + + plate_xn_bytes + + plate_yn_bytes + + plate_cols_byte + + plate_rows_byte + + wells_bytes + ) + logger.debug("%s: full plate encodings: %s", logging_prefix, plate_encodings.hex()) + return plate_encodings - async def _run_luminescence(self, focal_height: float): + async def _run_luminescence(self, focal_height: float, plate: Plate): """Run a plate reader luminescence run.""" assert 0 <= focal_height <= 25, "focal height must be between 0 and 25 mm" focal_height_data = int(focal_height * 100).to_bytes(2, byteorder="big") + + plate_bytes = await self._plate_bytes(plate) run_response = await self.send( - b"\x02\x00\x86\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56" - b"\x1d\x06\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27" + b"\x04" + plate_bytes + + b"\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27" b"\x0f\x27\x0f\x01" + focal_height_data + b"\x00\x00\x01\x00\x00\x0e\x10\x00\x01\x00\x01\x00" b"\x01\x00\x01\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01" b"\x00\x00\x00\x01\x00\x64\x00\x20\x00\x00" @@ -213,15 +264,14 @@ async def _run_luminescence(self, focal_height: float): ): return run_response - async def _run_absorbance(self, wavelength: float): + async def _run_absorbance(self, wavelength: float, plate: Plate): """Run a plate reader absorbance run.""" wavelength_data = int(wavelength * 10).to_bytes(2, byteorder="big") + plate_bytes = await self._plate_bytes(plate) absorbance_command = ( - b"\x02\x00\x7c\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56\x1d\x06" - b"\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27" + b"\x04" + plate_bytes + + b"\x02\x02\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27" b"\x0f\x19\x01" + wavelength_data + b"\x00\x00\x00\x64\x00\x00\x00\x00\x00\x00\x00\x64\x00" b"\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x16\x00\x01\x00\x00" ) @@ -259,7 +309,7 @@ async def read_luminescence(self, plate: Plate, focal_height: float = 13) -> Lis """Read luminescence values from the plate reader.""" await self._mp_and_focus_height_value() - await self._run_luminescence(focal_height=focal_height) + await self._run_luminescence(focal_height=focal_height, plate=plate) await self._read_order_values() @@ -291,11 +341,14 @@ async def read_absorbance( report: Literal["OD", "transmittance"] = "OD", ) -> List[List[float]]: """Read absorbance values from the device. + + NOTE: Currently only supports endpoint measurements. Args: wavelength: wavelength to read absorbance at, in nanometers. report: whether to report absorbance as optical depth (OD) or transmittance. Transmittance is used interchangeably with "transmission" in the CLARIOStar software and documentation. + Returns: A 2d array of absorbance values, as transmission percentage (values between 0 and 100). @@ -303,7 +356,7 @@ async def read_absorbance( await self._mp_and_focus_height_value() - await self._run_absorbance(wavelength=wavelength) + await self._run_absorbance(wavelength=wavelength, plate=plate) await self._read_order_values() @@ -355,19 +408,3 @@ async def read_fluorescence( focal_height: float, ) -> List[List[float]]: raise NotImplementedError("Not implemented yet") - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 - - -class CLARIOStar: - def __init__(self, *args, **kwargs): - raise RuntimeError("`CLARIOStar` is deprecated. Please use `CLARIOStarBackend` instead.") - - -class CLARIOStarBackend: - def __init__(self, *args, **kwargs): - raise RuntimeError( - "`CLARIOStarBackend` (capital 'S') is deprecated. Please use `CLARIOstarBackend` instead." - )