|
| 1 | +import 'dart:async'; |
1 | 2 | import 'dart:collection'; |
2 | 3 |
|
| 4 | +import 'package:collection/collection.dart'; |
3 | 5 | import 'package:flutter/foundation.dart'; |
4 | 6 |
|
5 | 7 | import '../api/model/events.dart'; |
6 | 8 | import '../api/model/initial_snapshot.dart'; |
7 | 9 | import '../api/model/model.dart'; |
| 10 | +import '../api/route/channels.dart'; |
8 | 11 | import 'realm.dart'; |
9 | 12 | import 'store.dart'; |
10 | 13 | import 'user.dart'; |
11 | 14 |
|
| 15 | +// similar to _apiSendMessage in lib/model/message.dart |
| 16 | +final _apiGetChannelTopics = getStreamTopics; |
| 17 | + |
12 | 18 | /// The portion of [PerAccountStore] for channels, topics, and stuff about them. |
13 | 19 | /// |
14 | 20 | /// This type is useful for expressing the needs of other parts of the |
@@ -78,6 +84,29 @@ mixin ChannelStore on UserStore { |
78 | 84 | }; |
79 | 85 | } |
80 | 86 |
|
| 87 | + /// Fetch topics in a channel from the server, only if they're not fetched yet. |
| 88 | + /// |
| 89 | + /// The results from the last successful fetch |
| 90 | + /// can be retrieved with [getChannelTopics]. |
| 91 | + Future<void> fetchTopics(int channelId); |
| 92 | + |
| 93 | + /// The topics in the given channel, along with their latest message ID. |
| 94 | + /// |
| 95 | + /// Returns null if the data has not been fetched yet. |
| 96 | + /// To fetch it from the server, use [fetchTopics]. |
| 97 | + /// |
| 98 | + /// The result is sorted by [GetStreamTopicsEntry.maxId] descending, |
| 99 | + /// and the topics are distinct. |
| 100 | + /// |
| 101 | + /// Occasionally, [GetStreamTopicsEntry.maxId] will refer to a message |
| 102 | + /// that doesn't exist or is no longer in the topic. |
| 103 | + /// This happens when a topic's latest message is deleted or moved |
| 104 | + /// and we don't have enough information |
| 105 | + /// to replace [GetStreamTopicsEntry.maxId] accurately. |
| 106 | + /// (We don't keep a snapshot of all messages.) |
| 107 | + /// Use [PerAccountStore.messages] to check a message's topic accurately. |
| 108 | + List<GetStreamTopicsEntry>? getChannelTopics(int channelId); |
| 109 | + |
81 | 110 | /// The visibility policy that the self-user has for the given topic. |
82 | 111 | /// |
83 | 112 | /// This does not incorporate the user's channel-level policy, |
@@ -288,6 +317,13 @@ mixin ProxyChannelStore on ChannelStore { |
288 | 317 | @override |
289 | 318 | Map<int, ChannelFolder> get channelFolders => channelStore.channelFolders; |
290 | 319 |
|
| 320 | + @override |
| 321 | + Future<void> fetchTopics(int channelId) => channelStore.fetchTopics(channelId); |
| 322 | + |
| 323 | + @override |
| 324 | + List<GetStreamTopicsEntry>? getChannelTopics(int channelId) => |
| 325 | + channelStore.getChannelTopics(channelId); |
| 326 | + |
291 | 327 | @override |
292 | 328 | UserTopicVisibilityPolicy topicVisibilityPolicy(int streamId, TopicName topic) => |
293 | 329 | channelStore.topicVisibilityPolicy(streamId, topic); |
@@ -368,6 +404,39 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore { |
368 | 404 | @override |
369 | 405 | final Map<int, ChannelFolder> channelFolders; |
370 | 406 |
|
| 407 | + /// Maps indexed by channel IDs, of the known latest message IDs in each topic. |
| 408 | + /// |
| 409 | + /// For example: `_latestMessageIdsByChannelTopic[channel.streamId][topic] = maxId` |
| 410 | + /// |
| 411 | + /// Occasionally, the latest message ID of a topic will refer to a message |
| 412 | + /// that doesn't exist or is no longer in the topic. |
| 413 | + /// This happens when the topic's latest message is deleted or moved |
| 414 | + /// and we don't have enough information to replace it accurately. |
| 415 | + /// (We don't keep a snapshot of all messages.) |
| 416 | + final Map<int, TopicKeyedMap<int>> _latestMessageIdsByChannelTopic = {}; |
| 417 | + |
| 418 | + @override |
| 419 | + Future<void> fetchTopics(int channelId) async { |
| 420 | + if (_latestMessageIdsByChannelTopic[channelId] != null) return; |
| 421 | + |
| 422 | + final result = await _apiGetChannelTopics(connection, streamId: channelId, |
| 423 | + allowEmptyTopicName: true); |
| 424 | + (_latestMessageIdsByChannelTopic[channelId] = makeTopicKeyedMap()).addAll({ |
| 425 | + for (final GetStreamTopicsEntry(:name, :maxId) in result.topics) |
| 426 | + name: maxId, |
| 427 | + }); |
| 428 | + } |
| 429 | + |
| 430 | + @override |
| 431 | + List<GetStreamTopicsEntry>? getChannelTopics(int channelId) { |
| 432 | + final latestMessageIdsByTopic = _latestMessageIdsByChannelTopic[channelId]; |
| 433 | + if (latestMessageIdsByTopic == null) return null; |
| 434 | + return [ |
| 435 | + for (final MapEntry(:key, :value) in latestMessageIdsByTopic.entries) |
| 436 | + GetStreamTopicsEntry(maxId: value, name: key), |
| 437 | + ].sortedBy((value) => -value.maxId); |
| 438 | + } |
| 439 | + |
371 | 440 | @override |
372 | 441 | Map<int, TopicKeyedMap<UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility; |
373 | 442 |
|
@@ -572,6 +641,65 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore { |
572 | 641 | forStream[event.topicName] = visibilityPolicy; |
573 | 642 | } |
574 | 643 | } |
| 644 | + |
| 645 | + /// Handle a [MessageEvent], returning whether listeners should be notified. |
| 646 | + bool handleMessageEvent(MessageEvent event) { |
| 647 | + if (event.message is! StreamMessage) return false; |
| 648 | + final StreamMessage(streamId: channelId, :topic) = event.message as StreamMessage; |
| 649 | + |
| 650 | + final latestMessageIdsByTopic = _latestMessageIdsByChannelTopic[channelId]; |
| 651 | + if (latestMessageIdsByTopic == null) { |
| 652 | + // We're not tracking this channel's topics yet. |
| 653 | + // We start doing that when [fetchTopics] is called, |
| 654 | + // and we fill in all the topics at that time. |
| 655 | + return false; |
| 656 | + } |
| 657 | + |
| 658 | + final currentLatestMessageId = latestMessageIdsByTopic[topic]; |
| 659 | + if (currentLatestMessageId != null && currentLatestMessageId >= event.message.id) { |
| 660 | + // The event raced with a message fetch. |
| 661 | + return false; |
| 662 | + } |
| 663 | + latestMessageIdsByTopic[topic] = event.message.id; |
| 664 | + return true; |
| 665 | + } |
| 666 | + |
| 667 | + /// Handle an [UpdateMessageEvent], returning whether listeners should be |
| 668 | + /// notified. |
| 669 | + bool handleUpdateMessageEvent(UpdateMessageEvent event) { |
| 670 | + if (event.moveData == null) return false; |
| 671 | + final UpdateMessageMoveData( |
| 672 | + :origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode, |
| 673 | + ) = event.moveData!; |
| 674 | + bool shouldNotify = false; |
| 675 | + |
| 676 | + final origLatestMessageIdsByTopics = _latestMessageIdsByChannelTopic[origStreamId]; |
| 677 | + switch (propagateMode) { |
| 678 | + case PropagateMode.changeOne: |
| 679 | + case PropagateMode.changeLater: |
| 680 | + // We can't know the new `maxId` for the original topic. |
| 681 | + // Shrug; leave it unchanged. (See dartdoc of [getChannelTopics], |
| 682 | + // where we call out this possibility that `maxId` is incorrect. |
| 683 | + break; |
| 684 | + case PropagateMode.changeAll: |
| 685 | + if (origLatestMessageIdsByTopics != null) { |
| 686 | + origLatestMessageIdsByTopics.remove(origTopic); |
| 687 | + shouldNotify = true; |
| 688 | + } |
| 689 | + } |
| 690 | + |
| 691 | + final newLatestMessageIdsByTopics = _latestMessageIdsByChannelTopic[newStreamId]; |
| 692 | + if (newLatestMessageIdsByTopics != null) { |
| 693 | + final movedMaxId = event.messageIds.max; |
| 694 | + final currentMaxId = newLatestMessageIdsByTopics[newTopic]; |
| 695 | + if (currentMaxId == null || currentMaxId < movedMaxId) { |
| 696 | + newLatestMessageIdsByTopics[newTopic] = movedMaxId; |
| 697 | + shouldNotify = true; |
| 698 | + } |
| 699 | + } |
| 700 | + |
| 701 | + return shouldNotify; |
| 702 | + } |
575 | 703 | } |
576 | 704 |
|
577 | 705 | /// A [Map] with [TopicName] keys and [V] values. |
|
0 commit comments