|
| 1 | +""" |
| 2 | +Module to handle and manage OSI scenarios. |
| 3 | +""" |
| 4 | +from collections import deque |
| 5 | +import time |
| 6 | +import lzma |
| 7 | +import struct |
| 8 | + |
| 9 | +from osi3.osi_sensorview_pb2 import SensorView |
| 10 | +from osi3.osi_groundtruth_pb2 import GroundTruth |
| 11 | +from osi3.osi_sensordata_pb2 import SensorData |
| 12 | +import warnings |
| 13 | +warnings.simplefilter('default') |
| 14 | + |
| 15 | +SEPARATOR = b'$$__$$' |
| 16 | +SEPARATOR_LENGTH = len(SEPARATOR) |
| 17 | +BUFFER_SIZE = 1000000 |
| 18 | + |
| 19 | + |
| 20 | +def get_size_from_file_stream(file_object): |
| 21 | + """ |
| 22 | + Return a file size from a file stream given in parameters |
| 23 | + """ |
| 24 | + current_position = file_object.tell() |
| 25 | + file_object.seek(0, 2) |
| 26 | + size = file_object.tell() |
| 27 | + file_object.seek(current_position) |
| 28 | + return size |
| 29 | + |
| 30 | + |
| 31 | +MESSAGES_TYPE = { |
| 32 | + "SensorView": SensorView, |
| 33 | + "GroundTruth": GroundTruth, |
| 34 | + "SensorData": SensorData |
| 35 | +} |
| 36 | + |
| 37 | + |
| 38 | +class OSITrace: |
| 39 | + """This class wrap OSI data. It can import and decode OSI scenarios.""" |
| 40 | + |
| 41 | + def __init__(self, path=None, type_name="SensorView"): |
| 42 | + self.scenario_file = None |
| 43 | + self.message_offsets = None |
| 44 | + self.type_name = type_name |
| 45 | + self.timestep_count = 0 |
| 46 | + self.retrieved_scenario_size = 0 |
| 47 | + |
| 48 | + if path is not None and type_name is not None: |
| 49 | + self.from_file(path) |
| 50 | + |
| 51 | + # Open and Read text file |
| 52 | + |
| 53 | + def from_file(self, path, type_name="SensorView", max_index=-1, format_type=None): |
| 54 | + """Import a scenario from a file""" |
| 55 | + |
| 56 | + if path.lower().endswith(('.lzma', '.xz')): |
| 57 | + self.scenario_file = lzma.open(path, "rb") |
| 58 | + else: |
| 59 | + self.scenario_file = open(path, "rb") |
| 60 | + |
| 61 | + self.type_name = type_name |
| 62 | + self.format_type = format_type |
| 63 | + |
| 64 | + if self.format_type == 'separated': |
| 65 | + # warnings.warn("The separated trace files will be completely removed in the near future. Please convert them to *.osi files with the converter in the main OSI repository.", PendingDeprecationWarning) |
| 66 | + self.timestep_count = self.retrieve_message_offsets(max_index) |
| 67 | + else: |
| 68 | + self.timestep_count = self.retrieve_message() |
| 69 | + |
| 70 | + def retrieve_message_offsets(self, max_index): |
| 71 | + """ |
| 72 | + Retrieve the offsets of all the messages of the scenario and store them |
| 73 | + in the `message_offsets` attribute of the object |
| 74 | +
|
| 75 | + It returns the number of discovered timesteps |
| 76 | + """ |
| 77 | + scenario_size = get_size_from_file_stream(self.scenario_file) |
| 78 | + |
| 79 | + if max_index == -1: |
| 80 | + max_index = float('inf') |
| 81 | + |
| 82 | + buffer_deque = deque(maxlen=2) |
| 83 | + |
| 84 | + self.message_offsets = [0] |
| 85 | + eof = False |
| 86 | + |
| 87 | + self.scenario_file.seek(0) |
| 88 | + |
| 89 | + while not eof and len(self.message_offsets) <= max_index: |
| 90 | + found = -1 # SEP offset in buffer |
| 91 | + buffer_deque.clear() |
| 92 | + |
| 93 | + while found == -1 and not eof: |
| 94 | + new_read = self.scenario_file.read(BUFFER_SIZE) |
| 95 | + buffer_deque.append(new_read) |
| 96 | + buffer = b"".join(buffer_deque) |
| 97 | + found = buffer.find(SEPARATOR) |
| 98 | + eof = len(new_read) != BUFFER_SIZE |
| 99 | + |
| 100 | + buffer_offset = self.scenario_file.tell() - len(buffer) |
| 101 | + message_offset = found + buffer_offset + SEPARATOR_LENGTH |
| 102 | + self.message_offsets.append(message_offset) |
| 103 | + |
| 104 | + self.scenario_file.seek(message_offset) |
| 105 | + |
| 106 | + while eof and found != -1: |
| 107 | + buffer = buffer[found + SEPARATOR_LENGTH:] |
| 108 | + found = buffer.find(SEPARATOR) |
| 109 | + |
| 110 | + buffer_offset = scenario_size - len(buffer) |
| 111 | + |
| 112 | + message_offset = found + buffer_offset + SEPARATOR_LENGTH |
| 113 | + |
| 114 | + if message_offset >= scenario_size: |
| 115 | + break |
| 116 | + self.message_offsets.append(message_offset) |
| 117 | + |
| 118 | + if eof: |
| 119 | + self.retrieved_scenario_size = scenario_size |
| 120 | + else: |
| 121 | + self.retrieved_scenario_size = self.message_offsets[-1] |
| 122 | + self.message_offsets.pop() |
| 123 | + |
| 124 | + return len(self.message_offsets) |
| 125 | + |
| 126 | + def retrieve_message(self): |
| 127 | + scenario_size = get_size_from_file_stream(self.scenario_file) |
| 128 | + buffer_deque = deque(maxlen=2) |
| 129 | + |
| 130 | + self.message_offsets = [0] |
| 131 | + eof = False |
| 132 | + |
| 133 | + # TODO Implement buffering for the scenarios |
| 134 | + self.scenario_file.seek(0) |
| 135 | + serialized_message = self.scenario_file.read() |
| 136 | + INT_LENGTH = len(struct.pack("<L", 0)) |
| 137 | + message_length = 0 |
| 138 | + |
| 139 | + i = 0 |
| 140 | + while i < len(serialized_message): |
| 141 | + message = MESSAGES_TYPE[self.type_name]() |
| 142 | + message_length = struct.unpack("<L", serialized_message[i:INT_LENGTH+i])[0] |
| 143 | + message.ParseFromString(serialized_message[i+INT_LENGTH:i+INT_LENGTH+message_length]) |
| 144 | + i += message_length + INT_LENGTH |
| 145 | + self.message_offsets.append(message) |
| 146 | + |
| 147 | + if eof: |
| 148 | + self.retrieved_scenario_size = scenario_size |
| 149 | + else: |
| 150 | + self.retrieved_scenario_size = self.message_offsets[-1] |
| 151 | + self.message_offsets.pop() |
| 152 | + |
| 153 | + return len(self.message_offsets) |
| 154 | + |
| 155 | + def get_message_by_index(self, index): |
| 156 | + """ |
| 157 | + Get a message by its index. Try first to get it from the cache made |
| 158 | + by the method ``cache_messages_in_index_range``. |
| 159 | + """ |
| 160 | + return next(self.get_messages_in_index_range(index, index+1)) |
| 161 | + |
| 162 | + def get_messages(self): |
| 163 | + return self.get_messages_in_index_range(0, len(self.message_offsets)) |
| 164 | + |
| 165 | + def get_messages_in_index_range(self, begin, end): |
| 166 | + """ |
| 167 | + Yield an iterator over messages of indexes between begin and end included. |
| 168 | + """ |
| 169 | + |
| 170 | + if self.format_type == "separated": |
| 171 | + self.scenario_file.seek(self.message_offsets[begin]) |
| 172 | + abs_first_offset = self.message_offsets[begin] |
| 173 | + abs_last_offset = self.message_offsets[end] \ |
| 174 | + if end < len(self.message_offsets) \ |
| 175 | + else self.retrieved_scenario_size |
| 176 | + |
| 177 | + rel_message_offsets = [ |
| 178 | + abs_message_offset - abs_first_offset |
| 179 | + for abs_message_offset in self.message_offsets[begin:end] |
| 180 | + ] |
| 181 | + |
| 182 | + message_sequence_len = abs_last_offset - \ |
| 183 | + abs_first_offset - SEPARATOR_LENGTH |
| 184 | + serialized_messages_extract = self.scenario_file.read( |
| 185 | + message_sequence_len) |
| 186 | + |
| 187 | + for rel_index, rel_message_offset in enumerate(rel_message_offsets): |
| 188 | + rel_begin = rel_message_offset |
| 189 | + rel_end = rel_message_offsets[rel_index + 1] - SEPARATOR_LENGTH \ |
| 190 | + if rel_index + 1 < len(rel_message_offsets) \ |
| 191 | + else message_sequence_len |
| 192 | + message = MESSAGES_TYPE[self.type_name]() |
| 193 | + serialized_message = serialized_messages_extract[rel_begin:rel_end] |
| 194 | + message.ParseFromString(serialized_message) |
| 195 | + yield message |
| 196 | + |
| 197 | + elif self.format_type is None: |
| 198 | + self.scenario_file.seek(0) |
| 199 | + serialized_message = self.scenario_file.read() |
| 200 | + INT_LENGTH = len(struct.pack("<L", 0)) |
| 201 | + message_length = 0 |
| 202 | + |
| 203 | + i = 0 |
| 204 | + while i < len(serialized_message): |
| 205 | + message = MESSAGES_TYPE[self.type_name]() |
| 206 | + message_length = struct.unpack("<L", serialized_message[i:INT_LENGTH+i])[0] |
| 207 | + message.ParseFromString(serialized_message[i+INT_LENGTH:i+INT_LENGTH+message_length]) |
| 208 | + i += message_length + INT_LENGTH |
| 209 | + yield message |
| 210 | + |
| 211 | + else: |
| 212 | + self.scenario_file.close() |
| 213 | + raise Exception(f"The defined format {self.format_type} does not exist.") |
| 214 | + |
| 215 | + self.scenario_file.close() |
| 216 | + |
| 217 | + def make_readable(self, name, interval=None, index=None): |
| 218 | + self.scenario_file.seek(0) |
| 219 | + serialized_message = self.scenario_file.read() |
| 220 | + message_length = len(serialized_message) |
| 221 | + |
| 222 | + if message_length > 1000000000: |
| 223 | + # Throw a warning if trace file is bigger than 1GB |
| 224 | + gb_size_input = round(message_length/1000000000, 2) |
| 225 | + gb_size_output = round(3.307692308*message_length/1000000000, 2) |
| 226 | + warnings.warn(f"The trace file you are trying to make readable has the size {gb_size_input}GB. This will generate a readable file with the size {gb_size_output}GB. Make sure you have enough disc space and memory to read the file with your text editor.", ResourceWarning) |
| 227 | + |
| 228 | + with open(name, 'a') as f: |
| 229 | + |
| 230 | + if interval is None and index is None: |
| 231 | + for i in self.get_messages(): |
| 232 | + f.write(str(i)) |
| 233 | + |
| 234 | + if interval is not None and index is None: |
| 235 | + if type(interval) == tuple and len(interval) == 2 and interval[0]<interval[1]: |
| 236 | + for i in self.get_messages_in_index_range(interval[0], interval[1]): |
| 237 | + f.write(str(i)) |
| 238 | + else: |
| 239 | + raise Exception("Argument 'interval' needs to be a tuple of length 2! The first number must be smaller then the second.") |
| 240 | + |
| 241 | + if interval is None and index is not None: |
| 242 | + if type(index) == int: |
| 243 | + f.write(str(scenario.get_message_by_index(0))) |
| 244 | + else: |
| 245 | + raise Exception("Argument 'index' needs to be of type 'int'") |
| 246 | + |
| 247 | + if interval is not None and index is not None: |
| 248 | + raise Exception("Arguments 'index' and 'interval' can not be set both") |
0 commit comments