@@ -875,7 +875,11 @@ impl KVStore for TestStore {
875875 let secondary_namespace = secondary_namespace. to_string ( ) ;
876876 let key = key. to_string ( ) ;
877877 let inner = Arc :: clone ( & self . inner ) ;
878- Box :: pin ( async move { inner. read_internal ( & primary_namespace, & secondary_namespace, & key) } )
878+ let fut =
879+ Box :: pin (
880+ async move { inner. read_internal ( & primary_namespace, & secondary_namespace, & key) } ,
881+ ) ;
882+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
879883 }
880884 fn write (
881885 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
@@ -884,9 +888,10 @@ impl KVStore for TestStore {
884888 let secondary_namespace = secondary_namespace. to_string ( ) ;
885889 let key = key. to_string ( ) ;
886890 let inner = Arc :: clone ( & self . inner ) ;
887- Box :: pin ( async move {
891+ let fut = Box :: pin ( async move {
888892 inner. write_internal ( & primary_namespace, & secondary_namespace, & key, buf)
889- } )
893+ } ) ;
894+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
890895 }
891896 fn remove (
892897 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
@@ -895,17 +900,20 @@ impl KVStore for TestStore {
895900 let secondary_namespace = secondary_namespace. to_string ( ) ;
896901 let key = key. to_string ( ) ;
897902 let inner = Arc :: clone ( & self . inner ) ;
898- Box :: pin ( async move {
903+ let fut = Box :: pin ( async move {
899904 inner. remove_internal ( & primary_namespace, & secondary_namespace, & key, lazy)
900- } )
905+ } ) ;
906+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
901907 }
902908 fn list (
903909 & self , primary_namespace : & str , secondary_namespace : & str ,
904910 ) -> Pin < Box < dyn Future < Output = Result < Vec < String > , io:: Error > > + ' static + Send > > {
905911 let primary_namespace = primary_namespace. to_string ( ) ;
906912 let secondary_namespace = secondary_namespace. to_string ( ) ;
907913 let inner = Arc :: clone ( & self . inner ) ;
908- Box :: pin ( async move { inner. list_internal ( & primary_namespace, & secondary_namespace) } )
914+ let fut =
915+ Box :: pin ( async move { inner. list_internal ( & primary_namespace, & secondary_namespace) } ) ;
916+ Box :: pin ( async move { TestStoreFuture :: new ( Box :: pin ( fut) ) . await } )
909917 }
910918}
911919
@@ -933,6 +941,37 @@ impl KVStoreSync for TestStore {
933941 }
934942}
935943
944+ // A `Future` wrapper that returns `Pending` once before actually polling the (likely sync) future.
945+ pub ( crate ) struct TestStoreFuture < F : Future < Output = Result < R , io:: Error > > + Unpin , R > {
946+ future : Mutex < Option < F > > ,
947+ first_poll : AtomicBool ,
948+ }
949+
950+ impl < F : Future < Output = Result < R , io:: Error > > + Unpin , R > TestStoreFuture < F , R > {
951+ fn new ( fut : F ) -> Self {
952+ let future = Mutex :: new ( Some ( fut) ) ;
953+ let first_poll = AtomicBool :: new ( true ) ;
954+ Self { future, first_poll }
955+ }
956+ }
957+
958+ impl < F : Future < Output = Result < R , io:: Error > > + Unpin , R > Future for TestStoreFuture < F , R > {
959+ type Output = Result < R , io:: Error > ;
960+ fn poll (
961+ self : Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ,
962+ ) -> core:: task:: Poll < Self :: Output > {
963+ if self . first_poll . swap ( false , Ordering :: Relaxed ) {
964+ core:: task:: Poll :: Pending
965+ } else {
966+ if let Some ( mut fut) = self . future . lock ( ) . unwrap ( ) . take ( ) {
967+ Pin :: new ( & mut fut) . poll ( cx)
968+ } else {
969+ unreachable ! ( "We should never poll more than twice" ) ;
970+ }
971+ }
972+ }
973+ }
974+
936975struct TestStoreInner {
937976 persisted_bytes : Mutex < HashMap < String , HashMap < String , Vec < u8 > > > > ,
938977 read_only : bool ,
0 commit comments