99 "testing"
1010 "time"
1111
12+ "github.com/gorilla/mux"
1213 "github.com/matrix-org/complement"
1314 "github.com/matrix-org/gomatrixserverlib"
1415 "github.com/matrix-org/gomatrixserverlib/fclient"
@@ -18,10 +19,12 @@ import (
1819
1920 "github.com/matrix-org/complement/b"
2021 "github.com/matrix-org/complement/client"
22+ "github.com/matrix-org/complement/ct"
2123 "github.com/matrix-org/complement/federation"
2224 "github.com/matrix-org/complement/helpers"
2325 "github.com/matrix-org/complement/match"
2426 "github.com/matrix-org/complement/must"
27+ "github.com/matrix-org/complement/runtime"
2528)
2629
2730// TODO:
@@ -598,3 +601,316 @@ func TestOutboundFederationEventSizeGetMissingEvents(t *testing.T) {
598601 // Alice should receive the sent event, even though the "bad" event has a too large state key
599602 alice .MustSyncUntil (t , client.SyncReq {}, client .SyncTimelineHasEventID (room .RoomID , sentEvent .EventID ()))
600603}
604+
605+ // Test that if you respond to /state_ids, and fail some /event requests, we end up
606+ // with correctly persisted auth information for the event. This creates an _auth graph_ like so:
607+ //
608+ // A <- B <- C <- D <- E m.room.member,bob
609+ //
610+ // Complement needs the HS to hit /state_ids and /event for missing events so it does some work to manipulate this:
611+ // - it sends 100 unrelated state events. This ensures that any statistical analysis done on the number of missing events
612+ // in /state_ids means we will bias to using /event and not /state. The test needs /event.
613+ // - it sends an unrelated event to the HS with unknown prev_events.
614+ // - it returns an unrelated event for /get_missing_events.
615+ // - then /state_ids should be hit.
616+ //
617+ // When /state_ids is hit, we will include A,B,C,D,E in the response. This will be the first time the HS sees these events.
618+ // Because we've gamed the number of state events in the room, HSes _should_ hit /event for each event ID.
619+ // Now the actual test can begin:
620+ // - We fail the /event request for B.
621+ // - We ensure that we do not see C,D,E in the final room state.
622+ //
623+ // This is a regression test where a HS could have code which does the following:
624+ // - Sort events topologically (A,B,C,D,E)
625+ // - for each event, check you have the auth events and then auth it.
626+ // - If you don't have the auth events, drop it, else persist it (incl. whether it was rejected).
627+ //
628+ // This has a subtle bug IF "check you have the auth events" uses an in-memory event map AND dropping the event doesn't remove
629+ // the entry from that event map. If this happens: A is processed, B is missing, C is dropped due to missing B,
630+ // crucially D and E ARE PERSISTED because C exists in-memory.
631+ // This breaks the auth chain for the room, which matters when doing state resolution.
632+ func TestCorruptedAuthChain (t * testing.T ) {
633+ // Dendrite doesn't make exactly the same requests as it seems to fallback to /event_auth.
634+ // As this is intended for a synapse bugfix, we'll skip dendrite for now.
635+ runtime .SkipIf (t , runtime .Dendrite )
636+ deployment := complement .Deploy (t , 1 )
637+ defer deployment .Destroy (t )
638+
639+ srv := federation .NewServer (t , deployment ,
640+ federation .HandleKeyRequests (),
641+ federation .HandleMakeSendJoinRequests (),
642+ federation .HandleTransactionRequests (nil , nil ),
643+ federation .HandleInviteRequests (nil ),
644+ )
645+ // We expect to be pushed events that we don't care about responding to (not relevant to the test)
646+ srv .UnexpectedRequestsAreErrors = false
647+ cancel := srv .Listen ()
648+ defer cancel ()
649+
650+ alice := deployment .Register (t , "hs1" , helpers.RegistrationOpts {LocalpartSuffix : "alice" })
651+ // ensure the server under test remains in the room when alice rejoins
652+ sentinel := deployment .Register (t , "hs1" , helpers.RegistrationOpts {LocalpartSuffix : "sentinel" })
653+ roomID := alice .MustCreateRoom (t , map [string ]interface {}{
654+ "preset" : "public_chat" ,
655+ "room_version" : "10" ,
656+ })
657+ sentinel .MustJoinRoom (t , roomID , []spec.ServerName {"hs1" })
658+ // Pad out the room state
659+ for i := 0 ; i < 100 ; i ++ {
660+ if i % 2 == 0 {
661+ alice .MustLeaveRoom (t , roomID )
662+ } else {
663+ alice .MustJoinRoom (t , roomID , []spec.ServerName {"hs1" })
664+ }
665+ }
666+ bob := srv .UserID ("bob" )
667+ defaultImpl := federation.ServerRoomImplDefault {}
668+ var existingAuthChain []gomatrixserverlib.PDU
669+ srvRoom := srv .MustJoinRoom (t , deployment , spec .ServerName ("hs1" ), roomID , bob , federation .WithRoomOpts (federation .WithImpl (& federation.ServerRoomImplCustom {
670+ ServerRoomImplDefault : defaultImpl ,
671+ PopulateFromSendJoinResponseFn : func (def federation.ServerRoomImpl , room * federation.ServerRoom , joinEvent gomatrixserverlib.PDU , resp fclient.RespSendJoin ) {
672+ defaultImpl .PopulateFromSendJoinResponse (room , joinEvent , resp )
673+ existingAuthChain = resp .AuthEvents .TrustedEvents (joinEvent .Version (), false )
674+ },
675+ })))
676+ // we should have at least 100 events in the auth chain
677+ if len (existingAuthChain ) < 100 {
678+ ct .Fatalf (t , "not enough events in the auth chain, got %d want >100" , len (existingAuthChain ))
679+ }
680+ createEvent := srvRoom .CurrentState (spec .MRoomCreate , "" )
681+ plEvent := srvRoom .CurrentState (spec .MRoomPowerLevels , "" )
682+ jrEvent := srvRoom .CurrentState (spec .MRoomJoinRules , "" )
683+ bobOriginalJoinEvent := srvRoom .CurrentState (spec .MRoomMember , bob )
684+
685+ // Create A,B,C,D,E which will be profile changes for Bob (where each event is dependent on the next)
686+ eventA := srv .MustCreateEvent (t , srvRoom , federation.Event {
687+ Type : spec .MRoomMember ,
688+ Sender : bob ,
689+ StateKey : & bob ,
690+ Content : map [string ]interface {}{
691+ "membership" : "join" ,
692+ "displayname" : "A" ,
693+ },
694+ })
695+ eventB := srv .MustCreateEvent (t , srvRoom , federation.Event {
696+ Type : spec .MRoomMember ,
697+ Sender : bob ,
698+ StateKey : & bob ,
699+ Content : map [string ]interface {}{
700+ "membership" : "join" ,
701+ "displayname" : "B" ,
702+ },
703+ PrevEvents : []string {eventA .EventID ()},
704+ AuthEvents : []string {createEvent .EventID (), plEvent .EventID (), jrEvent .EventID (), eventA .EventID ()},
705+ })
706+ eventC := srv .MustCreateEvent (t , srvRoom , federation.Event {
707+ Type : spec .MRoomMember ,
708+ Sender : bob ,
709+ StateKey : & bob ,
710+ Content : map [string ]interface {}{
711+ "membership" : "join" ,
712+ "displayname" : "C" ,
713+ },
714+ PrevEvents : []string {eventB .EventID ()},
715+ AuthEvents : []string {createEvent .EventID (), plEvent .EventID (), jrEvent .EventID (), eventB .EventID ()},
716+ })
717+ eventD := srv .MustCreateEvent (t , srvRoom , federation.Event {
718+ Type : spec .MRoomMember ,
719+ Sender : bob ,
720+ StateKey : & bob ,
721+ Content : map [string ]interface {}{
722+ "membership" : "join" ,
723+ "displayname" : "D" ,
724+ },
725+ PrevEvents : []string {eventC .EventID ()},
726+ AuthEvents : []string {createEvent .EventID (), plEvent .EventID (), jrEvent .EventID (), eventC .EventID ()},
727+ })
728+ eventE := srv .MustCreateEvent (t , srvRoom , federation.Event {
729+ Type : spec .MRoomMember ,
730+ Sender : bob ,
731+ StateKey : & bob ,
732+ Content : map [string ]interface {}{
733+ "membership" : "join" ,
734+ "displayname" : "E" ,
735+ },
736+ PrevEvents : []string {eventD .EventID ()},
737+ AuthEvents : []string {createEvent .EventID (), plEvent .EventID (), jrEvent .EventID (), eventD .EventID ()},
738+ })
739+ // We include this in auth_events for subsequent events below.
740+ srvRoom .AddEvent (eventE )
741+
742+ // Create 3 unrelated events (one for /send, one for /gme, one for /state_ids snapshot)
743+ stateIDsEvent := srv .MustCreateEvent (t , srvRoom , federation.Event {
744+ Type : "m.room.message" ,
745+ Sender : bob ,
746+ Content : map [string ]interface {}{
747+ "msgtype" : "m.text" ,
748+ "body" : "for /state_ids" ,
749+ },
750+ PrevEvents : []string {eventE .EventID ()},
751+ })
752+ srvRoom .AddEvent (stateIDsEvent )
753+ gmeEvent := srv .MustCreateEvent (t , srvRoom , federation.Event {
754+ Type : "m.room.message" ,
755+ Sender : bob ,
756+ Content : map [string ]interface {}{
757+ "msgtype" : "m.text" ,
758+ "body" : "for /get_missing_events" ,
759+ },
760+ PrevEvents : []string {stateIDsEvent .EventID ()},
761+ })
762+ srvRoom .AddEvent (gmeEvent )
763+ sendTxnEvent := srv .MustCreateEvent (t , srvRoom , federation.Event {
764+ Type : "m.room.message" ,
765+ Sender : bob ,
766+ Content : map [string ]interface {}{
767+ "msgtype" : "m.text" ,
768+ "body" : "for /send" ,
769+ },
770+ PrevEvents : []string {gmeEvent .EventID ()},
771+ })
772+ srvRoom .AddEvent (sendTxnEvent )
773+
774+ // the possible events to return in /event. This omits B.
775+ allEventsToShare := []gomatrixserverlib.PDU {
776+ stateIDsEvent , gmeEvent , sendTxnEvent , eventA , eventC , eventD , eventE ,
777+ }
778+ t .Logf ("event A: %s" , eventA .EventID ())
779+ t .Logf ("event B: %s" , eventB .EventID ())
780+ t .Logf ("event C: %s" , eventC .EventID ())
781+ t .Logf ("event D: %s" , eventD .EventID ())
782+ t .Logf ("event E: %s" , eventE .EventID ())
783+ t .Logf ("event for /state_ids: %s" , stateIDsEvent .EventID ())
784+ t .Logf ("event for /get_missing_events: %s" , gmeEvent .EventID ())
785+ t .Logf ("event for /send: %s" , sendTxnEvent .EventID ())
786+
787+ // add handlers for them
788+ gmeWaiter := helpers .NewWaiter ()
789+ // We will send 'sendTxnEvent' via /send. The homeserver will see the event has unknown prev_events and hit /get_missing_events
790+ srv .Mux ().HandleFunc ("/_matrix/federation/v1/get_missing_events/{roomID}" , func (w http.ResponseWriter , req * http.Request ) {
791+ defer gmeWaiter .Finish ()
792+ body := must .ParseJSON (t , req .Body )
793+ t .Logf ("/get_missing_events req for room %s => %s" , mux .Vars (req )["roomID" ], body .Raw )
794+ must .Equal (t , body .Get ("latest_events" ).Array ()[0 ].String (), sendTxnEvent .EventID (), "unexpected event provided to /get_missing_events" )
795+ w .WriteHeader (200 )
796+ res := struct {
797+ Events []gomatrixserverlib.PDU `json:"events"`
798+ }{
799+ Events : []gomatrixserverlib.PDU {gmeEvent },
800+ }
801+ t .Logf ("/get_missing_events req for room %s responding with %s in room %s" , mux .Vars (req )["roomID" ], res .Events [0 ].EventID (), res .Events [0 ].RoomID ())
802+ var responseBytes []byte
803+ responseBytes , err := json .Marshal (& res )
804+ must .NotError (t , "failed to marshal response" , err )
805+ w .Write (responseBytes )
806+ })
807+ stateIDWaiter := helpers .NewWaiter ()
808+ // The homeserver won't be able to link up the events returned via /get_missing_events to what it previously knew, so it will
809+ // ask for a state snapshot via /state_ids.
810+ srv .Mux ().HandleFunc ("/_matrix/federation/v1/state_ids/{roomID}" , func (w http.ResponseWriter , req * http.Request ) {
811+ defer stateIDWaiter .Finish ()
812+ t .Logf ("/state_ids req for room %s => %s" , mux .Vars (req )["roomID" ], req .URL .Query ().Encode ())
813+ reqEventID := req .URL .Query ().Get ("event_id" )
814+ must .Equal (t , reqEventID , stateIDsEvent .EventID (), "unexpected event provided to /state_ids" )
815+ w .WriteHeader (200 )
816+
817+ var authChainIDs []string
818+ for _ , ev := range existingAuthChain {
819+ authChainIDs = append (authChainIDs , ev .EventID ())
820+ }
821+ // include A,B,C,D
822+ authChainIDs = append (authChainIDs , eventA .EventID (), eventB .EventID (), eventC .EventID (), eventD .EventID ())
823+ // the current state is the same as before but with E as the member event for bob
824+ var pduIDs []string
825+ for _ , ev := range srvRoom .AllCurrentState () {
826+ if ev .Type () == spec .MRoomMember && ev .StateKeyEquals (bob ) {
827+ continue
828+ }
829+ pduIDs = append (pduIDs , ev .EventID ())
830+ }
831+ pduIDs = append (pduIDs , eventE .EventID ())
832+ res := struct {
833+ AuthChainIDs []string `json:"auth_chain_ids"`
834+ PDUIDs []string `json:"pdu_ids"`
835+ }{
836+ AuthChainIDs : authChainIDs ,
837+ PDUIDs : pduIDs ,
838+ }
839+ var responseBytes []byte
840+ responseBytes , err := json .Marshal (& res )
841+ must .NotError (t , "failed to marshal response" , err )
842+ w .Write (responseBytes )
843+ })
844+ eventBWaiter := helpers .NewWaiter ()
845+ // /state_ids will return some unknown events which the homeserver will try to fetch via /event
846+ srv .Mux ().Handle ("/_matrix/federation/v1/event/{eventID}" , http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
847+ vars := mux .Vars (req )
848+ eventID := vars ["eventID" ]
849+ var event gomatrixserverlib.PDU
850+ // find the event
851+ for _ , ev := range allEventsToShare {
852+ if ev .EventID () == eventID {
853+ event = ev
854+ break
855+ }
856+ }
857+ // we should see a request for event B
858+ if eventID == eventB .EventID () {
859+ eventBWaiter .Finish ()
860+ }
861+
862+ if event == nil {
863+ t .Logf ("/event returning 404 for event %v" , eventID )
864+ w .WriteHeader (404 )
865+ w .Write ([]byte (fmt .Sprintf (`complement: failed to find event: %s` , eventID )))
866+ return
867+ }
868+
869+ txn := gomatrixserverlib.Transaction {
870+ Origin : spec .ServerName (srv .ServerName ()),
871+ OriginServerTS : spec .AsTimestamp (time .Now ()),
872+ PDUs : []json.RawMessage {
873+ event .JSON (),
874+ },
875+ }
876+ resp , err := json .Marshal (txn )
877+ if err != nil {
878+ w .WriteHeader (500 )
879+ w .Write ([]byte (fmt .Sprintf (`complement: failed to marshal JSON response: %s` , err )))
880+ return
881+ }
882+ w .WriteHeader (200 )
883+ w .Write (resp )
884+ }))
885+
886+ srv .MustSendTransaction (t , deployment , "hs1" , []json.RawMessage {sendTxnEvent .JSON ()}, nil )
887+
888+ // wait for the server to make the requests
889+ gmeWaiter .Wait (t , 5 * time .Second )
890+ stateIDWaiter .Wait (t , 5 * time .Second )
891+ eventBWaiter .Wait (t , 5 * time .Second )
892+
893+ // At this point all we know is that the server requested event B when doing /state_ids.
894+ // We don't know if sendTxnEvent has been fully processed / the room state has been updated.
895+ // If the server is functioning correctly, sendTxnEvent will never be delivered to the client
896+ // as the server will be unable to fetch room state for it. So send another event as a sentinel.
897+ // Wait until we see sendTxnEvent in the sync timeline before asserting that the room state is correct.
898+ sentinelEvent := srv .MustCreateEvent (t , srvRoom , federation.Event {
899+ Type : "m.room.message" ,
900+ Sender : bob ,
901+ Content : map [string ]interface {}{
902+ "msgtype" : "m.text" ,
903+ "body" : "finished" ,
904+ },
905+ PrevEvents : []string {bobOriginalJoinEvent .EventID ()},
906+ AuthEvents : []string {createEvent .EventID (), plEvent .EventID (), bobOriginalJoinEvent .EventID ()},
907+ })
908+ srv .MustSendTransaction (t , deployment , "hs1" , []json.RawMessage {sentinelEvent .JSON ()}, nil )
909+ alice .MustSyncUntil (t , client.SyncReq {}, client .SyncTimelineHasEventID (roomID , sentinelEvent .EventID ()))
910+
911+ // we should not see event E as the current state for bob.
912+ content := alice .MustGetStateEventContent (t , roomID , spec .MRoomMember , bob )
913+ t .Logf ("bob's membership content: %v" , content .Raw )
914+ // assert bob's member event was his initial join, not any of the others. Technically you can argue A should be valid.
915+ must .Equal (t , content .Get ("displayname" ).Str , "" , "Events C/D/E were processed when they should not have been as the server doesn't know B." )
916+ }
0 commit comments