55
66use cloudevents:: {
77 binding:: fe2o3_amqp:: EventMessage , message:: MessageDeserializer , Event , EventBuilder ,
8- EventBuilderV10 ,
8+ EventBuilderV10 , AttributesReader , event :: ExtensionValue ,
99} ;
1010use fe2o3_amqp:: { types:: messaging:: Message , Connection , Receiver , Sender , Session } ;
1111use serde_json:: { json, from_slice, from_str} ;
1212
1313type BoxError = Box < dyn std:: error:: Error > ;
1414type Result < T > = std:: result:: Result < T , BoxError > ;
1515
16- async fn send_event ( sender : & mut Sender , i : usize , value : serde_json:: Value ) -> Result < ( ) > {
16+ const EXAMPLE_TYPE : & str = "example.test" ;
17+ const EXAMPLE_SOURCE : & str = "localhost" ;
18+ const EXTENSION_NAME : & str = "ext-name" ;
19+ const EXTENSION_VALUE : & str = "AMQP" ;
20+
21+ async fn send_binary_event ( sender : & mut Sender , i : usize , value : serde_json:: Value ) -> Result < ( ) > {
22+ let event = EventBuilderV10 :: new ( )
23+ . id ( i. to_string ( ) )
24+ . ty ( EXAMPLE_TYPE )
25+ . source ( EXAMPLE_SOURCE )
26+ . extension ( EXTENSION_NAME , EXTENSION_VALUE )
27+ . data ( "application/json" , value)
28+ . build ( ) ?;
29+ let event_message = EventMessage :: from_binary_event ( event) ?;
30+ let message = Message :: from ( event_message) ;
31+ sender. send ( message) . await ?. accepted_or ( "not accepted" ) ?;
32+ Ok ( ( ) )
33+ }
34+
35+ async fn send_structured_event ( sender : & mut Sender , i : usize , value : serde_json:: Value ) -> Result < ( ) > {
1736 let event = EventBuilderV10 :: new ( )
1837 . id ( i. to_string ( ) )
1938 . ty ( "example.test" )
2039 . source ( "localhost" )
2140 . extension ( "ext-name" , "AMQP" )
2241 . data ( "application/json" , value)
2342 . build ( ) ?;
24- let event_message = EventMessage :: from_binary_event ( event) ?;
43+ let event_message = EventMessage :: from_structured_event ( event) ?;
2544 let message = Message :: from ( event_message) ;
2645 sender. send ( message) . await ?. accepted_or ( "not accepted" ) ?;
2746 Ok ( ( ) )
@@ -36,6 +55,15 @@ async fn recv_event(receiver: &mut Receiver) -> Result<Event> {
3655 Ok ( event)
3756}
3857
58+ fn convert_data_into_json_value ( data : & cloudevents:: Data ) -> Result < serde_json:: Value > {
59+ let value = match data {
60+ cloudevents:: Data :: Binary ( bytes) => from_slice ( bytes) ?,
61+ cloudevents:: Data :: String ( s) => from_str ( s) ?,
62+ cloudevents:: Data :: Json ( value) => value. clone ( ) ,
63+ } ;
64+ Ok ( value)
65+ }
66+
3967#[ tokio:: main]
4068async fn main ( ) {
4169 let mut connection =
@@ -49,15 +77,32 @@ async fn main() {
4977 . unwrap ( ) ;
5078
5179 let expected = json ! ( { "hello" : "world" } ) ;
52- send_event ( & mut sender, 1 , expected. clone ( ) ) . await . unwrap ( ) ;
80+
81+ // Binary content mode
82+ send_binary_event ( & mut sender, 1 , expected. clone ( ) ) . await . unwrap ( ) ;
5383 let event = recv_event ( & mut receiver) . await . unwrap ( ) ;
54- let data: serde_json:: Value = match event. data ( ) . unwrap ( ) {
55- cloudevents:: Data :: Binary ( bytes) => from_slice ( bytes) . unwrap ( ) ,
56- cloudevents:: Data :: String ( s) => from_str ( s) . unwrap ( ) ,
57- cloudevents:: Data :: Json ( value) => value. clone ( ) ,
58- } ;
84+ let value = convert_data_into_json_value ( event. data ( ) . unwrap ( ) ) . unwrap ( ) ;
85+ assert_eq ! ( event. id( ) , "1" ) ;
86+ assert_eq ! ( event. ty( ) , EXAMPLE_TYPE ) ;
87+ assert_eq ! ( event. source( ) , EXAMPLE_SOURCE ) ;
88+ match event. extension ( EXTENSION_NAME ) . unwrap ( ) {
89+ ExtensionValue :: String ( value) => assert_eq ! ( value, EXTENSION_VALUE ) ,
90+ _ => panic ! ( "Expect a String" ) ,
91+ }
92+ assert_eq ! ( value, expected) ;
5993
60- assert_eq ! ( data, expected) ;
94+ // Structured content mode
95+ send_structured_event ( & mut sender, 2 , expected. clone ( ) ) . await . unwrap ( ) ;
96+ let event = recv_event ( & mut receiver) . await . unwrap ( ) ;
97+ let value = convert_data_into_json_value ( event. data ( ) . unwrap ( ) ) . unwrap ( ) ;
98+ assert_eq ! ( event. id( ) , "2" ) ;
99+ assert_eq ! ( event. ty( ) , EXAMPLE_TYPE ) ;
100+ assert_eq ! ( event. source( ) , EXAMPLE_SOURCE ) ;
101+ match event. extension ( EXTENSION_NAME ) . unwrap ( ) {
102+ ExtensionValue :: String ( value) => assert_eq ! ( value, EXTENSION_VALUE ) ,
103+ _ => panic ! ( "Expect a String" ) ,
104+ }
105+ assert_eq ! ( value, expected) ;
61106
62107 sender. close ( ) . await . unwrap ( ) ;
63108 receiver. close ( ) . await . unwrap ( ) ;
0 commit comments