@@ -7,11 +7,18 @@ use crate::metric::{Metric, SortedTags};
77use datadog_protos:: metrics:: SketchPayload ;
88use tokio:: sync:: { mpsc, oneshot} ;
99use tracing:: { debug, error, warn} ;
10+ use ustr:: Ustr ;
1011
1112#[ derive( Debug ) ]
1213pub enum AggregatorCommand {
1314 InsertBatch ( Vec < Metric > ) ,
1415 Flush ( oneshot:: Sender < FlushResponse > ) ,
16+ GetEntryById {
17+ name : Ustr ,
18+ tags : Option < SortedTags > ,
19+ timestamp : i64 ,
20+ response_tx : oneshot:: Sender < Option < Metric > > ,
21+ } ,
1522 Shutdown ,
1623}
1724
@@ -45,6 +52,27 @@ impl AggregatorHandle {
4552 . map_err ( |e| format ! ( "Failed to receive flush response: {}" , e) )
4653 }
4754
55+ pub async fn get_entry_by_id (
56+ & self ,
57+ name : Ustr ,
58+ tags : Option < SortedTags > ,
59+ timestamp : i64 ,
60+ ) -> Result < Option < Metric > , String > {
61+ let ( response_tx, response_rx) = oneshot:: channel ( ) ;
62+ self . tx
63+ . send ( AggregatorCommand :: GetEntryById {
64+ name,
65+ tags,
66+ timestamp,
67+ response_tx,
68+ } )
69+ . map_err ( |e| format ! ( "Failed to send get_entry_by_id command: {}" , e) ) ?;
70+
71+ response_rx
72+ . await
73+ . map_err ( |e| format ! ( "Failed to receive get_entry_by_id response: {}" , e) )
74+ }
75+
4876 pub fn shutdown ( & self ) -> Result < ( ) , mpsc:: error:: SendError < AggregatorCommand > > {
4977 self . tx . send ( AggregatorCommand :: Shutdown )
5078 }
@@ -102,6 +130,19 @@ impl AggregatorService {
102130 }
103131 }
104132
133+ AggregatorCommand :: GetEntryById {
134+ name,
135+ tags,
136+ timestamp,
137+ response_tx,
138+ } => {
139+ let entry = self . aggregator . get_entry_by_id ( name, & tags, timestamp) ;
140+ let response = entry. cloned ( ) ;
141+ if let Err ( _) = response_tx. send ( response) {
142+ error ! ( "Failed to send get_entry_by_id response - receiver dropped" ) ;
143+ }
144+ }
145+
105146 AggregatorCommand :: Shutdown => {
106147 debug ! ( "Aggregator service shutting down" ) ;
107148 break ;
@@ -174,4 +215,44 @@ mod tests {
174215 handle. shutdown ( ) . expect ( "Failed to shutdown" ) ;
175216 service_task. await . expect ( "Service task failed" ) ;
176217 }
218+
219+ #[ tokio:: test]
220+ async fn test_aggregator_service_get_entry_by_id ( ) {
221+ use ustr:: ustr;
222+
223+ let ( service, handle) =
224+ AggregatorService :: new ( EMPTY_TAGS , 1000 ) . expect ( "Failed to create aggregator service" ) ;
225+
226+ let service_task = tokio:: spawn ( service. run ( ) ) ;
227+
228+ let metric = parse ( "test_metric:42|c|#env:prod" ) . expect ( "metric parse failed" ) ;
229+ let metric_name = metric. name ;
230+ let metric_tags = metric. tags . clone ( ) ;
231+ let metric_timestamp = metric. timestamp ;
232+
233+ handle
234+ . insert_batch ( vec ! [ metric. clone( ) ] )
235+ . expect ( "Failed to insert metric" ) ;
236+
237+ let result = handle
238+ . get_entry_by_id ( metric_name, metric_tags. clone ( ) , metric_timestamp)
239+ . await
240+ . expect ( "Failed to get entry" ) ;
241+
242+ assert ! ( result. is_some( ) ) ;
243+ let retrieved_metric = result. unwrap ( ) ;
244+ assert_eq ! ( retrieved_metric. name, metric_name) ;
245+ assert_eq ! ( retrieved_metric. timestamp, metric_timestamp) ;
246+
247+ let non_existent = handle
248+ . get_entry_by_id ( ustr ( "non_existent" ) , None , 0 )
249+ . await
250+ . expect ( "Failed to get entry" ) ;
251+
252+ assert ! ( non_existent. is_none( ) ) ;
253+
254+ // Shutdown the service
255+ handle. shutdown ( ) . expect ( "Failed to shutdown" ) ;
256+ service_task. await . expect ( "Service task failed" ) ;
257+ }
177258}
0 commit comments