11use futures_lite:: StreamExt ;
2- use tokio:: sync:: broadcast;
32
43use crate :: {
54 proto:: {
@@ -8,8 +7,7 @@ use crate::{
87 } ,
98 session:: { channels:: ChannelSenders , static_tokens:: StaticTokens , Error , SessionId } ,
109 store:: {
11- entry:: { EntryChannel , EntryOrigin } ,
12- traits:: Storage ,
10+ traits:: { EntryOrigin , EntryStorage , Storage , StoreEvent , SubscribeParams } ,
1311 Store ,
1412 } ,
1513 util:: stream:: CancelableReceiver ,
@@ -51,29 +49,36 @@ impl<S: Storage> DataSender<S> {
5149 }
5250 }
5351 pub async fn run ( mut self ) -> Result < ( ) , Error > {
54- let mut entry_stream = self . store . entries ( ) . subscribe ( self . session_id ) ;
52+ let mut entry_stream = futures_concurrency :: stream :: StreamGroup :: new ( ) ;
5553 loop {
5654 tokio:: select! {
5755 input = self . inbox. next( ) => {
5856 let Some ( input) = input else {
5957 break ;
6058 } ;
6159 let Input :: AoiIntersection ( intersection) = input;
62- self . store. entries( ) . watch_area(
63- self . session_id,
64- intersection. namespace,
65- intersection. intersection. area. clone( ) ,
66- ) ;
60+ let params = SubscribeParams :: default ( ) . ingest_only( ) . ignore_remote( self . session_id) ;
61+ // TODO: We could start at the progress id at the beginning of the session.
62+ let stream = self
63+ . store
64+ . entries( )
65+ . subscribe_area(
66+ intersection. namespace,
67+ intersection. intersection. area. clone( ) ,
68+ params,
69+ )
70+ . filter_map( |event| match event {
71+ StoreEvent :: Ingested ( _id, entry, _origin) => Some ( entry) ,
72+ // We get only Ingested events because we set ingest_only() param above.
73+ _ => unreachable!( "expected only Ingested event but got another event" ) ,
74+ } ) ;
75+ entry_stream. insert( stream) ;
6776 } ,
68- entry = entry_stream. recv ( ) => {
77+ entry = entry_stream. next ( ) , if !entry_stream . is_empty ( ) => {
6978 match entry {
70- Ok ( entry) => self . send_entry( entry) . await ?,
71- Err ( broadcast:: error:: RecvError :: Closed ) => break ,
72- Err ( broadcast:: error:: RecvError :: Lagged ( _count) ) => {
73- // TODO: Queue another reconciliation
74- }
79+ Some ( entry) => self . send_entry( entry) . await ?,
80+ None => break ,
7581 }
76-
7782 }
7883 }
7984 }
@@ -149,13 +154,9 @@ impl<S: Storage> DataReceiver<S> {
149154 message. dynamic_token ,
150155 )
151156 . await ?;
152- self . store . entries ( ) . ingest (
153- & authorised_entry,
154- EntryOrigin :: Remote {
155- session : self . session_id ,
156- channel : EntryChannel :: Data ,
157- } ,
158- ) ?;
157+ self . store
158+ . entries ( )
159+ . ingest_entry ( & authorised_entry, EntryOrigin :: Remote ( self . session_id ) ) ?;
159160 let ( entry, _token) = authorised_entry. into_parts ( ) ;
160161 // TODO: handle offset
161162 self . current_payload . set (
0 commit comments