Skip to content

Commit 23904c0

Browse files
committed
feat: Redecryptor start to send out redecryptor reports
1 parent 70cc4e5 commit 23904c0

File tree

2 files changed

+137
-22
lines changed

2 files changed

+137
-22
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ impl EventCache {
208208
linked_chunk_update_sender.clone(),
209209
)));
210210

211+
#[cfg(feature = "e2e-encryption")]
212+
let redecryption_channels = redecryptor::RedecryptorChannels::new();
213+
211214
Self {
212215
inner: Arc::new(EventCacheInner {
213216
client,
@@ -221,6 +224,8 @@ impl EventCache {
221224
_thread_subscriber_task: thread_subscriber_task,
222225
#[cfg(feature = "experimental-search")]
223226
_search_indexing_task: search_indexing_task,
227+
#[cfg(feature = "e2e-encryption")]
228+
redecryption_channels,
224229
thread_subscriber_receiver,
225230
}),
226231
}
@@ -266,7 +271,18 @@ impl EventCache {
266271
));
267272

268273
#[cfg(feature = "e2e-encryption")]
269-
let redecryptor = redecryptor::Redecryptor::new(Arc::downgrade(&self.inner));
274+
let redecryptor = {
275+
let receiver = self
276+
.inner
277+
.redecryption_channels
278+
.decryption_request_receiver
279+
.lock()
280+
.take()
281+
.expect("We should have initialized the channel an subscribing should happen only once");
282+
283+
redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver)
284+
};
285+
270286

271287
Arc::new(EventCacheDropHandles {
272288
listen_updates_task,
@@ -853,6 +869,9 @@ struct EventCacheInner {
853869
/// This is helpful for tests to coordinate that a new thread subscription
854870
/// has been sent or not.
855871
thread_subscriber_receiver: Receiver<()>,
872+
873+
#[cfg(feature = "e2e-encryption")]
874+
redecryption_channels: redecryptor::RedecryptorChannels,
856875
}
857876

858877
type AutoShrinkChannelPayload = OwnedRoomId;

crates/matrix-sdk/src/event_cache/redecryptor.rs

Lines changed: 117 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,75 @@ use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
7272
use matrix_sdk_base::{
7373
crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent},
7474
deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
75+
locks::Mutex,
7576
};
77+
#[cfg(doc)]
78+
use matrix_sdk_common::deserialized_responses::EncryptionInfo;
7679
use matrix_sdk_common::executor::{JoinHandle, spawn};
7780
use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw};
78-
use tokio::sync::mpsc::UnboundedSender;
79-
use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError};
81+
use tokio::sync::{
82+
broadcast,
83+
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
84+
};
85+
use tokio_stream::wrappers::{
86+
BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
87+
};
8088
use tracing::{info, instrument, trace, warn};
8189

8290
use crate::event_cache::{
8391
EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate,
8492
};
8593

94+
type SessionId<'a> = &'a str;
95+
type OwnedSessionId = String;
96+
8697
/// The information sent across the channel to the long-running task requesting
8798
/// that the supplied set of sessions be retried.
99+
#[derive(Debug, Clone)]
88100
pub struct DecryptionRetryRequest {
89-
room_id: OwnedRoomId,
90-
session_ids: BTreeSet<String>,
101+
/// The room ID of the room the events belong to.
102+
pub room_id: OwnedRoomId,
103+
/// Events that are not decrypted.
104+
pub utd_session_ids: BTreeSet<OwnedSessionId>,
105+
/// Events that are decrypted but might need to have their
106+
/// [`EncryptionInfo`] refreshed.
107+
pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
91108
}
92109

93-
type SessionId<'a> = &'a str;
110+
/// A report coming from the redecryptor.
111+
#[derive(Debug, Clone)]
112+
pub enum RedecryptorReport {
113+
/// Events which we were able to decrypt.
114+
ResolvedUtds {
115+
/// The room ID of the room the events belong to.
116+
room_id: OwnedRoomId,
117+
/// The list of event IDs of the decrypted events.
118+
events: BTreeSet<OwnedEventId>,
119+
},
120+
/// The redecryptor might have missed some room keys so it might not have
121+
/// re-decrypted events that are now decryptable.
122+
Lagging,
123+
}
124+
125+
pub(super) struct RedecryptorChannels {
126+
utd_reporter: broadcast::Sender<RedecryptorReport>,
127+
pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
128+
pub(super) decryption_request_receiver:
129+
Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
130+
}
131+
132+
impl RedecryptorChannels {
133+
pub(super) fn new() -> Self {
134+
let (utd_reporter, _) = broadcast::channel(100);
135+
let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
136+
137+
Self {
138+
utd_reporter,
139+
decryption_request_sender,
140+
decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
141+
}
142+
}
143+
}
94144

