Skip to content

Commit e8c882f

Browse files
committed
feat: Redecryptor start to send out redecryptor reports
1 parent 82c7865 commit e8c882f

File tree

2 files changed

+145
-25
lines changed

2 files changed

+145
-25
lines changed

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,18 @@ use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument,
6969
use crate::{
7070
Client,
7171
client::WeakClient,
72-
event_cache::redecryptor::Redecryptor,
7372
send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
7473
};
7574

7675
mod deduplicator;
7776
mod pagination;
77+
#[cfg(feature = "e2e-encryption")]
7878
mod redecryptor;
7979
mod room;
8080

8181
pub use pagination::{RoomPagination, RoomPaginationStatus};
82+
#[cfg(feature = "e2e-encryption")]
83+
pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport};
8284
pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate};
8385

8486
/// An error observed in the [`EventCache`].
@@ -152,7 +154,8 @@ pub struct EventCacheDropHandles {
152154
auto_shrink_linked_chunk_task: JoinHandle<()>,
153155

154156
/// The task used to automatically redecrypt UTDs.
155-
redecryptor: Redecryptor,
157+
#[cfg(feature = "e2e-encryption")]
158+
_redecryptor: redecryptor::Redecryptor,
156159
}
157160

158161
impl fmt::Debug for EventCacheDropHandles {
@@ -205,6 +208,9 @@ impl EventCache {
205208
linked_chunk_update_sender.clone(),
206209
)));
207210

211+
#[cfg(feature = "e2e-encryption")]
212+
let redecryption_channels = redecryptor::RedecryptorChannels::new();
213+
208214
Self {
209215
inner: Arc::new(EventCacheInner {
210216
client,
@@ -218,6 +224,8 @@ impl EventCache {
218224
_thread_subscriber_task: thread_subscriber_task,
219225
#[cfg(feature = "experimental-search")]
220226
_search_indexing_task: search_indexing_task,
227+
#[cfg(feature = "e2e-encryption")]
228+
redecryption_channels,
221229
thread_subscriber_receiver,
222230
}),
223231
}
@@ -262,13 +270,25 @@ impl EventCache {
262270
auto_shrink_receiver,
263271
));
264272

265-
let redecryptor = Redecryptor::new(Arc::downgrade(&self.inner));
273+
#[cfg(feature = "e2e-encryption")]
274+
let redecryptor = {
275+
let receiver = self
276+
.inner
277+
.redecryption_channels
278+
.decryption_request_receiver
279+
.lock()
280+
.take()
281+
.expect("We should have initialized the channel an subscribing should happen only once");
282+
283+
redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver)
284+
};
266285

267286
Arc::new(EventCacheDropHandles {
268287
listen_updates_task,
269288
ignore_user_list_update_task,
270289
auto_shrink_linked_chunk_task,
271-
redecryptor,
290+
#[cfg(feature = "e2e-encryption")]
291+
_redecryptor: redecryptor,
272292
})
273293
});
274294

@@ -848,6 +868,9 @@ struct EventCacheInner {
848868
/// This is helpful for tests to coordinate that a new thread subscription
849869
/// has been sent or not.
850870
thread_subscriber_receiver: Receiver<()>,
871+
872+
#[cfg(feature = "e2e-encryption")]
873+
redecryption_channels: redecryptor::RedecryptorChannels,
851874
}
852875

853876
type AutoShrinkChannelPayload = OwnedRoomId;

crates/matrix-sdk/src/event_cache/redecryptor.rs

Lines changed: 118 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,76 @@ use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
7272
use matrix_sdk_base::{
7373
crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent},
7474
deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
75+
locks::Mutex,
7576
};
7677
use matrix_sdk_common::executor::spawn;
7778
use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw};
78-
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
79-
use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError};
79+
use tokio::{
80+
sync::{
81+
broadcast,
82+
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
83+
},
84+
task::JoinHandle,
85+
};
86+
use tokio_stream::wrappers::{
87+
BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
88+
};
8089
use tracing::{info, instrument, trace, warn};
8190

8291
use crate::event_cache::{
8392
EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate,
8493
};
8594

95+
type SessionId<'a> = &'a str;
96+
type OwnedSessionId = String;
97+
8698
/// The information sent across the channel to the long-running task requesting
8799
/// that the supplied set of sessions be retried.
100+
#[derive(Debug, Clone)]
88101
pub struct DecryptionRetryRequest {
89-
room_id: OwnedRoomId,
90-
session_ids: BTreeSet<String>,
102+
/// The room ID of the room the events belong to.
103+
pub room_id: OwnedRoomId,
104+
/// Events that are not decrypted.
105+
pub utd_session_ids: BTreeSet<OwnedSessionId>,
106+
/// Events that are decrypted but might need to have their
107+
/// [`EncryptionInfo`] refreshed.
108+
pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
91109
}
92110

93-
type SessionId<'a> = &'a str;
111+
/// A report coming from the redecryptor.
112+
#[derive(Debug, Clone)]
113+
pub enum RedecryptorReport {
114+
/// Events which we were able to decrypt.
115+
ResolvedUtds {
116+
/// The room ID of the room the events belong to.
117+
room_id: OwnedRoomId,
118+
/// The list of event IDs of the decrypted events.
119+
events: BTreeSet<OwnedEventId>,
120+
},
121+
/// The redecryptor might have missed some room keys so it might not have
122+
/// re-decrypted events that are now decryptable.
123+
Lagging,
124+
}
125+
126+
pub(super) struct RedecryptorChannels {
127+
utd_reporter: broadcast::Sender<RedecryptorReport>,
128+
pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
129+
pub(super) decryption_request_receiver:
130+
Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
131+
}
132+
133+
impl RedecryptorChannels {
134+
pub(super) fn new() -> Self {
135+
let (utd_reporter, _) = broadcast::channel(100);
136+
let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
137+
138+
Self {
139+
utd_reporter,
140+
decryption_request_sender,
141+
decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
142+
}
143+
}
144+
}
94145

