@@ -8,18 +8,18 @@ use cloudevents::{
88 EventBuilderV10 ,
99} ;
1010use fe2o3_amqp:: { types:: messaging:: Message , Connection , Receiver , Sender , Session } ;
11- use serde_json:: json;
11+ use serde_json:: { json, from_slice , from_str } ;
1212
1313type BoxError = Box < dyn std:: error:: Error > ;
1414type Result < T > = std:: result:: Result < T , BoxError > ;
1515
16- async fn send_event ( sender : & mut Sender , i : usize ) -> Result < ( ) > {
16+ async fn send_event ( sender : & mut Sender , i : usize , value : serde_json :: Value ) -> Result < ( ) > {
1717 let event = EventBuilderV10 :: new ( )
1818 . id ( i. to_string ( ) )
1919 . ty ( "example.test" )
2020 . source ( "localhost" )
2121 . extension ( "ext-name" , "AMQP" )
22- . data ( "application/json" , json ! ( { "hello" : "world" } ) )
22+ . data ( "application/json" , value )
2323 . build ( ) ?;
2424 let event_message = EventMessage :: from_binary_event ( event) ?;
2525 let message = Message :: from ( event_message) ;
@@ -28,9 +28,7 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> {
2828}
2929
3030async fn recv_event ( receiver : & mut Receiver ) -> Result < Event > {
31- use fe2o3_amqp:: types:: primitives:: Value ;
32-
33- let delivery = receiver. recv :: < Value > ( ) . await ?;
31+ let delivery = receiver. recv ( ) . await ?;
3432 receiver. accept ( & delivery) . await ?;
3533
3634 let event_message = EventMessage :: from ( delivery. into_message ( ) ) ;
@@ -50,9 +48,16 @@ async fn main() {
5048 . await
5149 . unwrap ( ) ;
5250
53- send_event ( & mut sender, 1 ) . await . unwrap ( ) ;
51+ let expected = json ! ( { "hello" : "world" } ) ;
52+ send_event ( & mut sender, 1 , expected. clone ( ) ) . await . unwrap ( ) ;
5453 let event = recv_event ( & mut receiver) . await . unwrap ( ) ;
55- println ! ( "{:?}" , event) ;
54+ let data: serde_json:: Value = match event. data ( ) . unwrap ( ) {
55+ cloudevents:: Data :: Binary ( bytes) => from_slice ( bytes) . unwrap ( ) ,
56+ cloudevents:: Data :: String ( s) => from_str ( s) . unwrap ( ) ,
57+ cloudevents:: Data :: Json ( value) => value. clone ( ) ,
58+ } ;
59+
60+ assert_eq ! ( data, expected) ;
5661
5762 sender. close ( ) . await . unwrap ( ) ;
5863 receiver. close ( ) . await . unwrap ( ) ;
0 commit comments