Skip to content

Commit a62120b

Browse files
authored
Merge pull request #276 from WenyXu/feat/produce_result
feat: add `ProduceResult` with encoded request size metadata
2 parents 1076b3c + a12992f commit a62120b

File tree

9 files changed

+141
-30
lines changed

9 files changed

+141
-30
lines changed

src/client/controller.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ impl ControllerClient {
7171
let response = broker
7272
.request(request)
7373
.await
74-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
74+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
75+
.response;
7576

7677
maybe_throttle(response.throttle_time_ms)?;
7778

@@ -122,7 +123,8 @@ impl ControllerClient {
122123
let response = broker
123124
.request(request)
124125
.await
125-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
126+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
127+
.response;
126128

127129
maybe_throttle(response.throttle_time_ms)?;
128130

src/client/partition.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::{
22
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
3-
client::error::{Error, RequestContext, Result},
3+
client::{
4+
error::{Error, RequestContext, Result},
5+
producer::ProduceResult,
6+
},
47
connection::{
58
BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
69
MetadataLookupMode,
@@ -15,6 +18,7 @@ use crate::{
1518
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
1619
ListOffsetsResponse, ListOffsetsResponsePartition, NORMAL_CONSUMER, ProduceRequest,
1720
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse,
21+
ResponseBodyWithMetadata,
1822
},
1923
primitives::*,
2024
record::{Record as ProtocolRecord, *},
@@ -195,10 +199,10 @@ impl PartitionClient {
195199
&self,
196200
records: Vec<Record>,
197201
compression: Compression,
198-
) -> Result<Vec<i64>> {
202+
) -> Result<ProduceResult> {
199203
// skip request entirely if `records` is empty
200204
if records.is_empty() {
201-
return Ok(vec![]);
205+
return Ok(ProduceResult::default());
202206
}
203207

204208
let n = records.len() as i64;
@@ -214,13 +218,20 @@ impl PartitionClient {
214218
.get()
215219
.await
216220
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
217-
let response = broker
221+
let ResponseBodyWithMetadata {
222+
response,
223+
encoded_request_size,
224+
} = broker
218225
.request(&request)
219226
.await
220227
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
221228
maybe_throttle(response.throttle_time_ms)?;
222229
process_produce_response(self.partition, &self.topic, n, response)
223230
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
231+
.map(|offsets| ProduceResult {
232+
offsets,
233+
encoded_request_size,
234+
})
224235
},
225236
)
226237
.await
@@ -256,7 +267,8 @@ impl PartitionClient {
256267
let response = broker
257268
.request(&request)
258269
.await
259-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
270+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
271+
.response;
260272
maybe_throttle(response.throttle_time_ms)?;
261273
process_fetch_response(self.partition, &self.topic, response, offset)
262274
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
@@ -292,7 +304,8 @@ impl PartitionClient {
292304
let response = broker
293305
.request(&request)
294306
.await
295-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
307+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
308+
.response;
296309
maybe_throttle(response.throttle_time_ms)?;
297310
process_list_offsets_response(self.partition, &self.topic, response)
298311
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
@@ -325,7 +338,8 @@ impl PartitionClient {
325338
let response = broker
326339
.request(&request)
327340
.await
328-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
341+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
342+
.response;
329343
maybe_throttle(Some(response.throttle_time_ms))?;
330344
process_delete_records_response(&self.topic, self.partition, response)
331345
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))

src/client/producer.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,17 @@ impl BatchProducerBuilder {
300300
}
301301
}
302302

303+
/// The result of the [produce](ProducerClient::produce) call.
304+
#[derive(Debug, Default)]
305+
#[non_exhaustive]
306+
pub struct ProduceResult {
307+
/// The offsets of the produced records.
308+
pub offsets: Vec<i64>,
309+
310+
/// The size of the request encoded in bytes.
311+
pub encoded_request_size: usize,
312+
}
313+
303314
/// The [`ProducerClient`] provides an abstraction over a Kafka client than can
304315
/// produce a record.
305316
///
@@ -316,15 +327,15 @@ pub trait ProducerClient: std::fmt::Debug + Send + Sync {
316327
&self,
317328
records: Vec<Record>,
318329
compression: Compression,
319-
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>>;
330+
) -> BoxFuture<'_, Result<ProduceResult, ClientError>>;
320331
}
321332

322333
impl ProducerClient for PartitionClient {
323334
fn produce(
324335
&self,
325336
records: Vec<Record>,
326337
compression: Compression,
327-
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
338+
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
328339
Box::pin(self.produce(records, compression))
329340
}
330341
}
@@ -703,7 +714,7 @@ mod tests {
703714
&self,
704715
records: Vec<Record>,
705716
_compression: Compression,
706-
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
717+
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
707718
Box::pin(async move {
708719
tokio::time::sleep(self.delay).await;
709720

@@ -727,7 +738,12 @@ mod tests {
727738
.map(|x| (x + offset_base) as i64)
728739
.collect();
729740
batch_sizes.push(records.len());
730-
Ok(offsets)
741+
let record_size = records.iter().map(|r| r.approximate_size()).sum::<usize>();
742+
Ok(ProduceResult {
743+
offsets,
744+
// Uses the approximate size of the records to estimate the size of the request.
745+
encoded_request_size: record_size,
746+
})
731747
})
732748
}
733749
}

src/client/producer/batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ where
158158
async move {
159159
let res = match client.produce(batch, compression).await {
160160
Ok(status) => Ok(Arc::new(AggregatedStatus {
161-
aggregated_status: status,
161+
aggregated_status: status.offsets,
162162
status_deagg,
163163
})),
164164
Err(e) => {

src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ impl RequestHandler for MessengerTransport {
392392
&self,
393393
request_params: &MetadataRequest,
394394
) -> Result<MetadataResponse, RequestError> {
395-
self.request(request_params).await
395+
self.request(request_params).await.map(|r| r.response)
396396
}
397397
}
398398

src/messenger.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use tokio::{
2727
};
2828
use tracing::{debug, info, warn};
2929

30-
use crate::protocol::{messages::ApiVersionsRequest, traits::ReadType};
30+
use crate::protocol::{
31+
messages::{ApiVersionsRequest, ResponseBodyWithMetadata},
32+
traits::ReadType,
33+
};
3134
use crate::{
3235
backoff::ErrorOrThrottle,
3336
protocol::{
@@ -314,7 +317,10 @@ where
314317
self.version_ranges = ranges;
315318
}
316319

317-
pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RequestError>
320+
pub async fn request<R>(
321+
&self,
322+
msg: R,
323+
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
318324
where
319325
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
320326
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
@@ -327,7 +333,7 @@ where
327333
&self,
328334
msg: R,
329335
version_ranges: &HashMap<ApiKey, ApiVersionRange>,
330-
) -> Result<R::ResponseBody, RequestError>
336+
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
331337
where
332338
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
333339
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
@@ -371,6 +377,7 @@ where
371377
.write_versioned(&mut buf, header_version)
372378
.expect("Writing header to buffer should always work");
373379
msg.write_versioned(&mut buf, body_api_version)?;
380+
let encoded_size = buf.len();
374381

375382
let (tx, rx) = channel();
376383

@@ -412,7 +419,10 @@ where
412419
});
413420
}
414421

415-
Ok(body)
422+
Ok(ResponseBodyWithMetadata {
423+
response: body,
424+
encoded_request_size: encoded_size,
425+
})
416426
}
417427

418428
async fn send_message(&self, msg: Vec<u8>) -> Result<(), RequestError> {
@@ -468,7 +478,7 @@ where
468478
.request_with_version_ranges(&body, &version_ranges)
469479
.await
470480
{
471-
Ok(response) => {
481+
Ok(ResponseBodyWithMetadata { response, .. }) => {
472482
if let Err(ErrorOrThrottle::Throttle(throttle)) =
473483
maybe_throttle::<SyncVersionsError>(response.throttle_time_ms)
474484
{
@@ -560,7 +570,7 @@ where
560570
auth_bytes: Vec<u8>,
561571
) -> Result<SaslAuthenticateResponse, SaslError> {
562572
let req = SaslAuthenticateRequest::new(auth_bytes);
563-
let resp = self.request(req).await?;
573+
let resp = self.request(req).await?.response;
564574
if let Some(err) = resp.error_code {
565575
if let Some(s) = resp.error_message.0 {
566576
debug!("Sasl auth error message: {s}");
@@ -573,7 +583,7 @@ where
573583

574584
async fn sasl_handshake(&self, mechanism: &str) -> Result<SaslHandshakeResponse, SaslError> {
575585
let req = SaslHandshakeRequest::new(mechanism);
576-
let resp = self.request(req).await?;
586+
let resp = self.request(req).await?.response;
577587
if let Some(err) = resp.error_code {
578588
return Err(SaslError::ApiError(err));
579589
}
@@ -1184,7 +1194,8 @@ mod tests {
11841194
tagged_fields: Some(TaggedFields::default()),
11851195
})
11861196
.await
1187-
.unwrap();
1197+
.unwrap()
1198+
.response;
11881199
assert_eq!(actual, resp);
11891200
}
11901201

src/protocol/messages/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ impl<W: Write, T: WriteVersionedType<W>> WriteVersionedType<W> for &T {
8686
}
8787
}
8888

89+
/// A response body with metadata about the request & response.
90+
#[derive(Debug)]
91+
pub struct ResponseBodyWithMetadata<R> {
92+
/// The response body.
93+
pub response: R,
94+
95+
/// The size of the request encoded in bytes.
96+
pub encoded_request_size: usize,
97+
}
98+
8999
/// Specifies a request body.
90100
pub trait RequestBody {
91101
/// The response type that will follow when issuing this request.

0 commit comments

Comments
 (0)