95145
impl EventCache {
96146
/// Retrieve a set of events that we weren't able to decrypt.
@@ -148,7 +198,7 @@ impl EventCache {
148198
let (room_cache, _drop_handles) = self.for_room(room_id).await?;
149199
let mut state = room_cache.inner.state.write().await;
150200

151-
let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect();
201+
let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect();
152202

153203
trace!(?event_ids, "Replacing successfully re-decrypted events");
154204

@@ -174,6 +224,14 @@ impl EventCache {
174224
origin: EventsOrigin::Cache,
175225
});
176226

227+
// We report that we resolved some UTDs, this is mainly for listeners that don't
228+
// care about the actual events, just about the fact that UTDs got
229+
// resolved. Not sure if we'll have more such listeners but the UTD hook
230+
// is one such thing.
231+
let report =
232+
RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
233+
let _ = self.inner.redecryption_channels.utd_reporter.send(report);
234+
177235
Ok(())
178236
}
179237

@@ -230,10 +288,28 @@ impl EventCache {
230288

231289
Ok(())
232290
}
291+
292+
/// Explicitly request the redecryption of a set of events.
293+
///
294+
/// TODO: Explain when and why this might be useful.
295+
pub fn request_decryption(&self, request: DecryptionRetryRequest) {
296+
let _ =
297+
self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
298+
|_| warn!("Requesting a decryption while the redecryption task has been shut down"),
299+
);
300+
}
301+
302+
/// Subscribe to reports that the redecryptor generates.
303+
///
304+
/// TODO: Explain when the redecryptor might send such reports.
305+
pub fn subscrube_to_decryption_reports(
306+
&self,
307+
) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
308+
BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
309+
}
233310
}
234311

235312
pub(crate) struct Redecryptor {
236-
request_decryption_sender: UnboundedSender<DecryptionRetryRequest>,
237313
task: JoinHandle<()>,
238314
}
239315

@@ -248,23 +324,17 @@ impl Redecryptor {
248324
///
249325
/// This creates a task that listens to various streams and attempts to
250326
/// redecrypt UTDs that can be found inside the [`EventCache`].
251-
pub(super) fn new(cache: Weak<EventCacheInner>) -> Self {
252-
let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel();
253-
327+
pub(super) fn new(
328+
cache: Weak<EventCacheInner>,
329+
receiver: UnboundedReceiver<DecryptionRetryRequest>,
330+
) -> Self {
254331
let task = spawn(async {
255332
let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
256333

257334
Self::listen_for_room_keys_task(cache, request_redecryption_stream).await;
258335
});
259336

260-
Self { task, request_decryption_sender }
261-
}
262-
263-
#[allow(dead_code)]
264-
pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) {
265-
let _ = self.request_decryption_sender.send(request).inspect_err(|_| {
266-
warn!("Requesting a decryption while the redecryption task has been shut down")
267-
});
337+
Self { task }
268338
}
269339

270340
/// (Re)-subscribe to the room key stream from the [`OlmMachine`].
@@ -304,18 +374,23 @@ impl Redecryptor {
304374
break false;
305375
};
306376

307-
for session_id in request.session_ids {
377+
for session_id in request.utd_session_ids {
308378
let _ = cache
309379
.retry_decryption(&request.room_id, &session_id)
310380
.await
311381
.inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
312382
}
383+
384+
// TODO: Deal with encryption info updating as well.
313385
}
314386
// The room key stream from the OlmMachine. Needs to be recreated every time we
315387
// receive a `None` from the stream.
316388
room_keys = room_key_stream.next() => {
317389
match room_keys {
318390
Some(Ok(room_keys)) => {
391+
// Alright, some room keys were received and persisted in our store,
392+
// let's attempt to redecrypt events that were encrypted using these
393+
// room keys.
319394
let Some(cache) = Self::upgrade_event_cache(cache) else {
320395
break false;
321396
};
@@ -326,9 +401,21 @@ impl Redecryptor {
326401
.await
327402
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
328403
}
404+
405+
// TODO: Deal with encryption info updating as well.
329406
},
330407
Some(Err(_)) => {
331-
todo!("Handle lagging here, how?")
408+
// We missed some room keys, we need to report this in case a listener
409+
// has and idea which UTDs we should attempt to redecrypt.
410+
//
411+
// This would most likely be the timeline. The timeline might attempt
412+
// to redecrypt all UTDs it is showing to the user.
413+
let Some(cache) = Self::upgrade_event_cache(cache) else {
414+
break false;
415+
};
416+
417+
let message = RedecryptorReport::Lagging;
418+
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
332419
},
333420
// The stream got closed, this could mean that our OlmMachine got
334421
// regenerated, let's return true and try to recreate the stream.
@@ -352,7 +439,16 @@ impl Redecryptor {
352439
pin_mut!(decryption_request_stream);
353440

354441
while Self::redecryption_loop(&cache, &mut decryption_request_stream).await {
355-
info!("Regenerating the re-decryption streams")
442+
info!("Regenerating the re-decryption streams");
443+
444+
let Some(cache) = Self::upgrade_event_cache(&cache) else {
445+
break;
446+
};
447+
448+
// Report that the stream got recreated so listeners can attempt to redecrypt
449+
// any UTDs they might be seeing.
450+
let message = RedecryptorReport::Lagging;
451+
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
356452
}
357453

358454
info!("Shutting down the event cache redecryptor");

0 commit comments

Comments
 (0)