From 07441dfec5137eea07164807d0eb95ac7934ad52 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Thu, 26 Jun 2025 13:44:30 -0500 Subject: [PATCH 01/13] first pass at OTB+ reader --- mne/io/__init__.pyi | 2 + mne/io/otb/__init__.py | 7 ++ mne/io/otb/otb.py | 251 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 260 insertions(+) create mode 100644 mne/io/otb/__init__.py create mode 100644 mne/io/otb/otb.py diff --git a/mne/io/__init__.pyi b/mne/io/__init__.pyi index a9c11415f15..71f35986d1b 100644 --- a/mne/io/__init__.pyi +++ b/mne/io/__init__.pyi @@ -43,6 +43,7 @@ __all__ = [ "read_raw_nihon", "read_raw_nirx", "read_raw_nsx", + "read_raw_otb", "read_raw_persyst", "read_raw_snirf", "show_fiff", @@ -87,5 +88,6 @@ from .nicolet import read_raw_nicolet from .nihon import read_raw_nihon from .nirx import read_raw_nirx from .nsx import read_raw_nsx +from .otb import read_raw_otb from .persyst import read_raw_persyst from .snirf import read_raw_snirf diff --git a/mne/io/otb/__init__.py b/mne/io/otb/__init__.py new file mode 100644 index 00000000000..dfdbfeedd99 --- /dev/null +++ b/mne/io/otb/__init__.py @@ -0,0 +1,7 @@ +"""OTB module for reading EMG data in OTB4 and OTB+ formats.""" + +# Authors: The MNE-Python contributors. +# License: BSD-3-Clause +# Copyright the MNE-Python contributors. + +from .otb import read_raw_otb diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py new file mode 100644 index 00000000000..ef3bf7fb023 --- /dev/null +++ b/mne/io/otb/otb.py @@ -0,0 +1,251 @@ +# Authors: The MNE-Python contributors. +# License: BSD-3-Clause +# Copyright the MNE-Python contributors. + +import tarfile +from collections import Counter +from datetime import datetime +from pathlib import Path + +import numpy as np +from defusedxml import ElementTree as ET + +from ..._fiff.meas_info import create_info +from ...utils import _check_fname, fill_doc, logger, verbose, warn +from ..base import BaseRaw + +OTB_PLUS_DTYPE = np.int16 # the file I have is int16; TBD if *all* OTB+ are like that +FORMAT_MAPPING = dict( + d="double", + f="single", + i="int", + h="short", +) +OTB_PLUS_FORMAT = FORMAT_MAPPING[OTB_PLUS_DTYPE().dtype.char] + + +def _parse_patient_xml(tree): + """Convert an ElementTree to a dict.""" + + def _parse_sex(sex): + # TODO English-centric; any value of "sex" not starting with "m" or "f" will get + # classed as "unknown". TBD if non-English-like values in the XML are possible + # (e.g. maybe the recording GUI is localized but the XML values are not?) + return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 + + def _parse_date(dt): + return datetime.fromisoformat(dt).date() + + subj_info_mapping = ( + ("family_name", "last_name", str), + ("first_name", "first_name", str), + ("weight", "weight", float), + ("height", "height", float), + ("sex", "sex", _parse_sex), + ("birth_date", "birthday", _parse_date), + ) + subject_info = dict() + for source, target, func in subj_info_mapping: + if value := tree.find(source): + subject_info[target] = func(value.text) + return subject_info + + +@fill_doc +class RawOTB(BaseRaw): + """Raw object from an OTB file. + + Parameters + ---------- + fname : path-like + Path to the OTB file. + %(verbose)s + + See Also + -------- + mne.io.Raw : Documentation of attributes and methods. + """ + + @verbose + def __init__(self, fname, *, verbose=None): + # Adapted from the MATLAB code at: + # https://github.com/OTBioelettronica/OTB-Matlab/tree/main/MATLAB%20Open%20and%20Processing%20OTBFiles + # with permission to relicense as BSD-3 granted here: + # https://github.com/OTBioelettronica/OTB-Python/issues/2#issuecomment-2979135882 + fname = str(_check_fname(fname, "read", True, "fname")) + otb_version = "four" if fname.endswith(".otb4") else "plus" + logger.info(f"Loading {fname}") + + self.preload = True # lazy loading not supported + + highpass = list() + lowpass = list() + ch_names = list() + ch_types = list() + + # TODO verify these are the only non-data channel IDs + NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel") + + with tarfile.open(fname, "r") as fid: + fnames = fid.getnames() + # the .sig file is the binary channel data + sig_fname = [_fname for _fname in fnames if _fname.endswith(".sig")] + assert len(sig_fname) == 1 # TODO is this a valid assumption? + sig_fname = sig_fname[0] + data_size_bytes = fid.getmember(sig_fname).size + # the .xml file with the matching basename contains signal metadata + metadata_fname = str(Path(sig_fname).with_suffix(".xml")) + metadata = ET.fromstring(fid.extractfile(metadata_fname).read()) + # patient info + patient_info_xml = ET.fromstring(fid.extractfile("patient.xml").read()) + # structure of `metadata` is: + # Device + # └ Channels + # ├ Adapter + # | ├ Channel + # | ├ ... + # | └ Channel + # ├ ... + # └ Adapter + # ├ Channel + # ├ ... + # └ Channel + assert metadata.tag == "Device" + sfreq = float(metadata.attrib["SampleFrequency"]) + n_chan = int(metadata.attrib["DeviceTotalChannels"]) + bit_depth = int(metadata.attrib["ad_bits"]) + assert bit_depth == 16, f"expected 16-bit data, got {bit_depth}" # TODO verify + gains = np.full(n_chan, np.nan) + # check in advance where we'll need to append indices to uniquify ch_names + n_ch_by_type = Counter([ch.get("ID") for ch in metadata.iter("Channel")]) + dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] + # iterate over adapters & channels to extract gain, filters, names, etc + for adapter in metadata.iter("Adapter"): + ch_offset = int(adapter.get("ChannelStartIndex")) + adapter_gain = float(adapter.get("Gain")) + # we only care about lowpass/highpass on the data channels + # TODO verify these two are the only non-data adapter types + if adapter.get("ID") not in ("AdapterQuaternions", "AdapterControl"): + highpass.append(float(adapter.get("HighPassFilter"))) + lowpass.append(float(adapter.get("LowPassFilter"))) + + for ch in adapter.iter("Channel"): + # uniquify channel names by appending channel index (if needed) + ix = int(ch.get("Index")) + ch_id = ch.get("ID") + # TODO better to call these "emg_1" etc? should we left-zeropad ix? + ch_names.append(f"{ch_id}_{ix}" if ch_id in dupl_ids else ch_id) + # handle signal conversion (TODO verify original units) + unit_conversion = 1 if ch_id in NON_DATA_CHS else 1e6 + gains[ix + ch_offset] = ( + float(ch.get("Gain")) * adapter_gain * unit_conversion + ) + + # TODO verify ch_type for quats, buffer channel, and ramp channel + ch_types.append("misc" if ch_id in NON_DATA_CHS else "emg") + + # compute number of samples + n_samples, extra = divmod(data_size_bytes, (bit_depth / 8) * n_chan) + if extra != 0: + warn( + f"Number of bytes in file ({data_size_bytes}) not evenly divided by " + f"number of channels ({n_chan}). File may be corrupted or truncated." + ) + n_samples = int(n_samples) + + # check filter freqs + if len(highpass) > 1: + warn( + "More than one highpass frequency found in file; choosing highest " + f"({max(highpass)})" + ) + if len(lowpass) > 1: + warn( + "More than one lowpass frequency found in file; choosing lowest " + f"({min(lowpass)})" + ) + highpass = max(highpass) + lowpass = min(lowpass) + + # create info + info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq) + subject_info = _parse_patient_xml(patient_info_xml) + info.update(subject_info=subject_info) + with info._unlock(): + info["highpass"] = highpass + info["lowpass"] = lowpass + if meas_date := patient_info_xml.find("time"): + info["meas_date"] = datetime.fromisoformat(meas_date.text) + + # sanity check + if dur := patient_info_xml.find("duration"): + assert float(dur.text) == n_samples / sfreq + + # TODO other fields in patient_info_xml: + # protocol_code, place, pathology, commentsPatient, comments + + # TODO parse files markers_0.xml, markers_1.xml as annotations + + # populate raw_extras + raw_extras = dict( + n_samples=n_samples, + gains=gains, + dtype=OTB_PLUS_DTYPE, + data_size_bytes=data_size_bytes, + sig_fname=sig_fname, + bit_depth=bit_depth, + otb_version=otb_version, + ) + + super().__init__( + info, + preload=True, + last_samps=(n_samples - 1,), + filenames=[fname], + orig_format=OTB_PLUS_FORMAT, + # orig_units="mV", # TODO verify + raw_extras=[raw_extras], + verbose=verbose, + ) + + def _preload_data(self, preload): + """Load raw data from an OTB+ file.""" + _extras = self._raw_extras[0] + sig_fname = _extras["sig_fname"] + power_supply = 3.3 if _extras["otb_version"] == "plus" else None + bit_depth = _extras["bit_depth"] + gains = _extras["gains"] + + with tarfile.open(self.filenames[0], "r") as fid: + _data = np.frombuffer( + fid.extractfile(sig_fname).read(), + dtype=_extras["dtype"], + ) + self._data = _data * (1000 * power_supply / 2**bit_depth / gains)[:, np.newaxis] + + +@fill_doc +def read_raw_otb(fname, verbose=None) -> RawOTB: + """Reader for an OTB (.otb4/.otb+) recording. + + Parameters + ---------- + fname : path-like + Path to the OTB file. + %(verbose)s + + Returns + ------- + raw : instance of RawOTB + A Raw object containing OTB data. + See :class:`mne.io.Raw` for documentation of attributes and methods. + + See Also + -------- + mne.io.Raw : Documentation of attributes and methods of RawPersyst. + + Notes + ----- + ``preload=False`` is not supported for this file format. + """ + return RawOTB(fname, verbose=verbose) From 16fe42aa4fc280294d0726fe3833a80f45a39e51 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Thu, 26 Jun 2025 14:41:35 -0500 Subject: [PATCH 02/13] dedup unit conversion; add question [ci skip] --- mne/io/otb/otb.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index ef3bf7fb023..6d5553156b9 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -135,11 +135,8 @@ def __init__(self, fname, *, verbose=None): ch_id = ch.get("ID") # TODO better to call these "emg_1" etc? should we left-zeropad ix? ch_names.append(f"{ch_id}_{ix}" if ch_id in dupl_ids else ch_id) - # handle signal conversion (TODO verify original units) - unit_conversion = 1 if ch_id in NON_DATA_CHS else 1e6 - gains[ix + ch_offset] = ( - float(ch.get("Gain")) * adapter_gain * unit_conversion - ) + # store gains + gains[ix + ch_offset] = float(ch.get("Gain")) * adapter_gain # TODO verify ch_type for quats, buffer channel, and ramp channel ch_types.append("misc" if ch_id in NON_DATA_CHS else "emg") @@ -221,6 +218,9 @@ def _preload_data(self, preload): fid.extractfile(sig_fname).read(), dtype=_extras["dtype"], ) + # TODO is the factor of 1000 (copied from the MATLAB code) to convert Volts + # milliVolts? If so we should remove it (in MNE we always store in SI units, so + # we should keep it in V) self._data = _data * (1000 * power_supply / 2**bit_depth / gains)[:, np.newaxis] From 0b4b92f24f770e7ee4df7f2117359166320f2c11 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Tue, 1 Jul 2025 17:16:56 -0500 Subject: [PATCH 03/13] bunch of fixes --- mne/io/otb/otb.py | 172 +++++++++++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 63 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 6d5553156b9..f05a9b2bd3e 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -4,7 +4,7 @@ import tarfile from collections import Counter -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path import numpy as np @@ -14,24 +14,14 @@ from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw -OTB_PLUS_DTYPE = np.int16 # the file I have is int16; TBD if *all* OTB+ are like that -FORMAT_MAPPING = dict( - d="double", - f="single", - i="int", - h="short", -) -OTB_PLUS_FORMAT = FORMAT_MAPPING[OTB_PLUS_DTYPE().dtype.char] - def _parse_patient_xml(tree): """Convert an ElementTree to a dict.""" def _parse_sex(sex): - # TODO English-centric; any value of "sex" not starting with "m" or "f" will get - # classed as "unknown". TBD if non-English-like values in the XML are possible - # (e.g. maybe the recording GUI is localized but the XML values are not?) - return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 + # TODO For devices that generate `.otb+` files, the recording GUI only has M or + # F options and choosing one is mandatory. For `.otb4` the field is optional. + return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 # 0 means "unknown" def _parse_date(dt): return datetime.fromisoformat(dt).date() @@ -46,7 +36,8 @@ def _parse_date(dt): ) subject_info = dict() for source, target, func in subj_info_mapping: - if value := tree.find(source): + value = tree.find(source) + if value is not None: subject_info[target] = func(value.text) return subject_info @@ -73,7 +64,8 @@ def __init__(self, fname, *, verbose=None): # with permission to relicense as BSD-3 granted here: # https://github.com/OTBioelettronica/OTB-Python/issues/2#issuecomment-2979135882 fname = str(_check_fname(fname, "read", True, "fname")) - otb_version = "four" if fname.endswith(".otb4") else "plus" + if fname.endswith(".otb4"): + raise NotImplementedError(".otb4 format is not yet supported") logger.info(f"Loading {fname}") self.preload = True # lazy loading not supported @@ -83,14 +75,23 @@ def __init__(self, fname, *, verbose=None): ch_names = list() ch_types = list() - # TODO verify these are the only non-data channel IDs - NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel") + # TODO verify these are the only non-data channel IDs (other than "AUX*" which + # are handled separately via glob) + NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel", "LoadCellChannel") + POWER_SUPPLY = 3.3 # volts with tarfile.open(fname, "r") as fid: fnames = fid.getnames() # the .sig file is the binary channel data sig_fname = [_fname for _fname in fnames if _fname.endswith(".sig")] - assert len(sig_fname) == 1 # TODO is this a valid assumption? + if len(sig_fname) != 1: + raise NotImplementedError( + "multiple .sig files found in the OTB+ archive. Probably this " + "means that an acquisition was imported into another session. " + "This is not yet supported; please open an issue at " + "https://github.com/mne-tools/mne-emg/issues if you want us to add " + "such support." + ) sig_fname = sig_fname[0] data_size_bytes = fid.getmember(sig_fname).size # the .xml file with the matching basename contains signal metadata @@ -102,9 +103,9 @@ def __init__(self, fname, *, verbose=None): # Device # └ Channels # ├ Adapter - # | ├ Channel - # | ├ ... - # | └ Channel + # │ ├ Channel + # │ ├ ... + # │ └ Channel # ├ ... # └ Adapter # ├ Channel @@ -114,14 +115,36 @@ def __init__(self, fname, *, verbose=None): sfreq = float(metadata.attrib["SampleFrequency"]) n_chan = int(metadata.attrib["DeviceTotalChannels"]) bit_depth = int(metadata.attrib["ad_bits"]) - assert bit_depth == 16, f"expected 16-bit data, got {bit_depth}" # TODO verify + model = metadata.attrib["Name"] + + # TODO we may not need this? only relevant for Quattrocento device, and `n_chan` + # defined above should already be correct/sufficient + # if model := metadata.attrib.get("Model"): + # max_n_chan = int(model[-3:]) + if bit_depth == 16: + _dtype = np.int16 + elif bit_depth == 24: # EEG data recorded on OTB devices do this + # this is possible but will be a bit tricky, see: + # https://stackoverflow.com/a/34128171 + # https://stackoverflow.com/a/11967503 + raise NotImplementedError( + "OTB+ files with 24-bit data are not yet supported." + ) + else: + raise NotImplementedError( + f"expected 16- or 24-bit data, but file metadata says {bit_depth}-bit. " + "If this file can be successfully read with other software (i.e. it is " + "not corrupted), please open an issue at " + "https://github.com/mne-tools/mne-emg/issues so we can add support for " + "your use case." + ) gains = np.full(n_chan, np.nan) # check in advance where we'll need to append indices to uniquify ch_names n_ch_by_type = Counter([ch.get("ID") for ch in metadata.iter("Channel")]) dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] # iterate over adapters & channels to extract gain, filters, names, etc - for adapter in metadata.iter("Adapter"): - ch_offset = int(adapter.get("ChannelStartIndex")) + for adapter_ix, adapter in enumerate(metadata.iter("Adapter")): + adapter_ch_offset = int(adapter.get("ChannelStartIndex")) adapter_gain = float(adapter.get("Gain")) # we only care about lowpass/highpass on the data channels # TODO verify these two are the only non-data adapter types @@ -130,19 +153,34 @@ def __init__(self, fname, *, verbose=None): lowpass.append(float(adapter.get("LowPassFilter"))) for ch in adapter.iter("Channel"): - # uniquify channel names by appending channel index (if needed) ix = int(ch.get("Index")) ch_id = ch.get("ID") - # TODO better to call these "emg_1" etc? should we left-zeropad ix? + # # see if we can parse the adapter name to get row,col info + # pattern = re.compile( + # # connector type inter-elec dist grid rows grid cols + # r"(?:[a-zA-Z]+)(?:(?P\d+)MM)(?P\d{2})(?P\d{2})" + # ) + # if match := pattern.match(ch_id): + # col = ix % int(match["col"]) + # row = ix // int(match["row"]) + # ch_name = f"EMG_{adapter_ix}({row:02},{col:02})" + # elif ch_id + # else: + # ch_name = f"EMG_{ix + adapter_ch_offset:03}" + # ch_names.append(ch_name) ch_names.append(f"{ch_id}_{ix}" if ch_id in dupl_ids else ch_id) # store gains - gains[ix + ch_offset] = float(ch.get("Gain")) * adapter_gain - + gains[ix + adapter_ch_offset] = float(ch.get("Gain")) * adapter_gain # TODO verify ch_type for quats, buffer channel, and ramp channel - ch_types.append("misc" if ch_id in NON_DATA_CHS else "emg") + ch_types.append( + "misc" + if ch_id in NON_DATA_CHS or ch_id.lower().startswith("aux") + else "emg" + ) + assert np.isfinite(gains).all() # compute number of samples - n_samples, extra = divmod(data_size_bytes, (bit_depth / 8) * n_chan) + n_samples, extra = divmod(data_size_bytes, (bit_depth // 8) * n_chan) if extra != 0: warn( f"Number of bytes in file ({data_size_bytes}) not evenly divided by " @@ -150,57 +188,72 @@ def __init__(self, fname, *, verbose=None): ) n_samples = int(n_samples) - # check filter freqs + # check filter freqs. + # TODO filter freqs can vary by adapter, so in theory we might get different + # filters for different *data* channels (not just different between data and + # misc/aux/whatever). if len(highpass) > 1: warn( - "More than one highpass frequency found in file; choosing highest " - f"({max(highpass)})" + "More than one highpass frequency found in file; choosing lowest " + f"({min(highpass)})" ) if len(lowpass) > 1: warn( - "More than one lowpass frequency found in file; choosing lowest " - f"({min(lowpass)})" + "More than one lowpass frequency found in file; choosing highest " + f"({max(lowpass)})" ) - highpass = max(highpass) - lowpass = min(lowpass) + highpass = min(highpass) + lowpass = max(lowpass) # create info info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq) subject_info = _parse_patient_xml(patient_info_xml) - info.update(subject_info=subject_info) + device_info = dict(type="OTB", model=model) # TODO type, model, serial, site + site = patient_info_xml.find("place") + if site is not None: + device_info.update(site=site.text) + info.update(subject_info=subject_info, device_info=device_info) with info._unlock(): info["highpass"] = highpass info["lowpass"] = lowpass - if meas_date := patient_info_xml.find("time"): - info["meas_date"] = datetime.fromisoformat(meas_date.text) + for _ch in info["chs"]: + cal = 1 / 2**bit_depth / gains[ix + adapter_ch_offset] + _ch.update(cal=cal, range=POWER_SUPPLY) + meas_date = patient_info_xml.find("time") + if meas_date is not None: + info["meas_date"] = datetime.fromisoformat(meas_date.text).astimezone( + timezone.utc + ) # sanity check - if dur := patient_info_xml.find("duration"): - assert float(dur.text) == n_samples / sfreq + dur = patient_info_xml.find("duration") + if dur is not None: + np.testing.assert_almost_equal( + float(dur.text), n_samples / sfreq, decimal=3 + ) # TODO other fields in patient_info_xml: - # protocol_code, place, pathology, commentsPatient, comments + # protocol_code, pathology, commentsPatient, comments - # TODO parse files markers_0.xml, markers_1.xml as annotations + # TODO parse files markers_0.xml, markers_1.xml as annotations? # populate raw_extras - raw_extras = dict( - n_samples=n_samples, - gains=gains, - dtype=OTB_PLUS_DTYPE, - data_size_bytes=data_size_bytes, - sig_fname=sig_fname, - bit_depth=bit_depth, - otb_version=otb_version, + raw_extras = dict(dtype=_dtype, sig_fname=sig_fname) + FORMAT_MAPPING = dict( + d="double", + f="single", + i="int", + h="short", ) + orig_format = FORMAT_MAPPING[_dtype().dtype.char] super().__init__( info, preload=True, last_samps=(n_samples - 1,), filenames=[fname], - orig_format=OTB_PLUS_FORMAT, - # orig_units="mV", # TODO verify + orig_format=orig_format, + # orig_units="V", # TODO maybe not needed raw_extras=[raw_extras], verbose=verbose, ) @@ -209,19 +262,12 @@ def _preload_data(self, preload): """Load raw data from an OTB+ file.""" _extras = self._raw_extras[0] sig_fname = _extras["sig_fname"] - power_supply = 3.3 if _extras["otb_version"] == "plus" else None - bit_depth = _extras["bit_depth"] - gains = _extras["gains"] with tarfile.open(self.filenames[0], "r") as fid: - _data = np.frombuffer( + self._data = np.frombuffer( fid.extractfile(sig_fname).read(), dtype=_extras["dtype"], ) - # TODO is the factor of 1000 (copied from the MATLAB code) to convert Volts - # milliVolts? If so we should remove it (in MNE we always store in SI units, so - # we should keep it in V) - self._data = _data * (1000 * power_supply / 2**bit_depth / gains)[:, np.newaxis] @fill_doc From 6cb51a4016a8f3192153bb479f5b8ef25a861f44 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Fri, 4 Jul 2025 12:58:09 -0500 Subject: [PATCH 04/13] fix preload [ci skip] --- mne/io/otb/otb.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index f05a9b2bd3e..37b4f794b6f 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -264,10 +264,21 @@ def _preload_data(self, preload): sig_fname = _extras["sig_fname"] with tarfile.open(self.filenames[0], "r") as fid: - self._data = np.frombuffer( - fid.extractfile(sig_fname).read(), - dtype=_extras["dtype"], + _data = ( + np.frombuffer( + fid.extractfile(sig_fname).read(), + dtype=_extras["dtype"], + ) + .reshape(-1, self.info["nchan"]) + .T ) + cals = np.array( + [ + _ch["cal"] * _ch["range"] * _ch.get("scale", 1.0) + for _ch in self.info["chs"] + ] + ) + self._data = _data * cals[:, np.newaxis] @fill_doc From 398330d30118500cbe2d9fd93cde0f2862a54e04 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Thu, 7 Aug 2025 12:01:53 -0500 Subject: [PATCH 05/13] WIP adding support for otb4 [ci skip] --- mne/io/otb/otb.py | 160 ++++++++++++++++++++++++++-------------------- 1 file changed, 90 insertions(+), 70 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 37b4f794b6f..08cf1935817 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -15,6 +15,10 @@ from ..base import BaseRaw +def _parse_date(dt): + return datetime.fromisoformat(dt).date() + + def _parse_patient_xml(tree): """Convert an ElementTree to a dict.""" @@ -23,9 +27,6 @@ def _parse_sex(sex): # F options and choosing one is mandatory. For `.otb4` the field is optional. return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 # 0 means "unknown" - def _parse_date(dt): - return datetime.fromisoformat(dt).date() - subj_info_mapping = ( ("family_name", "last_name", str), ("first_name", "first_name", str), @@ -42,6 +43,26 @@ def _parse_date(dt): return subject_info +def _parse_otb_plus_metadata(metadata, extras_metadata): + assert metadata.tag == "Device" + sfreq = float(metadata.attrib["SampleFrequency"]) + n_chan = int(metadata.attrib["DeviceTotalChannels"]) + bit_depth = int(metadata.attrib["ad_bits"]) + model = metadata.attrib["Name"] + adc_range = 3.3 + return dict( + sfreq=sfreq, + n_chan=n_chan, + bit_depth=bit_depth, + model=model, + adc_range=adc_range, + ) + + +def _parse_otb_four_metadata(metadata, extras_metadata): + pass + + @fill_doc class RawOTB(BaseRaw): """Raw object from an OTB file. @@ -64,8 +85,7 @@ def __init__(self, fname, *, verbose=None): # with permission to relicense as BSD-3 granted here: # https://github.com/OTBioelettronica/OTB-Python/issues/2#issuecomment-2979135882 fname = str(_check_fname(fname, "read", True, "fname")) - if fname.endswith(".otb4"): - raise NotImplementedError(".otb4 format is not yet supported") + v4_format = fname.endswith(".otb4") logger.info(f"Loading {fname}") self.preload = True # lazy loading not supported @@ -75,52 +95,46 @@ def __init__(self, fname, *, verbose=None): ch_names = list() ch_types = list() - # TODO verify these are the only non-data channel IDs (other than "AUX*" which - # are handled separately via glob) + # these are the only non-data channel IDs (besides "AUX*", handled via glob) NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel", "LoadCellChannel") - POWER_SUPPLY = 3.3 # volts with tarfile.open(fname, "r") as fid: fnames = fid.getnames() - # the .sig file is the binary channel data - sig_fname = [_fname for _fname in fnames if _fname.endswith(".sig")] - if len(sig_fname) != 1: - raise NotImplementedError( - "multiple .sig files found in the OTB+ archive. Probably this " - "means that an acquisition was imported into another session. " - "This is not yet supported; please open an issue at " - "https://github.com/mne-tools/mne-emg/issues if you want us to add " - "such support." - ) - sig_fname = sig_fname[0] - data_size_bytes = fid.getmember(sig_fname).size - # the .xml file with the matching basename contains signal metadata - metadata_fname = str(Path(sig_fname).with_suffix(".xml")) - metadata = ET.fromstring(fid.extractfile(metadata_fname).read()) - # patient info - patient_info_xml = ET.fromstring(fid.extractfile("patient.xml").read()) - # structure of `metadata` is: - # Device - # └ Channels - # ├ Adapter - # │ ├ Channel - # │ ├ ... - # │ └ Channel - # ├ ... - # └ Adapter - # ├ Channel - # ├ ... - # └ Channel - assert metadata.tag == "Device" - sfreq = float(metadata.attrib["SampleFrequency"]) - n_chan = int(metadata.attrib["DeviceTotalChannels"]) - bit_depth = int(metadata.attrib["ad_bits"]) - model = metadata.attrib["Name"] - - # TODO we may not need this? only relevant for Quattrocento device, and `n_chan` - # defined above should already be correct/sufficient - # if model := metadata.attrib.get("Model"): - # max_n_chan = int(model[-3:]) + # the .sig file(s) are the binary channel data. + sig_fnames = [_fname for _fname in fnames if _fname.endswith(".sig")] + # TODO ↓↓↓↓↓↓↓↓ make compatible with multiple sig_fnames + data_size_bytes = fid.getmember(sig_fnames[0]).size + # triage the file format versions + if v4_format: + metadata_fname = "DeviceParameters.xml" + extras_fname = "Tracks_000.xml" + parse_func = _parse_otb_four_metadata + else: + # .otb4 format may legitimately have multiple .sig files, but + # .otb+ should not (if it's truly raw data) + if len(sig_fnames) > 1: + raise NotImplementedError( + "multiple .sig files found in the OTB+ archive. Probably this " + "means that an acquisition was imported into another session. " + "This is not yet supported; please open an issue at " + "https://github.com/mne-tools/mne-emg/issues if you want us to " + "add such support." + ) + # the .xml file with the matching basename contains signal metadata + metadata_fname = str(Path(sig_fnames[0]).with_suffix(".xml")) + extras_fname = "patient.xml" + parse_func = _parse_otb_plus_metadata + # parse the XML into a tree + metadata_tree = ET.fromstring(fid.extractfile(metadata_fname).read()) + extras_tree = ET.fromstring(fid.extractfile(extras_fname).read()) + # extract what we need from the tree + metadata = parse_func(metadata_tree, extras_tree) + sfreq = metadata["sfreq"] + n_chan = metadata["n_chan"] + bit_depth = metadata["bit_depth"] + model = metadata["model"] + adc_range = metadata["adc_range"] + if bit_depth == 16: _dtype = np.int16 elif bit_depth == 24: # EEG data recorded on OTB devices do this @@ -140,10 +154,10 @@ def __init__(self, fname, *, verbose=None): ) gains = np.full(n_chan, np.nan) # check in advance where we'll need to append indices to uniquify ch_names - n_ch_by_type = Counter([ch.get("ID") for ch in metadata.iter("Channel")]) + n_ch_by_type = Counter([ch.get("ID") for ch in metadata_tree.iter("Channel")]) dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] # iterate over adapters & channels to extract gain, filters, names, etc - for adapter_ix, adapter in enumerate(metadata.iter("Adapter")): + for adapter_ix, adapter in enumerate(metadata_tree.iter("Adapter")): adapter_ch_offset = int(adapter.get("ChannelStartIndex")) adapter_gain = float(adapter.get("Gain")) # we only care about lowpass/highpass on the data channels @@ -188,28 +202,28 @@ def __init__(self, fname, *, verbose=None): ) n_samples = int(n_samples) - # check filter freqs. - # TODO filter freqs can vary by adapter, so in theory we might get different + # check filter freqs. Can vary by adapter, so in theory we might get different # filters for different *data* channels (not just different between data and # misc/aux/whatever). if len(highpass) > 1: warn( "More than one highpass frequency found in file; choosing lowest " - f"({min(highpass)})" + f"({min(highpass)} Hz)" ) if len(lowpass) > 1: warn( "More than one lowpass frequency found in file; choosing highest " - f"({max(lowpass)})" + f"({max(lowpass)} Hz)" ) highpass = min(highpass) lowpass = max(lowpass) # create info info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq) - subject_info = _parse_patient_xml(patient_info_xml) - device_info = dict(type="OTB", model=model) # TODO type, model, serial, site - site = patient_info_xml.find("place") + subject_info = _parse_patient_xml(extras_tree) + device_info = dict(type="OTB", model=model) # other allowed keys: serial + meas_date = extras_tree.find("time") + site = extras_tree.find("place") if site is not None: device_info.update(site=site.text) info.update(subject_info=subject_info, device_info=device_info) @@ -218,27 +232,26 @@ def __init__(self, fname, *, verbose=None): info["lowpass"] = lowpass for _ch in info["chs"]: cal = 1 / 2**bit_depth / gains[ix + adapter_ch_offset] - _ch.update(cal=cal, range=POWER_SUPPLY) - meas_date = patient_info_xml.find("time") + _ch.update(cal=cal, range=adc_range) if meas_date is not None: info["meas_date"] = datetime.fromisoformat(meas_date.text).astimezone( timezone.utc ) # sanity check - dur = patient_info_xml.find("duration") + dur = extras_tree.find("duration") if dur is not None: np.testing.assert_almost_equal( float(dur.text), n_samples / sfreq, decimal=3 ) - # TODO other fields in patient_info_xml: + # TODO other fields in extras_tree: # protocol_code, pathology, commentsPatient, comments # TODO parse files markers_0.xml, markers_1.xml as annotations? # populate raw_extras - raw_extras = dict(dtype=_dtype, sig_fname=sig_fname) + raw_extras = dict(dtype=_dtype, sig_fnames=sig_fnames) FORMAT_MAPPING = dict( d="double", f="single", @@ -261,17 +274,24 @@ def __init__(self, fname, *, verbose=None): def _preload_data(self, preload): """Load raw data from an OTB+ file.""" _extras = self._raw_extras[0] - sig_fname = _extras["sig_fname"] + sig_fnames = _extras["sig_fnames"] with tarfile.open(self.filenames[0], "r") as fid: - _data = ( - np.frombuffer( - fid.extractfile(sig_fname).read(), - dtype=_extras["dtype"], + _data = list() + for sig_fname in sig_fnames: + _data.append( + np.frombuffer( + fid.extractfile(sig_fname).read(), + dtype=_extras["dtype"], + ) + .reshape(-1, self.info["nchan"]) + .T ) - .reshape(-1, self.info["nchan"]) - .T - ) + if len(_data) == 1: + _data = _data[0] + else: + _data = np.concatenate(_data, axis=0) + cals = np.array( [ _ch["cal"] * _ch["range"] * _ch.get("scale", 1.0) @@ -283,7 +303,7 @@ def _preload_data(self, preload): @fill_doc def read_raw_otb(fname, verbose=None) -> RawOTB: - """Reader for an OTB (.otb4/.otb+) recording. + """Reader for an OTB (.otb/.otb+/.otb4) recording. Parameters ---------- From 05a02224ab8ea3bfd0f9ceaacdc00202d803004e Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Thu, 7 Aug 2025 17:40:55 -0500 Subject: [PATCH 06/13] more compatibility tweaks [ci skip] --- mne/io/otb/otb.py | 149 ++++++++++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 66 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 08cf1935817..583770548b8 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -14,6 +14,9 @@ from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw +# these are the only non-data channel IDs (besides "AUX*", handled via glob) +_NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel", "LoadCellChannel") + def _parse_date(dt): return datetime.fromisoformat(dt).date() @@ -23,8 +26,8 @@ def _parse_patient_xml(tree): """Convert an ElementTree to a dict.""" def _parse_sex(sex): - # TODO For devices that generate `.otb+` files, the recording GUI only has M or - # F options and choosing one is mandatory. For `.otb4` the field is optional. + # For devices that generate `.otb+` files, the recording GUI only has M or F + # options and choosing one is mandatory. For `.otb4` the field is optional. return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 # 0 means "unknown" subj_info_mapping = ( @@ -45,17 +48,71 @@ def _parse_sex(sex): def _parse_otb_plus_metadata(metadata, extras_metadata): assert metadata.tag == "Device" + # device-level metadata sfreq = float(metadata.attrib["SampleFrequency"]) n_chan = int(metadata.attrib["DeviceTotalChannels"]) bit_depth = int(metadata.attrib["ad_bits"]) - model = metadata.attrib["Name"] - adc_range = 3.3 + device_name = metadata.attrib["Name"] + adc_range = 3.3 # TODO is this V or mV ?? + # containers + gains = np.full(n_chan, np.nan) + ch_names = list() + ch_types = list() + highpass = list() + lowpass = list() + # check in advance where we'll need to append indices to uniquify ch_names + n_ch_by_type = Counter([ch.get("ID") for ch in metadata.iter("Channel")]) + dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] + # iterate over adapters & channels to extract gain, filters, names, etc + for adapter in metadata.iter("Adapter"): + adapter_id = adapter.get("ID") + adapter_gain = float(adapter.get("Gain")) + ch_offset = int(adapter.get("ChannelStartIndex")) + # we only really care about lowpass/highpass on the data channels + if adapter_id not in ("AdapterQuaternions", "AdapterControl"): + highpass.append(float(adapter.get("HighPassFilter"))) + lowpass.append(float(adapter.get("LowPassFilter"))) + for ch in adapter.iter("Channel"): + ix = int(ch.get("Index")) + ch_id = ch.get("ID") + # # see if we can parse the adapter name to get row,col info + # pattern = re.compile( + # # connector type inter-elec dist grid rows grid cols + # r"(?:[a-zA-Z]+)(?:(?P\d+)MM)(?P\d{2})(?P\d{2})" + # ) + # if match := pattern.match(ch_id): + # col = ix % int(match["col"]) + # row = ix // int(match["row"]) + # ch_name = f"EMG_{adapter_ix}({row:02},{col:02})" + # elif ch_id + # else: + # ch_name = f"EMG_{ix + adapter_ch_offset:03}" + # ch_names.append(ch_name) + ch_names.append(f"{ch_id}_{ix}" if ch_id in dupl_ids else ch_id) + # store gains + gain_ix = ix + ch_offset + gains[gain_ix] = float(ch.get("Gain")) * adapter_gain + # TODO verify ch_type for quats, buffer channel, and ramp channel + ch_types.append( + "misc" + if ch_id in _NON_DATA_CHS or ch_id.lower().startswith("aux") + else "emg" + ) + # parse subject info + subject_info = _parse_patient_xml(extras_metadata) + return dict( sfreq=sfreq, n_chan=n_chan, bit_depth=bit_depth, - model=model, + device_name=device_name, adc_range=adc_range, + subject_info=subject_info, + gains=gains, + ch_names=ch_names, + ch_types=ch_types, + highpass=highpass, + lowpass=lowpass, ) @@ -90,14 +147,6 @@ def __init__(self, fname, *, verbose=None): self.preload = True # lazy loading not supported - highpass = list() - lowpass = list() - ch_names = list() - ch_types = list() - - # these are the only non-data channel IDs (besides "AUX*", handled via glob) - NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel", "LoadCellChannel") - with tarfile.open(fname, "r") as fid: fnames = fid.getnames() # the .sig file(s) are the binary channel data. @@ -132,8 +181,14 @@ def __init__(self, fname, *, verbose=None): sfreq = metadata["sfreq"] n_chan = metadata["n_chan"] bit_depth = metadata["bit_depth"] - model = metadata["model"] + device_name = metadata["device_name"] adc_range = metadata["adc_range"] + subject_info = metadata["subject_info"] + ch_names = metadata["ch_names"] + ch_types = metadata["ch_types"] + gains = metadata["gains"] + highpass = metadata["highpass"] + lowpass = metadata["lowpass"] if bit_depth == 16: _dtype = np.int16 @@ -150,49 +205,8 @@ def __init__(self, fname, *, verbose=None): "If this file can be successfully read with other software (i.e. it is " "not corrupted), please open an issue at " "https://github.com/mne-tools/mne-emg/issues so we can add support for " - "your use case." + "your file." ) - gains = np.full(n_chan, np.nan) - # check in advance where we'll need to append indices to uniquify ch_names - n_ch_by_type = Counter([ch.get("ID") for ch in metadata_tree.iter("Channel")]) - dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] - # iterate over adapters & channels to extract gain, filters, names, etc - for adapter_ix, adapter in enumerate(metadata_tree.iter("Adapter")): - adapter_ch_offset = int(adapter.get("ChannelStartIndex")) - adapter_gain = float(adapter.get("Gain")) - # we only care about lowpass/highpass on the data channels - # TODO verify these two are the only non-data adapter types - if adapter.get("ID") not in ("AdapterQuaternions", "AdapterControl"): - highpass.append(float(adapter.get("HighPassFilter"))) - lowpass.append(float(adapter.get("LowPassFilter"))) - - for ch in adapter.iter("Channel"): - ix = int(ch.get("Index")) - ch_id = ch.get("ID") - # # see if we can parse the adapter name to get row,col info - # pattern = re.compile( - # # connector type inter-elec dist grid rows grid cols - # r"(?:[a-zA-Z]+)(?:(?P\d+)MM)(?P\d{2})(?P\d{2})" - # ) - # if match := pattern.match(ch_id): - # col = ix % int(match["col"]) - # row = ix // int(match["row"]) - # ch_name = f"EMG_{adapter_ix}({row:02},{col:02})" - # elif ch_id - # else: - # ch_name = f"EMG_{ix + adapter_ch_offset:03}" - # ch_names.append(ch_name) - ch_names.append(f"{ch_id}_{ix}" if ch_id in dupl_ids else ch_id) - # store gains - gains[ix + adapter_ch_offset] = float(ch.get("Gain")) * adapter_gain - # TODO verify ch_type for quats, buffer channel, and ramp channel - ch_types.append( - "misc" - if ch_id in NON_DATA_CHS or ch_id.lower().startswith("aux") - else "emg" - ) - assert np.isfinite(gains).all() - # compute number of samples n_samples, extra = divmod(data_size_bytes, (bit_depth // 8) * n_chan) if extra != 0: @@ -202,6 +216,9 @@ def __init__(self, fname, *, verbose=None): ) n_samples = int(n_samples) + # validate gains + assert np.isfinite(gains).all() + # check filter freqs. Can vary by adapter, so in theory we might get different # filters for different *data* channels (not just different between data and # misc/aux/whatever). @@ -220,8 +237,7 @@ def __init__(self, fname, *, verbose=None): # create info info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq) - subject_info = _parse_patient_xml(extras_tree) - device_info = dict(type="OTB", model=model) # other allowed keys: serial + device_info = dict(type="OTB", model=device_name) # other allowed keys: serial meas_date = extras_tree.find("time") site = extras_tree.find("place") if site is not None: @@ -230,8 +246,8 @@ def __init__(self, fname, *, verbose=None): with info._unlock(): info["highpass"] = highpass info["lowpass"] = lowpass - for _ch in info["chs"]: - cal = 1 / 2**bit_depth / gains[ix + adapter_ch_offset] + for ix, _ch in enumerate(info["chs"]): + cal = 1 / 2**bit_depth / gains[ix] _ch.update(cal=cal, range=adc_range) if meas_date is not None: info["meas_date"] = datetime.fromisoformat(meas_date.text).astimezone( @@ -245,7 +261,7 @@ def __init__(self, fname, *, verbose=None): float(dur.text), n_samples / sfreq, decimal=3 ) - # TODO other fields in extras_tree: + # TODO other fields in extras_tree for otb+ format: # protocol_code, pathology, commentsPatient, comments # TODO parse files markers_0.xml, markers_1.xml as annotations? @@ -266,7 +282,7 @@ def __init__(self, fname, *, verbose=None): last_samps=(n_samples - 1,), filenames=[fname], orig_format=orig_format, - # orig_units="V", # TODO maybe not needed + # orig_units=dict(...), # TODO needed? raw_extras=[raw_extras], verbose=verbose, ) @@ -292,11 +308,12 @@ def _preload_data(self, preload): else: _data = np.concatenate(_data, axis=0) + # TODO without this fudge factor, the scale of the signals seems way too high + # (sample data channels show a dynamic range of 0.2 - 3.3 V) + # the puzzling thing is that in the MATLAB code the fudge is 1e3 (not 1e-3) ?!? + fudge_factor = 1e-3 cals = np.array( - [ - _ch["cal"] * _ch["range"] * _ch.get("scale", 1.0) - for _ch in self.info["chs"] - ] + [_ch["cal"] * _ch["range"] * fudge_factor for _ch in self.info["chs"]] ) self._data = _data * cals[:, np.newaxis] From e4dc61fd1154cd662953ebe6847fb11f0a99f839 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Fri, 8 Aug 2025 17:34:50 -0500 Subject: [PATCH 07/13] update adc_range unit; comments [ci skip] --- mne/io/otb/otb.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 583770548b8..dc01cee2a1c 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -10,6 +10,7 @@ import numpy as np from defusedxml import ElementTree as ET +from ..._fiff.constants import FIFF from ..._fiff.meas_info import create_info from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw @@ -53,7 +54,7 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): n_chan = int(metadata.attrib["DeviceTotalChannels"]) bit_depth = int(metadata.attrib["ad_bits"]) device_name = metadata.attrib["Name"] - adc_range = 3.3 # TODO is this V or mV ?? + adc_range = 0.0033 # 3.3 mV (TODO VERIFY) # containers gains = np.full(n_chan, np.nan) ch_names = list() @@ -93,11 +94,19 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): gain_ix = ix + ch_offset gains[gain_ix] = float(ch.get("Gain")) * adapter_gain # TODO verify ch_type for quats, buffer channel, and ramp channel - ch_types.append( - "misc" - if ch_id in _NON_DATA_CHS or ch_id.lower().startswith("aux") - else "emg" - ) + # ramp and controls channels definitely "MISC" + # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify + # what quats should be, as there are only 4 quat channels. The FIFF quats: + # 0: obsolete + # 1-3: rotations + # 4-6: translations + if ch_id == "Quaternions": + ch_type = FIFF.FIFFV_QUAT_0 # TODO verify + elif ch_id in _NON_DATA_CHS or ch_id.lower().startswith("aux"): + ch_type = "misc" + else: + ch_type = "emg" + ch_types.append(ch_type) # parse subject info subject_info = _parse_patient_xml(extras_metadata) @@ -248,7 +257,13 @@ def __init__(self, fname, *, verbose=None): info["lowpass"] = lowpass for ix, _ch in enumerate(info["chs"]): cal = 1 / 2**bit_depth / gains[ix] - _ch.update(cal=cal, range=adc_range) + # TODO need different range for Quaternions? + _range = ( + adc_range + if _ch["kind"] in (FIFF.FIFFV_EMG_CH, FIFF.FIFFV_EEG_CH) + else 1.0 + ) + _ch.update(cal=cal, range=_range) if meas_date is not None: info["meas_date"] = datetime.fromisoformat(meas_date.text).astimezone( timezone.utc @@ -308,13 +323,7 @@ def _preload_data(self, preload): else: _data = np.concatenate(_data, axis=0) - # TODO without this fudge factor, the scale of the signals seems way too high - # (sample data channels show a dynamic range of 0.2 - 3.3 V) - # the puzzling thing is that in the MATLAB code the fudge is 1e3 (not 1e-3) ?!? - fudge_factor = 1e-3 - cals = np.array( - [_ch["cal"] * _ch["range"] * fudge_factor for _ch in self.info["chs"]] - ) + cals = np.array([_ch["cal"] * _ch["range"] for _ch in self.info["chs"]]) self._data = _data * cals[:, np.newaxis] From 494ac4b06f7e91b683ea4d654ece26f6abf73556 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Tue, 12 Aug 2025 13:35:22 -0500 Subject: [PATCH 08/13] rough in otb4 reader [ci skip] --- mne/io/otb/otb.py | 207 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 180 insertions(+), 27 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index dc01cee2a1c..73059fbca32 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -15,8 +15,8 @@ from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw -# these are the only non-data channel IDs (besides "AUX*", handled via glob) -_NON_DATA_CHS = ("Quaternion", "BufferChannel", "RampChannel", "LoadCellChannel") +# these will all get mapped to `misc`. Quaternion channels are handled separately. +_NON_DATA_CHS = ("buffer", "ramp", "loadcell", "aux") def _parse_date(dt): @@ -50,13 +50,14 @@ def _parse_sex(sex): def _parse_otb_plus_metadata(metadata, extras_metadata): assert metadata.tag == "Device" # device-level metadata - sfreq = float(metadata.attrib["SampleFrequency"]) - n_chan = int(metadata.attrib["DeviceTotalChannels"]) + adc_range = 0.0033 # 3.3 mV (TODO VERIFY) bit_depth = int(metadata.attrib["ad_bits"]) device_name = metadata.attrib["Name"] - adc_range = 0.0033 # 3.3 mV (TODO VERIFY) + n_chan = int(metadata.attrib["DeviceTotalChannels"]) + sfreq = float(metadata.attrib["SampleFrequency"]) # containers gains = np.full(n_chan, np.nan) + scalings = np.full(n_chan, np.nan) ch_names = list() ch_types = list() highpass = list() @@ -93,19 +94,22 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): # store gains gain_ix = ix + ch_offset gains[gain_ix] = float(ch.get("Gain")) * adapter_gain - # TODO verify ch_type for quats, buffer channel, and ramp channel - # ramp and controls channels definitely "MISC" + # TODO verify ch_type for quats & buffer channel + # ramp and control channels definitely "MISC" # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify # what quats should be, as there are only 4 quat channels. The FIFF quats: # 0: obsolete # 1-3: rotations # 4-6: translations - if ch_id == "Quaternions": - ch_type = FIFF.FIFFV_QUAT_0 # TODO verify - elif ch_id in _NON_DATA_CHS or ch_id.lower().startswith("aux"): + if ch_id.startswith("Quaternion"): + ch_type = "chpi" # TODO verify + scalings[gain_ix] = 1e-4 + elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _NON_DATA_CHS): ch_type = "misc" + scalings[gain_ix] = 1.0 else: ch_type = "emg" + scalings[gain_ix] = 1.0 ch_types.append(ch_type) # parse subject info subject_info = _parse_patient_xml(extras_metadata) @@ -122,11 +126,151 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): ch_types=ch_types, highpass=highpass, lowpass=lowpass, + units=None, + scalings=scalings, + signal_paths=None, ) def _parse_otb_four_metadata(metadata, extras_metadata): - pass + def get_str(node, tag): + return node.find(tag).text + + def get_int(node, tag): + return int(get_str(node, tag)) + + def get_float(node, tag, **replacements): + # filter freqs may be "Unknown", can't blindly parse as floats + val = get_str(node, tag) + return replacements.get(val, float(val)) + + assert metadata.tag == "DeviceParameters" + # device-level metadata + adc_range = float(metadata.find("ADC_Range").text) + bit_depth = int(metadata.find("AdBits").text) + n_chan = int(metadata.find("TotalChannelsInFile").text) + sfreq = float(metadata.find("SamplingFrequency").text) + # containers + gains = np.full(n_chan, np.nan) + ch_names = list() + ch_types = list() + highpass = list() + lowpass = list() + n_chans = list() + paths = list() + scalings = list() + units = list() + # stored per-adapter, but should be uniform + adc_ranges = set() + bit_depths = set() + device_names = set() + meas_dates = set() + sfreqs = set() + # adapter-level metadata + for adapter in extras_metadata.iter("TrackInfo"): + # expected to be same for all adapters + adc_ranges.add(get_float(adapter, "ADC_Range")) + bit_depths.add(get_int(adapter, "ADC_Nbits")) + device_names.add(get_str(adapter, "Device")) + sfreqs.add(get_int(adapter, "SamplingFrequency")) + # may be different for each adapter + adapter_id = get_str(adapter, "SubTitle") + adapter_gain = get_float(adapter, "Gain") + ch_offset = get_int(adapter, "AcquisitionChannel") + n_chans.append(get_int(adapter, "NumberOfChannels")) + paths.append(get_str(adapter, "SignalStreamPath")) + scalings.append(get_str(adapter, "UnitOfMeasurementFactor")) + units.append(get_str(adapter, "UnitOfMeasurement")) + # we only really care about lowpass/highpass on the data channels + if adapter_id not in ("Quaternion", "Buffer", "Ramp"): + highpass = get_float(adapter, "HighPassFilter", Unknown=None) + lowpass = get_float(adapter, "LowPassFilter", Unknown=None) + # meas_date + meas_date = adapter.find("StringsDescriptions").find("StartDate") + if meas_date is not None: + meas_dates.add(_parse_date(meas_date.text).astimezone(timezone.utc)) + # # range (TODO maybe not needed; might be just for mfg's GUI?) + # # FWIW in the example file: range for Buffer is 1-100, + # # Ramp and Control are -32767-32768, and + # # EMG chs are ±2.1237507098703645E-05 + # rmin = get_float(adapter, "RangeMin") + # rmax = get_float(adapter, "RangeMax") + # if rmin.is_integer() and rmax.is_integer(): + # rmin = int(rmin) + # rmax = int(rmax) + + # extract channel-specific info ↓ not a typo + for ch in adapter.get("Channels").iter("ChannelRapresentation"): + # channel names + ix = int(ch.find("Index").text) + ch_name = ch.find("Label").text + try: + _ = int(ch_name) + except ValueError: + pass + else: + ch_name = f"{adapter_id}_{ix}" + ch_names.append(ch_name) + # gains + gains[ix + ch_offset] = adapter_gain + # channel types + # TODO verify for quats & buffer channel + # ramp and control channels definitely "MISC" + # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify + # what quats should be, as there are only 4 quat channels. The FIFF quats: + # 0: obsolete + # 1-3: rotations + # 4-6: translations + if adapter_id.startswith("Quaternion"): + ch_type = "chpi" # TODO verify + elif any(adapter_id.lower().startswith(_ch) for _ch in _NON_DATA_CHS): + ch_type = "misc" + else: + ch_type = "emg" + ch_types.append(ch_type) + + # validate the fields stored at adapter level, but that ought to be uniform: + def check_uniform(name, adapter_values, device_value=None): + if len(adapter_values) > 1: + raise RuntimeError( + f"multiple {name}s found ({', '.join(sorted(adapter_values))}), " + "this is not yet supported" + ) + adapter_value = adapter_values.pop() + if device_value is not None and device_value != adapter_value: + raise RuntimeError( + f"mismatch between device-level {name} ({device_value}) and " + f"adapter-level {name} ({adapter_value})" + ) + return adapter_value + + device_name = check_uniform("device name", device_names) + adc_range = check_uniform("analog-to-digital range", adc_ranges, adc_range) + bit_depth = check_uniform("bit depth", bit_depths, bit_depth) + sfreq = check_uniform("sampling frequency", sfreqs, sfreq) + + # verify number of channels in device metadata matches sum of adapters + assert sum(n_chans) == n_chan, ( + f"total channels ({n_chan}) doesn't match sum of channels for each adapter " + f"({sum(n_chans)})" + ) + + return dict( + adc_range=adc_range, + bit_depth=bit_depth, + ch_names=ch_names, + ch_types=ch_types, + device_name=device_name, + gains=gains, + highpass=highpass, + lowpass=lowpass, + n_chan=n_chan, + scalings=scalings, + sfreq=sfreq, + signal_paths=paths, + subject_info=dict(), + units=units, + ) @fill_doc @@ -160,8 +304,9 @@ def __init__(self, fname, *, verbose=None): fnames = fid.getnames() # the .sig file(s) are the binary channel data. sig_fnames = [_fname for _fname in fnames if _fname.endswith(".sig")] - # TODO ↓↓↓↓↓↓↓↓ make compatible with multiple sig_fnames - data_size_bytes = fid.getmember(sig_fnames[0]).size + # TODO ↓↓↓↓↓↓↓↓ this may be wrong for Novecento+ devices + # (MATLAB code appears to skip the first sig_fname) + data_size_bytes = sum(fid.getmember(_fname).size for _fname in sig_fnames) # triage the file format versions if v4_format: metadata_fname = "DeviceParameters.xml" @@ -187,17 +332,20 @@ def __init__(self, fname, *, verbose=None): extras_tree = ET.fromstring(fid.extractfile(extras_fname).read()) # extract what we need from the tree metadata = parse_func(metadata_tree, extras_tree) - sfreq = metadata["sfreq"] - n_chan = metadata["n_chan"] - bit_depth = metadata["bit_depth"] - device_name = metadata["device_name"] adc_range = metadata["adc_range"] - subject_info = metadata["subject_info"] + bit_depth = metadata["bit_depth"] ch_names = metadata["ch_names"] ch_types = metadata["ch_types"] + device_name = metadata["device_name"] gains = metadata["gains"] highpass = metadata["highpass"] lowpass = metadata["lowpass"] + n_chan = metadata["n_chan"] + scalings = metadata["scalings"] + sfreq = metadata["sfreq"] + signal_paths = metadata["signal_paths"] + subject_info = metadata["subject_info"] + # units = metadata["units"] # TODO needed for orig_units maybe if bit_depth == 16: _dtype = np.int16 @@ -206,7 +354,7 @@ def __init__(self, fname, *, verbose=None): # https://stackoverflow.com/a/34128171 # https://stackoverflow.com/a/11967503 raise NotImplementedError( - "OTB+ files with 24-bit data are not yet supported." + "OTB files with 24-bit data are not yet supported." ) else: raise NotImplementedError( @@ -256,8 +404,8 @@ def __init__(self, fname, *, verbose=None): info["highpass"] = highpass info["lowpass"] = lowpass for ix, _ch in enumerate(info["chs"]): - cal = 1 / 2**bit_depth / gains[ix] - # TODO need different range for Quaternions? + # divisor = 1.0 if _ch["kind"] == FIFF.FIFFV_MISC_CH else 2**bit_depth + cal = 1 / 2**bit_depth / gains[ix] * scalings[ix] _range = ( adc_range if _ch["kind"] in (FIFF.FIFFV_EMG_CH, FIFF.FIFFV_EEG_CH) @@ -269,7 +417,7 @@ def __init__(self, fname, *, verbose=None): timezone.utc ) - # sanity check + # verify duration from metadata matches n_samples/sfreq dur = extras_tree.find("duration") if dur is not None: np.testing.assert_almost_equal( @@ -282,7 +430,12 @@ def __init__(self, fname, *, verbose=None): # TODO parse files markers_0.xml, markers_1.xml as annotations? # populate raw_extras - raw_extras = dict(dtype=_dtype, sig_fnames=sig_fnames) + raw_extras = dict( + device_name=device_name, + dtype=_dtype, + sig_fnames=sig_fnames, + signal_paths=signal_paths, + ) FORMAT_MAPPING = dict( d="double", f="single", @@ -306,6 +459,9 @@ def _preload_data(self, preload): """Load raw data from an OTB+ file.""" _extras = self._raw_extras[0] sig_fnames = _extras["sig_fnames"] + # if device_name=="Novecento+" then we may need these: + # sig_paths = _extras["signal_paths"] + # device_name = _extras["device_name"] with tarfile.open(self.filenames[0], "r") as fid: _data = list() @@ -318,10 +474,7 @@ def _preload_data(self, preload): .reshape(-1, self.info["nchan"]) .T ) - if len(_data) == 1: - _data = _data[0] - else: - _data = np.concatenate(_data, axis=0) + _data = np.concatenate(_data, axis=0) # no-op if len(_data) == 1 cals = np.array([_ch["cal"] * _ch["range"] for _ch in self.info["chs"]]) self._data = _data * cals[:, np.newaxis] From 103e78b93e60919f1347875d54b4255ba138d1eb Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Thu, 14 Aug 2025 17:41:22 -0500 Subject: [PATCH 09/13] most things working for otb4 [ci skip] --- mne/io/otb/otb.py | 255 ++++++++++++++++++++++++++-------------------- 1 file changed, 142 insertions(+), 113 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 73059fbca32..9fdbe097d63 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -10,7 +10,6 @@ import numpy as np from defusedxml import ElementTree as ET -from ..._fiff.constants import FIFF from ..._fiff.meas_info import create_info from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw @@ -19,34 +18,6 @@ _NON_DATA_CHS = ("buffer", "ramp", "loadcell", "aux") -def _parse_date(dt): - return datetime.fromisoformat(dt).date() - - -def _parse_patient_xml(tree): - """Convert an ElementTree to a dict.""" - - def _parse_sex(sex): - # For devices that generate `.otb+` files, the recording GUI only has M or F - # options and choosing one is mandatory. For `.otb4` the field is optional. - return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 # 0 means "unknown" - - subj_info_mapping = ( - ("family_name", "last_name", str), - ("first_name", "first_name", str), - ("weight", "weight", float), - ("height", "height", float), - ("sex", "sex", _parse_sex), - ("birth_date", "birthday", _parse_date), - ) - subject_info = dict() - for source, target, func in subj_info_mapping: - value = tree.find(source) - if value is not None: - subject_info[target] = func(value.text) - return subject_info - - def _parse_otb_plus_metadata(metadata, extras_metadata): assert metadata.tag == "Device" # device-level metadata @@ -56,6 +27,7 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): n_chan = int(metadata.attrib["DeviceTotalChannels"]) sfreq = float(metadata.attrib["SampleFrequency"]) # containers + adc_ranges = np.full(n_chan, np.nan) gains = np.full(n_chan, np.nan) scalings = np.full(n_chan, np.nan) ch_names = list() @@ -103,32 +75,68 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): # 4-6: translations if ch_id.startswith("Quaternion"): ch_type = "chpi" # TODO verify - scalings[gain_ix] = 1e-4 + scalings[gain_ix] = 1e-3 # TODO CHPI is usually 1e-4 + adc_ranges[gain_ix] = 1.0 elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _NON_DATA_CHS): ch_type = "misc" scalings[gain_ix] = 1.0 + adc_ranges[gain_ix] = 1.0 else: ch_type = "emg" scalings[gain_ix] = 1.0 + adc_ranges[:] = adc_range ch_types.append(ch_type) + # parse subject info - subject_info = _parse_patient_xml(extras_metadata) + def get_str(node, tag): + val = node.find(tag) + if val is not None: + return val.text + + def parse_date(dt): + return datetime.fromisoformat(dt).date() + + def parse_sex(sex): + # For devices that generate `.otb+` files, the recording GUI only has M or F + # options and choosing one is mandatory. + return dict(m=1, f=2)[sex.lower()[0]] if sex else 0 # 0 means "unknown" + + subj_info_mapping = ( + ("family_name", "last_name", str), + ("first_name", "first_name", str), + ("weight", "weight", float), + ("height", "height", float), + ("sex", "sex", parse_sex), + ("birth_date", "birthday", parse_date), + ) + subject_info = dict() + for source, target, func in subj_info_mapping: + value = get_str(extras_metadata, source) + if value is not None: + subject_info[target] = func(value) + + meas_date = get_str(extras_metadata, "time") + duration = get_str(extras_metadata, "duration") + site = get_str(extras_metadata, "place") return dict( - sfreq=sfreq, - n_chan=n_chan, + adc_range=adc_ranges, bit_depth=bit_depth, - device_name=device_name, - adc_range=adc_range, - subject_info=subject_info, - gains=gains, ch_names=ch_names, ch_types=ch_types, + device_name=device_name, + duration=duration, + gains=gains, highpass=highpass, lowpass=lowpass, - units=None, + meas_date=meas_date, + n_chan=n_chan, scalings=scalings, + sfreq=sfreq, signal_paths=None, + site=site, + subject_info=subject_info, + units=None, ) @@ -142,16 +150,15 @@ def get_int(node, tag): def get_float(node, tag, **replacements): # filter freqs may be "Unknown", can't blindly parse as floats val = get_str(node, tag) - return replacements.get(val, float(val)) + return replacements[val] if val in replacements else float(val) assert metadata.tag == "DeviceParameters" # device-level metadata - adc_range = float(metadata.find("ADC_Range").text) - bit_depth = int(metadata.find("AdBits").text) - n_chan = int(metadata.find("TotalChannelsInFile").text) - sfreq = float(metadata.find("SamplingFrequency").text) + bit_depth = get_int(metadata, "AdBits") # TODO use `SampleSize * 8` instead? + sfreq = get_float(metadata, "SamplingFrequency") + device_gain = get_float(metadata, "Gain") # containers - gains = np.full(n_chan, np.nan) + gains = list() ch_names = list() ch_types = list() highpass = list() @@ -160,35 +167,42 @@ def get_float(node, tag, **replacements): paths = list() scalings = list() units = list() + adc_ranges = list() # stored per-adapter, but should be uniform - adc_ranges = set() bit_depths = set() device_names = set() + durations = set() meas_dates = set() sfreqs = set() # adapter-level metadata for adapter in extras_metadata.iter("TrackInfo"): + strings = adapter.find("StringsDescriptions") # expected to be same for all adapters - adc_ranges.add(get_float(adapter, "ADC_Range")) bit_depths.add(get_int(adapter, "ADC_Nbits")) device_names.add(get_str(adapter, "Device")) sfreqs.add(get_int(adapter, "SamplingFrequency")) + durations.add(get_float(adapter, "TimeDuration")) # may be different for each adapter + adapter_adc_range = get_float(adapter, "ADC_Range") adapter_id = get_str(adapter, "SubTitle") adapter_gain = get_float(adapter, "Gain") - ch_offset = get_int(adapter, "AcquisitionChannel") + adapter_scaling = 1.0 / get_float(adapter, "UnitOfMeasurementFactor") + # ch_offset = get_int(adapter, "AcquisitionChannel") n_chans.append(get_int(adapter, "NumberOfChannels")) paths.append(get_str(adapter, "SignalStreamPath")) - scalings.append(get_str(adapter, "UnitOfMeasurementFactor")) units.append(get_str(adapter, "UnitOfMeasurement")) # we only really care about lowpass/highpass on the data channels if adapter_id not in ("Quaternion", "Buffer", "Ramp"): - highpass = get_float(adapter, "HighPassFilter", Unknown=None) - lowpass = get_float(adapter, "LowPassFilter", Unknown=None) + hp = get_float(strings, "HighPassFilter", Unknown=None) + lp = get_float(strings, "LowPassFilter", Unknown=None) + if hp is not None: + highpass.append(hp) + if lp is not None: + lowpass.append(lp) # meas_date - meas_date = adapter.find("StringsDescriptions").find("StartDate") + meas_date = strings.find("StartDate") if meas_date is not None: - meas_dates.add(_parse_date(meas_date.text).astimezone(timezone.utc)) + meas_dates.add(meas_date.text) # # range (TODO maybe not needed; might be just for mfg's GUI?) # # FWIW in the example file: range for Buffer is 1-100, # # Ramp and Control are -32767-32768, and @@ -199,8 +213,8 @@ def get_float(node, tag, **replacements): # rmin = int(rmin) # rmax = int(rmax) - # extract channel-specific info ↓ not a typo - for ch in adapter.get("Channels").iter("ChannelRapresentation"): + # extract channel-specific info ↓ not a typo + for ch in adapter.find("Channels").iter("ChannelRapresentation"): # channel names ix = int(ch.find("Index").text) ch_name = ch.find("Label").text @@ -211,8 +225,10 @@ def get_float(node, tag, **replacements): else: ch_name = f"{adapter_id}_{ix}" ch_names.append(ch_name) - # gains - gains[ix + ch_offset] = adapter_gain + # signal properties + adc_ranges.append(adapter_adc_range) + gains.append(adapter_gain * device_gain) + scalings.append(adapter_scaling) # channel types # TODO verify for quats & buffer channel # ramp and control channels definitely "MISC" @@ -232,9 +248,9 @@ def get_float(node, tag, **replacements): # validate the fields stored at adapter level, but that ought to be uniform: def check_uniform(name, adapter_values, device_value=None): if len(adapter_values) > 1: + vals = sorted(map(str, adapter_values)) raise RuntimeError( - f"multiple {name}s found ({', '.join(sorted(adapter_values))}), " - "this is not yet supported" + f"multiple {name}s found ({', '.join(vals)}), this is not yet supported" ) adapter_value = adapter_values.pop() if device_value is not None and device_value != adapter_value: @@ -244,30 +260,30 @@ def check_uniform(name, adapter_values, device_value=None): ) return adapter_value - device_name = check_uniform("device name", device_names) - adc_range = check_uniform("analog-to-digital range", adc_ranges, adc_range) + # ADC_Nbits is listed as zero for non-data channels; ignore + bit_depths.discard(0) bit_depth = check_uniform("bit depth", bit_depths, bit_depth) + device_name = check_uniform("device name", device_names) + duration = check_uniform("duration", durations) sfreq = check_uniform("sampling frequency", sfreqs, sfreq) - - # verify number of channels in device metadata matches sum of adapters - assert sum(n_chans) == n_chan, ( - f"total channels ({n_chan}) doesn't match sum of channels for each adapter " - f"({sum(n_chans)})" - ) + meas_date = check_uniform("meas date", meas_dates) return dict( - adc_range=adc_range, + adc_range=adc_ranges, bit_depth=bit_depth, ch_names=ch_names, ch_types=ch_types, device_name=device_name, + duration=duration, gains=gains, highpass=highpass, lowpass=lowpass, - n_chan=n_chan, - scalings=scalings, + meas_date=meas_date, + n_chan=sum(n_chans), + scalings=np.array(scalings), sfreq=sfreq, signal_paths=paths, + site=None, subject_info=dict(), units=units, ) @@ -337,25 +353,35 @@ def __init__(self, fname, *, verbose=None): ch_names = metadata["ch_names"] ch_types = metadata["ch_types"] device_name = metadata["device_name"] + duration = metadata["duration"] gains = metadata["gains"] highpass = metadata["highpass"] lowpass = metadata["lowpass"] + meas_date = metadata["meas_date"] n_chan = metadata["n_chan"] scalings = metadata["scalings"] sfreq = metadata["sfreq"] signal_paths = metadata["signal_paths"] + site = metadata["site"] subject_info = metadata["subject_info"] # units = metadata["units"] # TODO needed for orig_units maybe + # bit_depth seems to be unreliable for some OTB4 files, so let's check: + if duration is not None: # None for OTB+ files + expected_n_samp = int(duration * sfreq * n_chan) + expected_bit_depth = int(8 * data_size_bytes / expected_n_samp) + if bit_depth != expected_bit_depth: + warn( + f"mismatch between file metadata `AdBits` ({bit_depth} bit) and " + "computed sample size based on reported duration, sampling " + f"frequency, and number of channels ({expected_bit_depth} bit). " + "Using the computed bit depth." + ) + bit_depth = expected_bit_depth if bit_depth == 16: _dtype = np.int16 - elif bit_depth == 24: # EEG data recorded on OTB devices do this - # this is possible but will be a bit tricky, see: - # https://stackoverflow.com/a/34128171 - # https://stackoverflow.com/a/11967503 - raise NotImplementedError( - "OTB files with 24-bit data are not yet supported." - ) + elif bit_depth == 24: # EEG data recorded on OTB devices may be like this + _dtype = np.uint8 # hack, see `_preload_data` method else: raise NotImplementedError( f"expected 16- or 24-bit data, but file metadata says {bit_depth}-bit. " @@ -371,58 +397,52 @@ def __init__(self, fname, *, verbose=None): f"Number of bytes in file ({data_size_bytes}) not evenly divided by " f"number of channels ({n_chan}). File may be corrupted or truncated." ) - n_samples = int(n_samples) - # validate gains + # validate assert np.isfinite(gains).all() + assert np.isfinite(adc_range).all() + assert np.isfinite(scalings).all() # check filter freqs. Can vary by adapter, so in theory we might get different # filters for different *data* channels (not just different between data and # misc/aux/whatever). - if len(highpass) > 1: - warn( - "More than one highpass frequency found in file; choosing lowest " - f"({min(highpass)} Hz)" - ) - if len(lowpass) > 1: - warn( - "More than one lowpass frequency found in file; choosing highest " - f"({max(lowpass)} Hz)" - ) - highpass = min(highpass) - lowpass = max(lowpass) + def check_filter_freqs(name, freqs): + if not len(freqs): + return None + else: + extreme = dict(highpass="lowest", lowpass="highest")[name] + func = dict(highpass=min, lowpass=max)[name] + extremum = func(freqs) + if len(freqs) > 1: + warn( + f"More than one {name} frequency found in file; choosing " + f"{extreme} ({extremum} Hz)" + ) + return extremum + + highpass = check_filter_freqs("highpass", highpass) + lowpass = check_filter_freqs("lowpass", lowpass) # create info info = create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq) device_info = dict(type="OTB", model=device_name) # other allowed keys: serial - meas_date = extras_tree.find("time") - site = extras_tree.find("place") if site is not None: - device_info.update(site=site.text) + device_info.update(site=site) info.update(subject_info=subject_info, device_info=device_info) with info._unlock(): info["highpass"] = highpass info["lowpass"] = lowpass for ix, _ch in enumerate(info["chs"]): - # divisor = 1.0 if _ch["kind"] == FIFF.FIFFV_MISC_CH else 2**bit_depth cal = 1 / 2**bit_depth / gains[ix] * scalings[ix] - _range = ( - adc_range - if _ch["kind"] in (FIFF.FIFFV_EMG_CH, FIFF.FIFFV_EEG_CH) - else 1.0 - ) - _ch.update(cal=cal, range=_range) + _ch.update(cal=cal, range=adc_range[ix]) if meas_date is not None: - info["meas_date"] = datetime.fromisoformat(meas_date.text).astimezone( + info["meas_date"] = datetime.fromisoformat(meas_date).astimezone( timezone.utc ) # verify duration from metadata matches n_samples/sfreq - dur = extras_tree.find("duration") - if dur is not None: - np.testing.assert_almost_equal( - float(dur.text), n_samples / sfreq, decimal=3 - ) + if duration is not None: + np.testing.assert_almost_equal(duration, n_samples / sfreq, decimal=3) # TODO other fields in extras_tree for otb+ format: # protocol_code, pathology, commentsPatient, comments @@ -441,6 +461,7 @@ def __init__(self, fname, *, verbose=None): f="single", i="int", h="short", + B="int", # hack, we read 24-bit as uint8 and upcast to 32-bit signed int ) orig_format = FORMAT_MAPPING[_dtype().dtype.char] @@ -466,14 +487,22 @@ def _preload_data(self, preload): with tarfile.open(self.filenames[0], "r") as fid: _data = list() for sig_fname in sig_fnames: - _data.append( - np.frombuffer( - fid.extractfile(sig_fname).read(), - dtype=_extras["dtype"], - ) - .reshape(-1, self.info["nchan"]) - .T + this_data = np.frombuffer( + fid.extractfile(sig_fname).read(), _extras["dtype"] ) + if _extras["dtype"] is np.uint8: # hack to handle 24-bit data + # adapted from wavio._wav2array © 2015-2022 Warren Weckesser (BSD-2) + a = np.empty((this_data.size // 3, 4), dtype=np.uint8) + a[..., :3] = this_data.reshape(-1, 3) + # we read in 24-bit data as unsigned ints, but assume that it was + # actually *signed* data. So we check the most significant bit: + msb = a[..., 2:3] >> 7 + # Where it was 1, the 24-bit number was negative, so the added byte + # should be 11111111; otherwise it should be 00000000. + a[..., 3:] = msb * np.iinfo(np.uint8).max + # now we upcast to signed 32-bit and remove the extra dimension + this_data = np.squeeze(a.view(np.int32), axis=-1) + _data.append(this_data.reshape(-1, self.info["nchan"]).T) _data = np.concatenate(_data, axis=0) # no-op if len(_data) == 1 cals = np.array([_ch["cal"] * _ch["range"] for _ch in self.info["chs"]]) From 04c4117d6b9932f168957add0b5d1ab284fc3fba Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Fri, 15 Aug 2025 17:28:09 -0500 Subject: [PATCH 10/13] various fixes [ci skip] --- mne/io/otb/otb.py | 103 +++++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 9fdbe097d63..49725872238 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -15,7 +15,24 @@ from ..base import BaseRaw # these will all get mapped to `misc`. Quaternion channels are handled separately. -_NON_DATA_CHS = ("buffer", "ramp", "loadcell", "aux") +_CONTROL_CHS = ("buffer", "ramp") +_AUX_CHS = ("loadcell", "aux") + + +def _get_str(node, tag): + val = node.find(tag) + if val is not None: + return val.text + + +def _get_int(node, tag): + return int(_get_str(node, tag)) + + +def _get_float(node, tag, **replacements): + # filter freqs may be "Unknown", can't blindly parse as floats + val = _get_str(node, tag) + return replacements[val] if val in replacements else float(val) def _parse_otb_plus_metadata(metadata, extras_metadata): @@ -75,9 +92,13 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): # 4-6: translations if ch_id.startswith("Quaternion"): ch_type = "chpi" # TODO verify - scalings[gain_ix] = 1e-3 # TODO CHPI is usually 1e-4 + scalings[gain_ix] = 1e-3 # CHPI is usually 1e-4; limbs move more + adc_ranges[gain_ix] = 1.0 + elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _CONTROL_CHS): + ch_type = "stim" + scalings[gain_ix] = 1.0 adc_ranges[gain_ix] = 1.0 - elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _NON_DATA_CHS): + elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _AUX_CHS): ch_type = "misc" scalings[gain_ix] = 1.0 adc_ranges[gain_ix] = 1.0 @@ -88,11 +109,6 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): ch_types.append(ch_type) # parse subject info - def get_str(node, tag): - val = node.find(tag) - if val is not None: - return val.text - def parse_date(dt): return datetime.fromisoformat(dt).date() @@ -111,13 +127,13 @@ def parse_sex(sex): ) subject_info = dict() for source, target, func in subj_info_mapping: - value = get_str(extras_metadata, source) + value = _get_str(extras_metadata, source) if value is not None: subject_info[target] = func(value) - meas_date = get_str(extras_metadata, "time") - duration = get_str(extras_metadata, "duration") - site = get_str(extras_metadata, "place") + meas_date = _get_str(extras_metadata, "time") + duration = _get_float(extras_metadata, "duration") + site = _get_str(extras_metadata, "place") return dict( adc_range=adc_ranges, @@ -141,22 +157,11 @@ def parse_sex(sex): def _parse_otb_four_metadata(metadata, extras_metadata): - def get_str(node, tag): - return node.find(tag).text - - def get_int(node, tag): - return int(get_str(node, tag)) - - def get_float(node, tag, **replacements): - # filter freqs may be "Unknown", can't blindly parse as floats - val = get_str(node, tag) - return replacements[val] if val in replacements else float(val) - assert metadata.tag == "DeviceParameters" # device-level metadata - bit_depth = get_int(metadata, "AdBits") # TODO use `SampleSize * 8` instead? - sfreq = get_float(metadata, "SamplingFrequency") - device_gain = get_float(metadata, "Gain") + bit_depth = _get_int(metadata, "AdBits") # TODO use `SampleSize * 8` instead? + sfreq = _get_float(metadata, "SamplingFrequency") + device_gain = _get_float(metadata, "Gain") # containers gains = list() ch_names = list() @@ -178,23 +183,26 @@ def get_float(node, tag, **replacements): for adapter in extras_metadata.iter("TrackInfo"): strings = adapter.find("StringsDescriptions") # expected to be same for all adapters - bit_depths.add(get_int(adapter, "ADC_Nbits")) - device_names.add(get_str(adapter, "Device")) - sfreqs.add(get_int(adapter, "SamplingFrequency")) - durations.add(get_float(adapter, "TimeDuration")) + bit_depths.add(_get_int(adapter, "ADC_Nbits")) + device_names.add(_get_str(adapter, "Device")) + sfreqs.add(_get_int(adapter, "SamplingFrequency")) + durations.add(_get_float(adapter, "TimeDuration")) # may be different for each adapter - adapter_adc_range = get_float(adapter, "ADC_Range") - adapter_id = get_str(adapter, "SubTitle") - adapter_gain = get_float(adapter, "Gain") - adapter_scaling = 1.0 / get_float(adapter, "UnitOfMeasurementFactor") - # ch_offset = get_int(adapter, "AcquisitionChannel") - n_chans.append(get_int(adapter, "NumberOfChannels")) - paths.append(get_str(adapter, "SignalStreamPath")) - units.append(get_str(adapter, "UnitOfMeasurement")) + adapter_adc_range = _get_float(adapter, "ADC_Range") + adapter_id = _get_str(adapter, "SubTitle") + adapter_gain = _get_float(adapter, "Gain") + if adapter_id.startswith("Quaternion"): + adapter_scaling = 1e-3 + else: + adapter_scaling = 1.0 / _get_float(adapter, "UnitOfMeasurementFactor") + # ch_offset = _get_int(adapter, "AcquisitionChannel") + n_chans.append(_get_int(adapter, "NumberOfChannels")) + paths.append(_get_str(adapter, "SignalStreamPath")) + units.append(_get_str(adapter, "UnitOfMeasurement")) # we only really care about lowpass/highpass on the data channels if adapter_id not in ("Quaternion", "Buffer", "Ramp"): - hp = get_float(strings, "HighPassFilter", Unknown=None) - lp = get_float(strings, "LowPassFilter", Unknown=None) + hp = _get_float(strings, "HighPassFilter", Unknown=None) + lp = _get_float(strings, "LowPassFilter", Unknown=None) if hp is not None: highpass.append(hp) if lp is not None: @@ -207,8 +215,8 @@ def get_float(node, tag, **replacements): # # FWIW in the example file: range for Buffer is 1-100, # # Ramp and Control are -32767-32768, and # # EMG chs are ±2.1237507098703645E-05 - # rmin = get_float(adapter, "RangeMin") - # rmax = get_float(adapter, "RangeMax") + # rmin = _get_float(adapter, "RangeMin") + # rmax = _get_float(adapter, "RangeMax") # if rmin.is_integer() and rmax.is_integer(): # rmin = int(rmin) # rmax = int(rmax) @@ -231,15 +239,18 @@ def get_float(node, tag, **replacements): scalings.append(adapter_scaling) # channel types # TODO verify for quats & buffer channel - # ramp and control channels definitely "MISC" + # ramp and control channels maybe "MISC", arguably "STIM"? # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify # what quats should be, as there are only 4 quat channels. The FIFF quats: - # 0: obsolete + # 0: obsolete (?) # 1-3: rotations # 4-6: translations if adapter_id.startswith("Quaternion"): ch_type = "chpi" # TODO verify - elif any(adapter_id.lower().startswith(_ch) for _ch in _NON_DATA_CHS): + # adc_ranges[gain_ix] = 1.0 + elif any(adapter_id.lower().startswith(_ch) for _ch in _CONTROL_CHS): + ch_type = "stim" + elif any(adapter_id.lower().startswith(_ch) for _ch in _AUX_CHS): ch_type = "misc" else: ch_type = "emg" @@ -369,7 +380,7 @@ def __init__(self, fname, *, verbose=None): # bit_depth seems to be unreliable for some OTB4 files, so let's check: if duration is not None: # None for OTB+ files expected_n_samp = int(duration * sfreq * n_chan) - expected_bit_depth = int(8 * data_size_bytes / expected_n_samp) + expected_bit_depth = int(np.rint(8 * data_size_bytes / expected_n_samp)) if bit_depth != expected_bit_depth: warn( f"mismatch between file metadata `AdBits` ({bit_depth} bit) and " From d9ae19b8fae49df3a5f5a2358e3caf5d63156bef Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Tue, 19 Aug 2025 16:19:33 -0500 Subject: [PATCH 11/13] refactor/cleanup --- mne/io/otb/otb.py | 129 ++++++++++++++++++++++------------------------ 1 file changed, 61 insertions(+), 68 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 49725872238..641daaafa48 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -19,30 +19,28 @@ _AUX_CHS = ("loadcell", "aux") -def _get_str(node, tag): - val = node.find(tag) - if val is not None: - return val.text - - -def _get_int(node, tag): - return int(_get_str(node, tag)) +def _get(node, attr, _type=str, **replacements): + val = node.get(attr) + # filter freqs may be "Unknown", can't blindly parse as floats + return replacements[val] if val in replacements else _type(val) -def _get_float(node, tag, **replacements): +def _find(node, tag, _type=str, **replacements): + val = node.find(tag) + if val is not None: + val = val.text # filter freqs may be "Unknown", can't blindly parse as floats - val = _get_str(node, tag) - return replacements[val] if val in replacements else float(val) + return replacements[val] if val in replacements else _type(val) def _parse_otb_plus_metadata(metadata, extras_metadata): assert metadata.tag == "Device" # device-level metadata adc_range = 0.0033 # 3.3 mV (TODO VERIFY) - bit_depth = int(metadata.attrib["ad_bits"]) - device_name = metadata.attrib["Name"] - n_chan = int(metadata.attrib["DeviceTotalChannels"]) - sfreq = float(metadata.attrib["SampleFrequency"]) + bit_depth = _get(metadata, "ad_bits", int) + device_name = _get(metadata, "Name") + n_chan = _get(metadata, "DeviceTotalChannels", int) + sfreq = _get(metadata, "SampleFrequency", float) # containers adc_ranges = np.full(n_chan, np.nan) gains = np.full(n_chan, np.nan) @@ -54,35 +52,26 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): # check in advance where we'll need to append indices to uniquify ch_names n_ch_by_type = Counter([ch.get("ID") for ch in metadata.iter("Channel")]) dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] + ch_ix = {key: iter(range(val)) for key, val in n_ch_by_type.items()} # iterate over adapters & channels to extract gain, filters, names, etc for adapter in metadata.iter("Adapter"): - adapter_id = adapter.get("ID") - adapter_gain = float(adapter.get("Gain")) - ch_offset = int(adapter.get("ChannelStartIndex")) + adapter_id = _get(adapter, "ID") + adapter_gain = _get(adapter, "Gain", float) + ch_offset = _get(adapter, "ChannelStartIndex", int) # we only really care about lowpass/highpass on the data channels - if adapter_id not in ("AdapterQuaternions", "AdapterControl"): - highpass.append(float(adapter.get("HighPassFilter"))) - lowpass.append(float(adapter.get("LowPassFilter"))) + if not any(adapter_id.startswith(t) for t in ("Adapter", "Direct connection")): + highpass.append(_get(adapter, "HighPassFilter", float, Unknown=None)) + lowpass.append(_get(adapter, "LowPassFilter", float, Unknown=None)) for ch in adapter.iter("Channel"): - ix = int(ch.get("Index")) - ch_id = ch.get("ID") - # # see if we can parse the adapter name to get row,col info - # pattern = re.compile( - # # connector type inter-elec dist grid rows grid cols - # r"(?:[a-zA-Z]+)(?:(?P\d+)MM)(?P\d{2})(?P\d{2})" - # ) - # if match := pattern.match(ch_id): - # col = ix % int(match["col"]) - # row = ix // int(match["row"]) - # ch_name = f"EMG_{adapter_ix}({row:02},{col:02})" - # elif ch_id - # else: - # ch_name = f"EMG_{ix + adapter_ch_offset:03}" - # ch_names.append(ch_name) - ch_names.append(f"{ch_id}_{ix}" if ch_id in dupl_ids else ch_id) + ch_id = _get(ch, "ID") + if ch_id in dupl_ids: + ch_names.append(f"{ch_id}_{next(ch_ix[ch_id])}") + else: + ch_names.append(ch_id) # store gains + ix = _get(ch, "Index", int) gain_ix = ix + ch_offset - gains[gain_ix] = float(ch.get("Gain")) * adapter_gain + gains[gain_ix] = _get(ch, "Gain", float) * adapter_gain # TODO verify ch_type for quats & buffer channel # ramp and control channels definitely "MISC" # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify @@ -127,13 +116,13 @@ def parse_sex(sex): ) subject_info = dict() for source, target, func in subj_info_mapping: - value = _get_str(extras_metadata, source) + value = _find(extras_metadata, source) if value is not None: subject_info[target] = func(value) - meas_date = _get_str(extras_metadata, "time") - duration = _get_float(extras_metadata, "duration") - site = _get_str(extras_metadata, "place") + meas_date = _find(extras_metadata, "time") + duration = _find(extras_metadata, "duration", float) + site = _find(extras_metadata, "place") return dict( adc_range=adc_ranges, @@ -159,9 +148,9 @@ def parse_sex(sex): def _parse_otb_four_metadata(metadata, extras_metadata): assert metadata.tag == "DeviceParameters" # device-level metadata - bit_depth = _get_int(metadata, "AdBits") # TODO use `SampleSize * 8` instead? - sfreq = _get_float(metadata, "SamplingFrequency") - device_gain = _get_float(metadata, "Gain") + bit_depth = _find(metadata, "AdBits", int) # TODO use `SampleSize * 8` instead? + sfreq = _find(metadata, "SamplingFrequency", float) + device_gain = _find(metadata, "Gain", float) # containers gains = list() ch_names = list() @@ -183,26 +172,27 @@ def _parse_otb_four_metadata(metadata, extras_metadata): for adapter in extras_metadata.iter("TrackInfo"): strings = adapter.find("StringsDescriptions") # expected to be same for all adapters - bit_depths.add(_get_int(adapter, "ADC_Nbits")) - device_names.add(_get_str(adapter, "Device")) - sfreqs.add(_get_int(adapter, "SamplingFrequency")) - durations.add(_get_float(adapter, "TimeDuration")) + bit_depths.add(_find(adapter, "ADC_Nbits", int)) + device_names.add(_find(adapter, "Device")) + sfreqs.add(_find(adapter, "SamplingFrequency", int)) + durations.add(_find(adapter, "TimeDuration", float)) # may be different for each adapter - adapter_adc_range = _get_float(adapter, "ADC_Range") - adapter_id = _get_str(adapter, "SubTitle") - adapter_gain = _get_float(adapter, "Gain") + adapter_adc_range = _find(adapter, "ADC_Range", float) + adapter_id = _find(adapter, "SubTitle") + adapter_gain = _find(adapter, "Gain", float) if adapter_id.startswith("Quaternion"): adapter_scaling = 1e-3 else: - adapter_scaling = 1.0 / _get_float(adapter, "UnitOfMeasurementFactor") - # ch_offset = _get_int(adapter, "AcquisitionChannel") - n_chans.append(_get_int(adapter, "NumberOfChannels")) - paths.append(_get_str(adapter, "SignalStreamPath")) - units.append(_get_str(adapter, "UnitOfMeasurement")) + adapter_scaling = 1.0 / _find(adapter, "UnitOfMeasurementFactor", float) + # ch_offset = _find(adapter, "AcquisitionChannel", int) + adapter_n_chans = _find(adapter, "NumberOfChannels", int) + n_chans.append(adapter_n_chans) + paths.append(_find(adapter, "SignalStreamPath")) + units.append(_find(adapter, "UnitOfMeasurement")) # we only really care about lowpass/highpass on the data channels if adapter_id not in ("Quaternion", "Buffer", "Ramp"): - hp = _get_float(strings, "HighPassFilter", Unknown=None) - lp = _get_float(strings, "LowPassFilter", Unknown=None) + hp = _find(strings, "HighPassFilter", float, Unknown=None) + lp = _find(strings, "LowPassFilter", float, Unknown=None) if hp is not None: highpass.append(hp) if lp is not None: @@ -215,8 +205,8 @@ def _parse_otb_four_metadata(metadata, extras_metadata): # # FWIW in the example file: range for Buffer is 1-100, # # Ramp and Control are -32767-32768, and # # EMG chs are ±2.1237507098703645E-05 - # rmin = _get_float(adapter, "RangeMin") - # rmax = _get_float(adapter, "RangeMax") + # rmin = _find(adapter, "RangeMin", float) + # rmax = _find(adapter, "RangeMax", float) # if rmin.is_integer() and rmax.is_integer(): # rmin = int(rmin) # rmax = int(rmax) @@ -224,14 +214,17 @@ def _parse_otb_four_metadata(metadata, extras_metadata): # extract channel-specific info ↓ not a typo for ch in adapter.find("Channels").iter("ChannelRapresentation"): # channel names - ix = int(ch.find("Index").text) - ch_name = ch.find("Label").text - try: - _ = int(ch_name) - except ValueError: - pass + if adapter_n_chans == 1: + ch_name = adapter_id else: - ch_name = f"{adapter_id}_{ix}" + ix = int(ch.find("Index").text) + ch_name = ch.find("Label").text + try: + _ = int(ch_name) + except ValueError: + pass + else: + ch_name = f"{adapter_id}_{ix}" ch_names.append(ch_name) # signal properties adc_ranges.append(adapter_adc_range) From ed2296095947d6d0f55ed709c4026ed3360538d4 Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Wed, 20 Aug 2025 09:19:18 -0500 Subject: [PATCH 12/13] first pass at annotations [ci skip] --- mne/io/otb/otb.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index 641daaafa48..fc68c95d40c 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -10,6 +10,7 @@ import numpy as np from defusedxml import ElementTree as ET +from ... import Annotations from ..._fiff.meas_info import create_info from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw @@ -293,6 +294,30 @@ def check_uniform(name, adapter_values, device_value=None): ) +def _parse_annots(tree_list): + anns = set() # avoids duplicate annots + for tree in tree_list: + for marker in tree.iter("Marker"): + # TODO is it always "Milliseconds"? + ons = _find(marker, "Milliseconds", float) / 1e3 + # TODO will markers ever have duration? is duration + # encoded as onset/offset with 2 markers? + dur = 0.0 + # simplify descriptions + desc = _find(marker, "Description").strip() + if desc.startswith(sync := "Sync Pulse with code: "): + desc = int(desc.replace(sync, "")) + # add to containers + anns.add((ons, dur, desc)) + if anns: + onset, duration, description = zip(*sorted(anns)) + return Annotations( + onset=onset, + duration=duration, + description=description, + ) + + @fill_doc class RawOTB(BaseRaw): """Raw object from an OTB file. @@ -324,6 +349,8 @@ def __init__(self, fname, *, verbose=None): fnames = fid.getnames() # the .sig file(s) are the binary channel data. sig_fnames = [_fname for _fname in fnames if _fname.endswith(".sig")] + # the markers_NN.xml are the annotations + ann_fnames = [_fname for _fname in fnames if _fname.startswith("marker")] # TODO ↓↓↓↓↓↓↓↓ this may be wrong for Novecento+ devices # (MATLAB code appears to skip the first sig_fname) data_size_bytes = sum(fid.getmember(_fname).size for _fname in sig_fnames) @@ -350,8 +377,13 @@ def __init__(self, fname, *, verbose=None): # parse the XML into a tree metadata_tree = ET.fromstring(fid.extractfile(metadata_fname).read()) extras_tree = ET.fromstring(fid.extractfile(extras_fname).read()) + ann_trees = [ + ET.fromstring(fid.extractfile(ann_fname).read()) + for ann_fname in ann_fnames + ] # extract what we need from the tree metadata = parse_func(metadata_tree, extras_tree) + annots = _parse_annots(ann_trees) adc_range = metadata["adc_range"] bit_depth = metadata["bit_depth"] ch_names = metadata["ch_names"] @@ -479,6 +511,8 @@ def check_filter_freqs(name, freqs): raw_extras=[raw_extras], verbose=verbose, ) + if annots: + self.set_annotations(annots) def _preload_data(self, preload): """Load raw data from an OTB+ file.""" From da171e48e9e76c0fd114e4048e5fafc8ed8c2fbf Mon Sep 17 00:00:00 2001 From: Daniel McCloy Date: Fri, 29 Aug 2025 17:34:37 -0500 Subject: [PATCH 13/13] WIP using LUT for signal conversion factors [ci skip] --- mne/io/otb/otb.py | 108 ++++++++++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/mne/io/otb/otb.py b/mne/io/otb/otb.py index fc68c95d40c..01cb05b7b32 100644 --- a/mne/io/otb/otb.py +++ b/mne/io/otb/otb.py @@ -3,7 +3,7 @@ # Copyright the MNE-Python contributors. import tarfile -from collections import Counter +from collections import Counter, namedtuple from datetime import datetime, timezone from pathlib import Path @@ -15,9 +15,42 @@ from ...utils import _check_fname, fill_doc, logger, verbose, warn from ..base import BaseRaw -# these will all get mapped to `misc`. Quaternion channels are handled separately. -_CONTROL_CHS = ("buffer", "ramp") -_AUX_CHS = ("loadcell", "aux") +_CONTROL_CHS = ("buffer", "ramp") # mapped to `stim` +_AUX_CHS = ("loadcell", "aux") # mapped to `misc` +# Quaternion channels are handled separately (and are mapped to `chpi`) + + +def _signal_conversion_factors(device, adapter): + # values extracted from + # https://github.com/OTBioelettronica/OTB-Matlab/blob/main/MATLAB%20Open%20and%20Processing%20OTBFiles/OpenOTBFiles/OpenOTBfilesConvFact.m + convfact = namedtuple("OTBAdapterProperties", ("adc_range", "bit_depth", "gain")) + if adapter == "AdapterControl": + return convfact(adc_range=1.0, bit_depth=1, gain=1.0) + if adapter == "AdapterQuaternions": + # NB: original MATLAB code sets bit_depth=1 here ↓ + return convfact(adc_range=1.0, bit_depth=16, gain=1.0) + elif adapter in ("Direct connection", "Direct connection to Syncstation Input"): + return convfact(adc_range=5.0, bit_depth=16, gain=0.5) + elif device in ("QUATTRO", "QUATTROCENTO"): + return convfact(adc_range=5.0, bit_depth=16, gain=150.0) + elif device == "DUE": + return convfact(adc_range=5.0, bit_depth=16, gain=200.0) + elif device in ("DUE+", "QUATTRO+"): + return convfact(adc_range=3.3, bit_depth=16, gain=202.0) + # elif device == "MUOVI": + # return convfact(adc_range=..., bit_depth=16, gain=1.0) + elif device == "SYNCSTATION": + if adapter in ("Due+", "Quattro+"): + return convfact(adc_range=3.3, bit_depth=16, gain=202.0) + # elif adapter == "Sessantaquattro": + # return convfact(adc_range=..., bit_depth=16, gain=150.0) + elif adapter == "AdapterLoadCell": + # NB: original MATLAB code sets gain=205.0 here ↓ + return convfact(adc_range=5.0, bit_depth=16, gain=0.5) + else: + return convfact(adc_range=4.8, bit_depth=16, gain=256.0) + else: + return convfact(adc_range=4.8, bit_depth=16, gain=256.0) def _get(node, attr, _type=str, **replacements): @@ -37,15 +70,14 @@ def _find(node, tag, _type=str, **replacements): def _parse_otb_plus_metadata(metadata, extras_metadata): assert metadata.tag == "Device" # device-level metadata - adc_range = 0.0033 # 3.3 mV (TODO VERIFY) - bit_depth = _get(metadata, "ad_bits", int) device_name = _get(metadata, "Name") + device_bit_depth = _get(metadata, "ad_bits", int) n_chan = _get(metadata, "DeviceTotalChannels", int) sfreq = _get(metadata, "SampleFrequency", float) # containers adc_ranges = np.full(n_chan, np.nan) + bit_depths = np.full(n_chan, np.nan) gains = np.full(n_chan, np.nan) - scalings = np.full(n_chan, np.nan) ch_names = list() ch_types = list() highpass = list() @@ -54,10 +86,23 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): n_ch_by_type = Counter([ch.get("ID") for ch in metadata.iter("Channel")]) dupl_ids = [k for k, v in n_ch_by_type.items() if v > 1] ch_ix = {key: iter(range(val)) for key, val in n_ch_by_type.items()} + if "Quaternion" in ch_ix: + if (n_quat := n_ch_by_type["Quaternion"]) != 4: + raise ValueError( + f"Quaternions present, but there are {n_quat} of them (expected 4)" + ) + ch_ix["Quaternion"] = iter(list("wxyz")) # iterate over adapters & channels to extract gain, filters, names, etc for adapter in metadata.iter("Adapter"): adapter_id = _get(adapter, "ID") - adapter_gain = _get(adapter, "Gain", float) + adapter_adc_range, adapter_bit_depth, adapter_gain = _signal_conversion_factors( + device_name, adapter_id + ) + if (adapter_gain_xml := _get(adapter, "Gain", float)) not in (1, adapter_gain): + warn( + f"{device_name}, {adapter_id}, {adapter_gain_xml} from XML, " + f"{adapter_gain} from LUT" + ) ch_offset = _get(adapter, "ChannelStartIndex", int) # we only really care about lowpass/highpass on the data channels if not any(adapter_id.startswith(t) for t in ("Adapter", "Direct connection")): @@ -73,29 +118,25 @@ def _parse_otb_plus_metadata(metadata, extras_metadata): ix = _get(ch, "Index", int) gain_ix = ix + ch_offset gains[gain_ix] = _get(ch, "Gain", float) * adapter_gain - # TODO verify ch_type for quats & buffer channel - # ramp and control channels definitely "MISC" + adc_ranges[gain_ix] = adapter_adc_range + bit_depths[gain_ix] = adapter_bit_depth # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify # what quats should be, as there are only 4 quat channels. The FIFF quats: # 0: obsolete # 1-3: rotations # 4-6: translations if ch_id.startswith("Quaternion"): - ch_type = "chpi" # TODO verify - scalings[gain_ix] = 1e-3 # CHPI is usually 1e-4; limbs move more - adc_ranges[gain_ix] = 1.0 + ch_type = "chpi" + bit_depths[gain_ix] = device_bit_depth # override TODO SAY WHY + # ramp and buffer elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _CONTROL_CHS): ch_type = "stim" - scalings[gain_ix] = 1.0 - adc_ranges[gain_ix] = 1.0 + # loadcell and AUX elif any(ch_id.lower().startswith(_ch.lower()) for _ch in _AUX_CHS): ch_type = "misc" - scalings[gain_ix] = 1.0 - adc_ranges[gain_ix] = 1.0 else: ch_type = "emg" - scalings[gain_ix] = 1.0 - adc_ranges[:] = adc_range + assert bit_depths[gain_ix] == device_bit_depth ch_types.append(ch_type) # parse subject info @@ -127,7 +168,8 @@ def parse_sex(sex): return dict( adc_range=adc_ranges, - bit_depth=bit_depth, + bit_depth=device_bit_depth, + bit_depths=bit_depths, ch_names=ch_names, ch_types=ch_types, device_name=device_name, @@ -137,7 +179,6 @@ def parse_sex(sex): lowpass=lowpass, meas_date=meas_date, n_chan=n_chan, - scalings=scalings, sfreq=sfreq, signal_paths=None, site=site, @@ -153,6 +194,7 @@ def _parse_otb_four_metadata(metadata, extras_metadata): sfreq = _find(metadata, "SamplingFrequency", float) device_gain = _find(metadata, "Gain", float) # containers + bit_depths = list() gains = list() ch_names = list() ch_types = list() @@ -160,11 +202,9 @@ def _parse_otb_four_metadata(metadata, extras_metadata): lowpass = list() n_chans = list() paths = list() - scalings = list() units = list() adc_ranges = list() # stored per-adapter, but should be uniform - bit_depths = set() device_names = set() durations = set() meas_dates = set() @@ -173,18 +213,15 @@ def _parse_otb_four_metadata(metadata, extras_metadata): for adapter in extras_metadata.iter("TrackInfo"): strings = adapter.find("StringsDescriptions") # expected to be same for all adapters - bit_depths.add(_find(adapter, "ADC_Nbits", int)) device_names.add(_find(adapter, "Device")) sfreqs.add(_find(adapter, "SamplingFrequency", int)) durations.add(_find(adapter, "TimeDuration", float)) # may be different for each adapter adapter_adc_range = _find(adapter, "ADC_Range", float) + adapter_bit_depth = _find(adapter, "ADC_Nbits", int) adapter_id = _find(adapter, "SubTitle") adapter_gain = _find(adapter, "Gain", float) - if adapter_id.startswith("Quaternion"): - adapter_scaling = 1e-3 - else: - adapter_scaling = 1.0 / _find(adapter, "UnitOfMeasurementFactor", float) + # adapter_scaling = 1.0 / _find(adapter, "UnitOfMeasurementFactor", float) # ch_offset = _find(adapter, "AcquisitionChannel", int) adapter_n_chans = _find(adapter, "NumberOfChannels", int) n_chans.append(adapter_n_chans) @@ -229,8 +266,8 @@ def _parse_otb_four_metadata(metadata, extras_metadata): ch_names.append(ch_name) # signal properties adc_ranges.append(adapter_adc_range) + bit_depths.append(adapter_bit_depth) gains.append(adapter_gain * device_gain) - scalings.append(adapter_scaling) # channel types # TODO verify for quats & buffer channel # ramp and control channels maybe "MISC", arguably "STIM"? @@ -241,7 +278,6 @@ def _parse_otb_four_metadata(metadata, extras_metadata): # 4-6: translations if adapter_id.startswith("Quaternion"): ch_type = "chpi" # TODO verify - # adc_ranges[gain_ix] = 1.0 elif any(adapter_id.lower().startswith(_ch) for _ch in _CONTROL_CHS): ch_type = "stim" elif any(adapter_id.lower().startswith(_ch) for _ch in _AUX_CHS): @@ -265,9 +301,6 @@ def check_uniform(name, adapter_values, device_value=None): ) return adapter_value - # ADC_Nbits is listed as zero for non-data channels; ignore - bit_depths.discard(0) - bit_depth = check_uniform("bit depth", bit_depths, bit_depth) device_name = check_uniform("device name", device_names) duration = check_uniform("duration", durations) sfreq = check_uniform("sampling frequency", sfreqs, sfreq) @@ -276,6 +309,7 @@ def check_uniform(name, adapter_values, device_value=None): return dict( adc_range=adc_ranges, bit_depth=bit_depth, + bit_depths=bit_depths, ch_names=ch_names, ch_types=ch_types, device_name=device_name, @@ -285,7 +319,6 @@ def check_uniform(name, adapter_values, device_value=None): lowpass=lowpass, meas_date=meas_date, n_chan=sum(n_chans), - scalings=np.array(scalings), sfreq=sfreq, signal_paths=paths, site=None, @@ -386,6 +419,7 @@ def __init__(self, fname, *, verbose=None): annots = _parse_annots(ann_trees) adc_range = metadata["adc_range"] bit_depth = metadata["bit_depth"] + bit_depths = metadata["bit_depths"] ch_names = metadata["ch_names"] ch_types = metadata["ch_types"] device_name = metadata["device_name"] @@ -395,7 +429,6 @@ def __init__(self, fname, *, verbose=None): lowpass = metadata["lowpass"] meas_date = metadata["meas_date"] n_chan = metadata["n_chan"] - scalings = metadata["scalings"] sfreq = metadata["sfreq"] signal_paths = metadata["signal_paths"] site = metadata["site"] @@ -437,7 +470,7 @@ def __init__(self, fname, *, verbose=None): # validate assert np.isfinite(gains).all() assert np.isfinite(adc_range).all() - assert np.isfinite(scalings).all() + assert np.isfinite(bit_depths).all() # check filter freqs. Can vary by adapter, so in theory we might get different # filters for different *data* channels (not just different between data and @@ -469,7 +502,7 @@ def check_filter_freqs(name, freqs): info["highpass"] = highpass info["lowpass"] = lowpass for ix, _ch in enumerate(info["chs"]): - cal = 1 / 2**bit_depth / gains[ix] * scalings[ix] + cal = 1 / 2 ** bit_depths[ix] / gains[ix] _ch.update(cal=cal, range=adc_range[ix]) if meas_date is not None: info["meas_date"] = datetime.fromisoformat(meas_date).astimezone( @@ -542,7 +575,6 @@ def _preload_data(self, preload): this_data = np.squeeze(a.view(np.int32), axis=-1) _data.append(this_data.reshape(-1, self.info["nchan"]).T) _data = np.concatenate(_data, axis=0) # no-op if len(_data) == 1 - cals = np.array([_ch["cal"] * _ch["range"] for _ch in self.info["chs"]]) self._data = _data * cals[:, np.newaxis]