From 59cec858310b89373ab85eeae4917dc5ece596e4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 29 Aug 2025 18:05:08 -0500 Subject: [PATCH 1/8] WIP: Non-working gap test --- tests/csapi/room_messages_test.go | 301 ++++++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 373f3363..4dee1883 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -1,20 +1,27 @@ package csapi_tests import ( + "encoding/json" "fmt" "net/http" "net/url" "testing" + "time" "github.com/tidwall/gjson" "github.com/matrix-org/complement" "github.com/matrix-org/complement/b" "github.com/matrix-org/complement/client" + "github.com/matrix-org/complement/federation" "github.com/matrix-org/complement/helpers" "github.com/matrix-org/complement/match" "github.com/matrix-org/complement/must" "github.com/matrix-org/complement/runtime" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/fclient" + "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/matrix-org/util" ) // sytest: POST /rooms/:room_id/send/:event_type sends a message @@ -220,3 +227,297 @@ func TestRoomMessagesLazyLoadingLocalUser(t *testing.T) { }, }) } + +type MessageDraft struct { + Sender string + ShareInitially bool + Message string +} + +type EventInfo struct { + Message MessageDraft + PDU gomatrixserverlib.PDU +} + +func TestRoomMessagesGaps(t *testing.T) { + deployment := complement.Deploy(t, 1) + defer deployment.Destroy(t) + + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) + + // Create a remote homeserver + srv := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandleMakeSendJoinRequests(), + // The other server might try to send us some junk, just ignore it + federation.HandleTransactionRequests(nil, nil), + ) + cancel := srv.Listen() + defer cancel() + + roomVersion := alice.GetDefaultRoomVersion(t) + charlie := srv.UserID("charlie") + remoteRoom := srv.MustMakeRoom(t, roomVersion, federation.InitialRoomEvents(roomVersion, charlie)) + + messages := []MessageDraft{ + MessageDraft{charlie, true, "foo"}, + MessageDraft{charlie, true, "bar"}, + MessageDraft{charlie, true, "baz"}, + MessageDraft{charlie, false, "qux"}, + MessageDraft{charlie, true, "corge"}, + MessageDraft{charlie, true, "grault"}, + MessageDraft{charlie, true, "garply"}, + MessageDraft{charlie, true, "waldo"}, + MessageDraft{charlie, true, "fred"}, + } + + // Create some events + // Map from event_id to event info + eventIDs := make([]string, len(messages)) + eventMap := make(map[string]EventInfo) + for messageIndex, message := range messages { + federation_event := federation.Event{ + Sender: message.Sender, + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": message.Message, + }, + } + if messageIndex > 2 { + federation_event.PrevEvents = []string{ + eventIDs[messageIndex-1], + // Always connect it to some known part of the DAG (for the local server's sake + // later) + eventIDs[messageIndex-2], + } + } + + event := srv.MustCreateEvent(t, remoteRoom, federation_event) + eventIDs[messageIndex] = event.EventID() + eventMap[event.EventID()] = EventInfo{ + Message: message, + PDU: event, + } + remoteRoom.AddEvent(event) + } + + // Sanity check we sent all of the events in the room + if len(eventMap) != len(messages) { + t.Fatalf( + "expected the number of events (%d) to match the number of messages we expected to send (%d)", + len(messages), + len(eventMap), + ) + } + + // Make it easy to cross-reference the events being talked about in the logs + for eventIndex, eventID := range eventIDs { + message := eventMap[eventID].Message + event := eventMap[eventID].PDU + t.Logf("Message %d: %s-6s -> event_id=%s", eventIndex, message.Message, event.EventID()) + } + + // The other server is bound to ask about the missing events we reference in the + // prev_event_ids of others but we don't divulge that to them because we want the gaps + // to remain. + // + // We need to respond successfully (200 OK) so we remain on the good list of + // federation destinations. + srv.Mux().HandleFunc( + "/_matrix/federation/v1/get_missing_events/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /get_missing_events for %s", pathParams["roomID"]) + if pathParams["roomID"] != remoteRoom.RoomID { + t.Errorf("Received /get_missing_events for the wrong room: %s", remoteRoom.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "events": []string{}, + }, + } + }), + ).Methods("POST") + + // TODO + srv.Mux().HandleFunc( + "/_matrix/federation/v1/backfill/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /backfill for %s", pathParams["roomID"]) + if pathParams["roomID"] != remoteRoom.RoomID { + t.Errorf("Received /backfill for the wrong room: %s", remoteRoom.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + pdusToShare := []json.RawMessage{} + for _, eventInfo := range eventMap { + if eventInfo.Message.ShareInitially { + pdusToShare = append(pdusToShare, eventInfo.PDU.JSON()) + } + } + + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "origin": srv.ServerName(), + "origin_server_ts": time.Now().Unix(), + "pdus": pdusToShare, + }, + } + }), + ).Methods("GET") + + srv.Mux().HandleFunc( + "/_matrix/federation/v1/event/{eventID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /event for %s (%s)", pathParams["eventID"], eventMap[pathParams["eventID"]].Message.Message) + + eventInfo, ok := eventMap[pathParams["eventID"]] + if !ok || !eventInfo.Message.ShareInitially { + t.Errorf("Received /event for an unknown event: %s", pathParams["eventID"]) + return util.JSONResponse{ + Code: 400, + JSON: "unknown event", + } + } + + return util.JSONResponse{ + Code: 200, + JSON: map[string]interface{}{ + "origin": srv.ServerName(), + "origin_server_ts": time.Now().Unix(), + "pdus": []json.RawMessage{ + eventInfo.PDU.JSON(), + }, + }, + } + }), + ).Methods("GET") + + // Because state never changes in the room, we can just always respond the same + // + // Backfill will cause us to asked about `/state_ids` + roomStateForMessages := remoteRoom.AllCurrentState() + srv.Mux().HandleFunc( + "/_matrix/federation/v1/state_ids/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /state_ids for %s", pathParams["roomID"]) + if pathParams["roomID"] != remoteRoom.RoomID { + t.Errorf("Received /state_ids for the wrong room: %s", remoteRoom.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + return util.JSONResponse{ + Code: 200, + JSON: struct { + AuthChainIDs []string `json:"auth_chain_ids"` + PDUIDs []string `json:"pdu_ids"` + }{ + AuthChainIDs: eventIDsFromEvents(remoteRoom.AuthChainForEvents(roomStateForMessages)), + PDUIDs: eventIDsFromEvents(roomStateForMessages), + }, + } + }), + ).Methods("GET") + // After asking for `/state_ids`, the homeserver might actually ask about the actual + // `state` for those event IDs + srv.Mux().HandleFunc( + "/_matrix/federation/v1/state/{roomID}", + srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { + t.Logf("Got /state for %s", pathParams["roomID"]) + if pathParams["roomID"] != remoteRoom.RoomID { + t.Errorf("Received /state for the wrong room: %s", remoteRoom.RoomID) + return util.JSONResponse{ + Code: 400, + JSON: "wrong room", + } + } + + return util.JSONResponse{ + Code: 200, + JSON: struct { + AuthChain gomatrixserverlib.EventJSONs `json:"auth_chain"` + PDUs gomatrixserverlib.EventJSONs `json:"pdus"` + }{ + AuthChain: gomatrixserverlib.NewEventJSONsFromEvents(remoteRoom.AuthChainForEvents(roomStateForMessages)), + PDUs: gomatrixserverlib.NewEventJSONsFromEvents(roomStateForMessages), + }, + } + }), + ).Methods("GET") + + // The local homeserver joins the room + alice.MustJoinRoom(t, remoteRoom.RoomID, []spec.ServerName{srv.ServerName()}) + + // Backfill the local server with *some* of the messages (leave some gaps) + // for _, eventID := range slices.Backward(eventIDs) { + // // message := eventMap[eventID].Message + // event := eventMap[eventID].PDU + // // if message.ShareInitially { + // srv.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, "hs1"), []json.RawMessage{event.JSON()}, nil) + // // } + // } + + messagesRes := alice.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", remoteRoom.RoomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + }), + ) + messagesResBody := client.ParseJSON(t, messagesRes) + t.Logf("asdf %s", messagesResBody) + + fetchUntilMessagesResponseHas(t, alice, remoteRoom.RoomID, func(ev gjson.Result) bool { + t.Logf("asdf %s %s", ev.Get("event_id").Str, ev.Get("content").Raw) + return ev.Get("event_id").Str == eventIDs[0] + }) +} + +func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) { + t.Helper() + start := time.Now() + checkCounter := 0 + for { + if time.Since(start) > c.SyncUntilTimeout { + t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter) + } + + messagesRes := c.MustDo(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + messsageResBody := client.ParseJSON(t, messagesRes) + wantKey := "chunk" + keyRes := gjson.GetBytes(messsageResBody, wantKey) + if !keyRes.Exists() { + t.Fatalf("missing key '%s'", wantKey) + } + if !keyRes.IsArray() { + t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) + } + + events := keyRes.Array() + for _, ev := range events { + if check(ev) { + return + } + } + + checkCounter++ + // Add a slight delay so we don't hammer the messages endpoint + time.Sleep(500 * time.Millisecond) + } +} From fefc22f1ee79afba5ae012c3cc27712f3f2cdc48 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 29 Aug 2025 18:10:20 -0500 Subject: [PATCH 2/8] Rename `Message` -> `MessageDraft` --- tests/csapi/room_messages_test.go | 42 +++++++++++++++---------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 4dee1883..33369c95 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -235,8 +235,8 @@ type MessageDraft struct { } type EventInfo struct { - Message MessageDraft - PDU gomatrixserverlib.PDU + MessageDraft MessageDraft + PDU gomatrixserverlib.PDU } func TestRoomMessagesGaps(t *testing.T) { @@ -259,7 +259,7 @@ func TestRoomMessagesGaps(t *testing.T) { charlie := srv.UserID("charlie") remoteRoom := srv.MustMakeRoom(t, roomVersion, federation.InitialRoomEvents(roomVersion, charlie)) - messages := []MessageDraft{ + messageDrafts := []MessageDraft{ MessageDraft{charlie, true, "foo"}, MessageDraft{charlie, true, "bar"}, MessageDraft{charlie, true, "baz"}, @@ -273,49 +273,49 @@ func TestRoomMessagesGaps(t *testing.T) { // Create some events // Map from event_id to event info - eventIDs := make([]string, len(messages)) + eventIDs := make([]string, len(messageDrafts)) eventMap := make(map[string]EventInfo) - for messageIndex, message := range messages { + for messageDraftIndex, messageDraft := range messageDrafts { federation_event := federation.Event{ - Sender: message.Sender, + Sender: messageDraft.Sender, Type: "m.room.message", Content: map[string]interface{}{ "msgtype": "m.text", - "body": message.Message, + "body": messageDraft.Message, }, } - if messageIndex > 2 { + if messageDraftIndex > 2 { federation_event.PrevEvents = []string{ - eventIDs[messageIndex-1], + eventIDs[messageDraftIndex-1], // Always connect it to some known part of the DAG (for the local server's sake // later) - eventIDs[messageIndex-2], + eventIDs[messageDraftIndex-2], } } event := srv.MustCreateEvent(t, remoteRoom, federation_event) - eventIDs[messageIndex] = event.EventID() + eventIDs[messageDraftIndex] = event.EventID() eventMap[event.EventID()] = EventInfo{ - Message: message, - PDU: event, + MessageDraft: messageDraft, + PDU: event, } remoteRoom.AddEvent(event) } // Sanity check we sent all of the events in the room - if len(eventMap) != len(messages) { + if len(eventMap) != len(messageDrafts) { t.Fatalf( - "expected the number of events (%d) to match the number of messages we expected to send (%d)", - len(messages), + "expected the number of events (%d) to match the number of message drafts we expected to send (%d)", + len(messageDrafts), len(eventMap), ) } // Make it easy to cross-reference the events being talked about in the logs for eventIndex, eventID := range eventIDs { - message := eventMap[eventID].Message + messageDraft := eventMap[eventID].MessageDraft event := eventMap[eventID].PDU - t.Logf("Message %d: %s-6s -> event_id=%s", eventIndex, message.Message, event.EventID()) + t.Logf("Message %d: %s-6s -> event_id=%s", eventIndex, messageDraft.Message, event.EventID()) } // The other server is bound to ask about the missing events we reference in the @@ -360,7 +360,7 @@ func TestRoomMessagesGaps(t *testing.T) { pdusToShare := []json.RawMessage{} for _, eventInfo := range eventMap { - if eventInfo.Message.ShareInitially { + if eventInfo.MessageDraft.ShareInitially { pdusToShare = append(pdusToShare, eventInfo.PDU.JSON()) } } @@ -379,10 +379,10 @@ func TestRoomMessagesGaps(t *testing.T) { srv.Mux().HandleFunc( "/_matrix/federation/v1/event/{eventID}", srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Got /event for %s (%s)", pathParams["eventID"], eventMap[pathParams["eventID"]].Message.Message) + t.Logf("Got /event for %s (%s)", pathParams["eventID"], eventMap[pathParams["eventID"]].MessageDraft.Message) eventInfo, ok := eventMap[pathParams["eventID"]] - if !ok || !eventInfo.Message.ShareInitially { + if !ok || !eventInfo.MessageDraft.ShareInitially { t.Errorf("Received /event for an unknown event: %s", pathParams["eventID"]) return util.JSONResponse{ Code: 400, From 35f07618d0a92967ade0c9f563163892700cf7ff Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 29 Aug 2025 18:25:55 -0500 Subject: [PATCH 3/8] Make debug logs more clear --- tests/csapi/room_messages_test.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 33369c95..058634e7 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -315,7 +315,7 @@ func TestRoomMessagesGaps(t *testing.T) { for eventIndex, eventID := range eventIDs { messageDraft := eventMap[eventID].MessageDraft event := eventMap[eventID].PDU - t.Logf("Message %d: %s-6s -> event_id=%s", eventIndex, messageDraft.Message, event.EventID()) + t.Logf("Message %d: %-6s -> event_id=%s", eventIndex, messageDraft.Message, event.EventID()) } // The other server is bound to ask about the missing events we reference in the @@ -327,7 +327,7 @@ func TestRoomMessagesGaps(t *testing.T) { srv.Mux().HandleFunc( "/_matrix/federation/v1/get_missing_events/{roomID}", srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Got /get_missing_events for %s", pathParams["roomID"]) + t.Logf("Received /get_missing_events for %s", pathParams["roomID"]) if pathParams["roomID"] != remoteRoom.RoomID { t.Errorf("Received /get_missing_events for the wrong room: %s", remoteRoom.RoomID) return util.JSONResponse{ @@ -349,7 +349,7 @@ func TestRoomMessagesGaps(t *testing.T) { srv.Mux().HandleFunc( "/_matrix/federation/v1/backfill/{roomID}", srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Got /backfill for %s", pathParams["roomID"]) + t.Logf("Received /backfill for %s", pathParams["roomID"]) if pathParams["roomID"] != remoteRoom.RoomID { t.Errorf("Received /backfill for the wrong room: %s", remoteRoom.RoomID) return util.JSONResponse{ @@ -376,20 +376,29 @@ func TestRoomMessagesGaps(t *testing.T) { }), ).Methods("GET") + // Servers might ask about missing events via `/event` srv.Mux().HandleFunc( "/_matrix/federation/v1/event/{eventID}", srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Got /event for %s (%s)", pathParams["eventID"], eventMap[pathParams["eventID"]].MessageDraft.Message) - eventInfo, ok := eventMap[pathParams["eventID"]] - if !ok || !eventInfo.MessageDraft.ShareInitially { - t.Errorf("Received /event for an unknown event: %s", pathParams["eventID"]) + if !ok { + t.Errorf("Received /event for an unknown event (at-least not one of the messages): %s", pathParams["eventID"]) return util.JSONResponse{ Code: 400, JSON: "unknown event", } } + if !eventInfo.MessageDraft.ShareInitially { + t.Errorf("🙅 Received /event for an event we're not sharing: %s (%s)", pathParams["eventID"], eventInfo.MessageDraft.Message) + return util.JSONResponse{ + Code: 400, + JSON: "not sharing", + } + } + + t.Logf("Received /event for %s (%s)", pathParams["eventID"], eventInfo.MessageDraft.Message) + return util.JSONResponse{ Code: 200, JSON: map[string]interface{}{ @@ -403,14 +412,15 @@ func TestRoomMessagesGaps(t *testing.T) { }), ).Methods("GET") - // Because state never changes in the room, we can just always respond the same + // Because state never changes in the room, we can just always respond the same and + // assume they're not asking about state before all the state was sent. // // Backfill will cause us to asked about `/state_ids` roomStateForMessages := remoteRoom.AllCurrentState() srv.Mux().HandleFunc( "/_matrix/federation/v1/state_ids/{roomID}", srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Got /state_ids for %s", pathParams["roomID"]) + t.Logf("Received /state_ids for %s", pathParams["roomID"]) if pathParams["roomID"] != remoteRoom.RoomID { t.Errorf("Received /state_ids for the wrong room: %s", remoteRoom.RoomID) return util.JSONResponse{ @@ -436,7 +446,7 @@ func TestRoomMessagesGaps(t *testing.T) { srv.Mux().HandleFunc( "/_matrix/federation/v1/state/{roomID}", srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Got /state for %s", pathParams["roomID"]) + t.Logf("Received /state for %s", pathParams["roomID"]) if pathParams["roomID"] != remoteRoom.RoomID { t.Errorf("Received /state for the wrong room: %s", remoteRoom.RoomID) return util.JSONResponse{ From 60bb6c8cca252598a69cd19de00bbe453d3fcf9f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 1 Sep 2025 21:43:28 -0500 Subject: [PATCH 4/8] Non-working: Join/leave test to create gaps This works to create gaps but for some reason after backfilling, we still see some gaps and not all messages appear. --- tests/csapi/room_messages_test.go | 646 ++++++++++++++++++------------ 1 file changed, 385 insertions(+), 261 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 058634e7..0870572e 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -5,23 +5,20 @@ import ( "fmt" "net/http" "net/url" + "slices" + "strings" "testing" - "time" "github.com/tidwall/gjson" "github.com/matrix-org/complement" "github.com/matrix-org/complement/b" "github.com/matrix-org/complement/client" - "github.com/matrix-org/complement/federation" "github.com/matrix-org/complement/helpers" "github.com/matrix-org/complement/match" "github.com/matrix-org/complement/must" "github.com/matrix-org/complement/runtime" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/spec" - "github.com/matrix-org/util" ) // sytest: POST /rooms/:room_id/send/:event_type sends a message @@ -229,305 +226,432 @@ func TestRoomMessagesLazyLoadingLocalUser(t *testing.T) { } type MessageDraft struct { - Sender string - ShareInitially bool - Message string + Sender *client.CSAPI + Message string } type EventInfo struct { MessageDraft MessageDraft - PDU gomatrixserverlib.PDU + EventID string } func TestRoomMessagesGaps(t *testing.T) { - deployment := complement.Deploy(t, 1) + deployment := complement.Deploy(t, 3) defer deployment.Destroy(t) - alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) - - // Create a remote homeserver - srv := federation.NewServer(t, deployment, - federation.HandleKeyRequests(), - federation.HandleMakeSendJoinRequests(), - // The other server might try to send us some junk, just ignore it - federation.HandleTransactionRequests(nil, nil), - ) - cancel := srv.Listen() - defer cancel() + // Sometimes we send more than 10 messages (the default in Synapse) and we want to + // include all of them in the response. + includeMoreTimelineFilter, _ := json.Marshal(map[string]interface{}{ + "room": map[string]interface{}{ + "timeline": map[string]interface{}{ + "limit": 100, + }, + }, + }) - roomVersion := alice.GetDefaultRoomVersion(t) - charlie := srv.UserID("charlie") - remoteRoom := srv.MustMakeRoom(t, roomVersion, federation.InitialRoomEvents(roomVersion, charlie)) + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{ + LocalpartSuffix: "alice", + }) + bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{ + LocalpartSuffix: "bob", + }) + charlie := deployment.Register(t, "hs3", helpers.RegistrationOpts{ + LocalpartSuffix: "charlie", + }) - messageDrafts := []MessageDraft{ - MessageDraft{charlie, true, "foo"}, - MessageDraft{charlie, true, "bar"}, - MessageDraft{charlie, true, "baz"}, - MessageDraft{charlie, false, "qux"}, - MessageDraft{charlie, true, "corge"}, - MessageDraft{charlie, true, "grault"}, - MessageDraft{charlie, true, "garply"}, - MessageDraft{charlie, true, "waldo"}, - MessageDraft{charlie, true, "fred"}, - } + // Start a sync loop + _, aliceSince := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) + _, bobSince := bob.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) + _, charlieSince := charlie.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) - // Create some events + // Keep track of the order + eventIDs := make([]string, 0) // Map from event_id to event info - eventIDs := make([]string, len(messageDrafts)) eventMap := make(map[string]EventInfo) - for messageDraftIndex, messageDraft := range messageDrafts { - federation_event := federation.Event{ - Sender: messageDraft.Sender, - Type: "m.room.message", - Content: map[string]interface{}{ - "msgtype": "m.text", - "body": messageDraft.Message, - }, - } - if messageDraftIndex > 2 { - federation_event.PrevEvents = []string{ - eventIDs[messageDraftIndex-1], - // Always connect it to some known part of the DAG (for the local server's sake - // later) - eventIDs[messageDraftIndex-2], - } - } + // List of join events from charlie + charlieJoinEventIDs := make([]string, 0) - event := srv.MustCreateEvent(t, remoteRoom, federation_event) - eventIDs[messageDraftIndex] = event.EventID() - eventMap[event.EventID()] = EventInfo{ - MessageDraft: messageDraft, - PDU: event, - } - remoteRoom.AddEvent(event) - } + // Everyone joins the room + roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + awaitPartialStateJoinCompletion(t, roomID, bob) + charlie.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + charlieJoinEventID := getStateID(t, charlie, roomID, "m.room.member", charlie.UserID) + charlieJoinEventIDs = append(charlieJoinEventIDs, charlieJoinEventID) + t.Logf("Charlie initially joins the room: %s", charlieJoinEventID) + awaitPartialStateJoinCompletion(t, roomID, charlie) - // Sanity check we sent all of the events in the room - if len(eventMap) != len(messageDrafts) { - t.Fatalf( - "expected the number of events (%d) to match the number of message drafts we expected to send (%d)", - len(messageDrafts), - len(eventMap), - ) + messageDrafts := []MessageDraft{ + MessageDraft{alice, "I was just reading that commercial moon trips might start next year."}, + MessageDraft{bob, "Seriously? I'd sign up in a heartbeat. Imagine looking back at Earth."}, + MessageDraft{charlie, "Yeah, me too. It's the ultimate adventure. I've actually been looking into it..."}, + MessageDraft{alice, "Wait, Charlie, you're not actually considering it, are you? It must be incredibly dangerous."}, + MessageDraft{charlie, "Considering it? My launch is in ten minutes. Gotta go suit up."}, + MessageDraft{bob, "Wait, what? You're joking. Right, Charlie?"}, + } + newEventIDs := sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + + // Charlie leaves the room + charlie.MustLeaveRoom(t, roomID) + t.Logf("Charlie leaving for the moon: %s", getStateID(t, charlie, roomID, "m.room.member", charlie.UserID)) + // Make sure the leave has federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncLeftFrom(charlie.UserID, roomID)) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncLeftFrom(charlie.UserID, roomID)) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince}, client.SyncLeftFrom(charlie.UserID, roomID)) + + // Send some more messages which charlie won't get + messageDrafts = []MessageDraft{ + MessageDraft{alice, "Charlie...?"}, + MessageDraft{bob, "I think he was serious. His profile pic is now him in a spacesuit."}, + MessageDraft{alice, "Well. I guess he really left for the moon. Talk about a conversation killer."}, + } + newEventIDs = sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // Charlie isn't in the room right now so won't see anything yet + // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + + // Charlie joins back after going to the moon (has a gap in history) + charlie.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + charlieJoinEventID = getStateID(t, charlie, roomID, "m.room.member", charlie.UserID) + charlieJoinEventIDs = append(charlieJoinEventIDs, charlieJoinEventID) + t.Logf("Charlie join after coming back from the moon: %s", charlieJoinEventID) + awaitPartialStateJoinCompletion(t, roomID, charlie) + // Make sure the join has federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + + messageDrafts = []MessageDraft{ + MessageDraft{bob, "Hey, has anyone heard from Charlie? It's been months."}, + MessageDraft{alice, "Not a peep. I still can't believe he actually did it."}, + MessageDraft{charlie, "Believe it."}, + MessageDraft{alice, "CHARLIE?! You're back! How was it?!"}, + MessageDraft{charlie, "Dusty. Quiet. The most beautiful thing I've ever seen. Earth is just... a blue marble."}, + MessageDraft{bob, "Welcome back, man! So, what's next? A well-deserved vacation on a beach?"}, + MessageDraft{charlie, "A beach? Nah. I've seen the next horizon."}, + MessageDraft{alice, "Oh no. I know that tone. What horizon?"}, + MessageDraft{charlie, "The red one. They need pilots for the new Mars colony. I leave in six weeks."}, + MessageDraft{bob, "You can't be serious. You just got back!"}, + MessageDraft{charlie, "Serious as a vacuum. Talk to you guys from the stars. Bob, Alice... try to keep Earth in one piece for me."}, + } + newEventIDs = sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + + // Charlie leaves the room + charlie.MustLeaveRoom(t, roomID) + t.Logf("Charlie leaving to Mars: %s", getStateID(t, charlie, roomID, "m.room.member", charlie.UserID)) + // Make sure the leave has federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncLeftFrom(charlie.UserID, roomID)) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncLeftFrom(charlie.UserID, roomID)) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince}, client.SyncLeftFrom(charlie.UserID, roomID)) + + // Send some more messages while charlie is gone + messageDrafts = []MessageDraft{ + MessageDraft{bob, "Okay, so with Charlie literally out of this world, who's watering his plants?"}, + MessageDraft{alice, "I have a key. I'm on it. Though I'm half-convinced his fern is planning a moon landing of its own."}, + MessageDraft{bob, "Hah! So, completely changing the subject, have you tried that new pizza place on 5th? The one with the weird hexagonal slices?"}, + MessageDraft{alice, "Hexagonza? Yeah! The 'Geometry Special' is actually amazing. Though eating it feels like a math test."}, + MessageDraft{bob, "Right? I kept trying to calculate the area. Totally worth the existential crisis though."}, + MessageDraft{alice, "We should go next week. My treat. We can finally have a conversation that doesn't involve orbital mechanics."}, + MessageDraft{bob, "Deal. But low-key, I'm still expecting Charlie to message us a photo of his pizza on Mars."}, + MessageDraft{alice, "With extra red dust."}, } + newEventIDs = sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // Charlie isn't in the room right now so won't see anything yet + // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + + // Charlie joins back after going to mars (has a gap in history) + charlie.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + charlieJoinEventID = getStateID(t, charlie, roomID, "m.room.member", charlie.UserID) + charlieJoinEventIDs = append(charlieJoinEventIDs, charlieJoinEventID) + t.Logf("Charlie join after coming back from Mars: %s", charlieJoinEventID) + awaitPartialStateJoinCompletion(t, roomID, charlie) + // Make sure the join has federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince}, client.SyncJoinedTo(charlie.UserID, roomID)) // Make it easy to cross-reference the events being talked about in the logs for eventIndex, eventID := range eventIDs { - messageDraft := eventMap[eventID].MessageDraft - event := eventMap[eventID].PDU - t.Logf("Message %d: %-6s -> event_id=%s", eventIndex, messageDraft.Message, event.EventID()) + // messageDraft := eventMap[eventID].MessageDraft + t.Logf("Message %d -> event_id=%s", eventIndex, eventID) } - // The other server is bound to ask about the missing events we reference in the - // prev_event_ids of others but we don't divulge that to them because we want the gaps - // to remain. - // - // We need to respond successfully (200 OK) so we remain on the good list of - // federation destinations. - srv.Mux().HandleFunc( - "/_matrix/federation/v1/get_missing_events/{roomID}", - srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Received /get_missing_events for %s", pathParams["roomID"]) - if pathParams["roomID"] != remoteRoom.RoomID { - t.Errorf("Received /get_missing_events for the wrong room: %s", remoteRoom.RoomID) - return util.JSONResponse{ - Code: 400, - JSON: "wrong room", - } - } - - return util.JSONResponse{ - Code: 200, - JSON: map[string]interface{}{ - "events": []string{}, - }, - } + messagesRes := charlie.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + "backfill": []string{"false"}, }), - ).Methods("POST") - - // TODO - srv.Mux().HandleFunc( - "/_matrix/federation/v1/backfill/{roomID}", - srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Received /backfill for %s", pathParams["roomID"]) - if pathParams["roomID"] != remoteRoom.RoomID { - t.Errorf("Received /backfill for the wrong room: %s", remoteRoom.RoomID) - return util.JSONResponse{ - Code: 400, - JSON: "wrong room", - } - } + ) + messagesResBody := client.ParseJSON(t, messagesRes) + t.Logf("Before backfill (expecting gaps) %s", messagesResBody) - pdusToShare := []json.RawMessage{} - for _, eventInfo := range eventMap { - if eventInfo.MessageDraft.ShareInitially { - pdusToShare = append(pdusToShare, eventInfo.PDU.JSON()) - } - } + // We should see some gaps + gapsRes := gjson.GetBytes(messagesResBody, "gaps") + if !gapsRes.Exists() { + t.Fatalf("missing key '%s' in JSON response", "gaps") + } + if !gapsRes.IsArray() { + t.Fatalf("key '%s' is not an array (was %s)", "gaps", gapsRes.Type) + } + gaps := gapsRes.Array() + if len(gaps) != 3 { + t.Fatalf("expected 3 gaps (got %d) for each time after charlie joins back to the room - gaps: %s", + len(gaps), gaps, + ) + } + // Assert gaps are where we expect + for gapIndex, gap := range gaps { + if gaps[gapIndex].Get("event_id").Str != charlieJoinEventIDs[len(charlieJoinEventIDs)-1-gapIndex] { + t.Fatalf("expected gap %d event_id to be %s (got %s) - charlieJoinEventIDs: %s", + gapIndex, + charlieJoinEventIDs[len(charlieJoinEventIDs)-1-gapIndex], + gap.Get("event_id").Str, + charlieJoinEventIDs, + ) + } + } - return util.JSONResponse{ - Code: 200, - JSON: map[string]interface{}{ - "origin": srv.ServerName(), - "origin_server_ts": time.Now().Unix(), - "pdus": pdusToShare, - }, - } + // Fetch with `?backfill=true` to close the gaps + for _, gap := range gaps { + // TODO: Do a better job of retrying until we see the new event. Not every server + // implementation will necessarily backfill right away in the foreground of a + // `/messages` request. + charlie.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + "backfill": []string{"true"}, + "from": []string{gap.Get("prev_pagination_token").Str}, + }), + ) + } + + // Make another `/messages` request to ensure that we've backfilled the events now and + // we don't see any gaps + messagesRes = charlie.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + "backfill": []string{"false"}, }), - ).Methods("GET") - - // Servers might ask about missing events via `/event` - srv.Mux().HandleFunc( - "/_matrix/federation/v1/event/{eventID}", - srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - eventInfo, ok := eventMap[pathParams["eventID"]] - if !ok { - t.Errorf("Received /event for an unknown event (at-least not one of the messages): %s", pathParams["eventID"]) - return util.JSONResponse{ - Code: 400, - JSON: "unknown event", - } - } + ) + messagesResBody = client.ParseJSON(t, messagesRes) + t.Logf("After backfill (expecting *no* gaps) %s", messagesResBody) + + // We shouldn't see any gaps anymore + gapsRes = gjson.GetBytes(messagesResBody, "gaps") + // The gaps array could be empty (or omitted entirely) + if gapsRes.Exists() { + gaps = gapsRes.Array() + if len(gaps) != 0 { + t.Logf("Gaps after backfill (unexpected): %s", gaps) + // t.Fatalf("expected no gaps (got %d) after we backfilled each one - gaps: %s", + // len(gaps), gaps, + // ) + } + } else { + // Omitted entirely is fine (no gaps) + } - if !eventInfo.MessageDraft.ShareInitially { - t.Errorf("🙅 Received /event for an event we're not sharing: %s (%s)", pathParams["eventID"], eventInfo.MessageDraft.Message) - return util.JSONResponse{ - Code: 400, - JSON: "not sharing", - } - } + // Assert timeline order + assertMessagesInTimelineInOrder(t, messagesResBody, eventIDs) +} - t.Logf("Received /event for %s (%s)", pathParams["eventID"], eventInfo.MessageDraft.Message) - - return util.JSONResponse{ - Code: 200, - JSON: map[string]interface{}{ - "origin": srv.ServerName(), - "origin_server_ts": time.Now().Unix(), - "pdus": []json.RawMessage{ - eventInfo.PDU.JSON(), - }, - }, - } - }), - ).Methods("GET") - - // Because state never changes in the room, we can just always respond the same and - // assume they're not asking about state before all the state was sent. - // - // Backfill will cause us to asked about `/state_ids` - roomStateForMessages := remoteRoom.AllCurrentState() - srv.Mux().HandleFunc( - "/_matrix/federation/v1/state_ids/{roomID}", - srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Received /state_ids for %s", pathParams["roomID"]) - if pathParams["roomID"] != remoteRoom.RoomID { - t.Errorf("Received /state_ids for the wrong room: %s", remoteRoom.RoomID) - return util.JSONResponse{ - Code: 400, - JSON: "wrong room", - } - } +func sendMessageDrafts( + t *testing.T, + roomID string, + messageDrafts []MessageDraft, +) []string { + t.Helper() - return util.JSONResponse{ - Code: 200, - JSON: struct { - AuthChainIDs []string `json:"auth_chain_ids"` - PDUIDs []string `json:"pdu_ids"` - }{ - AuthChainIDs: eventIDsFromEvents(remoteRoom.AuthChainForEvents(roomStateForMessages)), - PDUIDs: eventIDsFromEvents(roomStateForMessages), - }, - } - }), - ).Methods("GET") - // After asking for `/state_ids`, the homeserver might actually ask about the actual - // `state` for those event IDs - srv.Mux().HandleFunc( - "/_matrix/federation/v1/state/{roomID}", - srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse { - t.Logf("Received /state for %s", pathParams["roomID"]) - if pathParams["roomID"] != remoteRoom.RoomID { - t.Errorf("Received /state for the wrong room: %s", remoteRoom.RoomID) - return util.JSONResponse{ - Code: 400, - JSON: "wrong room", - } - } + eventIDs := make([]string, len(messageDrafts)) + for messageDraftIndex, messageDraft := range messageDrafts { + eventID := messageDraft.Sender.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": messageDraft.Message, + }, + }) + eventIDs[messageDraftIndex] = eventID + } - return util.JSONResponse{ - Code: 200, - JSON: struct { - AuthChain gomatrixserverlib.EventJSONs `json:"auth_chain"` - PDUs gomatrixserverlib.EventJSONs `json:"pdus"` - }{ - AuthChain: gomatrixserverlib.NewEventJSONsFromEvents(remoteRoom.AuthChainForEvents(roomStateForMessages)), - PDUs: gomatrixserverlib.NewEventJSONsFromEvents(roomStateForMessages), - }, - } - }), - ).Methods("GET") + return eventIDs +} - // The local homeserver joins the room - alice.MustJoinRoom(t, remoteRoom.RoomID, []spec.ServerName{srv.ServerName()}) +// sendAndTrackMessages sends the given message drafts to the room, keeping track of the +// new events in the list of `eventIDs` and `eventMap`. Returns the list of new event +// IDs that were sent. +func sendAndTrackMessages( + t *testing.T, + roomID string, + messageDrafts []MessageDraft, + eventIDs *[]string, + eventMap *map[string]EventInfo, +) []string { + t.Helper() - // Backfill the local server with *some* of the messages (leave some gaps) - // for _, eventID := range slices.Backward(eventIDs) { - // // message := eventMap[eventID].Message - // event := eventMap[eventID].PDU - // // if message.ShareInitially { - // srv.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, "hs1"), []json.RawMessage{event.JSON()}, nil) - // // } - // } + newEventIDs := sendMessageDrafts(t, roomID, messageDrafts) - messagesRes := alice.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", remoteRoom.RoomID, "messages"}, - client.WithContentType("application/json"), - client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - }), - ) - messagesResBody := client.ParseJSON(t, messagesRes) - t.Logf("asdf %s", messagesResBody) + *eventIDs = append(*eventIDs, newEventIDs...) + for i, eventID := range newEventIDs { + (*eventMap)[eventID] = EventInfo{ + MessageDraft: messageDrafts[i], + EventID: eventID, + } + } - fetchUntilMessagesResponseHas(t, alice, remoteRoom.RoomID, func(ev gjson.Result) bool { - t.Logf("asdf %s %s", ev.Get("event_id").Str, ev.Get("content").Raw) - return ev.Get("event_id").Str == eventIDs[0] - }) + return newEventIDs +} + +func syncTimelineHasEventIDs(roomID string, eventIDs []string) []client.SyncCheckOpt { + syncChecks := make([]client.SyncCheckOpt, 0, len(eventIDs)) + for _, eventID := range eventIDs { + syncChecks = append(syncChecks, client.SyncTimelineHasEventID(roomID, eventID)) + } + return syncChecks } -func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) { +// assertMessagesTimeline asserts all events are in the response in the given order. +// Other unrelated events can be in between. +// +// messagesResBody: from a `/messages?dir=b` request (these will be in reverse-chronological order) +// eventIDs: the list of event IDs in chronological order that we expect to see in the response +func assertMessagesInTimelineInOrder(t *testing.T, messagesResBody json.RawMessage, expectedEventIDs []string) { t.Helper() - start := time.Now() - checkCounter := 0 - for { - if time.Since(start) > c.SyncUntilTimeout { - t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter) - } - messagesRes := c.MustDo(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - messsageResBody := client.ParseJSON(t, messagesRes) - wantKey := "chunk" - keyRes := gjson.GetBytes(messsageResBody, wantKey) - if !keyRes.Exists() { - t.Fatalf("missing key '%s'", wantKey) + wantKey := "chunk" + keyRes := gjson.GetBytes(messagesResBody, wantKey) + if !keyRes.Exists() { + t.Fatalf("missing key '%s'", wantKey) + } + if !keyRes.IsArray() { + t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) + } + + actualEvents := keyRes.Array() + // relevantActualEvents := make([]gjson.Result, 0, len(expectedEventIDs)) + relevantActualEventIDs := make([]string, 0, len(expectedEventIDs)) + for _, event := range actualEvents { + if slices.Contains(expectedEventIDs, event.Get("event_id").Str) { + // relevantActualEvents = append(relevantActualEvents, event) + relevantActualEventIDs = append(relevantActualEventIDs, event.Get("event_id").Str) + } + } + // Put them in chronological order to match the expected list + // slices.Reverse(relevantActualEvents) + slices.Reverse(relevantActualEventIDs) + + expectedLines := make([]string, len(expectedEventIDs)) + for i, expectedEventID := range expectedEventIDs { + isExpectedInActual := slices.Contains(relevantActualEventIDs, expectedEventID) + isMissingIndicatorString := " " + if !isExpectedInActual { + isMissingIndicatorString = "?" } - if !keyRes.IsArray() { - t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) + + expectedLines[i] = fmt.Sprintf("%2d: %s %s", i, isMissingIndicatorString, expectedEventID) + } + expectedDiffString := strings.Join(expectedLines, "\n") + + actualLines := make([]string, len(relevantActualEventIDs)) + for actualEventIndex, actualEventID := range relevantActualEventIDs { + isActualInExpected := slices.Contains(expectedEventIDs, actualEventID) + isActualInExpectedIndicatorString := " " + if isActualInExpected { + isActualInExpectedIndicatorString = "+" } - events := keyRes.Array() - for _, ev := range events { - if check(ev) { - return + expectedIndex := slices.Index(expectedEventIDs, actualEventID) + expectedIndexString := "" + if actualEventIndex != expectedIndex { + expectedDirectionString := "⬆️" + if expectedIndex > actualEventIndex { + expectedDirectionString = "⬇️" } + + expectedIndexString = fmt.Sprintf(" (expected index %d %s)", expectedIndex, expectedDirectionString) + } + + actualLines[actualEventIndex] = fmt.Sprintf("%2d: %s %s%s", actualEventIndex, isActualInExpectedIndicatorString, actualEventID, expectedIndexString) + } + actualDiffString := strings.Join(actualLines, "\n") + + if len(relevantActualEventIDs) != len(expectedEventIDs) { + t.Fatalf("expected %d events in timeline (got %d)\nActual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", + len(expectedEventIDs), len(relevantActualEventIDs), actualDiffString, expectedDiffString, + ) + } + + for i, eventID := range relevantActualEventIDs { + if eventID != expectedEventIDs[i] { + t.Fatalf("expected event ID %s (got %s) at index %d\nActual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", + expectedEventIDs[i], eventID, i, actualDiffString, expectedDiffString, + ) } + } +} + +func getStateID(t *testing.T, c *client.CSAPI, roomID string, stateType string, stateKey string) string { + t.Helper() + + stateRes := c.MustDo(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "state"}) + stateResBody := client.ParseJSON(t, stateRes) + eventJSON := gjson.ParseBytes(stateResBody) + if !eventJSON.IsArray() { + t.Fatalf("expected array of state events but found %s", eventJSON.Type) + } - checkCounter++ - // Add a slight delay so we don't hammer the messages endpoint - time.Sleep(500 * time.Millisecond) + events := eventJSON.Array() + + for _, event := range events { + if event.Get("type").Str == stateType && event.Get("state_key").Str == stateKey { + return event.Get("event_id").Str + } } + + t.Fatalf("Unable to find state event for (%s, %s). Room state: %s", stateType, stateKey, events) + return "" +} + +// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated +func awaitPartialStateJoinCompletion( + t *testing.T, room_id string, user *client.CSAPI, +) { + t.Helper() + + // Use a `/members` request to wait for the room to be un-partial stated. + // We avoid using `/sync`, as it only waits (or used to wait) for full state at + // particular events, rather than the whole room. + user.MustDo( + t, + "GET", + []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, + ) + t.Logf("%s's partial state join to %s completed.", user.UserID, room_id) } From effb1cad832488979862d98e181ac52a0c0afede Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 1 Sep 2025 23:10:26 -0500 Subject: [PATCH 5/8] Sync after join --- tests/csapi/room_messages_test.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 0870572e..c161dea0 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -272,11 +272,20 @@ func TestRoomMessagesGaps(t *testing.T) { charlieJoinEventIDs := make([]string, 0) // Everyone joins the room + // + // Alice creates the room roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + // Bob joins the room bob.MustJoinRoom(t, roomID, []spec.ServerName{ deployment.GetFullyQualifiedHomeserverName(t, "hs1"), }) awaitPartialStateJoinCompletion(t, roomID, bob) + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID)) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncJoinedTo(bob.UserID, roomID)) + // Charlie not joined yet + // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince}, client.SyncJoinedTo(bob.UserID, roomID)) + + // Charlie joins the room charlie.MustJoinRoom(t, roomID, []spec.ServerName{ deployment.GetFullyQualifiedHomeserverName(t, "hs1"), }) @@ -284,6 +293,9 @@ func TestRoomMessagesGaps(t *testing.T) { charlieJoinEventIDs = append(charlieJoinEventIDs, charlieJoinEventID) t.Logf("Charlie initially joins the room: %s", charlieJoinEventID) awaitPartialStateJoinCompletion(t, roomID, charlie) + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncJoinedTo(charlie.UserID, roomID)) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince}, client.SyncJoinedTo(charlie.UserID, roomID)) messageDrafts := []MessageDraft{ MessageDraft{alice, "I was just reading that commercial moon trips might start next year."}, @@ -445,7 +457,11 @@ func TestRoomMessagesGaps(t *testing.T) { "dir": []string{"b"}, "limit": []string{"100"}, "backfill": []string{"true"}, - "from": []string{gap.Get("prev_pagination_token").Str}, + // TODO: This works to get around current issues in Synapse around finding gaps to backfill + // but is kinda the wrong thing to use. + // "from": []string{gap.Get("next_pagination_token").Str}, + // This gives a perfect continuation point to fill in + "from": []string{gap.Get("prev_pagination_token").Str}, }), ) } From c25773ad67c1901174fc0aa8d47d39b6568b3c6e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 1 Sep 2025 23:18:37 -0500 Subject: [PATCH 6/8] Passing test --- tests/csapi/room_messages_test.go | 53 ++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index c161dea0..f2b35647 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -325,12 +325,17 @@ func TestRoomMessagesGaps(t *testing.T) { MessageDraft{bob, "I think he was serious. His profile pic is now him in a spacesuit."}, MessageDraft{alice, "Well. I guess he really left for the moon. Talk about a conversation killer."}, } - newEventIDs = sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) - // Make sure all of the messages have federated - aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) - bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) - // Charlie isn't in the room right now so won't see anything yet - // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // We have to send these one by one because we want to ensure that events are + // sequential in history and we need to make sure each homeserver knows about all of + // the events before we send the next one. + for _, messageDraft := range messageDrafts { + newEventIDs = sendAndTrackMessages(t, roomID, []MessageDraft{messageDraft}, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // Charlie isn't in the room right now so won't see anything yet + // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + } // Charlie joins back after going to the moon (has a gap in history) charlie.MustJoinRoom(t, roomID, []spec.ServerName{ @@ -358,11 +363,16 @@ func TestRoomMessagesGaps(t *testing.T) { MessageDraft{bob, "You can't be serious. You just got back!"}, MessageDraft{charlie, "Serious as a vacuum. Talk to you guys from the stars. Bob, Alice... try to keep Earth in one piece for me."}, } - newEventIDs = sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) - // Make sure all of the messages have federated - aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) - bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) - charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // We have to send these one by one because we want to ensure that events are + // sequential in history and we need to make sure each homeserver knows about all of + // the events before we send the next one. + for _, messageDraft := range messageDrafts { + newEventIDs = sendAndTrackMessages(t, roomID, []MessageDraft{messageDraft}, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + } // Charlie leaves the room charlie.MustLeaveRoom(t, roomID) @@ -383,12 +393,17 @@ func TestRoomMessagesGaps(t *testing.T) { MessageDraft{bob, "Deal. But low-key, I'm still expecting Charlie to message us a photo of his pizza on Mars."}, MessageDraft{alice, "With extra red dust."}, } - newEventIDs = sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) - // Make sure all of the messages have federated - aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) - bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) - // Charlie isn't in the room right now so won't see anything yet - // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // We have to send these one by one because we want to ensure that events are + // sequential in history and we need to make sure each homeserver knows about all of + // the events before we send the next one. + for _, messageDraft := range messageDrafts { + newEventIDs = sendAndTrackMessages(t, roomID, []MessageDraft{messageDraft}, &eventIDs, &eventMap) + // Make sure all of the messages have federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + bobSince = bob.MustSyncUntil(t, client.SyncReq{Since: bobSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + // Charlie isn't in the room right now so won't see anything yet + // charlieSince = charlie.MustSyncUntil(t, client.SyncReq{Since: charlieSince, Filter: string(includeMoreTimelineFilter)}, syncTimelineHasEventIDs(roomID, newEventIDs)...) + } // Charlie joins back after going to mars (has a gap in history) charlie.MustJoinRoom(t, roomID, []spec.ServerName{ @@ -459,9 +474,9 @@ func TestRoomMessagesGaps(t *testing.T) { "backfill": []string{"true"}, // TODO: This works to get around current issues in Synapse around finding gaps to backfill // but is kinda the wrong thing to use. - // "from": []string{gap.Get("next_pagination_token").Str}, + "from": []string{gap.Get("next_pagination_token").Str}, // This gives a perfect continuation point to fill in - "from": []string{gap.Get("prev_pagination_token").Str}, + // "from": []string{gap.Get("prev_pagination_token").Str}, }), ) } From e4865b5c2bd635348168ae15e90e26f421e4091d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Sep 2025 01:07:49 -0500 Subject: [PATCH 7/8] No need for `next_pagination_token` hack Fixed Synapse to look for backfill points more loosely because of approximate `depth` comparison --- tests/csapi/room_messages_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index f2b35647..c0b21e21 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -472,11 +472,8 @@ func TestRoomMessagesGaps(t *testing.T) { "dir": []string{"b"}, "limit": []string{"100"}, "backfill": []string{"true"}, - // TODO: This works to get around current issues in Synapse around finding gaps to backfill - // but is kinda the wrong thing to use. - "from": []string{gap.Get("next_pagination_token").Str}, // This gives a perfect continuation point to fill in - // "from": []string{gap.Get("prev_pagination_token").Str}, + "from": []string{gap.Get("prev_pagination_token").Str}, }), ) } From 61c028bdebdf4b8092662614f3796035577f2150 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Sep 2025 01:09:59 -0500 Subject: [PATCH 8/8] Add note where to sleep when manually testing with real client --- tests/csapi/room_messages_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index c0b21e21..245f5d56 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -461,6 +461,9 @@ func TestRoomMessagesGaps(t *testing.T) { } } + // XXX: Sleep here if you're manually testing with a real-client + // time.Sleep(2 * time.Hour) + // Fetch with `?backfill=true` to close the gaps for _, gap := range gaps { // TODO: Do a better job of retrying until we see the new event. Not every server