11import time
22from typing import List , Optional
3+ from dataclasses import dataclass
34
4- from caproto import ReadNotifyResponse
5+ from caproto import ReadNotifyResponse , timestamp_to_epics
56from caproto .threading .client import PV
67from caproto .threading .client import Context as CAContext
78
89from forwarder .application_logger import get_logger
910from forwarder .metrics import Counter , Summary , sanitise_metric_name
1011from forwarder .metrics .statistics_reporter import StatisticsReporter
1112from forwarder .update_handlers .serialiser_tracker import SerialiserTracker
13+ from forwarder .update_handlers .un00_serialiser import un00_CASerialiser
1214
1315
1416class CAUpdateHandler :
@@ -34,6 +36,7 @@ def __init__(
3436 self ._processing_errors_metric = processing_errors_metric
3537 self ._processing_latency_metric = None
3638 self ._receive_latency_metric = None
39+ self ._last_update = 0
3740 if self ._statistics_reporter :
3841 try :
3942 self ._processing_latency_metric = Summary (
@@ -66,19 +69,59 @@ def __init__(
6669 ctrl_sub .add_callback (self ._unit_callback )
6770
6871 def _unit_callback (self , sub , response : ReadNotifyResponse ):
72+ # sometimes caproto gives us a unit callback before a monitor callback.
73+ # in this case, to avoid just dropping the unit update, approximate
74+ # by using the current time.
75+ fallback_timestamp = time .time ()
76+
77+ self ._logger .debug ("CA Unit callback called for %s" , self ._pv_name )
78+
6979 old_unit = self ._current_unit
7080 try :
71- self ._current_unit = response .metadata .units .decode ("utf-8" )
81+ new_unit = response .metadata .units .decode ("utf-8" )
82+ if new_unit is not None :
83+ # we get a unit callback with blank units if the value has updated but the EGU field
84+ # has not.
85+ self ._current_unit = new_unit
7286 except AttributeError :
73- return
74- if old_unit is not None and old_unit != self ._current_unit :
75- self ._logger .error (
87+ self ._current_unit = None
88+
89+ if old_unit != self ._current_unit :
90+ self ._logger .info (
7691 f'Display unit of (ca) PV with name "{ self ._pv_name } " changed from "{ old_unit } " to "{ self ._current_unit } ".'
7792 )
78- if self ._processing_errors_metric :
79- self ._processing_errors_metric .inc ()
93+ for serialiser_tracker in self .serialiser_tracker_list :
94+ # Only let the unit serialiser deal with this update - as it has no value the other
95+ # serialisers will fall over.
96+ if isinstance (serialiser_tracker .serialiser , un00_CASerialiser ):
97+
98+ # The next bit is pretty hacky. We are mocking the ReadNotifyResponse
99+ # as by default its metadata is immutable/read-only, but we need to append the
100+ # timestamp here.
101+ @dataclass
102+ class StupidMetaData :
103+ timestamp : float
104+ units : str
105+
106+ @dataclass
107+ class StupidResponse :
108+ metadata : StupidMetaData
109+
110+
111+ update_time = self ._last_update if self ._last_update > 0 else fallback_timestamp
112+ self ._logger .debug (f"about to publish update. units: { self ._current_unit } , timestamp: { update_time } " )
113+ meta = StupidMetaData (timestamp = update_time , units = self ._current_unit )
114+ response = StupidResponse (metadata = meta )
115+ serialiser_tracker .process_ca_message (response ) # type: ignore
116+
80117
81118 def _monitor_callback (self , sub , response : ReadNotifyResponse ):
119+ self ._logger .debug ("CA Monitor callback called for %s" , self ._pv_name )
120+ try :
121+ self ._last_update = response .metadata .timestamp
122+ except Exception :
123+ self ._logger .warning ("Error getting timestamp for %s" , sub .pv .name )
124+
82125 if self ._receive_latency_metric :
83126 try :
84127 response_timestamp = response .metadata .timestamp .seconds + (
0 commit comments