Skip to content

Commit 9f5d07b

Browse files
authored
Merge pull request #6 from ISISComputingGroup/un00_units_support
Un00 units support
2 parents 71975c1 + 5da3cd1 commit 9f5d07b

File tree

9 files changed

+299
-15
lines changed

9 files changed

+299
-15
lines changed

changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* Refactor statistics reporter to support dynamically added metrics
88
* Add latency and per-PV graphite metrics
99
* Add containerfile
10+
* add support for streaming PV units (`un00` schema)
1011

1112
## v2.1.0
1213

forwarder/update_handlers/ca_update_handler.py

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import time
22
from typing import List, Optional
3+
from dataclasses import dataclass
34

4-
from caproto import ReadNotifyResponse
5+
from caproto import ReadNotifyResponse, timestamp_to_epics
56
from caproto.threading.client import PV
67
from caproto.threading.client import Context as CAContext
78

89
from forwarder.application_logger import get_logger
910
from forwarder.metrics import Counter, Summary, sanitise_metric_name
1011
from forwarder.metrics.statistics_reporter import StatisticsReporter
1112
from forwarder.update_handlers.serialiser_tracker import SerialiserTracker
13+
from forwarder.update_handlers.un00_serialiser import un00_CASerialiser
1214

1315

1416
class CAUpdateHandler:
@@ -34,6 +36,7 @@ def __init__(
3436
self._processing_errors_metric = processing_errors_metric
3537
self._processing_latency_metric = None
3638
self._receive_latency_metric = None
39+
self._last_update = 0
3740
if self._statistics_reporter:
3841
try:
3942
self._processing_latency_metric = Summary(
@@ -72,19 +75,59 @@ def __init__(
7275
ctrl_sub.add_callback(self._unit_callback)
7376

7477
def _unit_callback(self, sub, response: ReadNotifyResponse):
78+
# sometimes caproto gives us a unit callback before a monitor callback.
79+
# in this case, to avoid just dropping the unit update, approximate
80+
# by using the current time.
81+
fallback_timestamp = time.time()
82+
83+
self._logger.debug("CA Unit callback called for %s", self._pv_name)
84+
7585
old_unit = self._current_unit
7686
try:
77-
self._current_unit = response.metadata.units.decode("utf-8")
87+
new_unit = response.metadata.units.decode("utf-8")
88+
if new_unit is not None:
89+
# we get a unit callback with blank units if the value has updated but the EGU field
90+
# has not.
91+
self._current_unit = new_unit
7892
except AttributeError:
79-
return
80-
if old_unit is not None and old_unit != self._current_unit:
81-
self._logger.error(
93+
self._current_unit = None
94+
95+
if old_unit != self._current_unit:
96+
self._logger.info(
8297
f'Display unit of (ca) PV with name "{self._pv_name}" changed from "{old_unit}" to "{self._current_unit}".'
8398
)
84-
if self._processing_errors_metric:
85-
self._processing_errors_metric.inc()
99+
for serialiser_tracker in self.serialiser_tracker_list:
100+
# Only let the unit serialiser deal with this update - as it has no value the other
101+
# serialisers will fall over.
102+
if isinstance(serialiser_tracker.serialiser, un00_CASerialiser):
103+
104+
# The next bit is pretty hacky. We are mocking the ReadNotifyResponse
105+
# as by default its metadata is immutable/read-only, but we need to append the
106+
# timestamp here.
107+
@dataclass
108+
class StupidMetaData:
109+
timestamp: float
110+
units: str
111+
112+
@dataclass
113+
class StupidResponse:
114+
metadata: StupidMetaData
115+
116+
117+
update_time = self._last_update if self._last_update > 0 else fallback_timestamp
118+
self._logger.debug(f"about to publish update. units: {self._current_unit}, timestamp: {update_time}")
119+
meta = StupidMetaData(timestamp=update_time, units=self._current_unit)
120+
response = StupidResponse(metadata=meta)
121+
serialiser_tracker.process_ca_message(response) # type: ignore
122+
86123

87124
def _monitor_callback(self, sub, response: ReadNotifyResponse):
125+
self._logger.debug("CA Monitor callback called for %s", self._pv_name)
126+
try:
127+
self._last_update = response.metadata.timestamp
128+
except Exception:
129+
self._logger.warning("Error getting timestamp for %s", sub.pv.name)
130+
88131
if self._receive_latency_metric:
89132
try:
90133
response_timestamp = response.metadata.timestamp.seconds + (

forwarder/update_handlers/pva_update_handler.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from forwarder.metrics import Counter, Summary, sanitise_metric_name
99
from forwarder.metrics.statistics_reporter import StatisticsReporter
1010
from forwarder.update_handlers.serialiser_tracker import SerialiserTracker
11+
from forwarder.update_handlers.un00_serialiser import un00_PVASerialiser
1112

1213

1314
class PVAUpdateHandler:
@@ -54,7 +55,7 @@ def __init__(
5455
except Exception as e:
5556
self._logger.warning(f"Could not initialise metric for {pv_name}: {e}")
5657

57-
request = context.makeRequest("field()")
58+
request = PVAContext.makeRequest("field()")
5859
self._sub = context.monitor(
5960
pv_name,
6061
self._monitor_callback,
@@ -63,17 +64,19 @@ def __init__(
6364
)
6465

6566
def _monitor_callback(self, response: Union[Value, Exception]):
67+
self._logger.debug("PVA monitor callback called for %s", self._pv_name)
6668
old_unit = self._unit
6769
try:
6870
self._unit = response.display.units # type: ignore
6971
except AttributeError:
7072
pass
71-
if old_unit is not None and old_unit != self._unit:
72-
self._logger.error(
73+
74+
units_changed = old_unit != self._unit
75+
if units_changed:
76+
self._logger.info(
7377
f'Display unit of (pva) PV with name "{self._pv_name}" changed from "{old_unit}" to "{self._unit}".'
7478
)
75-
if self._processing_errors_metric:
76-
self._processing_errors_metric.inc()
79+
7780
if self._receive_latency_metric and isinstance(response, Value):
7881
try:
7982
response_timestamp = response.timeStamp.secondsPastEpoch + (
@@ -86,9 +89,15 @@ def _monitor_callback(self, response: Union[Value, Exception]):
8689
if self._processing_latency_metric:
8790
with self._processing_latency_metric.time():
8891
for serialiser_tracker in self.serialiser_tracker_list:
92+
if isinstance(serialiser_tracker.serialiser, un00_PVASerialiser) and not units_changed:
93+
# If units haven't changed, don't publish a unit update
94+
continue
8995
serialiser_tracker.process_pva_message(response)
9096
else:
9197
for serialiser_tracker in self.serialiser_tracker_list:
98+
if isinstance(serialiser_tracker.serialiser, un00_PVASerialiser) and not units_changed:
99+
# If units haven't changed, don't publish a unit update
100+
continue
92101
serialiser_tracker.process_pva_message(response)
93102
except (RuntimeError, ValueError) as e:
94103
self._logger.error(

forwarder/update_handlers/schema_serialiser_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
tdct_CASerialiser,
2929
tdct_PVASerialiser,
3030
)
31+
from forwarder.update_handlers.un00_serialiser import un00_PVASerialiser, un00_CASerialiser
3132

3233

3334
class SerialiserFactory:
@@ -39,6 +40,7 @@ class SerialiserFactory:
3940
"f144": f144_CASerialiser,
4041
"no_op": no_op_CASerialiser,
4142
"tdct": tdct_CASerialiser,
43+
"un00": un00_CASerialiser,
4244
},
4345
EpicsProtocol.FAKE: {
4446
"al00": al00_PVASerialiser,
@@ -49,6 +51,7 @@ class SerialiserFactory:
4951
"nttable_se00": nttable_se00_PVASerialiser,
5052
"nttable_senv": nttable_senv_PVASerialiser,
5153
"tdct": tdct_PVASerialiser,
54+
"un00": un00_PVASerialiser,
5255
},
5356
EpicsProtocol.PVA: {
5457
"al00": al00_PVASerialiser,
@@ -59,6 +62,7 @@ class SerialiserFactory:
5962
"nttable_se00": nttable_se00_PVASerialiser,
6063
"nttable_senv": nttable_senv_PVASerialiser,
6164
"tdct": tdct_PVASerialiser,
65+
"un00": un00_PVASerialiser,
6266
},
6367
}
6468

forwarder/update_handlers/serialiser_tracker.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,18 @@ def set_new_message(self, message: bytes, timestamp_ns: Union[int, float]):
9999
message_datetime = datetime.fromtimestamp(timestamp_ns / 1e9, tz=timezone.utc)
100100
if message_datetime < self._last_timestamp:
101101
self._logger.error(
102-
f"Rejecting update on {self._pv_name} as its timestamp is older than the previous message timestamp from that PV ({message_datetime} vs {self._last_timestamp})."
102+
f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is older than the previous message timestamp from that PV ({message_datetime} vs {self._last_timestamp})."
103103
)
104104
return
105105
current_datetime = datetime.now(tz=timezone.utc)
106106
if message_datetime < current_datetime - LOWER_AGE_LIMIT:
107107
self._logger.error(
108-
f"Rejecting update on {self._pv_name} as its timestamp is older than allowed ({LOWER_AGE_LIMIT})."
108+
f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is older than allowed ({LOWER_AGE_LIMIT})."
109109
)
110110
return
111111
if message_datetime > current_datetime + UPPER_AGE_LIMIT:
112112
self._logger.error(
113-
f"Rejecting update on {self._pv_name} as its timestamp is from further into the future than allowed ({UPPER_AGE_LIMIT})."
113+
f"Rejecting update on {self._pv_name} as its timestamp({message_datetime}) is from further into the future than allowed ({UPPER_AGE_LIMIT})."
114114
)
115115
return
116116
self._last_timestamp = message_datetime
@@ -182,4 +182,14 @@ def create_serialiser_list(
182182
periodic_update_ms,
183183
)
184184
)
185+
# Units serialiser
186+
return_list.append(
187+
SerialiserTracker(
188+
SerialiserFactory.create_serialiser(protocol, "un00", pv_name),
189+
producer,
190+
pv_name,
191+
output_topic,
192+
periodic_update_ms,
193+
)
194+
)
185195
return return_list
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from typing import Optional, Tuple, Union
2+
import p4p
3+
from caproto import Message as CA_Message
4+
5+
from forwarder.application_logger import get_logger
6+
from streaming_data_types import serialise_un00
7+
8+
from forwarder.kafka.kafka_helpers import seconds_to_nanoseconds
9+
from forwarder.update_handlers.schema_serialisers import CASerialiser, PVASerialiser
10+
11+
logger = get_logger()
12+
13+
def _serialise(
14+
source_name: str,
15+
timestamp_ns: int | None,
16+
units: str | None,
17+
) -> Tuple[bytes, int]:
18+
return (
19+
serialise_un00(source_name, timestamp_ns, units),
20+
timestamp_ns,
21+
)
22+
23+
24+
class un00_CASerialiser(CASerialiser):
25+
def __init__(self, source_name: str):
26+
self._source_name = source_name
27+
self._message: Optional[str] = None
28+
self._units: Optional[str] = None
29+
30+
def serialise(
31+
self, update: CA_Message, **unused
32+
) -> Union[Tuple[bytes, int], Tuple[None, None]]:
33+
metadata = update.metadata
34+
try:
35+
timestamp = seconds_to_nanoseconds(metadata.timestamp)
36+
except AttributeError:
37+
logger.warning("No timestamp available for %s", self._source_name)
38+
timestamp = None
39+
try:
40+
units = metadata.units
41+
except AttributeError:
42+
logger.warning("No units available for %s", self._source_name)
43+
return [None, None]
44+
logger.debug(f"Source name: {self._source_name}, timestamp: {timestamp}, units: {units}")
45+
return _serialise(self._source_name, timestamp, units)
46+
47+
def conn_serialise(self, pv: str, state: str) -> Tuple[None, None]:
48+
return None, None
49+
50+
51+
class un00_PVASerialiser(PVASerialiser):
52+
def __init__(self, source_name: str):
53+
self._source_name = source_name
54+
self._message: Optional[str] = None
55+
self._units: Optional[str] = None
56+
57+
def serialise(
58+
self, update: Union[p4p.Value, RuntimeError]
59+
) -> Union[Tuple[bytes, int], Tuple[None, None]]:
60+
if isinstance(update, RuntimeError):
61+
return None, None
62+
timestamp = (
63+
update.timeStamp.secondsPastEpoch * 1_000_000_000
64+
) + update.timeStamp.nanoseconds
65+
66+
try:
67+
self._units = update.display.units
68+
except AttributeError:
69+
logger.warning("No units available for %s", self._source_name)
70+
self._units = None
71+
72+
return _serialise(self._source_name, timestamp, self._units)

tests/test_helpers/ca_fakes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def get_pvs(
4545
return [FakePV(pv_name, self.subscription) for pv_name in pv_names]
4646

4747
def call_monitor_callback_with_fake_pv_update(self, pv_update: ReadNotifyResponse):
48+
# This actually calls both the monitor and unit callbacks.
4849
for c in self.subscription.callback:
4950
c(self.subscription, pv_update)
5051

0 commit comments

Comments
 (0)