Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 82 additions & 45 deletions pylabrobot/plate_reading/clario_star_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
HAS_PYLIBFTDI = False
_FTDI_IMPORT_ERROR = e

from plr.pylabrobot.pylabrobot.resources import plate
from pylabrobot import utils
from pylabrobot.io.ftdi import FTDI
from pylabrobot.resources.plate import Plate
Expand Down Expand Up @@ -99,7 +100,10 @@ async def read_resp(self, timeout=20) -> bytes:

async def send(self, cmd: Union[bytearray, bytes], read_timeout=20):
"""Send a command to the plate reader and return the response."""


message_size = (len(cmd)+7).to_bytes(2, byteorder="big")
cmd = b"\x02" + message_size + b"\x0c" + cmd

checksum = (sum(cmd) & 0xFFFF).to_bytes(2, byteorder="big")
cmd = cmd + checksum + b"\x0d"

Expand All @@ -123,73 +127,120 @@ async def _wait_for_ready_and_return(self, ret, timeout=150):

command_status = await self.read_command_status()

if len(command_status) != 24:
logger.warning(
"unexpected response %s. I think a command status response is always 24 " "bytes",
command_status,
)
continue

if command_status != last_status:
logger.info("status changed %s", command_status.hex())
last_status = command_status
else:
continue

if command_status[2] != 0x18 or command_status[3] != 0x0C or command_status[4] != 0x01:
if command_status[4] != 0x01:
logger.warning(
"unexpected response %s. I think 18 0c 01 indicates a command status " "response",
"unexpected response %s. I think 01 indicates a command status " "response",
command_status,
)

if command_status[5] not in {
0x25,
0x05,
}: # 25 is busy, 05 is ready. probably.
}: # 25 is busy, 05 is ready. probably, 01 is error?
logger.warning("unexpected response %s.", command_status)

if command_status[5] == 0x05:
logger.debug("status is ready")
return ret

async def read_command_status(self):
status = await self.send(b"\x02\x00\x09\x0c\x80\x00")
status = await self.send(b"\x80\x00")
return status

async def initialize(self):
command_response = await self.send(b"\x02\x00\x0d\x0c\x01\x00\x00\x10\x02\x00")
command_response = await self.send(b"\x80\x00")
return await self._wait_for_ready_and_return(command_response)

async def request_eeprom_data(self):
eeprom_response = await self.send(b"\x02\x00\x0f\x0c\x05\x07\x00\x00\x00\x00\x00\x00")
eeprom_response = await self.send(b"\x05\x07\x00\x00\x00\x00\x00\x00")
return await self._wait_for_ready_and_return(eeprom_response)

async def open(self):
open_response = await self.send(b"\x02\x00\x0e\x0c\x03\x01\x00\x00\x00\x00\x00")
open_response = await self.send(b"\x03\x01\x00\x00\x00\x00\x00")
return await self._wait_for_ready_and_return(open_response)

async def close(self, plate: Optional[Plate] = None):
close_response = await self.send(b"\x02\x00\x0e\x0c\x03\x00\x00\x00\x00\x00\x00")
close_response = await self.send(b"\x03\x00\x00\x00\x00\x00\x00")
return await self._wait_for_ready_and_return(close_response)

async def _mp_and_focus_height_value(self):
mp_and_focus_height_value_response = await self.send(
b"\x02\x00\x0f\x0c\x05\17\x00\x00\x00\x00" + b"\x00\x00"
b"\x05\x17\x00\x00\x00\x00" + b"\x00\x00"
)
return await self._wait_for_ready_and_return(mp_and_focus_height_value_response)

async def _plate_bytes(self, plate: Plate):
"""
Returns a byte array representing the plate geometry. This is used to configure the plate
reader to read from the correct wells.

This does not yet account for rotation.
"""
logging_prefix = "_plate_bytes"
float_to_bytes = lambda f: round(f * 100).to_bytes(2, byteorder="big")

