Skip to content

Commit 6c0ea7e

Browse files
committed
optimized conn pool usage
1 parent a35af2e commit 6c0ea7e

File tree

4 files changed

+51
-29
lines changed

4 files changed

+51
-29
lines changed

aerospike-core/src/commands/single_command.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16-
use std::time::Instant;
1716

1817
use crate::cluster::partition::Partition;
1918
use crate::cluster::{Cluster, Node};
@@ -23,6 +22,7 @@ use crate::net::Connection;
2322
use crate::policy::Policy;
2423
use crate::Key;
2524
use aerospike_rt::sleep;
25+
use aerospike_rt::time::Instant;
2626

2727
pub struct SingleCommand<'a> {
2828
cluster: Arc<Cluster>,
@@ -76,20 +76,13 @@ impl<'a> SingleCommand<'a> {
7676
loop {
7777
iterations += 1;
7878

79-
// too many retries
80-
if let Some(max_retries) = policy.max_retries() {
81-
if iterations > max_retries + 1 {
82-
bail!(ErrorKind::Connection(format!(
83-
"Timeout after {} tries",
84-
iterations
85-
)));
86-
}
87-
}
88-
8979
// Sleep before trying again, after the first iteration
9080
if iterations > 1 {
9181
if let Some(sleep_between_retries) = policy.sleep_between_retries() {
9282
sleep(sleep_between_retries).await;
83+
} else {
84+
// yield to free space for the runtime to execute other futures between runs because the loop would block the thread
85+
aerospike_rt::task::yield_now().await;
9386
}
9487
}
9588

aerospike-core/src/net/connection.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@
1313
// License for the specific language governing permissions and limitations under
1414
// the License.
1515

16-
use std::ops::Add;
17-
1816
use crate::commands::admin_command::AdminCommand;
1917
use crate::commands::buffer::Buffer;
20-
use crate::errors::Result;
18+
use crate::errors::{ErrorKind, Result};
2119
use crate::policy::ClientPolicy;
2220
#[cfg(all(any(feature = "rt-async-std"), not(feature = "rt-tokio")))]
2321
use aerospike_rt::async_std::net::Shutdown;
@@ -27,6 +25,7 @@ use aerospike_rt::net::TcpStream;
2725
use aerospike_rt::time::{Duration, Instant};
2826
#[cfg(all(any(feature = "rt-async-std"), not(feature = "rt-tokio")))]
2927
use futures::{AsyncReadExt, AsyncWriteExt};
28+
use std::ops::Add;
3029

3130
#[derive(Debug)]
3231
pub struct Connection {
@@ -46,12 +45,17 @@ pub struct Connection {
4645

4746
impl Connection {
4847
pub async fn new(addr: &str, policy: &ClientPolicy) -> Result<Self> {
49-
let stream = TcpStream::connect(addr).await?;
48+
let stream = aerospike_rt::timeout(Duration::from_secs(10), TcpStream::connect(addr)).await;
49+
if stream.is_err() {
50+
bail!(ErrorKind::Connection(
51+
"Could not open network connection".to_string()
52+
));
53+
}
5054
let mut conn = Connection {
5155
buffer: Buffer::new(policy.buffer_reclaim_threshold),
5256
bytes_read: 0,
5357
timeout: policy.timeout,
54-
conn: stream,
58+
conn: stream.unwrap()?,
5559
idle_timeout: policy.idle_timeout,
5660
idle_deadline: policy.idle_timeout.map(|timeout| Instant::now() + timeout),
5761
};
@@ -60,11 +64,11 @@ impl Connection {
6064
Ok(conn)
6165
}
6266

63-
pub fn close(&mut self) {
67+
pub async fn close(&mut self) {
6468
#[cfg(all(any(feature = "rt-async-std"), not(feature = "rt-tokio")))]
6569
let _s = self.conn.shutdown(Shutdown::Both);
6670
#[cfg(all(any(feature = "rt-tokio"), not(feature = "rt-async-std")))]
67-
let _s = self.conn.shutdown();
71+
let _s = self.conn.shutdown().await;
6872
}
6973

7074
pub async fn flush(&mut self) -> Result<()> {
@@ -77,7 +81,7 @@ impl Connection {
7781
self.buffer.resize_buffer(size)?;
7882
self.conn.read_exact(&mut self.buffer.data_buffer).await?;
7983
self.bytes_read += size;
80-
self.buffer.reset_offset()?;
84+
self.buffer.reset_offset();
8185
self.refresh();
8286
Ok(())
8387
}
@@ -103,7 +107,7 @@ impl Connection {
103107
fn refresh(&mut self) {
104108
self.idle_deadline = None;
105109
if let Some(idle_to) = self.idle_timeout {
106-
self.idle_deadline = Some(Instant::now().add(idle_to))
110+
self.idle_deadline = Some(Instant::now().add(idle_to));
107111
};
108112
}
109113

@@ -112,7 +116,7 @@ impl Connection {
112116
return match AdminCommand::authenticate(self, user, password).await {
113117
Ok(()) => Ok(()),
114118
Err(err) => {
115-
self.close();
119+
self.close().await;
116120
Err(err)
117121
}
118122
};

aerospike-core/src/net/connection_pool.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// License for the specific language governing permissions and limitations under
1414
// the License.
1515

16-
use std::collections::VecDeque;
1716
use std::ops::{Deref, DerefMut, Drop};
1817
use std::sync::atomic::{AtomicUsize, Ordering};
1918
use std::sync::Arc;
@@ -23,6 +22,8 @@ use crate::net::{Connection, Host};
2322
use crate::policy::ClientPolicy;
2423
use futures::executor::block_on;
2524
use futures::lock::Mutex;
25+
use std::collections::VecDeque;
26+
use std::time::Duration;
2627

2728
#[derive(Debug)]
2829
struct IdleConnection(Connection);
@@ -66,17 +67,39 @@ impl Queue {
6667
if let Some(IdleConnection(mut conn)) = internals.connections.pop_front() {
6768
if conn.is_idle() {
6869
internals.num_conns -= 1;
69-
conn.close();
70+
conn.close().await;
7071
continue;
7172
}
7273
connection = conn;
7374
break;
7475
}
76+
7577
if internals.num_conns >= self.0.capacity {
7678
bail!(ErrorKind::NoMoreConnections);
7779
}
78-
let conn = Connection::new(&self.0.host.address(), &self.0.policy).await?;
80+
7981
internals.num_conns += 1;
82+
83+
// Free the lock to prevent deadlocking
84+
drop(internals);
85+
86+
let conn = aerospike_rt::timeout(
87+
Duration::from_secs(5),
88+
Connection::new(&self.0.host.address(), &self.0.policy),
89+
)
90+
.await;
91+
92+
if conn.is_err() {
93+
let mut internals = self.0.internals.lock().await;
94+
internals.num_conns -= 1;
95+
drop(internals);
96+
bail!(ErrorKind::Connection(
97+
"Could not open network connection".to_string()
98+
));
99+
}
100+
101+
let conn = conn.unwrap()?;
102+
80103
connection = conn;
81104
break;
82105
}
@@ -92,7 +115,7 @@ impl Queue {
92115
if internals.num_conns < self.0.capacity {
93116
internals.connections.push_back(IdleConnection(conn));
94117
} else {
95-
conn.close();
118+
conn.close().await;
96119
internals.num_conns -= 1;
97120
}
98121
}
@@ -102,13 +125,13 @@ impl Queue {
102125
let mut internals = self.0.internals.lock().await;
103126
internals.num_conns -= 1;
104127
}
105-
conn.close();
128+
conn.close().await;
106129
}
107130

108131
pub async fn clear(&mut self) {
109132
let mut internals = self.0.internals.lock().await;
110133
for mut conn in internals.connections.drain(..) {
111-
conn.0.close();
134+
conn.0.close().await;
112135
}
113136
internals.num_conns = 0;
114137
}

aerospike-rt/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ compile_error!("Please select a runtime from ['rt-tokio', 'rt-async-std']");
55
compile_error!("Please select only one runtime");
66

77
#[cfg(all(any(feature = "rt-async-std"), not(feature = "rt-tokio")))]
8-
pub use async_std::{self, fs, io, net, sync::RwLock, task, task::sleep, task::spawn};
8+
pub use async_std::{
9+
self, fs, future::timeout, io, net, sync::RwLock, task, task::sleep, task::spawn,
10+
};
911
#[cfg(all(any(feature = "rt-tokio"), not(feature = "rt-async-std")))]
10-
pub use tokio::{self, fs, io, net, spawn, sync::RwLock, task, time, time::sleep};
12+
pub use tokio::{self, fs, io, net, spawn, sync::RwLock, task, time, time::sleep, time::timeout};
1113

1214
#[cfg(all(any(feature = "rt-async-std"), not(feature = "rt-tokio")))]
1315
pub use std::time;

0 commit comments

Comments
 (0)