Skip to content

Commit f70dbc5

Browse files
committed
fixup redecryptor
1 parent e4bf600 commit f70dbc5

File tree

1 file changed

+40
-26
lines changed

1 file changed

+40
-26
lines changed

crates/matrix-sdk/src/event_cache/redecryptor.rs

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use matrix_sdk_common::executor::spawn;
2828
use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw};
2929
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
3030
use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError};
31-
use tracing::{instrument, trace, warn};
31+
use tracing::{info, instrument, trace, warn};
3232

3333
use crate::event_cache::{
3434
EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate,
@@ -172,6 +172,7 @@ impl Drop for Redecryptor {
172172
impl Redecryptor {
173173
pub(super) fn new(cache: Weak<EventCacheInner>) -> Self {
174174
let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel();
175+
175176
let task = spawn(async {
176177
let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
177178

@@ -198,7 +199,11 @@ impl Redecryptor {
198199
machine.as_ref().map(|m| m.store().room_keys_received_stream())
199200
}
200201

201-
async fn listen_loop(
202+
fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
203+
cache.upgrade().map(|inner| EventCache { inner })
204+
}
205+
206+
async fn redecryption_loop(
202207
cache: &Weak<EventCacheInner>,
203208
decryption_request_stream: &mut Pin<&mut impl Stream<Item = DecryptionRetryRequest>>,
204209
) -> bool {
@@ -208,41 +213,40 @@ impl Redecryptor {
208213

209214
pin_mut!(room_key_stream);
210215

211-
// TODO: Listen to notifications that the Olm machine got recreated, this means
212-
// that our room key stream is effectively dead, we need to exit this
213-
// function with a `true` return value.
214216
loop {
215217
tokio::select! {
216218
Some(request) = decryption_request_stream.next() => {
217-
let Some(event_cache) = cache.upgrade() else {
219+
let Some(cache) = Self::upgrade_event_cache(cache) else {
218220
break false;
219221
};
220222

221-
let cache = EventCache { inner: event_cache };
222-
223223
for session_id in request.session_ids {
224224
let _ = cache
225225
.retry_decryption(&request.room_id, &session_id)
226226
.await
227227
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
228228
}
229229
}
230-
Some(room_keys) = room_key_stream.next() => {
231-
if let Ok(room_keys) = room_keys {
232-
let Some(event_cache) = cache.upgrade() else {
233-
break false;
234-
};
235-
236-
let cache = EventCache { inner: event_cache };
237-
238-
for key in room_keys {
239-
let _ = cache
240-
.retry_decryption(&key.room_id, &key.session_id)
241-
.await
242-
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
230+
room_keys = room_key_stream.next() => {
231+
match room_keys {
232+
Some(Ok(room_keys)) => {
233+
let Some(cache) = Self::upgrade_event_cache(cache) else {
234+
break false;
235+
};
236+
237+
for key in room_keys {
238+
let _ = cache
239+
.retry_decryption(&key.room_id, &key.session_id)
240+
.await
241+
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
242+
}
243+
},
244+
Some(Err(_)) => {
245+
todo!("Handle lagging here, how?")
246+
},
247+
None => {
248+
break true
243249
}
244-
} else {
245-
todo!("Decrypt all events?");
246250
}
247251
}
248252
else => break false,
@@ -256,9 +260,11 @@ impl Redecryptor {
256260
) {
257261
pin_mut!(decryption_request_stream);
258262

259-
// TODO: We need to relisten to this stream if it dies due to the cross-process
260-
// lock reloading the Olm machine.
261-
while Self::listen_loop(&cache, &mut decryption_request_stream).await {}
263+
while Self::redecryption_loop(&cache, &mut decryption_request_stream).await {
264+
info!("Regenerating the re-decryption streams")
265+
}
266+
267+
info!("Shutting down the event cache redecryptor");
262268
}
263269
}
264270

@@ -390,6 +396,14 @@ mod tests {
390396
let event = event_receiver.await.expect("Alice should have sent the event by now");
391397
let room_key = room_key.await;
392398

399+
// We regenerate the Olm machine to check if the room key stream is recreated to
400+
// correctly.
401+
bob.inner
402+
.base_client
403+
.regenerate_olm(None)
404+
.await
405+
.expect("We should be able to regenerate the Olm machine");
406+
393407
// Let us forward the event to Bob.
394408
matrix_mock_server
395409
.mock_sync()

0 commit comments

Comments
 (0)