Skip to content

Commit d6b2242

Browse files
PIG208sm-sayedi
andcommitted
channel: Keep track of channel topics, and keep up-to-date with events
Co-authored-by: Sayed Mahmood Sayedi <smahmood.sayedi@gmail.com>
1 parent 1699aa5 commit d6b2242

File tree

5 files changed

+496
-0
lines changed

5 files changed

+496
-0
lines changed

lib/model/channel.dart

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
import 'dart:async';
12
import 'dart:collection';
23

4+
import 'package:collection/collection.dart';
35
import 'package:flutter/foundation.dart';
46

57
import '../api/model/events.dart';
68
import '../api/model/initial_snapshot.dart';
79
import '../api/model/model.dart';
10+
import '../api/route/channels.dart';
811
import 'realm.dart';
912
import 'store.dart';
1013
import 'user.dart';
1114

15+
final _apiGetChannelTopics = getStreamTopics; // similar to _apiSendMessage in lib/model/message.dart
16+
1217
/// The portion of [PerAccountStore] for channels, topics, and stuff about them.
1318
///
1419
/// This type is useful for expressing the needs of other parts of the
@@ -78,6 +83,27 @@ mixin ChannelStore on UserStore {
7883
};
7984
}
8085

86+
/// Fetch topics in a channel from the server, only if they're not fetched yet.
87+
///
88+
/// The results from the last successful fetch
89+
/// can be retrieved with [getChannelTopics].
90+
Future<void> fetchTopics(int channelId);
91+
92+
/// Pairs of the known topics and its latest message ID, in the given channel.
93+
///
94+
/// Returns null if the data has never been fetched yet.
95+
/// To fetch it from the server, use [fetchTopics].
96+
///
97+
/// The result is guaranteed to be sorted by [GetStreamTopicsEntry.maxId]
98+
/// descending, and the topics are guaranteed to be distinct.
99+
///
100+
/// In some cases, the same maxId affected by message moves can be present in
101+
/// multiple [GetStreamTopicsEntry] entries. For this reason, the caller
102+
/// should not rely on [getChannelTopics] to determine which topic the message
103+
/// is in. Instead, refer to [PerAccountStore.messages].
104+
/// See [handleUpdateMessageEvent] on how this could happen.
105+
List<GetStreamTopicsEntry>? getChannelTopics(int channelId);
106+
81107
/// The visibility policy that the self-user has for the given topic.
82108
///
83109
/// This does not incorporate the user's channel-level policy,
@@ -288,6 +314,13 @@ mixin ProxyChannelStore on ChannelStore {
288314
@override
289315
Map<int, ChannelFolder> get channelFolders => channelStore.channelFolders;
290316

317+
@override
318+
Future<void> fetchTopics(int channelId) => channelStore.fetchTopics(channelId);
319+
320+
@override
321+
List<GetStreamTopicsEntry>? getChannelTopics(int channelId) =>
322+
channelStore.getChannelTopics(channelId);
323+
291324
@override
292325
UserTopicVisibilityPolicy topicVisibilityPolicy(int streamId, TopicName topic) =>
293326
channelStore.topicVisibilityPolicy(streamId, topic);
@@ -368,6 +401,37 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore {
368401
@override
369402
final Map<int, ChannelFolder> channelFolders;
370403

404+
/// Maps indexed by channel IDs, of the known latest message IDs in each topic.
405+
///
406+
/// For example: `_latestMessageIdsByChannelTopic[channel.streamId][topic] = maxId`
407+
///
408+
/// In some cases, the same message IDs, when affected by message moves, can
409+
/// be present for mutliple channel-topic keys.
410+
/// See [handleUpdateMessageEvent] on how this could happen.
411+
final Map<int, Map<TopicName, int>> _latestMessageIdsByChannelTopic = {};
412+
413+
@override
414+
Future<void> fetchTopics(int channelId) async {
415+
if (_latestMessageIdsByChannelTopic[channelId] != null) return;
416+
417+
final result = await _apiGetChannelTopics(connection, streamId: channelId,
418+
allowEmptyTopicName: true);
419+
_latestMessageIdsByChannelTopic[channelId] = {
420+
for (final GetStreamTopicsEntry(:name, :maxId) in result.topics)
421+
name: maxId,
422+
};
423+
}
424+
425+
@override
426+
List<GetStreamTopicsEntry>? getChannelTopics(int channelId) {
427+
final latestMessageIdsByTopic = _latestMessageIdsByChannelTopic[channelId];
428+
if (latestMessageIdsByTopic == null) return null;
429+
return [
430+
for (final MapEntry(:key, :value) in latestMessageIdsByTopic.entries)
431+
GetStreamTopicsEntry(maxId: value, name: key),
432+
].sortedBy((value) => -value.maxId);
433+
}
434+
371435
@override
372436
Map<int, TopicKeyedMap<UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility;
373437

@@ -572,6 +636,65 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore {
572636
forStream[event.topicName] = visibilityPolicy;
573637
}
574638
}
639+
640+
/// Handle a [MessageEvent], returning whether listeners should be notified.
641+
bool handleMessageEvent(MessageEvent event) {
642+
if (event.message is! StreamMessage) return false;
643+
final StreamMessage(:streamId, :topic) = event.message as StreamMessage;
644+
645+
final latestMessageIdsByTopic = _latestMessageIdsByChannelTopic[streamId];
646+
// If we don't already know about the list of topics of the channel this
647+
// message belongs to, we don't want to proceed and put one entry about the
648+
// topic of this message, otherwise [fetchTopics] and the callers of
649+
// [getChannelTopics] would assume that the channel only has this one topic
650+
// and would never fetch the complete list of topics for that matter.
651+
if (latestMessageIdsByTopic == null) return false;
652+
653+
// If this message is already the latest message in the topic because it was
654+
// received through fetch in fetch/event race, or it is a message sent even
655+
// before the latest message of the fetch, we don't do the update.
656+
final currentLatestMessageId = latestMessageIdsByTopic[topic];
657+
if (currentLatestMessageId != null && currentLatestMessageId >= event.message.id) {
658+
return false;
659+
}
660+
latestMessageIdsByTopic[topic] = event.message.id;
661+
return true;
662+
}
663+
664+
/// Handle an [UpdateMessageEvent], returning whether listeners should be
665+
/// notified.
666+
bool handleUpdateMessageEvent(UpdateMessageEvent event) {
667+
if (event.moveData == null) return false;
668+
final UpdateMessageMoveData(
669+
:origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode,
670+
) = event.moveData!;
671+
bool shouldNotify = false;
672+
673+
final origLatestMessageIdsByTopics = _latestMessageIdsByChannelTopic[origStreamId];
674+
// We only handle the case where all the messages of [origTopic] are
675+
// moved to [newTopic]; in that case we can remove [origTopic] safely.
676+
// But if only one messsage is moved (`PropagateMode.changeOne`) or a few
677+
// messages are moved (`PropagateMode.changeLater`), we cannot do anything
678+
// about [origTopic] here as we cannot determine the new `maxId` for it.
679+
// (This is the case where there could be multiple channel-topic keys with
680+
// the same `maxId`)
681+
if (propagateMode == PropagateMode.changeAll
682+
&& origLatestMessageIdsByTopics != null) {
683+
shouldNotify = origLatestMessageIdsByTopics.remove(origTopic) != null;
684+
}
685+
686+
final newLatestMessageIdsByTopics = _latestMessageIdsByChannelTopic[newStreamId];
687+
if (newLatestMessageIdsByTopics != null) {
688+
final movedMaxId = event.messageIds.max;
689+
if (!newLatestMessageIdsByTopics.containsKey(newTopic)
690+
|| newLatestMessageIdsByTopics[newTopic]! < movedMaxId) {
691+
newLatestMessageIdsByTopics[newTopic] = movedMaxId;
692+
shouldNotify = true;
693+
}
694+
}
695+
696+
return shouldNotify;
697+
}
575698
}
576699

