Skip to content

Commit d3c1399

Browse files
committed
store: Make the worker futures in copy Send
1 parent 1d17d73 commit d3c1399

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

store/postgres/src/copy.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -868,14 +868,16 @@ impl CopyTableWorker {
868868
}
869869
}
870870

871+
type WorkerFuture = Pin<Box<dyn Future<Output = WorkerResult> + Send>>;
872+
871873
/// A helper to manage the workers that are copying data. Besides the actual
872874
/// workers it also keeps a worker that wakes us up periodically to give us
873875
/// a chance to create more workers if there are database connections
874876
/// available
875877
struct Workers {
876878
/// The list of workers that are currently running. This will always
877879
/// include a future that wakes us up periodically
878-
futures: Vec<Pin<Box<dyn Future<Output = WorkerResult>>>>,
880+
futures: Vec<WorkerFuture>,
879881
}
880882

881883
impl Workers {
@@ -885,7 +887,7 @@ impl Workers {
885887
}
886888
}
887889

888-
fn add(&mut self, worker: Pin<Box<dyn Future<Output = WorkerResult>>>) {
890+
fn add(&mut self, worker: WorkerFuture) {
889891
self.futures.push(worker);
890892
}
891893

@@ -908,7 +910,7 @@ impl Workers {
908910
result
909911
}
910912

911-
fn waker() -> Pin<Box<dyn Future<Output = WorkerResult>>> {
913+
fn waker() -> WorkerFuture {
912914
let sleep = tokio::time::sleep(ENV_VARS.store.batch_target_duration);
913915
Box::pin(sleep.map(|()| WorkerResult::Wake))
914916
}
@@ -1057,7 +1059,7 @@ impl Connection {
10571059
&mut self,
10581060
state: &mut CopyState,
10591061
progress: &Arc<CopyProgress>,
1060-
) -> Option<Pin<Box<dyn Future<Output = WorkerResult>>>> {
1062+
) -> Option<WorkerFuture> {
10611063
let Some(conn) = self.conn.take() else {
10621064
return None;
10631065
};
@@ -1079,7 +1081,7 @@ impl Connection {
10791081
&mut self,
10801082
state: &mut CopyState,
10811083
progress: &Arc<CopyProgress>,
1082-
) -> Option<Pin<Box<dyn Future<Output = WorkerResult>>>> {
1084+
) -> Option<WorkerFuture> {
10831085
// It's important that we get the connection before the table since
10841086
// we remove the table from the state and could drop it otherwise
10851087
let Some(conn) = self

0 commit comments

Comments
 (0)