@@ -5,90 +5,59 @@ use cloudevents::message::{
55 Result , StructuredDeserializer , StructuredSerializer ,
66} ;
77use cloudevents:: { message, Event } ;
8- use paho_mqtt:: { Message , PropertyCode } ;
9- use std:: collections:: HashMap ;
8+ use paho_mqtt:: { Message , Properties , PropertyCode } ;
109use std:: convert:: TryFrom ;
11- use std:: str;
1210
13- pub struct ConsumerMessageDeserializer {
14- pub ( crate ) headers : HashMap < String , Vec < u8 > > ,
11+ pub struct ConsumerMessageDeserializer < ' a > {
12+ pub ( crate ) headers : & ' a Properties ,
1513 pub ( crate ) payload : Option < Vec < u8 > > ,
1614}
1715
18- impl ConsumerMessageDeserializer {
19- fn get_mqtt_headers ( message : & Message ) -> Result < HashMap < String , Vec < u8 > > > {
20- let mut hm = HashMap :: new ( ) ;
21- let prop_iterator = message. properties ( ) . iter ( PropertyCode :: UserProperty ) ;
22-
23- for property in prop_iterator {
24- let header = property. get_string_pair ( ) . unwrap ( ) ;
25- hm. insert ( header. 0 . to_string ( ) , Vec :: from ( header. 1 ) ) ;
26- }
27-
28- Ok ( hm)
16+ impl < ' a > ConsumerMessageDeserializer < ' a > {
17+ fn get_mqtt_headers ( message : & Message ) -> & Properties {
18+ message. properties ( )
2919 }
3020
3121 pub fn new ( message : & Message ) -> Result < ConsumerMessageDeserializer > {
3222 Ok ( ConsumerMessageDeserializer {
33- headers : Self :: get_mqtt_headers ( message) ? ,
23+ headers : Self :: get_mqtt_headers ( message) ,
3424 payload : Some ( message. payload ( ) ) . map ( |s| Vec :: from ( s) ) ,
3525 } )
3626 }
3727}
3828
39- impl BinaryDeserializer for ConsumerMessageDeserializer {
40- fn deserialize_binary < R : Sized , V : BinarySerializer < R > > ( mut self , mut visitor : V ) -> Result < R > {
29+ impl < ' a > BinaryDeserializer for ConsumerMessageDeserializer < ' a > {
30+ fn deserialize_binary < R : Sized , V : BinarySerializer < R > > ( self , mut visitor : V ) -> Result < R > {
4131 if self . encoding ( ) != Encoding :: BINARY {
4232 return Err ( message:: Error :: WrongEncoding { } ) ;
4333 }
4434
4535 let spec_version = SpecVersion :: try_from (
46- str :: from_utf8 ( & self . headers . remove ( headers :: SPEC_VERSION_HEADER ) . unwrap ( ) [ .. ] )
47- . map_err ( |e| cloudevents :: message :: Error :: Other {
48- source : Box :: new ( e ) ,
49- } ) ? ,
36+ self . headers
37+ . find_user_property ( headers :: SPEC_VERSION_HEADER )
38+ . unwrap ( )
39+ . as_str ( ) ,
5040 ) ?;
5141
5242 visitor = visitor. set_spec_version ( spec_version. clone ( ) ) ?;
5343
5444 let attributes = spec_version. attribute_names ( ) ;
5545
56- if let Some ( hv) = self . headers . remove ( headers:: CONTENT_TYPE ) {
57- visitor = visitor. set_attribute (
58- "datacontenttype" ,
59- MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
60- cloudevents:: message:: Error :: Other {
61- source : Box :: new ( e) ,
62- }
63- } ) ?) ,
64- ) ?
46+ if let Some ( hv) = self . headers . find_user_property ( headers:: CONTENT_TYPE ) {
47+ visitor = visitor. set_attribute ( "datacontenttype" , MessageAttributeValue :: String ( hv) ) ?
6548 }
6649
6750 for ( hn, hv) in self
6851 . headers
69- . into_iter ( )
52+ . user_iter ( )
7053 . filter ( |( hn, _) | headers:: SPEC_VERSION_HEADER != * hn && hn. starts_with ( "ce_" ) )
7154 {
7255 let name = & hn[ "ce_" . len ( ) ..] ;
7356
7457 if attributes. contains ( & name) {
75- visitor = visitor. set_attribute (
76- name,
77- MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
78- cloudevents:: message:: Error :: Other {
79- source : Box :: new ( e) ,
80- }
81- } ) ?) ,
82- ) ?
58+ visitor = visitor. set_attribute ( name, MessageAttributeValue :: String ( hv) ) ?
8359 } else {
84- visitor = visitor. set_extension (
85- name,
86- MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
87- cloudevents:: message:: Error :: Other {
88- source : Box :: new ( e) ,
89- }
90- } ) ?) ,
91- ) ?
60+ visitor = visitor. set_extension ( name, MessageAttributeValue :: String ( hv) ) ?
9261 }
9362 }
9463
@@ -100,51 +69,32 @@ impl BinaryDeserializer for ConsumerMessageDeserializer {
10069 }
10170}
10271
103- impl StructuredDeserializer for ConsumerMessageDeserializer {
72+ impl < ' a > StructuredDeserializer for ConsumerMessageDeserializer < ' a > {
10473 fn deserialize_structured < R : Sized , V : StructuredSerializer < R > > ( self , visitor : V ) -> Result < R > {
10574 visitor. set_structured_event ( self . payload . unwrap ( ) )
10675 }
10776}
10877
109- impl MessageDeserializer for ConsumerMessageDeserializer {
78+ impl < ' a > MessageDeserializer for ConsumerMessageDeserializer < ' a > {
11079 fn encoding ( & self ) -> Encoding {
111- match (
112- self . headers
113- . get ( "content-type" )
114- . map ( |s| String :: from_utf8 ( s. to_vec ( ) ) . ok ( ) )
115- . flatten ( )
116- . map ( |s| s. starts_with ( headers:: CLOUDEVENTS_JSON_HEADER ) )
117- . unwrap_or ( false ) ,
118- self . headers . get ( headers:: SPEC_VERSION_HEADER ) ,
119- ) {
120- ( true , _) => Encoding :: STRUCTURED ,
121- ( _, Some ( _) ) => Encoding :: BINARY ,
122- _ => Encoding :: UNKNOWN ,
80+ match self . headers . iter ( PropertyCode :: UserProperty ) . count ( ) == 0 {
81+ true => Encoding :: STRUCTURED ,
82+ false => Encoding :: BINARY ,
12383 }
12484 }
12585}
12686
127- pub fn record_to_event ( msg : & Message , version : headers:: MqttVersion ) -> Result < Event > {
128- match version {
129- headers:: MqttVersion :: V5 => {
130- BinaryDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?)
131- }
132- headers:: MqttVersion :: V3_1 => {
133- StructuredDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?)
134- }
135- headers:: MqttVersion :: V3_1_1 => {
136- StructuredDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?)
137- }
138- }
87+ pub fn record_to_event ( msg : & Message ) -> Result < Event > {
88+ MessageDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?)
13989}
14090
14191pub trait MessageExt {
142- fn to_event ( & self , version : headers :: MqttVersion ) -> Result < Event > ;
92+ fn to_event ( & self ) -> Result < Event > ;
14393}
14494
14595impl MessageExt for Message {
146- fn to_event ( & self , version : headers :: MqttVersion ) -> Result < Event > {
147- record_to_event ( self , version )
96+ fn to_event ( & self ) -> Result < Event > {
97+ record_to_event ( self )
14898 }
14999}
150100
0 commit comments