Skip to content

Commit 6aba8c1

Browse files
authored
new forwarder config schema (#98)
* add fc00 schema for forwarder configuration * format
1 parent 5a48070 commit 6aba8c1

File tree

9 files changed

+369
-2
lines changed

9 files changed

+369
-2
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ https://github.com/ess-dmsc/streaming-data-types
2222
| tdct | Timestamps |
2323
| ep00 | EPICS connection info (deprecated in favour of ep01) |
2424
| ep01 | EPICS connection info |
25-
| rf5k | Forwarder configuration update |
25+
| rf5k | Forwarder configuration update (deprecated in favour of fc00) |
26+
| fc00 | Forwarder configuration update |
2627
| answ | File-writer command response |
2728
| wrdn | File-writer finished writing |
2829
| NDAr | **Deprecated** |
@@ -31,7 +32,7 @@ https://github.com/ess-dmsc/streaming-data-types
3132
| senv | **Deprecated** |
3233
| json | Generic JSON data |
3334
| se00 | Arrays with optional timestamps, for example waveform data. Replaces _senv_. |
34-
| da00 | Scipp-like data arrays, for histograms, etc. |
35+
| da00 | Scipp-like data arrays, for histograms, etc. |
3536

3637
### hs00 and hs01
3738
Schema for histogram data. It is one of the more complicated to use schemas.

streaming_data_types/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
deserialise_rf5k,
2020
serialise_rf5k,
2121
)
22+
from streaming_data_types.forwarder_config_update_fc00 import (
23+
deserialise_fc00,
24+
serialise_fc00,
25+
)
2226
from streaming_data_types.histogram_hs00 import deserialise_hs00, serialise_hs00
2327
from streaming_data_types.histogram_hs01 import deserialise_hs01, serialise_hs01
2428
from streaming_data_types.json_json import deserialise_json, serialise_json
@@ -52,6 +56,7 @@
5256
"ep01": serialise_ep01,
5357
"tdct": serialise_tdct,
5458
"rf5k": serialise_rf5k,
59+
"fc00": serialise_fc00,
5560
"answ": serialise_answ,
5661
"wrdn": serialise_wrdn,
5762
"NDAr": serialise_ndar,
@@ -81,6 +86,7 @@
8186
"ep01": deserialise_ep01,
8287
"tdct": deserialise_tdct,
8388
"rf5k": deserialise_rf5k,
89+
"fc00": deserialise_fc00,
8490
"answ": deserialise_answ,
8591
"wrdn": deserialise_wrdn,
8692
"NDAr": deserialise_ndar,
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# automatically generated by the FlatBuffers compiler, do not modify
2+
3+
# namespace:
4+
5+
class Protocol(object):
6+
PVA = 0
7+
CA = 1
8+
FAKE = 2
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# automatically generated by the FlatBuffers compiler, do not modify
2+
3+
# namespace:
4+
5+
import flatbuffers
6+
from flatbuffers.compat import import_numpy
7+
np = import_numpy()
8+
9+
class Stream(object):
10+
__slots__ = ['_tab']
11+
12+
@classmethod
13+
def GetRootAs(cls, buf, offset=0):
14+
n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
15+
x = Stream()
16+
x.Init(buf, n + offset)
17+
return x
18+
19+
@classmethod
20+
def GetRootAsStream(cls, buf, offset=0):
21+
"""This method is deprecated. Please switch to GetRootAs."""
22+
return cls.GetRootAs(buf, offset)
23+
@classmethod
24+
def StreamBufferHasIdentifier(cls, buf, offset, size_prefixed=False):
25+
return flatbuffers.util.BufferHasIdentifier(buf, offset, b"\x66\x63\x30\x30", size_prefixed=size_prefixed)
26+
27+
# Stream
28+
def Init(self, buf, pos):
29+
self._tab = flatbuffers.table.Table(buf, pos)
30+
31+
# Stream
32+
def Channel(self):
33+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
34+
if o != 0:
35+
return self._tab.String(o + self._tab.Pos)
36+
return None
37+
38+
# Stream
39+
def Schema(self):
40+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
41+
if o != 0:
42+
return self._tab.String(o + self._tab.Pos)
43+
return None
44+
45+
# Stream
46+
def Topic(self):
47+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8))
48+
if o != 0:
49+
return self._tab.String(o + self._tab.Pos)
50+
return None
51+
52+
# Stream
53+
def Protocol(self):
54+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(10))
55+
if o != 0:
56+
return self._tab.Get(flatbuffers.number_types.Uint16Flags, o + self._tab.Pos)
57+
return 0
58+
59+
# Stream
60+
def Periodic(self):
61+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12))
62+
if o != 0:
63+
return self._tab.Get(flatbuffers.number_types.Int32Flags, o + self._tab.Pos)
64+
return 0
65+
66+
def StreamStart(builder): builder.StartObject(5)
67+
def Start(builder):
68+
return StreamStart(builder)
69+
def StreamAddChannel(builder, channel): builder.PrependUOffsetTRelativeSlot(0, flatbuffers.number_types.UOffsetTFlags.py_type(channel), 0)
70+
def AddChannel(builder, channel):
71+
return StreamAddChannel(builder, channel)
72+
def StreamAddSchema(builder, schema): builder.PrependUOffsetTRelativeSlot(1, flatbuffers.number_types.UOffsetTFlags.py_type(schema), 0)
73+
def AddSchema(builder, schema):
74+
return StreamAddSchema(builder, schema)
75+
def StreamAddTopic(builder, topic): builder.PrependUOffsetTRelativeSlot(2, flatbuffers.number_types.UOffsetTFlags.py_type(topic), 0)
76+
def AddTopic(builder, topic):
77+
return StreamAddTopic(builder, topic)
78+
def StreamAddProtocol(builder, protocol): builder.PrependUint16Slot(3, protocol, 0)
79+
def AddProtocol(builder, protocol):
80+
return StreamAddProtocol(builder, protocol)
81+
def StreamAddPeriodic(builder, periodic): builder.PrependInt32Slot(4, periodic, 0)
82+
def AddPeriodic(builder, periodic):
83+
return StreamAddPeriodic(builder, periodic)
84+
def StreamEnd(builder): return builder.EndObject()
85+
def End(builder):
86+
return StreamEnd(builder)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# automatically generated by the FlatBuffers compiler, do not modify
2+
3+
# namespace:
4+
5+
class UpdateType(object):
6+
ADD = 0
7+
REMOVE = 1
8+
REMOVEALL = 2
9+
REPLACE = 3

