Skip to content

Commit 17e855c

Browse files
authored
fix: meta-semaphore: re-connect when no event recevied (#18690)
Refactor permit implementation to properly handle acquirer closure and connection errors. The permit now uses oneshot channels to detect when the underlying permit entry is removed from meta-service, replacing the previous BoxFuture. Because a future that is not polled will block the channel. If the meta-service does not send an event for `permit_ttl * 1.5`, the connection will be re-established. Add `AcquirerClosed` error type for client side close error.
1 parent 84bce6a commit 17e855c

File tree

13 files changed

+715
-53
lines changed

13 files changed

+715
-53
lines changed

src/meta/binaries/metabench/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,14 +412,14 @@ async fn benchmark_semaphore(
412412

413413
let permit_str = format!("({sem_key}, id={id})");
414414

415-
let mut sem = Semaphore::new(client.clone(), &sem_key, param.capacity).await;
415+
let mut sem = Semaphore::new(client.clone(), &sem_key, param.capacity, param.ttl()).await;
416416
if param.time_based {
417417
sem.set_time_based_seq(None);
418418
} else {
419419
sem.set_storage_based_seq();
420420
}
421421

422-
let permit_res = sem.acquire(&id, param.ttl()).await;
422+
let permit_res = sem.acquire(&id).await;
423423

424424
print_sem_res(i, format!("sem-acquired: {permit_str}",), &permit_res);
425425

src/meta/semaphore/src/acquirer/permit.rs

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414

1515
use std::future::Future;
1616

17-
use futures::future::BoxFuture;
17+
use databend_common_base::runtime::spawn_named;
1818
use futures::FutureExt;
1919
use log::debug;
2020
use log::info;
21+
use log::warn;
2122
use tokio::sync::mpsc;
2223
use tokio::sync::oneshot;
2324

2425
use crate::acquirer::Acquirer;
2526
use crate::acquirer::SharedAcquirerStat;
26-
use crate::errors::ConnectionClosed;
2727
use crate::queue::PermitEvent;
2828
use crate::storage::PermitEntry;
2929
use crate::storage::PermitKey;
@@ -43,7 +43,15 @@ use crate::storage::PermitKey;
4343
pub struct Permit {
4444
pub acquirer_name: String,
4545
pub stat: SharedAcquirerStat,
46-
pub fu: BoxFuture<'static, Result<(), ConnectionClosed>>,
46+
47+
// Hold it so that the subscriber keep running.
48+
pub(crate) _subscriber_cancel_tx: oneshot::Sender<()>,
49+
50+
// Hold it so that the lease extending task keep running.
51+
pub(crate) _leaser_cancel_tx: oneshot::Sender<()>,
52+
53+
/// Gets ready if the [`PermitEntry`] is removed from meta-service.
54+
pub(crate) is_removed_rx: oneshot::Receiver<()>,
4755
}
4856

4957
impl std::fmt::Debug for Permit {
@@ -68,13 +76,16 @@ impl Drop for Permit {
6876
}
6977

7078
impl Future for Permit {
71-
type Output = Result<(), ConnectionClosed>;
79+
type Output = ();
7280

7381
fn poll(
7482
mut self: std::pin::Pin<&mut Self>,
7583
cx: &mut std::task::Context<'_>,
7684
) -> std::task::Poll<Self::Output> {
77-
self.fu.poll_unpin(cx)
85+
match self.is_removed_rx.poll_unpin(cx) {
86+
std::task::Poll::Ready(_) => std::task::Poll::Ready(()),
87+
std::task::Poll::Pending => std::task::Poll::Pending,
88+
}
7889
}
7990
}
8091

@@ -89,18 +100,26 @@ impl Permit {
89100
permit_entry: PermitEntry,
90101
leaser_cancel_tx: oneshot::Sender<()>,
91102
) -> Self {
103+
let (is_removed_tx, is_removed_rx) = oneshot::channel::<()>();
104+
105+
// There must be a standalone task that consumes incoming events so that it won't block the permit_event_rx sender
106+
let acquirer_name = acquirer.name.clone();
92107
let fu = Self::watch_for_remove(
93108
acquirer.permit_event_rx,
109+
acquirer_name.clone(),
94110
permit_key,
95111
permit_entry,
96-
acquirer.subscriber_cancel_tx,
97-
leaser_cancel_tx,
112+
is_removed_tx,
98113
);
99114

115+
spawn_named(fu, format!("{}-WatchRemove", acquirer_name.clone()));
116+
100117
Permit {
101-
acquirer_name: acquirer.name,
118+
acquirer_name,
102119
stat: acquirer.stat,
103-
fu: Box::pin(fu),
120+
_subscriber_cancel_tx: acquirer.subscriber_cancel_tx,
121+
_leaser_cancel_tx: leaser_cancel_tx,
122+
is_removed_rx,
104123
}
105124
}
106125

@@ -120,30 +139,26 @@ impl Permit {
120139
/// in such case, the permit may still be valid.
121140
pub(crate) async fn watch_for_remove(
122141
mut permit_event_rx: mpsc::Receiver<PermitEvent>,
142+
acquirer_name: String,
123143
permit_key: PermitKey,
124144
permit_entry: PermitEntry,
125-
_subscriber_cancel_tx: oneshot::Sender<()>,
126-
_leaser_cancel_tx: oneshot::Sender<()>,
127-
) -> Result<(), ConnectionClosed> {
128-
let ctx = format!("Semaphore-Acquired: {}->{}", permit_key, permit_entry);
145+
is_removed_tx: oneshot::Sender<()>,
146+
) {
147+
let ctx = format!("{}: {}->{}", acquirer_name, permit_key, permit_entry);
129148

130149
while let Some(sem_event) = permit_event_rx.recv().await {
131-
debug!("semaphore event: {} received by: {}", sem_event, ctx);
150+
debug!("{}: received semaphore event: {}", ctx, sem_event);
132151

133152
match sem_event {
134153
PermitEvent::Acquired((_seq, _entry)) => {}
135154
PermitEvent::Removed((seq, _)) => {
136155
if seq == permit_key.seq {
137-
debug!(
138-
"semaphore PermitEntry is removed: {}->{}",
139-
permit_key, permit_entry.id
140-
);
141-
return Ok(());
156+
warn!("{}: PermitEntry is removed", ctx);
157+
is_removed_tx.send(()).ok();
158+
return;
142159
}
143160
}
144161
}
145162
}
146-
147-
Err(ConnectionClosed::new_str("semaphore event stream closed").context(&ctx))
148163
}
149164
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
17+
/// Error indicating that a semaphore acquirer or permit has been closed.
18+
///
19+
/// This occurs when the acquirer instance becomes unavailable due to connection loss,
20+
/// explicit closure, permit expiration, or meta-service unavailability.
21+
#[derive(thiserror::Error, Debug)]
22+
pub struct AcquirerClosed {
23+
reason: String,
24+
when: Vec<String>,
25+
}
26+
27+
impl AcquirerClosed {
28+
/// Creates a new acquirer closed error with the specified reason.
29+
pub fn new(reason: impl ToString) -> Self {
30+
AcquirerClosed {
31+
reason: reason.to_string(),
32+
when: vec![],
33+
}
34+
}
35+
36+
/// Adds contextual information about when or where the error occurred.
37+
///
38+
/// Context is displayed in the order it was added and enables method chaining.
39+
pub fn context(mut self, context: impl ToString) -> Self {
40+
self.when.push(context.to_string());
41+
self
42+
}
43+
}
44+
45+
impl fmt::Display for AcquirerClosed {
46+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47+
write!(
48+
f,
49+
"distributed-Semaphore Acquirer or Permit is closed: {}",
50+
self.reason
51+
)?;
52+
53+
if self.when.is_empty() {
54+
return Ok(());
55+
}
56+
57+
write!(f, "; when: (")?;
58+
59+
for (i, when) in self.when.iter().enumerate() {
60+
if i > 0 {
61+
write!(f, "; ")?;
62+
}
63+
write!(f, "{}", when)?;
64+
}
65+
66+
write!(f, ")")
67+
}
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use super::*;
73+
74+
#[test]
75+
fn test_basic_error_creation() {
76+
let err = AcquirerClosed::new("Connection timeout");
77+
assert_eq!(
78+
err.to_string(),
79+
"distributed-Semaphore Acquirer or Permit is closed: Connection timeout"
80+
);
81+
}
82+
83+
#[test]
84+
fn test_error_with_context_chain() {
85+
let err = AcquirerClosed::new("Network unreachable")
86+
.context("while acquiring read permit")
87+
.context("during database query");
88+
assert_eq!(
89+
err.to_string(),
90+
"distributed-Semaphore Acquirer or Permit is closed: Network unreachable; when: (while acquiring read permit; during database query)"
91+
);
92+
}
93+
94+
#[test]
95+
fn test_error_without_context() {
96+
let err = AcquirerClosed::new("Simple error");
97+
assert_eq!(
98+
err.to_string(),
99+
"distributed-Semaphore Acquirer or Permit is closed: Simple error"
100+
);
101+
}
102+
}

src/meta/semaphore/src/errors/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
// limitations under the License.
1414

1515
mod acquire_error;
16+
mod acquirer_closed;
1617
mod connection_closed;
1718
mod early_removed;
1819
mod either;
20+
mod processer_error;
1921

2022
pub use acquire_error::AcquireError;
23+
pub use acquirer_closed::AcquirerClosed;
2124
pub use connection_closed::ConnectionClosed;
2225
pub use early_removed::EarlyRemoved;
2326
pub use either::Either;
27+
pub use processer_error::ProcessorError;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use tonic::Status;
16+
17+
use crate::errors::AcquirerClosed;
18+
use crate::errors::ConnectionClosed;
19+
20+
/// Errors during semaphore permit processing.
21+
#[derive(thiserror::Error, Debug)]
22+
pub enum ProcessorError {
23+
/// Connection to meta-service was lost.
24+
#[error("ProcessorError: {0}")]
25+
ConnectionClosed(#[from] ConnectionClosed),
26+
27+
/// Acquirer or Permit was dropped and there is no receiving end to receive the event.
28+
#[error("ProcessorError: the semaphore Acquirer or Permit is dropped {0}")]
29+
AcquirerClosed(#[from] AcquirerClosed),
30+
}
31+
32+
impl From<Status> for ProcessorError {
33+
fn from(status: Status) -> Self {
34+
ProcessorError::ConnectionClosed(status.into())
35+
}
36+
}

src/meta/semaphore/src/meta_event_subscriber/processor.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use tokio::sync::mpsc;
2121

2222
#[cfg(doc)]
2323
use crate::acquirer::Acquirer;
24+
use crate::errors::AcquirerClosed;
2425
use crate::errors::ConnectionClosed;
26+
use crate::errors::ProcessorError;
2527
use crate::queue::PermitEvent;
2628
use crate::queue::SemaphoreQueue;
2729
use crate::PermitEntry;
@@ -34,18 +36,21 @@ pub(crate) struct Processor {
3436
pub(crate) queue: SemaphoreQueue,
3537

3638
/// The channel to send the [`PermitEvent`] to the [`Acquirer`].
37-
pub(crate) tx: mpsc::Sender<PermitEvent>,
39+
///
40+
/// When the [`Acquirer`] acquired a [`Permit`], this channel is no longer used,
41+
/// because the receiving end may not be polled and sending to it may block forever.
42+
pub(crate) tx_to_acquirer: mpsc::Sender<PermitEvent>,
3843

3944
/// Contains descriptive information about the context of this watcher.
4045
pub(crate) watcher_name: String,
4146
}
4247

4348
impl Processor {
4449
/// Create a new [`Processor`] instance with given permits capacity and the channel to send the [`PermitEvent`] to the [`Acquirer`].
45-
pub(crate) fn new(capacity: u64, tx: mpsc::Sender<PermitEvent>) -> Self {
50+
pub(crate) fn new(capacity: u64, tx_to_acquirer: mpsc::Sender<PermitEvent>) -> Self {
4651
Self {
4752
queue: SemaphoreQueue::new(capacity),
48-
tx,
53+
tx_to_acquirer,
4954
watcher_name: "".to_string(),
5055
}
5156
}
@@ -60,22 +65,24 @@ impl Processor {
6065
pub(crate) async fn process_watch_response(
6166
&mut self,
6267
watch_response: WatchResponse,
63-
) -> Result<(), ConnectionClosed> {
68+
) -> Result<(), ProcessorError> {
6469
let Some((key, prev, current)) =
6570
Self::decode_watch_response(watch_response, &self.watcher_name)?
6671
else {
6772
return Ok(());
6873
};
6974

70-
self.process_kv_change(key, prev, current).await
75+
self.process_kv_change(key, prev, current).await?;
76+
77+
Ok(())
7178
}
7279

7380
async fn process_kv_change(
7481
&mut self,
7582
sem_key: PermitKey,
7683
prev: Option<SeqV<PermitEntry>>,
7784
current: Option<SeqV<PermitEntry>>,
78-
) -> Result<(), ConnectionClosed> {
85+
) -> Result<(), AcquirerClosed> {
7986
log::debug!(
8087
"{} processing kv change: {}: {:?} -> {:?}",
8188
self.watcher_name,
@@ -108,8 +115,8 @@ impl Processor {
108115
for event in state_changes {
109116
log::debug!("{} sending event: {}", self.watcher_name, event);
110117

111-
self.tx.send(event).await.map_err(|e| {
112-
ConnectionClosed::new_str(format!("Semaphore-Watcher fail to send {}", e.0))
118+
self.tx_to_acquirer.send(event).await.map_err(|e| {
119+
AcquirerClosed::new(format!("Semaphore-Watcher fail to send {}", e.0))
113120
.context(&self.watcher_name)
114121
})?;
115122
}

0 commit comments

Comments
 (0)