plate_length = plate.get_absolute_size_x()
plate_length_bytes = float_to_bytes(plate_length)
logger.debug("%s: Plate length: %s", logging_prefix, plate_length)
plate_width = plate.get_absolute_size_y()
plate_width_bytes = float_to_bytes(plate_width)
logger.debug("%s: Plate width: %s", logging_prefix, plate_width)
plate_x1 = (plate.get_well(0).location.x + plate.get_well(0).center().x)
logger.debug("%s: Well 0 location x: %s", logging_prefix, plate.get_well(0).location.x)
logger.debug("%s: Well 0 center x: %s", logging_prefix, plate.get_well(0).center().x)
logger.debug("%s: Distance from left edge to middle column 1: %s", logging_prefix, plate_x1)
plate_x1_bytes = float_to_bytes(plate_x1)
plate_y1 = (plate_width - (plate.get_well(0).location.y + plate.get_well(0).center().y))
logger.debug("%s: Well 0 location y: %s", logging_prefix, plate.get_well(0).location.y)
logger.debug("%s: Well 0 center y: %s", logging_prefix, plate.get_well(0).center().y)
logger.debug("%s: Distance from top edge to middle row 1: %s", logging_prefix, plate_y1)
plate_y1_bytes = float_to_bytes(plate_y1)
plate_xn = plate_length - plate_x1
logger.debug("%s: Distance from middle column 1 to right edge: %s", logging_prefix, plate_xn)
plate_xn_bytes = float_to_bytes(plate_xn)
plate_yn = plate_width - plate_y1
logger.debug("%s: Distance from middle row 1 to bottom edge: %s", logging_prefix, plate_yn)
plate_yn_bytes = float_to_bytes(plate_yn)
plate_cols = plate.num_items_x
logger.debug("%s: Number of columns: %s", logging_prefix, plate_cols)
plate_cols_byte = plate_cols.to_bytes(1, byteorder="big")
plate_rows = plate.num_items_y
logger.debug("%s: Number of rows: %s", logging_prefix, plate_rows)
plate_rows_byte = plate_rows.to_bytes(1, byteorder="big")
# wells to read, for now we assume all wells
wells = ([1] * plate.num_items) + ([0] * (384 - plate.num_items))
wells_bytes = sum(b << i for i, b in enumerate(wells[::-1])).to_bytes(48, 'big')
plate_encodings = (plate_length_bytes +
plate_width_bytes +
plate_x1_bytes +
plate_y1_bytes +
plate_xn_bytes +
plate_yn_bytes +
plate_cols_byte +
plate_rows_byte +
wells_bytes
)
logger.debug("%s: full plate encodings: %s", logging_prefix, plate_encodings.hex())
return plate_encodings

async def _run_luminescence(self, focal_height: float):
async def _run_luminescence(self, focal_height: float, plate: Plate):
"""Run a plate reader luminescence run."""

assert 0 <= focal_height <= 25, "focal height must be between 0 and 25 mm"

focal_height_data = int(focal_height * 100).to_bytes(2, byteorder="big")

plate_bytes = await self._plate_bytes(plate)

run_response = await self.send(
b"\x02\x00\x86\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56"
b"\x1d\x06\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27"
b"\x04" + plate_bytes +
b"\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27"
b"\x0f\x27\x0f\x01" + focal_height_data + b"\x00\x00\x01\x00\x00\x0e\x10\x00\x01\x00\x01\x00"
b"\x01\x00\x01\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01"
b"\x00\x00\x00\x01\x00\x64\x00\x20\x00\x00"
Expand All @@ -213,15 +264,14 @@ async def _run_luminescence(self, focal_height: float):
):
return run_response

async def _run_absorbance(self, wavelength: float):
async def _run_absorbance(self, wavelength: float, plate: Plate):
"""Run a plate reader absorbance run."""
wavelength_data = int(wavelength * 10).to_bytes(2, byteorder="big")
plate_bytes = await self._plate_bytes(plate)

absorbance_command = (
b"\x02\x00\x7c\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56\x1d\x06"
b"\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27"
b"\x04" + plate_bytes +
b"\x02\x02\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27"
b"\x0f\x19\x01" + wavelength_data + b"\x00\x00\x00\x64\x00\x00\x00\x00\x00\x00\x00\x64\x00"
b"\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x16\x00\x01\x00\x00"
)
Expand Down Expand Up @@ -259,7 +309,7 @@ async def read_luminescence(self, plate: Plate, focal_height: float = 13) -> Lis
"""Read luminescence values from the plate reader."""
await self._mp_and_focus_height_value()

await self._run_luminescence(focal_height=focal_height)
await self._run_luminescence(focal_height=focal_height, plate=plate)

await self._read_order_values()

Expand Down Expand Up @@ -291,19 +341,22 @@ async def read_absorbance(
report: Literal["OD", "transmittance"] = "OD",
) -> List[List[float]]:
"""Read absorbance values from the device.

NOTE: Currently only supports endpoint measurements.

Args:
wavelength: wavelength to read absorbance at, in nanometers.
report: whether to report absorbance as optical depth (OD) or transmittance. Transmittance is
used interchangeably with "transmission" in the CLARIOStar software and documentation.


Returns:
A 2d array of absorbance values, as transmission percentage (values between 0 and 100).
"""

await self._mp_and_focus_height_value()

await self._run_absorbance(wavelength=wavelength)
await self._run_absorbance(wavelength=wavelength, plate=plate)

await self._read_order_values()

Expand Down Expand Up @@ -355,19 +408,3 @@ async def read_fluorescence(
focal_height: float,
) -> List[List[float]]:
raise NotImplementedError("Not implemented yet")


# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update)
# https://github.com/PyLabRobot/pylabrobot/issues/466


class CLARIOStar:
def __init__(self, *args, **kwargs):
raise RuntimeError("`CLARIOStar` is deprecated. Please use `CLARIOStarBackend` instead.")


class CLARIOStarBackend:
def __init__(self, *args, **kwargs):
raise RuntimeError(
"`CLARIOStarBackend` (capital 'S') is deprecated. Please use `CLARIOstarBackend` instead."
)
Loading