1+ use crate :: headers;
2+ use cloudevents:: event:: SpecVersion ;
3+ use cloudevents:: message:: { Result , BinarySerializer , BinaryDeserializer , MessageAttributeValue ,
4+ MessageDeserializer , Encoding , StructuredSerializer , StructuredDeserializer } ;
5+ use cloudevents:: { message, Event } ;
6+ use paho_mqtt:: { Message , PropertyCode } ;
7+ use std:: collections:: HashMap ;
8+ use std:: convert:: TryFrom ;
9+ use std:: str;
10+
11+ pub struct ConsumerMessageDeserializer {
12+ pub ( crate ) headers : HashMap < String , Vec < u8 > > ,
13+ pub ( crate ) payload : Option < Vec < u8 > > ,
14+ }
15+
16+ impl ConsumerMessageDeserializer {
17+ fn get_mqtt_headers ( message : & Message ) -> Result < HashMap < String , Vec < u8 > > > {
18+ let mut hm = HashMap :: new ( ) ;
19+ let prop_iterator = message. properties ( ) . iter ( PropertyCode :: UserProperty ) ;
20+
21+ for property in prop_iterator {
22+ let header = property. get_string_pair ( ) . unwrap ( ) ;
23+ hm. insert ( header. 0 . to_string ( ) , Vec :: from ( header. 1 ) ) ;
24+ }
25+
26+ Ok ( hm)
27+ }
28+
29+ pub fn new ( message : & Message ) -> Result < ConsumerMessageDeserializer > {
30+ Ok ( ConsumerMessageDeserializer {
31+ headers : Self :: get_mqtt_headers ( message) ?,
32+ payload : Some ( message. payload ( ) ) . map ( |s| Vec :: from ( s) ) ,
33+ } )
34+ }
35+ }
36+
37+ impl BinaryDeserializer for ConsumerMessageDeserializer {
38+ fn deserialize_binary < R : Sized , V : BinarySerializer < R > > ( mut self , mut visitor : V ) -> Result < R > {
39+ if self . encoding ( ) != Encoding :: BINARY {
40+ return Err ( message:: Error :: WrongEncoding { } )
41+ }
42+
43+ let spec_version = SpecVersion :: try_from (
44+ str:: from_utf8 ( & self . headers . remove ( headers:: SPEC_VERSION_HEADER ) . unwrap ( ) [ ..] )
45+ . map_err ( |e| cloudevents:: message:: Error :: Other {
46+ source : Box :: new ( e) ,
47+ } ) ?,
48+ ) ?;
49+
50+ visitor = visitor. set_spec_version ( spec_version. clone ( ) ) ?;
51+
52+ let attributes = spec_version. attribute_names ( ) ;
53+
54+ if let Some ( hv) = self . headers . remove ( headers:: CONTENT_TYPE ) {
55+ visitor = visitor. set_attribute (
56+ "datacontenttype" ,
57+ MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
58+ cloudevents:: message:: Error :: Other {
59+ source : Box :: new ( e) ,
60+ }
61+ } ) ?) ,
62+ ) ?
63+ }
64+
65+ for ( hn, hv) in self
66+ . headers
67+ . into_iter ( )
68+ . filter ( |( hn, _) | headers:: SPEC_VERSION_HEADER != * hn && hn. starts_with ( "ce_" ) )
69+ {
70+ let name = & hn[ "ce_" . len ( ) ..] ;
71+
72+ if attributes. contains ( & name) {
73+ visitor = visitor. set_attribute (
74+ name,
75+ MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
76+ cloudevents:: message:: Error :: Other {
77+ source : Box :: new ( e) ,
78+ }
79+ } ) ?) ,
80+ ) ?
81+ } else {
82+ visitor = visitor. set_extension (
83+ name,
84+ MessageAttributeValue :: String ( String :: from_utf8 ( hv) . map_err ( |e| {
85+ cloudevents:: message:: Error :: Other {
86+ source : Box :: new ( e) ,
87+ }
88+ } ) ?) ,
89+ ) ?
90+ }
91+ }
92+
93+ if self . payload != None {
94+ visitor. end_with_data ( self . payload . unwrap ( ) )
95+ } else {
96+ visitor. end ( )
97+ }
98+ }
99+ }
100+
101+ impl StructuredDeserializer for ConsumerMessageDeserializer {
102+ fn deserialize_structured < R : Sized , V : StructuredSerializer < R > > ( self , visitor : V ) -> Result < R > {
103+ visitor. set_structured_event ( self . payload . unwrap ( ) )
104+ }
105+ }
106+
107+ impl MessageDeserializer for ConsumerMessageDeserializer {
108+ fn encoding ( & self ) -> Encoding {
109+ match (
110+ self . headers
111+ . get ( "content-type" )
112+ . map ( |s| String :: from_utf8 ( s. to_vec ( ) ) . ok ( ) )
113+ . flatten ( )
114+ . map ( |s| s. starts_with ( headers:: CLOUDEVENTS_JSON_HEADER ) )
115+ . unwrap_or ( false ) ,
116+ self . headers . get ( headers:: SPEC_VERSION_HEADER ) ,
117+ ) {
118+ ( true , _) => Encoding :: STRUCTURED ,
119+ ( _, Some ( _) ) => Encoding :: BINARY ,
120+ _ => Encoding :: UNKNOWN ,
121+ }
122+ }
123+ }
124+
125+ pub fn record_to_event ( msg : & Message , version : headers:: MqttVersion ) -> Result < Event > {
126+ match version {
127+ headers:: MqttVersion :: V5 => BinaryDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?) ,
128+ headers:: MqttVersion :: V3_1 => StructuredDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?) ,
129+ headers:: MqttVersion :: V3_1_1 => StructuredDeserializer :: into_event ( ConsumerMessageDeserializer :: new ( msg) ?) ,
130+ }
131+ }
132+
133+ pub trait MessageExt {
134+ fn to_event ( & self , version : headers:: MqttVersion ) -> Result < Event > ;
135+ }
136+
137+ impl MessageExt for Message {
138+ fn to_event ( & self , version : headers:: MqttVersion ) -> Result < Event > {
139+ record_to_event ( self , version)
140+ }
141+ }
142+
143+ #[ cfg( test) ]
144+ mod tests {
145+ use super :: * ;
146+ use crate :: mqtt_producer_record:: MessageRecord ;
147+
148+ use chrono:: Utc ;
149+ use cloudevents:: { EventBuilder , EventBuilderV10 } ;
150+ use crate :: MessageBuilderExt ;
151+ use serde_json:: json;
152+ use cloudevents:: event:: Data ;
153+
154+ #[ test]
155+ fn test_binary_record ( ) {
156+ let time = Utc :: now ( ) ;
157+
158+ let expected = EventBuilderV10 :: new ( )
159+ . id ( "0001" )
160+ . ty ( "example.test" )
161+ . time ( time)
162+ . source ( "http://localhost" )
163+ . data ( "application/json" ,
164+ Data :: Binary ( String :: from ( "{\" hello\" :\" world\" }" ) . into_bytes ( ) ) )
165+ . extension ( "someint" , "10" )
166+ . build ( )
167+ . unwrap ( ) ;
168+
169+ let message_record = MessageRecord :: from_event (
170+ EventBuilderV10 :: new ( )
171+ . id ( "0001" )
172+ . ty ( "example.test" )
173+ . time ( time)
174+ . source ( "http://localhost" )
175+ . extension ( "someint" , "10" )
176+ . data ( "application/json" , json ! ( { "hello" : "world" } ) )
177+ . build ( )
178+ . unwrap ( ) ,
179+ headers:: MqttVersion :: V5 ,
180+ )
181+ . unwrap ( ) ;
182+
183+ let msg = MessageBuilder :: new ( )
184+ . topic ( "test" )
185+ . message_record ( & message_record)
186+ . qos ( 1 )
187+ . finalize ( ) ;
188+
189+ assert_eq ! ( msg. to_event( headers:: MqttVersion :: V5 ) . unwrap( ) , expected)
190+ }
191+
192+ #[ test]
193+ fn test_structured_record ( ) {
194+ let j = json ! ( { "hello" : "world" } ) ;
195+
196+ let expected = EventBuilderV10 :: new ( )
197+ . id ( "0001" )
198+ . ty ( "example.test" )
199+ . source ( "http://localhost" )
200+ . data ( "application/cloudevents+json" , j. clone ( ) )
201+ . extension ( "someint" , "10" )
202+ . build ( )
203+ . unwrap ( ) ;
204+
205+ let input = EventBuilderV10 :: new ( )
206+ . id ( "0001" )
207+ . ty ( "example.test" )
208+ . source ( "http://localhost" )
209+ . data ( "application/cloudevents+json" , j. clone ( ) )
210+ . extension ( "someint" , "10" )
211+ . build ( )
212+ . unwrap ( ) ;
213+
214+ let serialized_event =
215+ StructuredDeserializer :: deserialize_structured ( input, MessageRecord :: new ( ) ) . unwrap ( ) ;
216+
217+ let msg = MessageBuilder :: new ( )
218+ . topic ( "test" )
219+ . message_record ( & serialized_event)
220+ . qos ( 1 )
221+ . finalize ( ) ;
222+
223+ assert_eq ! ( msg. to_event( headers:: MqttVersion :: V3_1_1 ) . unwrap( ) , expected)
224+ }
225+ }
0 commit comments