@@ -18,6 +18,18 @@ enum {
1818 IDLE_FROM_RUNNING = 10 + static_cast <int >(NodeStatus::RUNNING)
1919};
2020
21+ struct Transition
22+ {
23+ // when serializing, we will remove the initial time and serialize only
24+ // 6 bytes, instead of 8
25+ uint64_t timestamp_usec;
26+ // if you have more than 64.000 nodes, you are doing something wrong :)
27+ uint16_t node_uid;
28+ // enough bits to contain NodeStatus
29+ uint8_t status;
30+
31+ uint8_t padding[5 ];
32+ };
2133
2234std::array<char ,16 > CreateRandomUUID ()
2335{
@@ -82,6 +94,10 @@ struct Groot2Publisher::PImpl
8294 std::chrono::system_clock::time_point last_heartbeat;
8395 std::chrono::milliseconds max_heartbeat_delay = std::chrono::milliseconds(5000 );
8496
97+ std::atomic_bool recording = false ;
98+ std::deque<Transition> transitions_buffer;
99+ std::chrono::microseconds recording_fist_time;
100+
85101 std::thread heartbeat_thread;
86102
87103 zmq::context_t context;
@@ -183,7 +199,7 @@ Groot2Publisher::~Groot2Publisher()
183199 }
184200}
185201
186- void Groot2Publisher::callback (Duration, const TreeNode& node,
202+ void Groot2Publisher::callback (Duration ts , const TreeNode& node,
187203 NodeStatus prev_status, NodeStatus new_status)
188204{
189205 std::unique_lock<std::mutex> lk (_p->status_mutex );
@@ -193,6 +209,20 @@ void Groot2Publisher::callback(Duration, const TreeNode& node,
193209 status = 10 + static_cast <char >(prev_status);
194210 }
195211 *(_p->status_buffermap .at (node.UID ())) = status;
212+
213+ if (_p->recording )
214+ {
215+ Transition trans;
216+ trans.node_uid = node.UID ();
217+ trans.status = static_cast <uint8_t >(new_status);
218+ auto timestamp = ts -_p->recording_fist_time ;
219+ trans.timestamp_usec =
220+ std::chrono::duration_cast<std::chrono::microseconds>(timestamp).count ();
221+ _p->transitions_buffer .push_back (trans);
222+ while (_p->transitions_buffer .size () > 1000 ) {
223+ _p->transitions_buffer .pop_front ();
224+ }
225+ }
196226}
197227
198228void Groot2Publisher::flush ()
@@ -239,6 +269,7 @@ void Groot2Publisher::serverLoop()
239269
240270 Monitor::ReplyHeader reply_header;
241271 reply_header.request = request_header;
272+ reply_header.request .protocol = Monitor::kProtocolID ;
242273 reply_header.tree_id = serialized_uuid;
243274
244275 zmq::multipart_t reply_msg;
@@ -383,6 +414,54 @@ void Groot2Publisher::serverLoop()
383414 }
384415 reply_msg.addstr ( json_out.dump () );
385416 } break ;
417+
418+ case Monitor::RequestType::TOGGLE_RECORDING:
419+ {
420+ if (requestMsg.size () != 2 ) {
421+ sendErrorReply (" must be 2 parts message" );
422+ continue ;
423+ }
424+
425+ auto const cmd = (requestMsg[1 ].to_string ());
426+ if (cmd == " start" )
427+ {
428+ _p->recording = true ;
429+ auto now = std::chrono::system_clock::now ();
430+
431+ _p->recording_fist_time = std::chrono::duration_cast<std::chrono::microseconds>
432+ (now.time_since_epoch ());
433+
434+ reply_msg.addstr (std::to_string (_p->recording_fist_time .count ()));
435+ std::unique_lock<std::mutex> lk (_p->status_mutex );
436+ _p->transitions_buffer .clear ();
437+ }
438+ else if (cmd == " stop" )
439+ {
440+ _p->recording = false ;
441+ }
442+ } break ;
443+
444+ case Monitor::RequestType::GET_TRANSITIONS:
445+ {
446+ thread_local std::string trans_buffer;
447+ trans_buffer.resize (9 * _p->transitions_buffer .size ());
448+
449+ std::unique_lock<std::mutex> lk (_p->status_mutex );
450+ size_t offset = 0 ;
451+ for (const auto & trans: _p->transitions_buffer )
452+ {
453+ std::memcpy (&trans_buffer[offset], &trans.timestamp_usec , 6 );
454+ offset += 6 ;
455+ std::memcpy (&trans_buffer[offset], &trans.node_uid , 2 );
456+ offset += 2 ;
457+ std::memcpy (&trans_buffer[offset], &trans.status , 1 );
458+ offset += 1 ;
459+ }
460+ _p->transitions_buffer .clear ();
461+ trans_buffer.resize (offset);
462+ reply_msg.addstr (trans_buffer);
463+ } break ;
464+
386465 default : {
387466 sendErrorReply (" Request not recognized" );
388467 continue ;
0 commit comments