|
| 1 | +from struct import pack |
| 2 | +from typing import List, NamedTuple, Union |
| 3 | + |
| 4 | +import flatbuffers |
| 5 | +import numpy as np |
| 6 | + |
| 7 | +import streaming_data_types.fbschemas.area_detector_ad00.Attribute as ADArAttribute |
| 8 | +from streaming_data_types.fbschemas.area_detector_ad00 import ad00_ADArray |
| 9 | +from streaming_data_types.fbschemas.area_detector_ad00.DType import DType |
| 10 | +from streaming_data_types.utils import check_schema_identifier |
| 11 | + |
| 12 | +FILE_IDENTIFIER = b"ad00" |
| 13 | + |
| 14 | + |
| 15 | +class Attribute: |
| 16 | + def __init__( |
| 17 | + self, |
| 18 | + name: str, |
| 19 | + description: str, |
| 20 | + source: str, |
| 21 | + data: Union[np.ndarray, str, int, float], |
| 22 | + ): |
| 23 | + self.name = name |
| 24 | + self.description = description |
| 25 | + self.source = source |
| 26 | + self.data = data |
| 27 | + |
| 28 | + def __eq__(self, other): |
| 29 | + data_is_equal = type(self.data) == type(other.data) # noqa: E721 |
| 30 | + if type(self.data) is np.ndarray: |
| 31 | + data_is_equal = data_is_equal and np.array_equal(self.data, other.data) |
| 32 | + else: |
| 33 | + data_is_equal = data_is_equal and self.data == other.data |
| 34 | + return ( |
| 35 | + self.name == other.name |
| 36 | + and self.description == other.description |
| 37 | + and self.source == other.source |
| 38 | + and data_is_equal |
| 39 | + ) |
| 40 | + |
| 41 | + |
| 42 | +def serialise_ad00( |
| 43 | + source_name: str, |
| 44 | + unique_id: int, |
| 45 | + timestamp_ns: int, |
| 46 | + data: Union[np.ndarray, str], |
| 47 | + attributes: List[Attribute] = [], |
| 48 | +) -> bytes: |
| 49 | + builder = flatbuffers.Builder(1024) |
| 50 | + builder.ForceDefaults(True) |
| 51 | + |
| 52 | + type_map = { |
| 53 | + np.dtype("uint8"): DType.uint8, |
| 54 | + np.dtype("int8"): DType.int8, |
| 55 | + np.dtype("uint16"): DType.uint16, |
| 56 | + np.dtype("int16"): DType.int16, |
| 57 | + np.dtype("uint32"): DType.uint32, |
| 58 | + np.dtype("int32"): DType.int32, |
| 59 | + np.dtype("uint64"): DType.uint64, |
| 60 | + np.dtype("int64"): DType.int64, |
| 61 | + np.dtype("float32"): DType.float32, |
| 62 | + np.dtype("float64"): DType.float64, |
| 63 | + } |
| 64 | + |
| 65 | + if type(data) is str: |
| 66 | + data = np.frombuffer(data.encode(), np.uint8) |
| 67 | + data_type = DType.c_string |
| 68 | + else: |
| 69 | + data_type = type_map[data.dtype] |
| 70 | + |
| 71 | + # Build dims |
| 72 | + dims_offset = builder.CreateNumpyVector(np.asarray(data.shape)) |
| 73 | + |
| 74 | + # Build data |
| 75 | + data_offset = builder.CreateNumpyVector(data.flatten().view(np.uint8)) |
| 76 | + |
| 77 | + source_name_offset = builder.CreateString(source_name) |
| 78 | + |
| 79 | + temp_attributes = [] |
| 80 | + for item in attributes: |
| 81 | + if type(item.data) is np.ndarray: |
| 82 | + attr_data_type = type_map[item.data.dtype] |
| 83 | + attr_data = item.data |
| 84 | + elif type(item.data) is str: |
| 85 | + attr_data_type = DType.c_string |
| 86 | + attr_data = np.frombuffer(item.data.encode(), np.uint8) |
| 87 | + elif type(item.data) is int: |
| 88 | + attr_data_type = DType.int64 |
| 89 | + attr_data = np.frombuffer(pack("q", item.data), np.uint8) |
| 90 | + elif type(item.data) is float: |
| 91 | + attr_data_type = DType.float64 |
| 92 | + attr_data = np.frombuffer(pack("d", item.data), np.uint8) |
| 93 | + attr_name_offset = builder.CreateString(item.name) |
| 94 | + attr_desc_offset = builder.CreateString(item.description) |
| 95 | + attr_src_offset = builder.CreateString(item.source) |
| 96 | + attr_data_offset = builder.CreateNumpyVector(attr_data.flatten().view(np.uint8)) |
| 97 | + ADArAttribute.AttributeStart(builder) |
| 98 | + ADArAttribute.AttributeAddName(builder, attr_name_offset) |
| 99 | + ADArAttribute.AttributeAddDescription(builder, attr_desc_offset) |
| 100 | + ADArAttribute.AttributeAddSource(builder, attr_src_offset) |
| 101 | + ADArAttribute.AttributeAddDataType(builder, attr_data_type) |
| 102 | + ADArAttribute.AttributeAddData(builder, attr_data_offset) |
| 103 | + attr_offset = ADArAttribute.AttributeEnd(builder) |
| 104 | + temp_attributes.append(attr_offset) |
| 105 | + |
| 106 | + ad00_ADArray.ad00_ADArrayStartAttributesVector(builder, len(attributes)) |
| 107 | + for item in reversed(temp_attributes): |
| 108 | + builder.PrependUOffsetTRelative(item) |
| 109 | + attributes_offset = builder.EndVector() |
| 110 | + |
| 111 | + # Build the actual buffer |
| 112 | + ad00_ADArray.ad00_ADArrayStart(builder) |
| 113 | + ad00_ADArray.ad00_ADArrayAddSourceName(builder, source_name_offset) |
| 114 | + ad00_ADArray.ad00_ADArrayAddDataType(builder, data_type) |
| 115 | + ad00_ADArray.ad00_ADArrayAddDimensions(builder, dims_offset) |
| 116 | + ad00_ADArray.ad00_ADArrayAddId(builder, unique_id) |
| 117 | + ad00_ADArray.ad00_ADArrayAddData(builder, data_offset) |
| 118 | + ad00_ADArray.ad00_ADArrayAddTimestamp(builder, timestamp_ns) |
| 119 | + ad00_ADArray.ad00_ADArrayAddAttributes(builder, attributes_offset) |
| 120 | + array_message = ad00_ADArray.ad00_ADArrayEnd(builder) |
| 121 | + |
| 122 | + builder.Finish(array_message, file_identifier=FILE_IDENTIFIER) |
| 123 | + return bytes(builder.Output()) |
| 124 | + |
| 125 | + |
| 126 | +ADArray = NamedTuple( |
| 127 | + "ADArray", |
| 128 | + ( |
| 129 | + ("source_name", str), |
| 130 | + ("unique_id", int), |
| 131 | + ("timestamp_ns", int), |
| 132 | + ("dimensions", np.ndarray), |
| 133 | + ("data", np.ndarray), |
| 134 | + ("attributes", List[Attribute]), |
| 135 | + ), |
| 136 | +) |
| 137 | + |
| 138 | + |
| 139 | +def get_payload_data(fb_arr) -> np.ndarray: |
| 140 | + return get_data(fb_arr).reshape(fb_arr.DimensionsAsNumpy()) |
| 141 | + |
| 142 | + |
| 143 | +def get_data(fb_arr) -> np.ndarray: |
| 144 | + """ |
| 145 | + Converts the data array into the correct type. |
| 146 | + """ |
| 147 | + raw_data = fb_arr.DataAsNumpy() |
| 148 | + type_map = { |
| 149 | + DType.uint8: np.uint8, |
| 150 | + DType.int8: np.int8, |
| 151 | + DType.uint16: np.uint16, |
| 152 | + DType.int16: np.int16, |
| 153 | + DType.uint32: np.uint32, |
| 154 | + DType.int32: np.int32, |
| 155 | + DType.uint64: np.uint64, |
| 156 | + DType.int64: np.int64, |
| 157 | + DType.float32: np.float32, |
| 158 | + DType.float64: np.float64, |
| 159 | + } |
| 160 | + return raw_data.view(type_map[fb_arr.DataType()]) |
| 161 | + |
| 162 | + |
| 163 | +def deserialise_ad00(buffer: Union[bytearray, bytes]) -> ADArray: |
| 164 | + check_schema_identifier(buffer, FILE_IDENTIFIER) |
| 165 | + |
| 166 | + ad_array = ad00_ADArray.ad00_ADArray.GetRootAsad00_ADArray(buffer, 0) |
| 167 | + unique_id = ad_array.Id() |
| 168 | + if ad_array.DataType() == DType.c_string: |
| 169 | + data = ad_array.DataAsNumpy().tobytes().decode() |
| 170 | + else: |
| 171 | + data = get_payload_data(ad_array) |
| 172 | + |
| 173 | + attributes_list = [] |
| 174 | + for i in range(ad_array.AttributesLength()): |
| 175 | + attribute_ptr = ad_array.Attributes(i) |
| 176 | + if attribute_ptr.DataType() == DType.c_string: |
| 177 | + attr_data = attribute_ptr.DataAsNumpy().tobytes().decode() |
| 178 | + else: |
| 179 | + attr_data = get_data(attribute_ptr) |
| 180 | + temp_attribute = Attribute( |
| 181 | + name=attribute_ptr.Name().decode(), |
| 182 | + description=attribute_ptr.Description().decode(), |
| 183 | + source=attribute_ptr.Source().decode(), |
| 184 | + data=attr_data, |
| 185 | + ) |
| 186 | + if type(temp_attribute.data) is np.ndarray and len(temp_attribute.data) == 1: |
| 187 | + if np.issubdtype(temp_attribute.data.dtype, np.floating): |
| 188 | + temp_attribute.data = float(temp_attribute.data[0]) |
| 189 | + elif np.issubdtype(temp_attribute.data.dtype, np.integer): |
| 190 | + temp_attribute.data = int(temp_attribute.data[0]) |
| 191 | + attributes_list.append(temp_attribute) |
| 192 | + |
| 193 | + return ADArray( |
| 194 | + source_name=ad_array.SourceName().decode(), |
| 195 | + unique_id=unique_id, |
| 196 | + timestamp_ns=ad_array.Timestamp(), |
| 197 | + dimensions=tuple(ad_array.DimensionsAsNumpy()), |
| 198 | + data=data, |
| 199 | + attributes=attributes_list, |
| 200 | + ) |
0 commit comments