@@ -26,12 +26,13 @@ use matrix_sdk::{
2626 crypto:: store:: types:: RoomKeyInfo ,
2727 deserialized_responses:: TimelineEventKind as SdkTimelineEventKind ,
2828 encryption:: backups:: BackupState ,
29+ event_cache:: { self , RedecryptorReport } ,
2930 event_handler:: EventHandlerHandle ,
3031 executor:: { JoinHandle , spawn} ,
3132} ;
3233use tokio:: sync:: {
3334 RwLock ,
34- mpsc:: { self , Receiver , Sender } ,
35+ mpsc:: { self , Receiver , Sender , UnboundedSender } ,
3536} ;
3637use tokio_stream:: { StreamExt as _, wrappers:: errors:: BroadcastStreamRecvError } ;
3738use tracing:: { Instrument as _, debug, error, field, info, info_span, warn} ;
@@ -69,6 +70,58 @@ impl Drop for CryptoDropHandles {
6970 }
7071}
7172
73+ async fn redecryption_report_task (
74+ stream : impl Stream < Item = Result < RedecryptorReport , BroadcastStreamRecvError > > ,
75+ timeline_controller : TimelineController ,
76+ sender : UnboundedSender < event_cache:: DecryptionRetryRequest > ,
77+ ) {
78+ pin_mut ! ( stream) ;
79+
80+ while let Some ( report) = stream. next ( ) . await {
81+ match report {
82+ Ok ( RedecryptorReport :: ResolvedUtds { events, .. } ) => {
83+ let state = timeline_controller. state . read ( ) . await ;
84+
85+ if let Some ( utd_hook) = & state. meta . unable_to_decrypt_hook {
86+ for event_id in events {
87+ utd_hook. on_late_decrypt ( & event_id) . await ;
88+ }
89+ }
90+ }
91+ Ok ( RedecryptorReport :: Lagging ) | Err ( _) => {
92+ // The room key stream lagged or the OlmMachine got regenerated. Let's tell the
93+ // redecryptor which events we have.
94+ let state = timeline_controller. state . read ( ) . await ;
95+
96+ let ( utds, decrypted) : ( BTreeSet < _ > , BTreeSet < _ > ) = state
97+ . items
98+ . iter ( )
99+ . filter_map ( |event| {
100+ event. as_event ( ) . and_then ( |e| {
101+ let session_id = e. encryption_info ( ) . and_then ( |info| info. session_id ( ) ) ;
102+ session_id. map ( |id| id. to_owned ( ) ) . zip ( Some ( e) )
103+ } )
104+ } )
105+ . partition_map ( |( session_id, event) | {
106+ if event. content . is_unable_to_decrypt ( ) {
107+ Either :: Left ( session_id)
108+ } else {
109+ Either :: Right ( session_id)
110+ }
111+ } ) ;
112+
113+ let message = event_cache:: DecryptionRetryRequest {
114+ room_id : timeline_controller. room ( ) . room_id ( ) . to_owned ( ) ,
115+ utd_session_ids : utds,
116+ refresh_info_session_ids : decrypted,
117+ } ;
118+
119+ let _ = sender. send ( message) ;
120+ }
121+ }
122+ }
123+ }
124+
72125/// The task that handles the room keys from backups.
73126async fn room_keys_from_backups_task < S > ( stream : S , timeline_controller : TimelineController )
74127where
0 commit comments