@@ -2,12 +2,12 @@ use crate::{clients::QueueClient, prelude::*, PopReceipt};
22use azure_core:: {
33 collect_pinned_stream,
44 error:: { ErrorKind , ResultExt } ,
5- headers:: utc_date_from_rfc2822,
65 prelude:: * ,
76 Context , Response as AzureResponse ,
87} ;
98use azure_storage:: core:: { headers:: CommonStorageResponseHeaders , xml:: read_xml} ;
109use chrono:: { DateTime , Utc } ;
10+ use serde:: Deserialize ;
1111use std:: convert:: TryInto ;
1212
1313#[ derive( Debug , Clone ) ]
@@ -25,7 +25,6 @@ impl GetMessagesBuilder {
2525 queue_client,
2626 number_of_messages : None ,
2727 visibility_timeout : None ,
28-
2928 timeout : None ,
3029 context : Context :: new ( ) ,
3130 }
@@ -78,68 +77,105 @@ pub struct GetMessagesResponse {
7877 pub messages : Vec < Message > ,
7978}
8079
81- #[ derive( Debug , Clone ) ]
80+ #[ derive( Debug , Clone , Deserialize ) ]
8281pub struct Message {
83- pub pop_receipt : PopReceipt ,
82+ #[ serde( rename = "MessageId" ) ]
83+ message_id : String ,
84+ #[ serde( rename = "PopReceipt" ) ]
85+ pop_receipt : String ,
86+ #[ serde( rename = "InsertionTime" , deserialize_with = "deserialize_utc" ) ]
8487 pub insertion_time : DateTime < Utc > ,
88+ #[ serde( rename = "ExpirationTime" , deserialize_with = "deserialize_utc" ) ]
8589 pub expiration_time : DateTime < Utc > ,
90+ #[ serde( rename = "TimeNextVisible" , deserialize_with = "deserialize_utc" ) ]
8691 pub time_next_visible : DateTime < Utc > ,
92+ #[ serde( rename = "DequeueCount" ) ]
8793 pub dequeue_count : u64 ,
94+ #[ serde( rename = "MessageText" ) ]
8895 pub message_text : String ,
8996}
9097
91- impl From < Message > for PopReceipt {
92- fn from ( message : Message ) -> Self {
93- message . pop_receipt
98+ impl Message {
99+ pub fn pop_receipt ( & self ) -> PopReceipt {
100+ PopReceipt :: new ( self . message_id . clone ( ) , self . pop_receipt . clone ( ) )
94101 }
95102}
96103
97- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
98- struct MessageInternal {
99- #[ serde( rename = "MessageId" ) ]
100- pub message_id : String ,
101- #[ serde( rename = "InsertionTime" ) ]
102- pub insertion_time : String ,
103- #[ serde( rename = "ExpirationTime" ) ]
104- pub expiration_time : String ,
105- #[ serde( rename = "PopReceipt" ) ]
106- pub pop_receipt : String ,
107- #[ serde( rename = "TimeNextVisible" ) ]
108- pub time_next_visible : String ,
109- #[ serde( rename = "DequeueCount" ) ]
110- pub dequeue_count : u64 ,
111- #[ serde( rename = "MessageText" ) ]
112- pub message_text : String ,
104+ impl From < Message > for PopReceipt {
105+ fn from ( message : Message ) -> Self {
106+ PopReceipt :: new ( message. message_id , message. pop_receipt )
107+ }
113108}
114109
115- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
116- struct MessagesInternal {
110+ #[ derive( Debug , Clone , Deserialize ) ]
111+ struct MessageList {
117112 #[ serde( rename = "QueueMessage" ) ]
118- pub messages : Option < Vec < MessageInternal > > ,
113+ pub messages : Option < Vec < Message > > ,
119114}
120115
121116impl GetMessagesResponse {
117+ fn parse_messages ( body : & [ u8 ] ) -> azure_core:: Result < Vec < Message > > {
118+ let response: MessageList = read_xml ( body) . map_kind ( ErrorKind :: DataConversion ) ?;
119+ Ok ( response. messages . unwrap_or_default ( ) )
120+ }
121+
122122 async fn try_from ( response : AzureResponse ) -> azure_core:: Result < Self > {
123123 let ( _, headers, body) = response. deconstruct ( ) ;
124124 let body = collect_pinned_stream ( body) . await ?;
125125
126- let response: MessagesInternal = read_xml ( & body) . map_kind ( ErrorKind :: DataConversion ) ?;
127-
128- let mut messages = Vec :: new ( ) ;
129- for message in response. messages . unwrap_or_default ( ) . into_iter ( ) {
130- messages. push ( Message {
131- pop_receipt : PopReceipt :: new ( message. message_id , message. pop_receipt ) ,
132- insertion_time : utc_date_from_rfc2822 ( & message. insertion_time ) ?,
133- expiration_time : utc_date_from_rfc2822 ( & message. expiration_time ) ?,
134- time_next_visible : utc_date_from_rfc2822 ( & message. time_next_visible ) ?,
135- dequeue_count : message. dequeue_count ,
136- message_text : message. message_text ,
137- } )
138- }
126+ let messages = Self :: parse_messages ( & body) ?;
139127
140128 Ok ( GetMessagesResponse {
141129 common_storage_response_headers : ( & headers) . try_into ( ) ?,
142130 messages,
143131 } )
144132 }
145133}
134+
135+ fn deserialize_utc < ' de , D > ( deserializer : D ) -> std:: result:: Result < DateTime < Utc > , D :: Error >
136+ where
137+ D : serde:: Deserializer < ' de > ,
138+ {
139+ let s = String :: deserialize ( deserializer) ?;
140+ let date = DateTime :: parse_from_rfc2822 ( & s) . map_err ( serde:: de:: Error :: custom) ?;
141+ Ok ( DateTime :: from_utc ( date. naive_utc ( ) , Utc ) )
142+ }
143+
144+ #[ cfg( test) ]
145+ mod tests {
146+ use super :: * ;
147+
148+ #[ test]
149+ fn test_parse_messages ( ) -> azure_core:: Result < ( ) > {
150+ let message = b"\xef \xbb \xbf \
151+ <?xml version=\" 1.0\" encoding=\" utf-8\" ?>\
152+ <QueueMessagesList><QueueMessage>\
153+ <MessageId>00000000-0000-0000-0000-000000000000</MessageId>\
154+ <InsertionTime>Mon, 27 Jun 2022 13:38:48 GMT</InsertionTime>\
155+ <ExpirationTime>Mon, 04 Jul 2022 13:38:48 GMT</ExpirationTime>\
156+ <PopReceipt>REDACTED1</PopReceipt>\
157+ <TimeNextVisible>Mon, 27 Jun 2022 13:38:53 GMT</TimeNextVisible>\
158+ <DequeueCount>1</DequeueCount>\
159+ <MessageText>test1</MessageText>\
160+ </QueueMessage>\
161+ <QueueMessage>\
162+ <MessageId>11111111-1111-1111-1111-111111111111</MessageId>\
163+ <InsertionTime>Mon, 27 Jun 2022 13:38:48 GMT</InsertionTime>\
164+ <ExpirationTime>Mon, 04 Jul 2022 13:38:48 GMT</ExpirationTime>\
165+ <PopReceipt>REDACTED2</PopReceipt>\
166+ <TimeNextVisible>Mon, 27 Jun 2022 13:38:53 GMT</TimeNextVisible>\
167+ <DequeueCount>1</DequeueCount>\
168+ <MessageText>test2</MessageText>\
169+ </QueueMessage>\
170+ </QueueMessagesList>\
171+ ";
172+
173+ let messages = GetMessagesResponse :: parse_messages ( message) ?;
174+
175+ assert_eq ! ( messages. len( ) , 2 ) ;
176+ assert_eq ! ( messages[ 0 ] . message_text, "test1" ) ;
177+ assert_eq ! ( messages[ 1 ] . message_text, "test2" ) ;
178+
179+ Ok ( ( ) )
180+ }
181+ }
0 commit comments