577700
/// A [Map] with [TopicName] keys and [V] values.

lib/model/store.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,13 +862,15 @@ class PerAccountStore extends PerAccountStoreBase with
862862
unreads.handleMessageEvent(event);
863863
recentDmConversationsView.handleMessageEvent(event);
864864
recentSenders.handleMessage(event.message); // TODO(#824)
865+
if (_channels.handleMessageEvent(event)) notifyListeners();
865866
// When adding anything here (to handle [MessageEvent]),
866867
// it probably belongs in [reconcileMessages] too.
867868

868869
case UpdateMessageEvent():
869870
assert(debugLog("server event: update_message ${event.messageId}"));
870871
_messages.handleUpdateMessageEvent(event);
871872
unreads.handleUpdateMessageEvent(event);
873+
if (_channels.handleUpdateMessageEvent(event)) notifyListeners();
872874

873875
case DeleteMessageEvent():
874876
assert(debugLog("server event: delete_message ${event.messageIds}"));

test/api/route/route_checks.dart

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import 'package:checks/checks.dart';
2+
import 'package:zulip/api/model/model.dart';
3+
import 'package:zulip/api/route/channels.dart';
24
import 'package:zulip/api/route/messages.dart';
35
import 'package:zulip/api/route/realm.dart';
46
import 'package:zulip/api/route/saved_snippets.dart';
@@ -15,4 +17,9 @@ extension GetServerSettingsResultChecks on Subject<GetServerSettingsResult> {
1517
Subject<Uri> get realmUrl => has((e) => e.realmUrl, 'realmUrl');
1618
}
1719

20+
extension GetStreamTopicEntryChecks on Subject<GetStreamTopicsEntry> {
21+
Subject<int> get maxId => has((e) => e.maxId, 'maxId');
22+
Subject<TopicName> get name => has((e) => e.name, 'name');
23+
}
24+
1825
// TODO add similar extensions for other classes in api/route/*.dart

0 commit comments

Comments
 (0)