streaming_data_types/fbschemas/forwarder_config_update_fc00/__init__.py

Whitespace-only changes.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# automatically generated by the FlatBuffers compiler, do not modify
2+
3+
# namespace:
4+
5+
import flatbuffers
6+
from flatbuffers.compat import import_numpy
7+
np = import_numpy()
8+
9+
class fc00_ConfigUpdate(object):
10+
__slots__ = ['_tab']
11+
12+
@classmethod
13+
def GetRootAs(cls, buf, offset=0):
14+
n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
15+
x = fc00_ConfigUpdate()
16+
x.Init(buf, n + offset)
17+
return x
18+
19+
@classmethod
20+
def GetRootAsfc00_ConfigUpdate(cls, buf, offset=0):
21+
"""This method is deprecated. Please switch to GetRootAs."""
22+
return cls.GetRootAs(buf, offset)
23+
@classmethod
24+
def fc00_ConfigUpdateBufferHasIdentifier(cls, buf, offset, size_prefixed=False):
25+
return flatbuffers.util.BufferHasIdentifier(buf, offset, b"\x66\x63\x30\x30", size_prefixed=size_prefixed)
26+
27+
# fc00_ConfigUpdate
28+
def Init(self, buf, pos):
29+
self._tab = flatbuffers.table.Table(buf, pos)
30+
31+
# fc00_ConfigUpdate
32+
def ConfigChange(self):
33+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
34+
if o != 0:
35+
return self._tab.Get(flatbuffers.number_types.Uint16Flags, o + self._tab.Pos)
36+
return 0
37+
38+
# fc00_ConfigUpdate
39+
def Streams(self, j):
40+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
41+
if o != 0:
42+
x = self._tab.Vector(o)
43+
x += flatbuffers.number_types.UOffsetTFlags.py_type(j) * 4
44+
x = self._tab.Indirect(x)
45+
from .Stream import Stream
46+
obj = Stream()
47+
obj.Init(self._tab.Bytes, x)
48+
return obj
49+
return None
50+
51+
# fc00_ConfigUpdate
52+
def StreamsLength(self):
53+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
54+
if o != 0:
55+
return self._tab.VectorLen(o)
56+
return 0
57+
58+
# fc00_ConfigUpdate
59+
def StreamsIsNone(self):
60+
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
61+
return o == 0
62+
63+
def fc00_ConfigUpdateStart(builder): builder.StartObject(2)
64+
def Start(builder):
65+
return fc00_ConfigUpdateStart(builder)
66+
def fc00_ConfigUpdateAddConfigChange(builder, configChange): builder.PrependUint16Slot(0, configChange, 0)
67+
def AddConfigChange(builder, configChange):
68+
return fc00_ConfigUpdateAddConfigChange(builder, configChange)
69+
def fc00_ConfigUpdateAddStreams(builder, streams): builder.PrependUOffsetTRelativeSlot(1, flatbuffers.number_types.UOffsetTFlags.py_type(streams), 0)
70+
def AddStreams(builder, streams):
71+
return fc00_ConfigUpdateAddStreams(builder, streams)
72+
def fc00_ConfigUpdateStartStreamsVector(builder, numElems): return builder.StartVector(4, numElems, 4)
73+
def StartStreamsVector(builder, numElems):
74+
return fc00_ConfigUpdateStartStreamsVector(builder, numElems)
75+
def fc00_ConfigUpdateEnd(builder): return builder.EndObject()
76+
def End(builder):
77+
return fc00_ConfigUpdateEnd(builder)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from collections import namedtuple
2+
from typing import List, Union
3+
4+
import flatbuffers
5+
from flatbuffers.packer import struct as flatbuffer_struct
6+
7+
from streaming_data_types.fbschemas.forwarder_config_update_fc00 import (
8+
Protocol,
9+
Stream,
10+
UpdateType,
11+
fc00_ConfigUpdate,
12+
)
13+
from streaming_data_types.utils import check_schema_identifier
14+
15+
FILE_IDENTIFIER = b"fc00"
16+
17+
ConfigurationUpdate = namedtuple("ConfigurationUpdate", ("config_change", "streams"))
18+
19+
StreamInfo = namedtuple(
20+
"StreamInfo", ("channel", "schema", "topic", "protocol", "periodic")
21+
)
22+
23+
24+
def deserialise_fc00(buffer: Union[bytearray, bytes]) -> ConfigurationUpdate:
25+
"""
26+
Deserialise FlatBuffer fc00.
27+
28+
:param buffer: The FlatBuffers buffer.
29+
:return: The deserialised data.
30+
"""
31+
check_schema_identifier(buffer, FILE_IDENTIFIER)
32+
33+
config_message = fc00_ConfigUpdate.fc00_ConfigUpdate.GetRootAsfc00_ConfigUpdate(
34+
buffer, 0
35+
)
36+
37+
streams = []
38+
try:
39+
for i in range(config_message.StreamsLength()):
40+
stream_message = config_message.Streams(i)
41+
streams.append(
42+
StreamInfo(
43+
stream_message.Channel().decode("utf-8")
44+
if stream_message.Channel()
45+
else "",
46+
stream_message.Schema().decode("utf-8")
47+
if stream_message.Schema()
48+
else "",
49+
stream_message.Topic().decode("utf-8")
50+
if stream_message.Topic()
51+
else "",
52+
stream_message.Protocol(),
53+
int(stream_message.Periodic().decode("utf-8"))
54+
if stream_message.Periodic()
55+
else 0,
56+
)
57+
)
58+
except flatbuffer_struct.error:
59+
pass # No streams in buffer
60+
61+
return ConfigurationUpdate(config_message.ConfigChange(), streams)
62+
63+
64+
def serialise_stream(
65+
builder: flatbuffers.Builder,
66+
protocol: Protocol,
67+
channel_offset: int,
68+
schema_offset: int,
69+
topic_offset: int,
70+
) -> int:
71+
Stream.StreamStart(builder)
72+
Stream.StreamAddProtocol(builder, protocol)
73+
Stream.StreamAddTopic(builder, topic_offset)
74+
Stream.StreamAddSchema(builder, schema_offset)
75+
Stream.StreamAddChannel(builder, channel_offset)
76+
return Stream.StreamEnd(builder)
77+
78+
79+
def serialise_fc00(config_change: UpdateType, streams: List[StreamInfo]) -> bytes:
80+
"""
81+
Serialise config update message as an fc00 FlatBuffers message.
82+
83+
:param config_change:
84+
:param streams: channel, schema and output topic configurations
85+
:return:
86+
"""
87+
builder = flatbuffers.Builder(1024)
88+
builder.ForceDefaults(True)
89+
90+
if streams:
91+
# We have to use multiple loops/list comprehensions here because we cannot create strings after we have
92+
# called StreamStart and cannot create streams after we have called StartVector
93+
stream_field_offsets = [
94+
(
95+
builder.CreateString(stream.channel),
96+
builder.CreateString(stream.schema),
97+
builder.CreateString(stream.topic),
98+
)
99+
for stream in streams
100+
]
101+
stream_offsets = [
102+
serialise_stream(builder, stream.protocol, *stream_fields)
103+
for stream, stream_fields in zip(streams, stream_field_offsets)
104+
]
105+
106+
fc00_ConfigUpdate.fc00_ConfigUpdateStartStreamsVector(builder, len(streams))
107+
for stream_offset in stream_offsets:
108+
builder.PrependUOffsetTRelative(stream_offset)
109+
streams_offset = builder.EndVector()
110+
111+
# Build the actual buffer
112+
fc00_ConfigUpdate.fc00_ConfigUpdateStart(builder)
113+
if streams:
114+
fc00_ConfigUpdate.fc00_ConfigUpdateAddStreams(builder, streams_offset)
115+
fc00_ConfigUpdate.fc00_ConfigUpdateAddConfigChange(builder, config_change)
116+
data = fc00_ConfigUpdate.fc00_ConfigUpdateEnd(builder)
117+
118+
builder.Finish(data, file_identifier=FILE_IDENTIFIER)
119+
return bytes(builder.Output())

0 commit comments

Comments
 (0)