95146
impl EventCache {
96147
/// Retrieve a set of events that we weren't able to decrypt.
@@ -154,7 +205,7 @@ impl EventCache {
154205
let (room_cache, _) = self.for_room(room_id).await?;
155206
let mut state = room_cache.inner.state.write().await;
156207

157-
let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect();
208+
let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect();
158209

159210
trace!(?event_ids, "Replacing successfully re-decrypted events");
160211

@@ -177,6 +228,14 @@ impl EventCache {
177228
origin: EventsOrigin::Cache,
178229
});
179230

231+
// We report that we resolved some UTDs, this is mainly for listeners that don't
232+
// care about the actual events, just about the fact that UTDs got
233+
// resolved. Not sure if we'll have more such listeners but the UTD hook
234+
// is one such thing.
235+
let report =
236+
RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
237+
let _ = self.inner.redecryption_channels.utd_reporter.send(report.into());
238+
180239
Ok(())
181240
}
182241

@@ -233,10 +292,28 @@ impl EventCache {
233292

234293
Ok(())
235294
}
295+
296+
/// Explicitly request the redecryption of a set of events.
297+
///
298+
/// TODO: Explain when and why this might be useful.
299+
pub fn request_decryption(&self, request: DecryptionRetryRequest) {
300+
let _ =
301+
self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
302+
|_| warn!("Requesting a decryption while the redecryption task has been shut down"),
303+
);
304+
}
305+
306+
/// Subscribe to reports that the redecryptor generates.
307+
///
308+
/// TODO: Explain when the redecryptor might send such reports.
309+
pub fn subscrube_to_decryption_reports(
310+
&self,
311+
) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
312+
BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
313+
}
236314
}
237315

238316
pub(crate) struct Redecryptor {
239-
request_decryption_sender: UnboundedSender<DecryptionRetryRequest>,
240317
task: JoinHandle<()>,
241318
}
242319

@@ -251,23 +328,17 @@ impl Redecryptor {
251328
///
252329
/// This creates a task that listens to various streams and attempts to
253330
/// redecrypt UTDs that can be found inside the [`EventCache`].
254-
pub(super) fn new(cache: Weak<EventCacheInner>) -> Self {
255-
let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel();
256-
331+
pub(super) fn new(
332+
cache: Weak<EventCacheInner>,
333+
receiver: UnboundedReceiver<DecryptionRetryRequest>,
334+
) -> Self {
257335
let task = spawn(async {
258336
let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
259337

260338
Self::listen_for_room_keys_task(cache, request_redecryption_stream).await;
261339
});
262340

263-
Self { task, request_decryption_sender }
264-
}
265-
266-
#[allow(dead_code)]
267-
pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) {
268-
let _ = self.request_decryption_sender.send(request).inspect_err(|_| {
269-
warn!("Requesting a decryption while the redecryption task has been shut down")
270-
});
341+
Self { task }
271342
}
272343

273344
/// (Re)-subscribe to the room key stream from the [`OlmMachine`].
@@ -307,18 +378,23 @@ impl Redecryptor {
307378
break false;
308379
};
309380

310-
for session_id in request.session_ids {
381+
for session_id in request.utd_session_ids {
311382
let _ = cache
312383
.retry_decryption(&request.room_id, &session_id)
313384
.await
314385
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
315386
}
387+
388+
// TODO: Deal with encryption info updating as well.
316389
}
317390
// The room key stream from the OlmMachine. Needs to be recreated every time we
318391
// receive a `None` from the stream.
319392
room_keys = room_key_stream.next() => {
320393
match room_keys {
321394
Some(Ok(room_keys)) => {
395+
// Alright, some room keys were received and persisted in our store,
396+
// let's attempt to redecrypt events that were encrypted using these
397+
// room keys.
322398
let Some(cache) = Self::upgrade_event_cache(cache) else {
323399
break false;
324400
};
@@ -329,9 +405,21 @@ impl Redecryptor {
329405
.await
330406
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
331407
}
408+
409+
// TODO: Deal with encryption info updating as well.
332410
},
333411
Some(Err(_)) => {
334-
todo!("Handle lagging here, how?")
412+
// We missed some room keys, we need to report this in case a listener
413+
// has and idea which UTDs we should attempt to redecrypt.
414+
//
415+
// This would most likely be the timeline. The timeline might attempt
416+
// to redecrypt all UTDs it is showing to the user.
417+
let Some(cache) = Self::upgrade_event_cache(cache) else {
418+
break false;
419+
};
420+
421+
let message = RedecryptorReport::Lagging;
422+
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
335423
},
336424
// The stream got closed, this could mean that our OlmMachine got
337425
// regenerated, let's return true and try to recreate the stream.
@@ -355,7 +443,16 @@ impl Redecryptor {
355443
pin_mut!(decryption_request_stream);
356444

357445
while Self::redecryption_loop(&cache, &mut decryption_request_stream).await {
358-
info!("Regenerating the re-decryption streams")
446+
info!("Regenerating the re-decryption streams");
447+
448+
let Some(cache) = Self::upgrade_event_cache(&cache) else {
449+
break;
450+
};
451+
452+
// Report that the stream got recreated so listeners can attempt to redecrypt
453+
// any UTDs they might be seeing.
454+
let message = RedecryptorReport::Lagging;
455+
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
359456
}
360457

361458
info!("Shutting down the event cache redecryptor");

0 commit comments

Comments
 (0)