1515//! The Redecryptor (Rd) is a layer and long-running background task which
1616//! handles redecryption of events in case we couldn't decrypt them imediatelly.
1717//!
18- //! Rd listens to the OlmMachine for received room keys. If a new room key has
19- //! been received it attempts to find any UTDs in the [`EventCache`]. If Rd
20- //! decrypts any UTDs from the event cache it will replace the events in the
21- //! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners.
18+ //! Rd listens to the OlmMachine for received room keys and new
19+ //! m.room_key.withheld events.
20+ //!
21+ //! If a new room key has been received it attempts to find any UTDs in the
22+ //! [`EventCache`]. If Rd decrypts any UTDs from the event cache it will replace
23+ //! the events in the cache and send out new [`RoomEventCacheUpdates`] to any of
24+ //! its listeners.
25+ //!
26+ //! If a new withheld info has been received it attempts to find any relevant
27+ //! events and updates the [`EncryptionInfo`] of an event.
2228//!
2329//! There's an additional gotcha, the [`OlmMachine`] might get recreated by
2430//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive
@@ -70,7 +76,10 @@ use futures_util::{StreamExt, pin_mut};
7076#[ cfg( doc) ]
7177use matrix_sdk_base:: { BaseClient , crypto:: OlmMachine } ;
7278use matrix_sdk_base:: {
73- crypto:: { store:: types:: RoomKeyInfo , types:: events:: room:: encrypted:: EncryptedEvent } ,
79+ crypto:: {
80+ store:: types:: { RoomKeyInfo , RoomKeyWithheldInfo } ,
81+ types:: events:: room:: encrypted:: EncryptedEvent ,
82+ } ,
7483 deserialized_responses:: { DecryptedRoomEvent , TimelineEvent , TimelineEventKind } ,
7584 locks:: Mutex ,
7685} ;
@@ -155,7 +164,7 @@ impl EventCache {
155164 room_id : & RoomId ,
156165 session_id : SessionId < ' _ > ,
157166 ) -> Result < Vec < ( OwnedEventId , Raw < AnySyncTimelineEvent > ) > , EventCacheError > {
158- let map_timeline_event = |event : TimelineEvent | {
167+ let filter = |event : TimelineEvent | {
159168 let event_id = event. event_id ( ) ;
160169
161170 // Only pick out events that are UTDs, get just the Raw event as this is what
@@ -167,13 +176,31 @@ impl EventCache {
167176 event_id. zip ( event)
168177 } ;
169178
170- // Load the relevant events from the event cache store and attempt to redecrypt
171- // things.
172179 let store = self . inner . store . lock ( ) . await ?;
173180 let events =
174181 store. get_room_events ( room_id, Some ( "m.room.encrypted" ) , Some ( session_id) ) . await ?;
175182
176- Ok ( events. into_iter ( ) . filter_map ( map_timeline_event) . collect ( ) )
183+ Ok ( events. into_iter ( ) . filter_map ( filter) . collect ( ) )
184+ }
185+
186+ async fn get_decrypted_events (
187+ & self ,
188+ room_id : & RoomId ,
189+ session_id : SessionId < ' _ > ,
190+ ) -> Result < Vec < ( OwnedEventId , DecryptedRoomEvent ) > , EventCacheError > {
191+ let filter = |event : TimelineEvent | {
192+ let event_id = event. event_id ( ) ;
193+
194+ let event = as_variant ! ( event. kind, TimelineEventKind :: Decrypted ( event) => event) ;
195+ // Zip the event ID and event together so we don't have to pick out the event ID
196+ // again. We need the event ID to replace the event in the cache.
197+ event_id. zip ( event)
198+ } ;
199+
200+ let store = self . inner . store . lock ( ) . await ?;
201+ let events = store. get_room_events ( room_id, None , Some ( session_id) ) . await ?;
202+
203+ Ok ( events. into_iter ( ) . filter_map ( filter) . collect ( ) )
177204 }
178205
179206 /// Handle a chunk of events that we were previously unable to decrypt but
@@ -289,6 +316,38 @@ impl EventCache {
289316 Ok ( ( ) )
290317 }
291318
319+ async fn update_encryption_info (
320+ & self ,
321+ room_id : & RoomId ,
322+ session_id : SessionId < ' _ > ,
323+ ) -> Result < ( ) , EventCacheError > {
324+ let client = self . inner . client ( ) . ok ( ) . unwrap ( ) ;
325+ let room = client. get_room ( room_id) . unwrap ( ) ;
326+
327+ // Get all the relevant events.
328+ let events = self . get_decrypted_events ( room_id, session_id) . await ?;
329+
330+ // Let's attempt to update their encryption info.
331+ let mut updated_events = Vec :: with_capacity ( events. len ( ) ) ;
332+
333+ for ( event_id, mut event) in events {
334+ let new_encryption_info =
335+ room. get_encryption_info ( session_id, & event. encryption_info . sender ) . await ;
336+
337+ // Only create a replacement if the encryption info actually changed.
338+ if let Some ( new_encryption_info) = new_encryption_info
339+ && event. encryption_info != new_encryption_info
340+ {
341+ event. encryption_info = new_encryption_info;
342+ updated_events. push ( ( event_id, event) ) ;
343+ }
344+ }
345+
346+ self . on_resolved_utds ( room_id, updated_events) . await ?;
347+
348+ Ok ( ( ) )
349+ }
350+
292351 /// Explicitly request the redecryption of a set of events.
293352 ///
294353 /// TODO: Explain when and why this might be useful.
@@ -343,12 +402,17 @@ impl Redecryptor {
343402 /// the sending part of the stream has been dropped.
344403 async fn subscribe_to_room_key_stream (
345404 cache : & Weak < EventCacheInner > ,
346- ) -> Option < impl Stream < Item = Result < Vec < RoomKeyInfo > , BroadcastStreamRecvError > > > {
405+ ) -> Option < (
406+ impl Stream < Item = Result < Vec < RoomKeyInfo > , BroadcastStreamRecvError > > ,
407+ impl Stream < Item = Vec < RoomKeyWithheldInfo > > ,
408+ ) > {
347409 let event_cache = cache. upgrade ( ) ?;
348410 let client = event_cache. client ( ) . ok ( ) ?;
349411 let machine = client. olm_machine ( ) . await ;
350412
351- machine. as_ref ( ) . map ( |m| m. store ( ) . room_keys_received_stream ( ) )
413+ machine. as_ref ( ) . map ( |m| {
414+ ( m. store ( ) . room_keys_received_stream ( ) , m. store ( ) . room_keys_withheld_received_stream ( ) )
415+ } )
352416 }
353417
354418 fn upgrade_event_cache ( cache : & Weak < EventCacheInner > ) -> Option < EventCache > {
@@ -359,11 +423,14 @@ impl Redecryptor {
359423 cache : & Weak < EventCacheInner > ,
360424 decryption_request_stream : & mut Pin < & mut impl Stream < Item = DecryptionRetryRequest > > ,
361425 ) -> bool {
362- let Some ( room_key_stream) = Self :: subscribe_to_room_key_stream ( cache) . await else {
426+ let Some ( ( room_key_stream, withheld_stream) ) =
427+ Self :: subscribe_to_room_key_stream ( cache) . await
428+ else {
363429 return false ;
364430 } ;
365431
366432 pin_mut ! ( room_key_stream) ;
433+ pin_mut ! ( withheld_stream) ;
367434
368435 loop {
369436 tokio:: select! {
@@ -374,14 +441,23 @@ impl Redecryptor {
374441 break false ;
375442 } ;
376443
444+ trace!( ?request, "Received a redecryption request" ) ;
445+
377446 for session_id in request. utd_session_ids {
378447 let _ = cache
379448 . retry_decryption( & request. room_id, & session_id)
380449 . await
381450 . inspect_err( |e| warn!( "Error redecrypting after an explicit request was received {e:?}" ) ) ;
382451 }
383452
384- // TODO: Deal with encryption info updating as well.
453+ for session_id in request. refresh_info_session_ids {
454+ let _ = cache. update_encryption_info( & request. room_id, & session_id) . await . inspect_err( |e|
455+ warn!(
456+ room_id = %request. room_id,
457+ session_id = session_id,
458+ "Unable to update the encryption info {e:?}" ,
459+ ) ) ;
460+ }
385461 }
386462 // The room key stream from the OlmMachine. Needs to be recreated every time we
387463 // receive a `None` from the stream.
@@ -395,14 +471,23 @@ impl Redecryptor {
395471 break false ;
396472 } ;
397473
398- for key in room_keys {
474+ trace!( ?room_keys, "Received new room keys" ) ;
475+
476+ for key in & room_keys {
399477 let _ = cache
400478 . retry_decryption( & key. room_id, & key. session_id)
401479 . await
402480 . inspect_err( |e| warn!( "Error redecrypting {e:?}" ) ) ;
403481 }
404482
405- // TODO: Deal with encryption info updating as well.
483+ for key in room_keys {
484+ let _ = cache. update_encryption_info( & key. room_id, & key. session_id) . await . inspect_err( |e|
485+ warn!(
486+ room_id = %key. room_id,
487+ session_id = key. session_id,
488+ "Unable to update the encryption info {e:?}" ,
489+ ) ) ;
490+ }
406491 } ,
407492 Some ( Err ( _) ) => {
408493 // We missed some room keys, we need to report this in case a listener
@@ -424,6 +509,29 @@ impl Redecryptor {
424509 }
425510 }
426511 }
512+ withheld_info = withheld_stream. next( ) => {
513+ match withheld_info {
514+ Some ( infos) => {
515+ let Some ( cache) = Self :: upgrade_event_cache( cache) else {
516+ break false ;
517+ } ;
518+
519+ trace!( ?infos, "Received new withheld infos" ) ;
520+
521+ for RoomKeyWithheldInfo { room_id, session_id, .. } in & infos {
522+ let _ = cache. update_encryption_info( room_id, session_id) . await . inspect_err( |e|
523+ warn!(
524+ room_id = %room_id,
525+ session_id = session_id,
526+ "Unable to update the encryption info {e:?}" ,
527+ ) ) ;
528+ }
529+ }
530+ // The stream got closed, same as for the room key stream, we'll try to
531+ // recreate the streams.
532+ None => break true
533+ }
534+ }
427535 else => break false ,
428536 }
429537 }
0 commit comments