Skip to content

Commit 613261d

Browse files
authored
add close connections (#181)
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 90edc0b commit 613261d

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

src/producer.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
use dashmap::DashMap;
2-
use futures::{future::BoxFuture, FutureExt};
3-
use rabbitmq_stream_protocol::{message::Message, ResponseCode, ResponseKind};
41
use std::future::Future;
52
use std::vec;
63
use std::{
@@ -11,10 +8,15 @@ use std::{
118
},
129
time::Duration,
1310
};
11+
12+
use dashmap::DashMap;
13+
use futures::{future::BoxFuture, FutureExt};
1414
use tokio::sync::mpsc::channel;
1515
use tokio::sync::{mpsc, Mutex};
1616
use tracing::{debug, error, trace};
1717

18+
use rabbitmq_stream_protocol::{message::Message, ResponseCode, ResponseKind};
19+
1820
use crate::client::ClientMessage;
1921
use crate::MetricsCollector;
2022
use crate::{client::MessageHandler, ClientOptions, RabbitMQStreamResult};
@@ -96,6 +98,7 @@ impl ProducerInternal {
9698
Ok(())
9799
}
98100
}
101+
99102
/// API for publising messages to RabbitMQ stream
100103
#[derive(Clone)]
101104
pub struct Producer<T>(Arc<ProducerInternal>, PhantomData<T>);
@@ -111,7 +114,9 @@ pub struct ProducerBuilder<T> {
111114

112115
#[derive(Clone)]
113116
pub struct NoDedup {}
117+
114118
pub struct Dedup {}
119+
115120
impl<T> ProducerBuilder<T> {
116121
pub async fn build(self, stream: &str) -> Result<Producer<T>, ProducerCreateError> {
117122
// Connect to the user specified node first, then look for the stream leader.
@@ -479,6 +484,7 @@ impl<T> Producer<T> {
479484
Ok(false) => {
480485
let response = self.0.client.delete_publisher(self.0.producer_id).await?;
481486
if response.is_ok() {
487+
self.0.client.close().await?;
482488
Ok(())
483489
} else {
484490
Err(ProducerCloseError::Close {

0 commit comments

Comments
 (0)