@@ -4,7 +4,7 @@ use cloudevents::message::{
44 BinaryDeserializer , BinarySerializer , Encoding , MessageAttributeValue , MessageDeserializer ,
55 Result , StructuredDeserializer , StructuredSerializer ,
66} ;
7- use cloudevents:: { message, Event } ;
7+ use cloudevents:: { message, Data , Event } ;
88use paho_mqtt:: { Message , PropertyCode } ;
99use std:: collections:: HashMap ;
1010use std:: convert:: TryFrom ;
@@ -16,7 +16,7 @@ pub struct ConsumerMessageDeserializer {
1616}
1717
1818impl ConsumerMessageDeserializer {
19- fn get_mqtt_headers ( message : & Message ) -> Result < HashMap < String , Vec < u8 > > > {
19+ fn get_mqtt_headers ( message : & Message ) -> HashMap < String , Vec < u8 > > {
2020 let mut hm = HashMap :: new ( ) ;
2121 let prop_iterator = message. properties ( ) . iter ( PropertyCode :: UserProperty ) ;
2222
@@ -25,12 +25,12 @@ impl ConsumerMessageDeserializer {
2525 hm. insert ( header. 0 . to_string ( ) , Vec :: from ( header. 1 ) ) ;
2626 }
2727
28- Ok ( hm )
28+ hm
2929 }
3030
3131 pub fn new ( message : & Message ) -> Result < ConsumerMessageDeserializer > {
3232 Ok ( ConsumerMessageDeserializer {
33- headers : Self :: get_mqtt_headers ( message) ? ,
33+ headers : Self :: get_mqtt_headers ( message) ,
3434 payload : Some ( message. payload ( ) ) . map ( |s| Vec :: from ( s) ) ,
3535 } )
3636 }
@@ -110,22 +110,27 @@ impl MessageDeserializer for ConsumerMessageDeserializer {
110110 fn encoding ( & self ) -> Encoding {
111111 match (
112112 self . headers
113- . get ( "content-type" )
113+ . get ( headers :: CONTENT_TYPE )
114114 . map ( |s| String :: from_utf8 ( s. to_vec ( ) ) . ok ( ) )
115115 . flatten ( )
116116 . map ( |s| s. starts_with ( headers:: CLOUDEVENTS_JSON_HEADER ) )
117117 . unwrap_or ( false ) ,
118- self . headers . get ( headers:: SPEC_VERSION_HEADER ) ,
118+ self . headers . get ( headers:: MQTT_VERSION_HEADER )
119+ . map ( |s| String :: from_utf8 ( s. to_vec ( ) ) . ok ( ) )
120+ . flatten ( )
121+ . map ( |s| s. eq ( headers:: MQTT_V5_BINARY ) )
122+ . unwrap_or ( false ) ,
119123 ) {
120- ( true , _ ) => Encoding :: STRUCTURED ,
121- ( _, Some ( _ ) ) => Encoding :: BINARY ,
122- _ => Encoding :: UNKNOWN ,
124+ ( true , true ) => Encoding :: STRUCTURED ,
125+ ( _, true ) => Encoding :: BINARY ,
126+ _ => Encoding :: STRUCTURED ,
123127 }
124128 }
125129}
126130
127- pub fn record_to_event ( msg : & Message , version : headers:: MqttVersion ) -> Result < Event > {
128- match version {
131+ pub fn record_to_event ( msg : & Message ) -> Result < Event > {
132+ MessageDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?)
133+ /* match version {
129134 headers::MqttVersion::V5 => {
130135 BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
131136 }
@@ -135,16 +140,16 @@ pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<E
135140 headers::MqttVersion::V3_1_1 => {
136141 StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
137142 }
138- }
143+ }*/
139144}
140145
141146pub trait MessageExt {
142- fn to_event ( & self , version : headers :: MqttVersion ) -> Result < Event > ;
147+ fn to_event ( & self ) -> Result < Event > ;
143148}
144149
145150impl MessageExt for Message {
146- fn to_event ( & self , version : headers :: MqttVersion ) -> Result < Event > {
147- record_to_event ( self , version )
151+ fn to_event ( & self ) -> Result < Event > {
152+ record_to_event ( self )
148153 }
149154}
150155
@@ -155,7 +160,6 @@ mod tests {
155160
156161 use crate :: MessageBuilderExt ;
157162 use chrono:: Utc ;
158- use cloudevents:: event:: Data ;
159163 use cloudevents:: { EventBuilder , EventBuilderV10 } ;
160164 use paho_mqtt:: MessageBuilder ;
161165 use serde_json:: json;
@@ -170,10 +174,10 @@ mod tests {
170174 . time ( time)
171175 . source ( "http://localhost" )
172176 . data (
173- "application/json " ,
174- Data :: Binary ( String :: from ( "{ \" hello\" : \" world \" } " ) . into_bytes ( ) ) ,
177+ "application/octet-stream " ,
178+ Data :: Binary ( String :: from ( "hello rust " ) . into_bytes ( ) ) ,
175179 )
176- . extension ( "someint " , "10" )
180+ . extension ( "mqttversion " , headers :: MQTT_V5_BINARY )
177181 . build ( )
178182 . unwrap ( ) ;
179183
@@ -183,11 +187,12 @@ mod tests {
183187 . ty ( "example.test" )
184188 . time ( time)
185189 . source ( "http://localhost" )
186- . extension ( "someint" , "10" )
187- . data ( "application/json" , json ! ( { "hello" : "world" } ) )
190+ . extension ( "mqttversion" , headers:: MQTT_V5_BINARY )
191+ . data (
192+ "application/octet-stream" ,
193+ Data :: Binary ( String :: from ( "hello rust" ) . into_bytes ( ) ) )
188194 . build ( )
189195 . unwrap ( ) ,
190- headers:: MqttVersion :: V5 ,
191196 )
192197 . unwrap ( ) ;
193198
@@ -197,7 +202,7 @@ mod tests {
197202 . qos ( 1 )
198203 . finalize ( ) ;
199204
200- assert_eq ! ( msg. to_event( headers :: MqttVersion :: V5 ) . unwrap( ) , expected)
205+ assert_eq ! ( msg. to_event( ) . unwrap( ) , expected)
201206 }
202207
203208 #[ test]
@@ -209,7 +214,6 @@ mod tests {
209214 . ty ( "example.test" )
210215 . source ( "http://localhost" )
211216 . data ( "application/cloudevents+json" , j. clone ( ) )
212- . extension ( "someint" , "10" )
213217 . build ( )
214218 . unwrap ( ) ;
215219
@@ -218,7 +222,6 @@ mod tests {
218222 . ty ( "example.test" )
219223 . source ( "http://localhost" )
220224 . data ( "application/cloudevents+json" , j. clone ( ) )
221- . extension ( "someint" , "10" )
222225 . build ( )
223226 . unwrap ( ) ;
224227
@@ -232,7 +235,7 @@ mod tests {
232235 . finalize ( ) ;
233236
234237 assert_eq ! (
235- msg. to_event( headers :: MqttVersion :: V3_1_1 ) . unwrap( ) ,
238+ msg. to_event( ) . unwrap( ) ,
236239 expected
237240 )
238241 }
0 commit comments