11use super :: headers;
2- use paho_mqtt:: { Properties , Property , PropertyCode , MessageBuilder } ;
3- use cloudevents:: message:: { BinaryDeserializer , BinarySerializer , MessageAttributeValue , Result ,
4- StructuredDeserializer , StructuredSerializer , Error } ;
5- use cloudevents:: Event ;
62use cloudevents:: event:: SpecVersion ;
3+ use cloudevents:: message:: {
4+ BinaryDeserializer , BinarySerializer , Error , MessageAttributeValue , Result ,
5+ StructuredDeserializer , StructuredSerializer ,
6+ } ;
7+ use cloudevents:: Event ;
8+ use paho_mqtt:: { MessageBuilder , Properties , Property , PropertyCode } ;
79use std:: option:: Option :: Some ;
810
911pub struct MessageRecord {
@@ -22,68 +24,76 @@ impl MessageRecord {
2224
2325 pub fn from_event ( event : Event , version : headers:: MqttVersion ) -> Result < Self > {
2426 match version {
25- headers:: MqttVersion :: V5 => BinaryDeserializer :: deserialize_binary ( event, MessageRecord :: new ( ) ) ,
26- headers:: MqttVersion :: V3_1 => StructuredDeserializer :: deserialize_structured ( event, MessageRecord :: new ( ) ) ,
27- headers:: MqttVersion :: V3_1_1 => StructuredDeserializer :: deserialize_structured ( event, MessageRecord :: new ( ) ) ,
27+ headers:: MqttVersion :: V5 => {
28+ BinaryDeserializer :: deserialize_binary ( event, MessageRecord :: new ( ) )
29+ }
30+ headers:: MqttVersion :: V3_1 => {
31+ StructuredDeserializer :: deserialize_structured ( event, MessageRecord :: new ( ) )
32+ }
33+ headers:: MqttVersion :: V3_1_1 => {
34+ StructuredDeserializer :: deserialize_structured ( event, MessageRecord :: new ( ) )
35+ }
2836 }
2937 }
3038}
3139
3240impl BinarySerializer < MessageRecord > for MessageRecord {
3341 fn set_spec_version ( mut self , spec_version : SpecVersion ) -> Result < Self > {
34- match Property :: new_string_pair ( PropertyCode :: UserProperty , headers:: SPEC_VERSION_HEADER ,
35- spec_version. as_str ( ) ) {
36- Ok ( property) => {
37- match self . headers . push ( property) {
38- Err ( e) => Err ( Error :: Other {
39- source : Box :: new ( e)
40- } ) ,
41- _ => Ok ( self )
42- }
42+ match Property :: new_string_pair (
43+ PropertyCode :: UserProperty ,
44+ headers:: SPEC_VERSION_HEADER ,
45+ spec_version. as_str ( ) ,
46+ ) {
47+ Ok ( property) => match self . headers . push ( property) {
48+ Err ( e) => Err ( Error :: Other {
49+ source : Box :: new ( e) ,
50+ } ) ,
51+ _ => Ok ( self ) ,
4352 } ,
4453 _ => Err ( Error :: UnrecognizedAttributeName {
45- name : headers:: SPEC_VERSION_HEADER . to_string ( )
46- } )
54+ name : headers:: SPEC_VERSION_HEADER . to_string ( ) ,
55+ } ) ,
4756 }
4857 }
4958
5059 fn set_attribute ( mut self , name : & str , value : MessageAttributeValue ) -> Result < Self > {
51- match Property :: new_string_pair ( PropertyCode :: UserProperty , & headers:: ATTRIBUTES_TO_MQTT_HEADERS
52- . get ( name)
53- . ok_or ( cloudevents:: message:: Error :: UnrecognizedAttributeName {
54- name : String :: from ( name) ,
55- } ) ?
56- . clone ( ) [ ..] ,
57- & value. to_string ( ) [ ..] ) {
58- Ok ( property) => {
59- match self . headers . push ( property) {
60- Err ( e) => Err ( Error :: Other {
61- source : Box :: new ( e)
62- } ) ,
63- _ => Ok ( self )
64- }
60+ match Property :: new_string_pair (
61+ PropertyCode :: UserProperty ,
62+ & headers:: ATTRIBUTES_TO_MQTT_HEADERS
63+ . get ( name)
64+ . ok_or ( cloudevents:: message:: Error :: UnrecognizedAttributeName {
65+ name : String :: from ( name) ,
66+ } ) ?
67+ . clone ( ) [ ..] ,
68+ & value. to_string ( ) [ ..] ,
69+ ) {
70+ Ok ( property) => match self . headers . push ( property) {
71+ Err ( e) => Err ( Error :: Other {
72+ source : Box :: new ( e) ,
73+ } ) ,
74+ _ => Ok ( self ) ,
6575 } ,
6676 _ => Err ( Error :: UnrecognizedAttributeName {
67- name : headers:: SPEC_VERSION_HEADER . to_string ( )
68- } )
77+ name : headers:: SPEC_VERSION_HEADER . to_string ( ) ,
78+ } ) ,
6979 }
7080 }
7181
7282 fn set_extension ( mut self , name : & str , value : MessageAttributeValue ) -> Result < Self > {
73- match Property :: new_string_pair ( PropertyCode :: UserProperty ,
74- & attribute_name_to_header ! ( name ) [ .. ] ,
75- & value . to_string ( ) [ ..] ) {
76- Ok ( property ) => {
77- match self . headers . push ( property ) {
78- Err ( e ) => Err ( Error :: Other {
79- source : Box :: new ( e )
80- } ) ,
81- _ => Ok ( self )
82- }
83+ match Property :: new_string_pair (
84+ PropertyCode :: UserProperty ,
85+ & attribute_name_to_header ! ( name ) [ ..] ,
86+ & value . to_string ( ) [ .. ] ,
87+ ) {
88+ Ok ( property ) => match self . headers . push ( property ) {
89+ Err ( e ) => Err ( Error :: Other {
90+ source : Box :: new ( e ) ,
91+ } ) ,
92+ _ => Ok ( self ) ,
8393 } ,
8494 _ => Err ( Error :: UnrecognizedAttributeName {
85- name : headers:: SPEC_VERSION_HEADER . to_string ( )
86- } )
95+ name : headers:: SPEC_VERSION_HEADER . to_string ( ) ,
96+ } ) ,
8797 }
8898 }
8999
@@ -100,14 +110,15 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
100110
101111impl StructuredSerializer < MessageRecord > for MessageRecord {
102112 fn set_structured_event ( mut self , bytes : Vec < u8 > ) -> Result < MessageRecord > {
103- match Property :: new_string_pair ( PropertyCode :: UserProperty ,
104- headers:: CONTENT_TYPE , headers:: CLOUDEVENTS_JSON_HEADER ) {
105- Ok ( property) => {
106- match self . headers . push ( property) {
107- _ => ( )
108- }
113+ match Property :: new_string_pair (
114+ PropertyCode :: UserProperty ,
115+ headers:: CONTENT_TYPE ,
116+ headers:: CLOUDEVENTS_JSON_HEADER ,
117+ ) {
118+ Ok ( property) => match self . headers . push ( property) {
119+ _ => ( ) ,
109120 } ,
110- _ => ( )
121+ _ => ( ) ,
111122 }
112123 self . payload = Some ( bytes) ;
113124
@@ -116,16 +127,11 @@ impl StructuredSerializer<MessageRecord> for MessageRecord {
116127}
117128
118129pub trait MessageBuilderExt {
119- fn message_record (
120- self ,
121- message_record : & MessageRecord ,
122- ) -> MessageBuilder ;
130+ fn message_record ( self , message_record : & MessageRecord ) -> MessageBuilder ;
123131}
124132
125133impl MessageBuilderExt for MessageBuilder {
126- fn message_record ( mut self ,
127- message_record : & MessageRecord
128- ) -> MessageBuilder {
134+ fn message_record ( mut self , message_record : & MessageRecord ) -> MessageBuilder {
129135 self = self . properties ( message_record. headers . clone ( ) ) ;
130136
131137 if let Some ( s) = message_record. payload . as_ref ( ) {
@@ -134,4 +140,4 @@ impl MessageBuilderExt for MessageBuilder {
134140
135141 self
136142 }
137- }
143+ }
0 commit comments