123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355 |
- # Copyright 2021 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import collections
- import itertools
- import logging
- from http import HTTPStatus
- from typing import (
- TYPE_CHECKING,
- Collection,
- Container,
- Dict,
- Iterable,
- List,
- Optional,
- Sequence,
- Set,
- Tuple,
- )
- from prometheus_client import Counter, Histogram
- from synapse import event_auth
- from synapse.api.constants import (
- EventContentFields,
- EventTypes,
- GuestAccess,
- Membership,
- RejectedReason,
- RoomEncryptionAlgorithms,
- )
- from synapse.api.errors import (
- AuthError,
- Codes,
- EventSizeError,
- FederationError,
- FederationPullAttemptBackoffError,
- HttpResponseException,
- RequestSendFailed,
- SynapseError,
- )
- from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
- from synapse.event_auth import (
- auth_types_for_event,
- check_state_dependent_auth_rules,
- check_state_independent_auth_rules,
- validate_event_for_room_version,
- )
- from synapse.events import EventBase
- from synapse.events.snapshot import EventContext
- from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
- from synapse.logging.context import nested_logging_context
- from synapse.logging.opentracing import (
- SynapseTags,
- set_tag,
- start_active_span,
- tag_args,
- trace,
- )
- from synapse.metrics.background_process_metrics import run_as_background_process
- from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
- from synapse.replication.http.federation import (
- ReplicationFederationSendEventsRestServlet,
- )
- from synapse.state import StateResolutionStore
- from synapse.storage.databases.main.events import PartialStateConflictError
- from synapse.storage.databases.main.events_worker import EventRedactBehaviour
- from synapse.types import (
- PersistedEventPosition,
- RoomStreamToken,
- StateMap,
- UserID,
- get_domain_from_id,
- )
- from synapse.types.state import StateFilter
- from synapse.util.async_helpers import Linearizer, concurrently_execute
- from synapse.util.iterutils import batch_iter
- from synapse.util.retryutils import NotRetryingDestination
- from synapse.util.stringutils import shortstr
- if TYPE_CHECKING:
- from synapse.server import HomeServer
- logger = logging.getLogger(__name__)
- soft_failed_event_counter = Counter(
- "synapse_federation_soft_failed_events_total",
- "Events received over federation that we marked as soft_failed",
- )
- # Added to debug performance and track progress on optimizations
- backfill_processing_after_timer = Histogram(
- "synapse_federation_backfill_processing_after_time_seconds",
- "sec",
- [],
- buckets=(
- 0.1,
- 0.25,
- 0.5,
- 1.0,
- 2.5,
- 5.0,
- 7.5,
- 10.0,
- 15.0,
- 20.0,
- 25.0,
- 30.0,
- 40.0,
- 50.0,
- 60.0,
- 80.0,
- 100.0,
- 120.0,
- 150.0,
- 180.0,
- "+Inf",
- ),
- )
- class FederationEventHandler:
- """Handles events that originated from federation.
- Responsible for handing incoming events and passing them on to the rest
- of the homeserver (including auth and state conflict resolutions)
- """
- def __init__(self, hs: "HomeServer"):
- self._store = hs.get_datastores().main
- self._storage_controllers = hs.get_storage_controllers()
- self._state_storage_controller = self._storage_controllers.state
- self._state_handler = hs.get_state_handler()
- self._event_creation_handler = hs.get_event_creation_handler()
- self._event_auth_handler = hs.get_event_auth_handler()
- self._message_handler = hs.get_message_handler()
- self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
- self._state_resolution_handler = hs.get_state_resolution_handler()
- # avoid a circular dependency by deferring execution here
- self._get_room_member_handler = hs.get_room_member_handler
- self._federation_client = hs.get_federation_client()
- self._third_party_event_rules = hs.get_third_party_event_rules()
- self._notifier = hs.get_notifier()
- self._is_mine_id = hs.is_mine_id
- self._server_name = hs.hostname
- self._instance_name = hs.get_instance_name()
- self._config = hs.config
- self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
- self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
- if hs.config.worker.worker_app:
- self._user_device_resync = (
- ReplicationUserDevicesResyncRestServlet.make_client(hs)
- )
- else:
- self._device_list_updater = hs.get_device_handler().device_list_updater
- # When joining a room we need to queue any events for that room up.
- # For each room, a list of (pdu, origin) tuples.
- # TODO: replace this with something more elegant, probably based around the
- # federation event staging area.
- self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
- self._room_pdu_linearizer = Linearizer("fed_room_pdu")
- async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
- """Process a PDU received via a federation /send/ transaction
- Args:
- origin: server which initiated the /send/ transaction. Will
- be used to fetch missing events or state.
- pdu: received PDU
- """
- # We should never see any outliers here.
- assert not pdu.internal_metadata.outlier
- room_id = pdu.room_id
- event_id = pdu.event_id
- # We reprocess pdus when we have seen them only as outliers
- existing = await self._store.get_event(
- event_id, allow_none=True, allow_rejected=True
- )
- # FIXME: Currently we fetch an event again when we already have it
- # if it has been marked as an outlier.
- if existing:
- if not existing.internal_metadata.is_outlier():
- logger.info(
- "Ignoring received event %s which we have already seen", event_id
- )
- return
- if pdu.internal_metadata.is_outlier():
- logger.info(
- "Ignoring received outlier %s which we already have as an outlier",
- event_id,
- )
- return
- logger.info("De-outliering event %s", event_id)
- # do some initial sanity-checking of the event. In particular, make
- # sure it doesn't have hundreds of prev_events or auth_events, which
- # could cause a huge state resolution or cascade of event fetches.
- try:
- self._sanity_check_event(pdu)
- except SynapseError as err:
- logger.warning("Received event failed sanity checks")
- raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
- # If we are currently in the process of joining this room, then we
- # queue up events for later processing.
- if room_id in self.room_queues:
- logger.info(
- "Queuing PDU from %s for now: join in progress",
- origin,
- )
- self.room_queues[room_id].append((pdu, origin))
- return
- # If we're not in the room just ditch the event entirely. This is
- # probably an old server that has come back and thinks we're still in
- # the room (or we've been rejoined to the room by a state reset).
- #
- # Note that if we were never in the room then we would have already
- # dropped the event, since we wouldn't know the room version.
- is_in_room = await self._event_auth_handler.is_host_in_room(
- room_id, self._server_name
- )
- if not is_in_room:
- logger.info(
- "Ignoring PDU from %s as we're not in the room",
- origin,
- )
- return None
- # Try to fetch any missing prev events to fill in gaps in the graph
- prevs = set(pdu.prev_event_ids())
- seen = await self._store.have_events_in_timeline(prevs)
- missing_prevs = prevs - seen
- if missing_prevs:
- # We only backfill backwards to the min depth.
- min_depth = await self._store.get_min_depth(pdu.room_id)
- logger.debug("min_depth: %d", min_depth)
- if min_depth is not None and pdu.depth > min_depth:
- # If we're missing stuff, ensure we only fetch stuff one
- # at a time.
- logger.info(
- "Acquiring room lock to fetch %d missing prev_events: %s",
- len(missing_prevs),
- shortstr(missing_prevs),
- )
- async with self._room_pdu_linearizer.queue(pdu.room_id):
- logger.info(
- "Acquired room lock to fetch %d missing prev_events",
- len(missing_prevs),
- )
- try:
- await self._get_missing_events_for_pdu(
- origin, pdu, prevs, min_depth
- )
- except Exception as e:
- raise Exception(
- "Error fetching missing prev_events for %s: %s"
- % (event_id, e)
- ) from e
- # Update the set of things we've seen after trying to
- # fetch the missing stuff
- seen = await self._store.have_events_in_timeline(prevs)
- missing_prevs = prevs - seen
- if not missing_prevs:
- logger.info("Found all missing prev_events")
- if missing_prevs:
- # since this event was pushed to us, it is possible for it to
- # become the only forward-extremity in the room, and we would then
- # trust its state to be the state for the whole room. This is very
- # bad. Further, if the event was pushed to us, there is no excuse
- # for us not to have all the prev_events. (XXX: apart from
- # min_depth?)
- #
- # We therefore reject any such events.
- logger.warning(
- "Rejecting: failed to fetch %d prev events: %s",
- len(missing_prevs),
- shortstr(missing_prevs),
- )
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
- try:
- context = await self._state_handler.compute_event_context(pdu)
- await self._process_received_pdu(origin, pdu, context)
- except PartialStateConflictError:
- # The room was un-partial stated while we were processing the PDU.
- # Try once more, with full state this time.
- logger.info(
- "Room %s was un-partial stated while processing the PDU, trying again.",
- room_id,
- )
- context = await self._state_handler.compute_event_context(pdu)
- await self._process_received_pdu(origin, pdu, context)
- async def on_send_membership_event(
- self, origin: str, event: EventBase
- ) -> Tuple[EventBase, EventContext]:
- """
- We have received a join/leave/knock event for a room via send_join/leave/knock.
- Verify that event and send it into the room on the remote homeserver's behalf.
- This is quite similar to on_receive_pdu, with the following principal
- differences:
- * only membership events are permitted (and only events with
- sender==state_key -- ie, no kicks or bans)
- * *We* send out the event on behalf of the remote server.
- * We enforce the membership restrictions of restricted rooms.
- * Rejected events result in an exception rather than being stored.
- There are also other differences, however it is not clear if these are by
- design or omission. In particular, we do not attempt to backfill any missing
- prev_events.
- Args:
- origin: The homeserver of the remote (joining/invited/knocking) user.
- event: The member event that has been signed by the remote homeserver.
- Returns:
- The event and context of the event after inserting it into the room graph.
- Raises:
- RuntimeError if any prev_events are missing
- SynapseError if the event is not accepted into the room
- PartialStateConflictError if the room was un-partial stated in between
- computing the state at the event and persisting it. The caller should
- retry exactly once in this case.
- """
- logger.debug(
- "on_send_membership_event: Got event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
- if get_domain_from_id(event.sender) != origin:
- logger.info(
- "Got send_membership request for user %r from different origin %s",
- event.sender,
- origin,
- )
- raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
- if event.sender != event.state_key:
- raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
- assert not event.internal_metadata.outlier
- # Send this event on behalf of the other server.
- #
- # The remote server isn't a full participant in the room at this point, so
- # may not have an up-to-date list of the other homeservers participating in
- # the room, so we send it on their behalf.
- event.internal_metadata.send_on_behalf_of = origin
- context = await self._state_handler.compute_event_context(event)
- await self._check_event_auth(origin, event, context)
- if context.rejected:
- raise SynapseError(
- 403, f"{event.membership} event was rejected", Codes.FORBIDDEN
- )
- # for joins, we need to check the restrictions of restricted rooms
- if event.membership == Membership.JOIN:
- await self.check_join_restrictions(context, event)
- # for knock events, we run the third-party event rules. It's not entirely clear
- # why we don't do this for other sorts of membership events.
- if event.membership == Membership.KNOCK:
- event_allowed, _ = await self._third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.info("Sending of knock %s forbidden by third-party rules", event)
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
- # all looks good, we can persist the event.
- # First, precalculate the joined hosts so that the federation sender doesn't
- # need to.
- await self._event_creation_handler.cache_joined_hosts_for_events(
- [(event, context)]
- )
- await self._check_for_soft_fail(event, context=context, origin=origin)
- await self._run_push_actions_and_persist_event(event, context)
- return event, context
- async def check_join_restrictions(
- self, context: EventContext, event: EventBase
- ) -> None:
- """Check that restrictions in restricted join rules are matched
- Called when we receive a join event via send_join.
- Raises an auth error if the restrictions are not matched.
- """
- prev_state_ids = await context.get_prev_state_ids()
- # Check if the user is already in the room or invited to the room.
- user_id = event.state_key
- prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
- prev_member_event = None
- if prev_member_event_id:
- prev_member_event = await self._store.get_event(prev_member_event_id)
- # Check if the member should be allowed access via membership in a space.
- await self._event_auth_handler.check_restricted_join_rules(
- prev_state_ids,
- event.room_version,
- user_id,
- prev_member_event,
- )
- @trace
- async def process_remote_join(
- self,
- origin: str,
- room_id: str,
- auth_events: List[EventBase],
- state: List[EventBase],
- event: EventBase,
- room_version: RoomVersion,
- partial_state: bool,
- ) -> int:
- """Persists the events returned by a send_join
- Checks the auth chain is valid (and passes auth checks) for the
- state and event. Then persists all of the events.
- Notifies about the persisted events where appropriate.
- Args:
- origin: Where the events came from
- room_id:
- auth_events
- state
- event
- room_version: The room version we expect this room to have, and
- will raise if it doesn't match the version in the create event.
- partial_state: True if the state omits non-critical membership events
- Returns:
- The stream ID after which all events have been persisted.
- Raises:
- SynapseError if the response is in some way invalid.
- PartialStateConflictError if the homeserver is already in the room and it
- has been un-partial stated.
- """
- create_event = None
- for e in state:
- if (e.type, e.state_key) == (EventTypes.Create, ""):
- create_event = e
- break
- if create_event is None:
- # If the state doesn't have a create event then the room is
- # invalid, and it would fail auth checks anyway.
- raise SynapseError(400, "No create event in state")
- room_version_id = create_event.content.get(
- "room_version", RoomVersions.V1.identifier
- )
- if room_version.identifier != room_version_id:
- raise SynapseError(400, "Room version mismatch")
- # persist the auth chain and state events.
- #
- # any invalid events here will be marked as rejected, and we'll carry on.
- #
- # any events whose auth events are missing (ie, not in the send_join response,
- # and not already in our db) will just be ignored. This is correct behaviour,
- # because the reason that auth_events are missing might be due to us being
- # unable to validate their signatures. The fact that we can't validate their
- # signatures right now doesn't mean that we will *never* be able to, so it
- # is premature to reject them.
- #
- await self._auth_and_persist_outliers(
- room_id, itertools.chain(auth_events, state)
- )
- # and now persist the join event itself.
- logger.info(
- "Peristing join-via-remote %s (partial_state: %s)", event, partial_state
- )
- with nested_logging_context(suffix=event.event_id):
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event={
- (e.type, e.state_key): e.event_id for e in state
- },
- partial_state=partial_state,
- )
- await self._check_event_auth(origin, event, context)
- if context.rejected:
- raise SynapseError(400, "Join event was rejected")
- # the remote server is responsible for sending our join event to the rest
- # of the federation. Indeed, attempting to do so will result in problems
- # when we try to look up the state before the join (to get the server list)
- # and discover that we do not have it.
- event.internal_metadata.proactively_send = False
- stream_id_after_persist = await self.persist_events_and_notify(
- room_id, [(event, context)]
- )
- # If we're joining the room again, check if there is new marker
- # state indicating that there is new history imported somewhere in
- # the DAG. Multiple markers can exist in the current state with
- # unique state_keys.
- #
- # Do this after the state from the remote join was persisted (via
- # `persist_events_and_notify`). Otherwise we can run into a
- # situation where the create event doesn't exist yet in the
- # `current_state_events`
- for e in state:
- await self._handle_marker_event(origin, e)
- return stream_id_after_persist
- async def update_state_for_partial_state_event(
- self, destination: str, event: EventBase
- ) -> None:
- """Recalculate the state at an event as part of a de-partial-stating process
- Args:
- destination: server to request full state from
- event: partial-state event to be de-partial-stated
- Raises:
- FederationPullAttemptBackoffError if we are are deliberately not attempting
- to pull the given event over federation because we've already done so
- recently and are backing off.
- FederationError if we fail to request state from the remote server.
- """
- logger.info("Updating state for %s", event.event_id)
- with nested_logging_context(suffix=event.event_id):
- # if we have all the event's prev_events, then we can work out the
- # state based on their states. Otherwise, we request it from the destination
- # server.
- #
- # This is the same operation as we do when we receive a regular event
- # over federation.
- context = await self._compute_event_context_with_maybe_missing_prevs(
- destination, event
- )
- if context.partial_state:
- # this can happen if some or all of the event's prev_events still have
- # partial state. We were careful to only pick events from the db without
- # partial-state prev events, so that implies that a prev event has
- # been persisted (with partial state) since we did the query.
- #
- # So, let's just ignore `event` for now; when we re-run the db query
- # we should instead get its partial-state prev event, which we will
- # de-partial-state, and then come back to event.
- logger.warning(
- "%s still has prev_events with partial state: can't de-partial-state it yet",
- event.event_id,
- )
- return
- # since the state at this event has changed, we should now re-evaluate
- # whether it should have been rejected. We must already have all of the
- # auth events (from last time we went round this path), so there is no
- # need to pass the origin.
- await self._check_event_auth(None, event, context)
- await self._store.update_state_for_partial_state_event(event, context)
- self._state_storage_controller.notify_event_un_partial_stated(
- event.event_id
- )
- # Notify that there's a new row in the un_partial_stated_events stream.
- self._notifier.notify_replication()
- @trace
- async def backfill(
- self, dest: str, room_id: str, limit: int, extremities: Collection[str]
- ) -> None:
- """Trigger a backfill request to `dest` for the given `room_id`
- This will attempt to get more events from the remote. If the other side
- has no new events to offer, this will return an empty list.
- As the events are received, we check their signatures, and also do some
- sanity-checking on them. If any of the backfilled events are invalid,
- this method throws a SynapseError.
- We might also raise an InvalidResponseError if the response from the remote
- server is just bogus.
- TODO: make this more useful to distinguish failures of the remote
- server from invalid events (there is probably no point in trying to
- re-fetch invalid events from every other HS in the room.)
- """
- if dest == self._server_name:
- raise SynapseError(400, "Can't backfill from self.")
- events = await self._federation_client.backfill(
- dest, room_id, limit=limit, extremities=extremities
- )
- if not events:
- return
- with backfill_processing_after_timer.time():
- # if there are any events in the wrong room, the remote server is buggy and
- # should not be trusted.
- for ev in events:
- if ev.room_id != room_id:
- raise InvalidResponseError(
- f"Remote server {dest} returned event {ev.event_id} which is in "
- f"room {ev.room_id}, when we were backfilling in {room_id}"
- )
- await self._process_pulled_events(
- dest,
- events,
- backfilled=True,
- )
- @trace
- async def _get_missing_events_for_pdu(
- self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
- ) -> None:
- """
- Args:
- origin: Origin of the pdu. Will be called to get the missing events
- pdu: received pdu
- prevs: List of event ids which we are missing
- min_depth: Minimum depth of events to return.
- """
- room_id = pdu.room_id
- event_id = pdu.event_id
- seen = await self._store.have_events_in_timeline(prevs)
- if not prevs - seen:
- return
- latest_list = await self._store.get_latest_event_ids_in_room(room_id)
- # We add the prev events that we have seen to the latest
- # list to ensure the remote server doesn't give them to us
- latest = set(latest_list)
- latest |= seen
- logger.info(
- "Requesting missing events between %s and %s",
- shortstr(latest),
- event_id,
- )
- # XXX: we set timeout to 10s to help workaround
- # https://github.com/matrix-org/synapse/issues/1733.
- # The reason is to avoid holding the linearizer lock
- # whilst processing inbound /send transactions, causing
- # FDs to stack up and block other inbound transactions
- # which empirically can currently take up to 30 minutes.
- #
- # N.B. this explicitly disables retry attempts.
- #
- # N.B. this also increases our chances of falling back to
- # fetching fresh state for the room if the missing event
- # can't be found, which slightly reduces our security.
- # it may also increase our DAG extremity count for the room,
- # causing additional state resolution? See #1760.
- # However, fetching state doesn't hold the linearizer lock
- # apparently.
- #
- # see https://github.com/matrix-org/synapse/pull/1744
- #
- # ----
- #
- # Update richvdh 2018/09/18: There are a number of problems with timing this
- # request out aggressively on the client side:
- #
- # - it plays badly with the server-side rate-limiter, which starts tarpitting you
- # if you send too many requests at once, so you end up with the server carefully
- # working through the backlog of your requests, which you have already timed
- # out.
- #
- # - for this request in particular, we now (as of
- # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
- # server can't produce a plausible-looking set of prev_events - so we becone
- # much more likely to reject the event.
- #
- # - contrary to what it says above, we do *not* fall back to fetching fresh state
- # for the room if get_missing_events times out. Rather, we give up processing
- # the PDU whose prevs we are missing, which then makes it much more likely that
- # we'll end up back here for the *next* PDU in the list, which exacerbates the
- # problem.
- #
- # - the aggressive 10s timeout was introduced to deal with incoming federation
- # requests taking 8 hours to process. It's not entirely clear why that was going
- # on; certainly there were other issues causing traffic storms which are now
- # resolved, and I think in any case we may be more sensible about our locking
- # now. We're *certainly* more sensible about our logging.
- #
- # All that said: Let's try increasing the timeout to 60s and see what happens.
- try:
- missing_events = await self._federation_client.get_missing_events(
- origin,
- room_id,
- earliest_events_ids=list(latest),
- latest_events=[pdu],
- limit=10,
- min_depth=min_depth,
- timeout=60000,
- )
- except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
- # We failed to get the missing events, but since we need to handle
- # the case of `get_missing_events` not returning the necessary
- # events anyway, it is safe to simply log the error and continue.
- logger.warning("Failed to get prev_events: %s", e)
- return
- logger.info("Got %d prev_events", len(missing_events))
- await self._process_pulled_events(origin, missing_events, backfilled=False)
- @trace
- async def _process_pulled_events(
- self, origin: str, events: Collection[EventBase], backfilled: bool
- ) -> None:
- """Process a batch of events we have pulled from a remote server
- Pulls in any events required to auth the events, persists the received events,
- and notifies clients, if appropriate.
- Assumes the events have already had their signatures and hashes checked.
- Params:
- origin: The server we received these events from
- events: The received events.
- backfilled: True if this is part of a historical batch of events (inhibits
- notification to clients, and validation of device keys.)
- """
- set_tag(
- SynapseTags.FUNC_ARG_PREFIX + "event_ids",
- str([event.event_id for event in events]),
- )
- set_tag(
- SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
- str(len(events)),
- )
- set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
- logger.debug(
- "processing pulled backfilled=%s events=%s",
- backfilled,
- [
- "event_id=%s,depth=%d,body=%s,prevs=%s\n"
- % (
- event.event_id,
- event.depth,
- event.content.get("body", event.type),
- event.prev_event_ids(),
- )
- for event in events
- ],
- )
- # Check if we already any of these have these events.
- # Note: we currently make a lookup in the database directly here rather than
- # checking the event cache, due to:
- # https://github.com/matrix-org/synapse/issues/13476
- existing_events_map = await self._store._get_events_from_db(
- [event.event_id for event in events]
- )
- new_events = []
- for event in events:
- event_id = event.event_id
- # If we've already seen this event ID...
- if event_id in existing_events_map:
- existing_event = existing_events_map[event_id]
- # ...and the event itself was not previously stored as an outlier...
- if not existing_event.event.internal_metadata.is_outlier():
- # ...then there's no need to persist it. We have it already.
- logger.info(
- "_process_pulled_event: Ignoring received event %s which we "
- "have already seen",
- event.event_id,
- )
- continue
- # While we have seen this event before, it was stored as an outlier.
- # We'll now persist it as a non-outlier.
- logger.info("De-outliering event %s", event_id)
- # Continue on with the events that are new to us.
- new_events.append(event)
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- sorted_events = sorted(new_events, key=lambda x: x.depth)
- for ev in sorted_events:
- with nested_logging_context(ev.event_id):
- await self._process_pulled_event(origin, ev, backfilled=backfilled)
- @trace
- @tag_args
- async def _process_pulled_event(
- self, origin: str, event: EventBase, backfilled: bool
- ) -> None:
- """Process a single event that we have pulled from a remote server
- Pulls in any events required to auth the event, persists the received event,
- and notifies clients, if appropriate.
- Assumes the event has already had its signatures and hashes checked.
- This is somewhat equivalent to on_receive_pdu, but applies somewhat different
- logic in the case that we are missing prev_events (in particular, it just
- requests the state at that point, rather than triggering a get_missing_events) -
- so is appropriate when we have pulled the event from a remote server, rather
- than having it pushed to us.
- Params:
- origin: The server we received this event from
- events: The received event
- backfilled: True if this is part of a historical batch of events (inhibits
- notification to clients, and validation of device keys.)
- """
- logger.info("Processing pulled event %s", event)
- # This function should not be used to persist outliers (use something
- # else) because this does a bunch of operations that aren't necessary
- # (extra work; in particular, it makes sure we have all the prev_events
- # and resolves the state across those prev events). If you happen to run
- # into a situation where the event you're trying to process/backfill is
- # marked as an `outlier`, then you should update that spot to return an
- # `EventBase` copy that doesn't have `outlier` flag set.
- #
- # `EventBase` is used to represent both an event we have not yet
- # persisted, and one that we have persisted and now keep in the cache.
- # In an ideal world this method would only be called with the first type
- # of event, but it turns out that's not actually the case and for
- # example, you could get an event from cache that is marked as an
- # `outlier` (fix up that spot though).
- assert not event.internal_metadata.is_outlier(), (
- "Outlier event passed to _process_pulled_event. "
- "To persist an event as a non-outlier, make sure to pass in a copy without `event.internal_metadata.outlier = true`."
- )
- event_id = event.event_id
- try:
- self._sanity_check_event(event)
- except SynapseError as err:
- logger.warning("Event %s failed sanity check: %s", event_id, err)
- await self._store.record_event_failed_pull_attempt(
- event.room_id, event_id, str(err)
- )
- return
- try:
- try:
- context = await self._compute_event_context_with_maybe_missing_prevs(
- origin, event
- )
- await self._process_received_pdu(
- origin,
- event,
- context,
- backfilled=backfilled,
- )
- except PartialStateConflictError:
- # The room was un-partial stated while we were processing the event.
- # Try once more, with full state this time.
- context = await self._compute_event_context_with_maybe_missing_prevs(
- origin, event
- )
- # We ought to have full state now, barring some unlikely race where we left and
- # rejoned the room in the background.
- if context.partial_state:
- raise AssertionError(
- f"Event {event.event_id} still has a partial resolved state "
- f"after room {event.room_id} was un-partial stated"
- )
- await self._process_received_pdu(
- origin,
- event,
- context,
- backfilled=backfilled,
- )
- except FederationPullAttemptBackoffError as exc:
- # Log a warning about why we failed to process the event (the error message
- # for `FederationPullAttemptBackoffError` is pretty good)
- logger.warning("_process_pulled_event: %s", exc)
- # We do not record a failed pull attempt when we backoff fetching a missing
- # `prev_event` because not being able to fetch the `prev_events` just means
- # we won't be able to de-outlier the pulled event. But we can still use an
- # `outlier` in the state/auth chain for another event. So we shouldn't stop
- # a downstream event from trying to pull it.
- #
- # This avoids a cascade of backoff for all events in the DAG downstream from
- # one event backoff upstream.
- except FederationError as e:
- await self._store.record_event_failed_pull_attempt(
- event.room_id, event_id, str(e)
- )
- if e.code == 403:
- logger.warning("Pulled event %s failed history check.", event_id)
- else:
- raise
- @trace
- async def _compute_event_context_with_maybe_missing_prevs(
- self, dest: str, event: EventBase
- ) -> EventContext:
- """Build an EventContext structure for a non-outlier event whose prev_events may
- be missing.
- This is used when we have pulled a batch of events from a remote server, and may
- not have all the prev_events.
- To build an EventContext, we need to calculate the state before the event. If we
- already have all the prev_events for `event`, we can simply use the state after
- the prev_events to calculate the state before `event`.
- Otherwise, the missing prevs become new backwards extremities, and we fall back
- to asking the remote server for the state after each missing `prev_event`,
- and resolving across them.
- That's ok provided we then resolve the state against other bits of the DAG
- before using it - in other words, that the received event `event` is not going
- to become the only forwards_extremity in the room (which will ensure that you
- can't just take over a room by sending an event, withholding its prev_events,
- and declaring yourself to be an admin in the subsequent state request).
- In other words: we should only call this method if `event` has been *pulled*
- as part of a batch of missing prev events, or similar.
- Params:
- dest: the remote server to ask for state at the missing prevs. Typically,
- this will be the server we got `event` from.
- event: an event to check for missing prevs.
- Returns:
- The event context.
- Raises:
- FederationPullAttemptBackoffError if we are are deliberately not attempting
- to pull the given event over federation because we've already done so
- recently and are backing off.
- FederationError if we fail to get the state from the remote server after any
- missing `prev_event`s.
- """
- room_id = event.room_id
- event_id = event.event_id
- prevs = set(event.prev_event_ids())
- seen = await self._store.have_events_in_timeline(prevs)
- missing_prevs = prevs - seen
- # If we've already recently attempted to pull this missing event, don't
- # try it again so soon. Since we have to fetch all of the prev_events, we can
- # bail early here if we find any to ignore.
- prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
- room_id, missing_prevs
- )
- if len(prevs_to_ignore) > 0:
- raise FederationPullAttemptBackoffError(
- event_ids=prevs_to_ignore,
- message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
- )
- if not missing_prevs:
- return await self._state_handler.compute_event_context(event)
- logger.info(
- "Event %s is missing prev_events %s: calculating state for a "
- "backwards extremity",
- event_id,
- shortstr(missing_prevs),
- )
- # Calculate the state after each of the previous events, and
- # resolve them to find the correct state at the current event.
- try:
- # Determine whether we may be about to retrieve partial state
- # Events may be un-partial stated right after we compute the partial state
- # flag, but that's okay, as long as the flag errs on the conservative side.
- partial_state_flags = await self._store.get_partial_state_events(seen)
- partial_state = any(partial_state_flags.values())
- # Get the state of the events we know about
- ours = await self._state_storage_controller.get_state_groups_ids(
- room_id, seen, await_full_state=False
- )
- # state_maps is a list of mappings from (type, state_key) to event_id
- state_maps: List[StateMap[str]] = list(ours.values())
- # we don't need this any more, let's delete it.
- del ours
- # Ask the remote server for the states we don't
- # know about
- for p in missing_prevs:
- logger.info("Requesting state after missing prev_event %s", p)
- with nested_logging_context(p):
- # note that if any of the missing prevs share missing state or
- # auth events, the requests to fetch those events are deduped
- # by the get_pdu_cache in federation_client.
- remote_state_map = (
- await self._get_state_ids_after_missing_prev_event(
- dest, room_id, p
- )
- )
- state_maps.append(remote_state_map)
- room_version = await self._store.get_room_version_id(room_id)
- state_map = await self._state_resolution_handler.resolve_events_with_store(
- room_id,
- room_version,
- state_maps,
- event_map={event_id: event},
- state_res_store=StateResolutionStore(self._store),
- )
- except Exception as e:
- logger.warning(
- "Error attempting to resolve state at missing prev_events: %s", e
- )
- raise FederationError(
- "ERROR",
- 403,
- "We can't get valid state history.",
- affected=event_id,
- )
- return await self._state_handler.compute_event_context(
- event, state_ids_before_event=state_map, partial_state=partial_state
- )
- @trace
- @tag_args
- async def _get_state_ids_after_missing_prev_event(
- self,
- destination: str,
- room_id: str,
- event_id: str,
- ) -> StateMap[str]:
- """Requests all of the room state at a given event from a remote homeserver.
- Args:
- destination: The remote homeserver to query for the state.
- room_id: The id of the room we're interested in.
- event_id: The id of the event we want the state at.
- Returns:
- The event ids of the state *after* the given event.
- Raises:
- InvalidResponseError: if the remote homeserver's response contains fields
- of the wrong type.
- """
- # It would be better if we could query the difference from our known
- # state to the given `event_id` so the sending server doesn't have to
- # send as much and we don't have to process as many events. For example
- # in a room like #matrix:matrix.org, we get 200k events (77k state_events, 122k
- # auth_events) from this call.
- #
- # Tracked by https://github.com/matrix-org/synapse/issues/13618
- (
- state_event_ids,
- auth_event_ids,
- ) = await self._federation_client.get_room_state_ids(
- destination, room_id, event_id=event_id
- )
- logger.debug(
- "state_ids returned %i state events, %i auth events",
- len(state_event_ids),
- len(auth_event_ids),
- )
- # Start by checking events we already have in the DB
- desired_events = set(state_event_ids)
- desired_events.add(event_id)
- logger.debug("Fetching %i events from cache/store", len(desired_events))
- have_events = await self._store.have_seen_events(room_id, desired_events)
- missing_desired_event_ids = desired_events - have_events
- logger.debug(
- "We are missing %i events (got %i)",
- len(missing_desired_event_ids),
- len(have_events),
- )
- # We probably won't need most of the auth events, so let's just check which
- # we have for now, rather than thrashing the event cache with them all
- # unnecessarily.
- # TODO: we probably won't actually need all of the auth events, since we
- # already have a bunch of the state events. It would be nice if the
- # federation api gave us a way of finding out which we actually need.
- missing_auth_event_ids = set(auth_event_ids) - have_events
- missing_auth_event_ids.difference_update(
- await self._store.have_seen_events(room_id, missing_auth_event_ids)
- )
- logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
- missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
- set_tag(
- SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
- str(missing_auth_event_ids),
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
- str(len(missing_auth_event_ids)),
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
- str(missing_desired_event_ids),
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
- str(len(missing_desired_event_ids)),
- )
- # Making an individual request for each of 1000s of events has a lot of
- # overhead. On the other hand, we don't really want to fetch all of the events
- # if we already have most of them.
- #
- # As an arbitrary heuristic, if we are missing more than 10% of the events, then
- # we fetch the whole state.
- #
- # TODO: might it be better to have an API which lets us do an aggregate event
- # request
- if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
- logger.debug("Requesting complete state from remote")
- await self._get_state_and_persist(destination, room_id, event_id)
- else:
- logger.debug("Fetching %i events from remote", len(missing_event_ids))
- await self._get_events_and_persist(
- destination=destination, room_id=room_id, event_ids=missing_event_ids
- )
- # We now need to fill out the state map, which involves fetching the
- # type and state key for each event ID in the state.
- state_map = {}
- event_metadata = await self._store.get_metadata_for_events(state_event_ids)
- for state_event_id, metadata in event_metadata.items():
- if metadata.room_id != room_id:
- # This is a bogus situation, but since we may only discover it a long time
- # after it happened, we try our best to carry on, by just omitting the
- # bad events from the returned state set.
- #
- # This can happen if a remote server claims that the state or
- # auth_events at an event in room A are actually events in room B
- logger.warning(
- "Remote server %s claims event %s in room %s is an auth/state "
- "event in room %s",
- destination,
- state_event_id,
- metadata.room_id,
- room_id,
- )
- continue
- if metadata.state_key is None:
- logger.warning(
- "Remote server gave us non-state event in state: %s", state_event_id
- )
- continue
- state_map[(metadata.event_type, metadata.state_key)] = state_event_id
- # if we couldn't get the prev event in question, that's a problem.
- remote_event = await self._store.get_event(
- event_id,
- allow_none=True,
- allow_rejected=True,
- redact_behaviour=EventRedactBehaviour.as_is,
- )
- if not remote_event:
- raise Exception("Unable to get missing prev_event %s" % (event_id,))
- # missing state at that event is a warning, not a blocker
- # XXX: this doesn't sound right? it means that we'll end up with incomplete
- # state.
- failed_to_fetch = desired_events - event_metadata.keys()
- # `event_id` could be missing from `event_metadata` because it's not necessarily
- # a state event. We've already checked that we've fetched it above.
- failed_to_fetch.discard(event_id)
- if failed_to_fetch:
- logger.warning(
- "Failed to fetch missing state events for %s %s",
- event_id,
- failed_to_fetch,
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "failed_to_fetch",
- str(failed_to_fetch),
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
- str(len(failed_to_fetch)),
- )
- if remote_event.is_state() and remote_event.rejected_reason is None:
- state_map[
- (remote_event.type, remote_event.state_key)
- ] = remote_event.event_id
- return state_map
- @trace
- @tag_args
- async def _get_state_and_persist(
- self, destination: str, room_id: str, event_id: str
- ) -> None:
- """Get the complete room state at a given event, and persist any new events
- as outliers"""
- room_version = await self._store.get_room_version(room_id)
- auth_events, state_events = await self._federation_client.get_room_state(
- destination, room_id, event_id=event_id, room_version=room_version
- )
- logger.info("/state returned %i events", len(auth_events) + len(state_events))
- await self._auth_and_persist_outliers(
- room_id, itertools.chain(auth_events, state_events)
- )
- # we also need the event itself.
- if not await self._store.have_seen_event(room_id, event_id):
- await self._get_events_and_persist(
- destination=destination, room_id=room_id, event_ids=(event_id,)
- )
- @trace
- async def _process_received_pdu(
- self,
- origin: str,
- event: EventBase,
- context: EventContext,
- backfilled: bool = False,
- ) -> None:
- """Called when we have a new non-outlier event.
- This is called when we have a new event to add to the room DAG. This can be
- due to:
- * events received directly via a /send request
- * events retrieved via get_missing_events after a /send request
- * events backfilled after a client request.
- It's not currently used for events received from incoming send_{join,knock,leave}
- requests (which go via on_send_membership_event), nor for joins created by a
- remote join dance (which go via process_remote_join).
- We need to do auth checks and put it through the StateHandler.
- Args:
- origin: server sending the event
- event: event to be persisted
- context: The `EventContext` to persist the event with.
- backfilled: True if this is part of a historical batch of events (inhibits
- notification to clients, and validation of device keys.)
- PartialStateConflictError: if the room was un-partial stated in between
- computing the state at the event and persisting it. The caller should
- recompute `context` and retry exactly once when this happens.
- """
- logger.debug("Processing event: %s", event)
- assert not event.internal_metadata.outlier
- try:
- await self._check_event_auth(origin, event, context)
- except AuthError as e:
- # This happens only if we couldn't find the auth events. We'll already have
- # logged a warning, so now we just convert to a FederationError.
- raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- if not backfilled and not context.rejected:
- # For new (non-backfilled and non-outlier) events we check if the event
- # passes auth based on the current state. If it doesn't then we
- # "soft-fail" the event.
- await self._check_for_soft_fail(event, context=context, origin=origin)
- await self._run_push_actions_and_persist_event(event, context, backfilled)
- await self._handle_marker_event(origin, event)
- if backfilled or context.rejected:
- return
- await self._maybe_kick_guest_users(event)
- # For encrypted messages we check that we know about the sending device,
- # if we don't then we mark the device cache for that user as stale.
- if event.type == EventTypes.Encrypted:
- device_id = event.content.get("device_id")
- sender_key = event.content.get("sender_key")
- cached_devices = await self._store.get_cached_devices_for_user(event.sender)
- resync = False # Whether we should resync device lists.
- device = None
- if device_id is not None:
- device = cached_devices.get(device_id)
- if device is None:
- logger.info(
- "Received event from remote device not in our cache: %s %s",
- event.sender,
- device_id,
- )
- resync = True
- # We also check if the `sender_key` matches what we expect.
- if sender_key is not None:
- # Figure out what sender key we're expecting. If we know the
- # device and recognize the algorithm then we can work out the
- # exact key to expect. Otherwise check it matches any key we
- # have for that device.
- current_keys: Container[str] = []
- if device:
- keys = device.get("keys", {}).get("keys", {})
- if (
- event.content.get("algorithm")
- == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
- ):
- # For this algorithm we expect a curve25519 key.
- key_name = "curve25519:%s" % (device_id,)
- current_keys = [keys.get(key_name)]
- else:
- # We don't know understand the algorithm, so we just
- # check it matches a key for the device.
- current_keys = keys.values()
- elif device_id:
- # We don't have any keys for the device ID.
- pass
- else:
- # The event didn't include a device ID, so we just look for
- # keys across all devices.
- current_keys = [
- key
- for device in cached_devices.values()
- for key in device.get("keys", {}).get("keys", {}).values()
- ]
- # We now check that the sender key matches (one of) the expected
- # keys.
- if sender_key not in current_keys:
- logger.info(
- "Received event from remote device with unexpected sender key: %s %s: %s",
- event.sender,
- device_id or "<no device_id>",
- sender_key,
- )
- resync = True
- if resync:
- run_as_background_process(
- "resync_device_due_to_pdu",
- self._resync_device,
- event.sender,
- )
- async def _resync_device(self, sender: str) -> None:
- """We have detected that the device list for the given user may be out
- of sync, so we try and resync them.
- """
- try:
- await self._store.mark_remote_users_device_caches_as_stale((sender,))
- # Immediately attempt a resync in the background
- if self._config.worker.worker_app:
- await self._user_device_resync(user_id=sender)
- else:
- await self._device_list_updater.user_device_resync(sender)
- except Exception:
- logger.exception("Failed to resync device for %s", sender)
- @trace
- async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
- """Handles backfilling the insertion event when we receive a marker
- event that points to one.
- Args:
- origin: Origin of the event. Will be called to get the insertion event
- marker_event: The event to process
- """
- if marker_event.type != EventTypes.MSC2716_MARKER:
- # Not a marker event
- return
- if marker_event.rejected_reason is not None:
- # Rejected event
- return
- # Skip processing a marker event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = await self._store.get_room_version(marker_event.room_id)
- create_event = await self._store.get_create_event_for_room(marker_event.room_id)
- room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
- if not room_version.msc2716_historical and (
- not self._config.experimental.msc2716_enabled
- or marker_event.sender != room_creator
- ):
- return
- logger.debug("_handle_marker_event: received %s", marker_event)
- insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
- )
- if insertion_event_id is None:
- # Nothing to retrieve then (invalid marker)
- return
- already_seen_insertion_event = await self._store.have_seen_event(
- marker_event.room_id, insertion_event_id
- )
- if already_seen_insertion_event:
- # No need to process a marker again if we have already seen the
- # insertion event that it was pointing to
- return
- logger.debug(
- "_handle_marker_event: backfilling insertion event %s", insertion_event_id
- )
- await self._get_events_and_persist(
- origin,
- marker_event.room_id,
- [insertion_event_id],
- )
- insertion_event = await self._store.get_event(
- insertion_event_id, allow_none=True
- )
- if insertion_event is None:
- logger.warning(
- "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
- origin,
- insertion_event_id,
- marker_event.event_id,
- )
- return
- logger.debug(
- "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
- insertion_event,
- marker_event,
- )
- await self._store.insert_insertion_extremity(
- insertion_event_id, marker_event.room_id
- )
- logger.debug(
- "_handle_marker_event: insertion extremity added for %s from marker event %s",
- insertion_event,
- marker_event,
- )
- async def backfill_event_id(
- self, destinations: List[str], room_id: str, event_id: str
- ) -> PulledPduInfo:
- """Backfill a single event and persist it as a non-outlier which means
- we also pull in all of the state and auth events necessary for it.
- Args:
- destination: The homeserver to pull the given event_id from.
- room_id: The room where the event is from.
- event_id: The event ID to backfill.
- Raises:
- FederationError if we are unable to find the event from the destination
- """
- logger.info("backfill_event_id: event_id=%s", event_id)
- room_version = await self._store.get_room_version(room_id)
- pulled_pdu_info = await self._federation_client.get_pdu(
- destinations,
- event_id,
- room_version,
- )
- if not pulled_pdu_info:
- raise FederationError(
- "ERROR",
- 404,
- f"Unable to find event_id={event_id} from remote servers to backfill.",
- affected=event_id,
- )
- # Persist the event we just fetched, including pulling all of the state
- # and auth events to de-outlier it. This also sets up the necessary
- # `state_groups` for the event.
- await self._process_pulled_events(
- pulled_pdu_info.pull_origin,
- [pulled_pdu_info.pdu],
- # Prevent notifications going to clients
- backfilled=True,
- )
- return pulled_pdu_info
- @trace
- @tag_args
- async def _get_events_and_persist(
- self, destination: str, room_id: str, event_ids: Collection[str]
- ) -> None:
- """Fetch the given events from a server, and persist them as outliers.
- This function *does not* recursively get missing auth events of the
- newly fetched events. Callers must include in the `event_ids` argument
- any missing events from the auth chain.
- Logs a warning if we can't find the given event.
- """
- room_version = await self._store.get_room_version(room_id)
- events: List[EventBase] = []
- async def get_event(event_id: str) -> None:
- with nested_logging_context(event_id):
- try:
- pulled_pdu_info = await self._federation_client.get_pdu(
- [destination],
- event_id,
- room_version,
- )
- if pulled_pdu_info is None:
- logger.warning(
- "Server %s didn't return event %s",
- destination,
- event_id,
- )
- return
- events.append(pulled_pdu_info.pdu)
- except Exception as e:
- logger.warning(
- "Error fetching missing state/auth event %s: %s %s",
- event_id,
- type(e),
- e,
- )
- await concurrently_execute(get_event, event_ids, 5)
- logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
- await self._auth_and_persist_outliers(room_id, events)
- @trace
- async def _auth_and_persist_outliers(
- self, room_id: str, events: Iterable[EventBase]
- ) -> None:
- """Persist a batch of outlier events fetched from remote servers.
- We first sort the events to make sure that we process each event's auth_events
- before the event itself.
- We then mark the events as outliers, persist them to the database, and, where
- appropriate (eg, an invite), awake the notifier.
- Params:
- room_id: the room that the events are meant to be in (though this has
- not yet been checked)
- events: the events that have been fetched
- """
- event_map = {event.event_id: event for event in events}
- event_ids = event_map.keys()
- set_tag(
- SynapseTags.FUNC_ARG_PREFIX + "event_ids",
- str(event_ids),
- )
- set_tag(
- SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
- str(len(event_ids)),
- )
- # filter out any events we have already seen. This might happen because
- # the events were eagerly pushed to us (eg, during a room join), or because
- # another thread has raced against us since we decided to request the event.
- #
- # This is just an optimisation, so it doesn't need to be watertight - the event
- # persister does another round of deduplication.
- seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
- for s in seen_remotes:
- event_map.pop(s, None)
- # XXX: it might be possible to kick this process off in parallel with fetching
- # the events.
- while event_map:
- # build a list of events whose auth events are not in the queue.
- roots = tuple(
- ev
- for ev in event_map.values()
- if not any(aid in event_map for aid in ev.auth_event_ids())
- )
- if not roots:
- # if *none* of the remaining events are ready, that means
- # we have a loop. This either means a bug in our logic, or that
- # somebody has managed to create a loop (which requires finding a
- # hash collision in room v2 and later).
- logger.warning(
- "Loop found in auth events while fetching missing state/auth "
- "events: %s",
- shortstr(event_map.keys()),
- )
- return
- logger.info(
- "Persisting %i of %i remaining outliers: %s",
- len(roots),
- len(event_map),
- shortstr(e.event_id for e in roots),
- )
- await self._auth_and_persist_outliers_inner(room_id, roots)
- for ev in roots:
- del event_map[ev.event_id]
- async def _auth_and_persist_outliers_inner(
- self, room_id: str, fetched_events: Collection[EventBase]
- ) -> None:
- """Helper for _auth_and_persist_outliers
- Persists a batch of events where we have (theoretically) already persisted all
- of their auth events.
- Marks the events as outliers, auths them, persists them to the database, and,
- where appropriate (eg, an invite), awakes the notifier.
- Params:
- origin: where the events came from
- room_id: the room that the events are meant to be in (though this has
- not yet been checked)
- fetched_events: the events to persist
- """
- # get all the auth events for all the events in this batch. By now, they should
- # have been persisted.
- auth_events = {
- aid for event in fetched_events for aid in event.auth_event_ids()
- }
- persisted_events = await self._store.get_events(
- auth_events,
- allow_rejected=True,
- )
- events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
- async def prep(event: EventBase) -> None:
- with nested_logging_context(suffix=event.event_id):
- auth = []
- for auth_event_id in event.auth_event_ids():
- ae = persisted_events.get(auth_event_id)
- if not ae:
- # the fact we can't find the auth event doesn't mean it doesn't
- # exist, which means it is premature to reject `event`. Instead we
- # just ignore it for now.
- logger.warning(
- "Dropping event %s, which relies on auth_event %s, which could not be found",
- event,
- auth_event_id,
- )
- return
- auth.append(ae)
- # we're not bothering about room state, so flag the event as an outlier.
- event.internal_metadata.outlier = True
- context = EventContext.for_outlier(self._storage_controllers)
- try:
- validate_event_for_room_version(event)
- await check_state_independent_auth_rules(self._store, event)
- check_state_dependent_auth_rules(event, auth)
- except AuthError as e:
- logger.warning("Rejecting %r because %s", event, e)
- context.rejected = RejectedReason.AUTH_ERROR
- except EventSizeError as e:
- if e.unpersistable:
- # This event is completely unpersistable.
- raise e
- # Otherwise, we are somewhat lenient and just persist the event
- # as rejected, for moderate compatibility with older Synapse
- # versions.
- logger.warning("While validating received event %r: %s", event, e)
- context.rejected = RejectedReason.OVERSIZED_EVENT
- events_and_contexts_to_persist.append((event, context))
- for event in fetched_events:
- await prep(event)
- await self.persist_events_and_notify(
- room_id,
- events_and_contexts_to_persist,
- # Mark these events backfilled as they're historic events that will
- # eventually be backfilled. For example, missing events we fetch
- # during backfill should be marked as backfilled as well.
- backfilled=True,
- )
- @trace
- async def _check_event_auth(
- self, origin: Optional[str], event: EventBase, context: EventContext
- ) -> None:
- """
- Checks whether an event should be rejected (for failing auth checks).
- Args:
- origin: The host the event originates from. This is used to fetch
- any missing auth events. It can be set to None, but only if we are
- sure that we already have all the auth events.
- event: The event itself.
- context:
- The event context.
- Raises:
- AuthError if we were unable to find copies of the event's auth events.
- (Most other failures just cause us to set `context.rejected`.)
- """
- # This method should only be used for non-outliers
- assert not event.internal_metadata.outlier
- # first of all, check that the event itself is valid.
- try:
- validate_event_for_room_version(event)
- except AuthError as e:
- logger.warning("While validating received event %r: %s", event, e)
- # TODO: use a different rejected reason here?
- context.rejected = RejectedReason.AUTH_ERROR
- return
- except EventSizeError as e:
- if e.unpersistable:
- # This event is completely unpersistable.
- raise e
- # Otherwise, we are somewhat lenient and just persist the event
- # as rejected, for moderate compatibility with older Synapse
- # versions.
- logger.warning("While validating received event %r: %s", event, e)
- context.rejected = RejectedReason.OVERSIZED_EVENT
- return
- # next, check that we have all of the event's auth events.
- #
- # Note that this can raise AuthError, which we want to propagate to the
- # caller rather than swallow with `context.rejected` (since we cannot be
- # certain that there is a permanent problem with the event).
- claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
- origin, event
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "claimed_auth_events",
- str([ev.event_id for ev in claimed_auth_events]),
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
- str(len(claimed_auth_events)),
- )
- # ... and check that the event passes auth at those auth events.
- # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
- # 4. Passes authorization rules based on the event’s auth events,
- # otherwise it is rejected.
- try:
- await check_state_independent_auth_rules(self._store, event)
- check_state_dependent_auth_rules(event, claimed_auth_events)
- except AuthError as e:
- logger.warning(
- "While checking auth of %r against auth_events: %s", event, e
- )
- context.rejected = RejectedReason.AUTH_ERROR
- return
- # now check the auth rules pass against the room state before the event
- # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
- # 5. Passes authorization rules based on the state before the event,
- # otherwise it is rejected.
- #
- # ... however, if we only have partial state for the room, then there is a good
- # chance that we'll be missing some of the state needed to auth the new event.
- # So, we state-resolve the auth events that we are given against the state that
- # we know about, which ensures things like bans are applied. (Note that we'll
- # already have checked we have all the auth events, in
- # _load_or_fetch_auth_events_for_event above)
- if context.partial_state:
- room_version = await self._store.get_room_version_id(event.room_id)
- local_state_id_map = await context.get_prev_state_ids()
- claimed_auth_events_id_map = {
- (ev.type, ev.state_key): ev.event_id for ev in claimed_auth_events
- }
- state_for_auth_id_map = (
- await self._state_resolution_handler.resolve_events_with_store(
- event.room_id,
- room_version,
- [local_state_id_map, claimed_auth_events_id_map],
- event_map=None,
- state_res_store=StateResolutionStore(self._store),
- )
- )
- else:
- event_types = event_auth.auth_types_for_event(event.room_version, event)
- state_for_auth_id_map = await context.get_prev_state_ids(
- StateFilter.from_types(event_types)
- )
- calculated_auth_event_ids = self._event_auth_handler.compute_auth_events(
- event, state_for_auth_id_map, for_verification=True
- )
- # if those are the same, we're done here.
- if collections.Counter(event.auth_event_ids()) == collections.Counter(
- calculated_auth_event_ids
- ):
- return
- # otherwise, re-run the auth checks based on what we calculated.
- calculated_auth_events = await self._store.get_events_as_list(
- calculated_auth_event_ids
- )
- # log the differences
- claimed_auth_event_map = {(e.type, e.state_key): e for e in claimed_auth_events}
- calculated_auth_event_map = {
- (e.type, e.state_key): e for e in calculated_auth_events
- }
- logger.info(
- "event's auth_events are different to our calculated auth_events. "
- "Claimed but not calculated: %s. Calculated but not claimed: %s",
- [
- ev
- for k, ev in claimed_auth_event_map.items()
- if k not in calculated_auth_event_map
- or calculated_auth_event_map[k].event_id != ev.event_id
- ],
- [
- ev
- for k, ev in calculated_auth_event_map.items()
- if k not in claimed_auth_event_map
- or claimed_auth_event_map[k].event_id != ev.event_id
- ],
- )
- try:
- check_state_dependent_auth_rules(event, calculated_auth_events)
- except AuthError as e:
- logger.warning(
- "While checking auth of %r against room state before the event: %s",
- event,
- e,
- )
- context.rejected = RejectedReason.AUTH_ERROR
- @trace
- async def _maybe_kick_guest_users(self, event: EventBase) -> None:
- if event.type != EventTypes.GuestAccess:
- return
- guest_access = event.content.get(EventContentFields.GUEST_ACCESS)
- if guest_access == GuestAccess.CAN_JOIN:
- return
- current_state = await self._storage_controllers.state.get_current_state(
- event.room_id
- )
- current_state_list = list(current_state.values())
- await self._get_room_member_handler().kick_guest_users(current_state_list)
- async def _check_for_soft_fail(
- self,
- event: EventBase,
- context: EventContext,
- origin: str,
- ) -> None:
- """Checks if we should soft fail the event; if so, marks the event as
- such.
- Does nothing for events in rooms with partial state, since we may not have an
- accurate membership event for the sender in the current state.
- Args:
- event
- context: The `EventContext` which we are about to persist the event with.
- origin: The host the event originates from.
- """
- if await self._store.is_partial_state_room(event.room_id):
- # We might not know the sender's membership in the current state, so don't
- # soft fail anything. Even if we do have a membership for the sender in the
- # current state, it may have been derived from state resolution between
- # partial and full state and may not be accurate.
- return
- extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
- extrem_ids = set(extrem_ids_list)
- prev_event_ids = set(event.prev_event_ids())
- if extrem_ids == prev_event_ids:
- # If they're the same then the current state is the same as the
- # state at the event, so no point rechecking auth for soft fail.
- return
- room_version = await self._store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- # The event types we want to pull from the "current" state.
- auth_types = auth_types_for_event(room_version_obj, event)
- # Calculate the "current state".
- seen_event_ids = await self._store.have_events_in_timeline(prev_event_ids)
- has_missing_prevs = bool(prev_event_ids - seen_event_ids)
- if has_missing_prevs:
- # We don't have all the prev_events of this event, which means we have a
- # gap in the graph, and the new event is going to become a new backwards
- # extremity.
- #
- # In this case we want to be a little careful as we might have been
- # down for a while and have an incorrect view of the current state,
- # however we still want to do checks as gaps are easy to
- # maliciously manufacture.
- #
- # So we use a "current state" that is actually a state
- # resolution across the current forward extremities and the
- # given state at the event. This should correctly handle cases
- # like bans, especially with state res v2.
- state_sets_d = await self._state_storage_controller.get_state_groups_ids(
- event.room_id, extrem_ids
- )
- state_sets: List[StateMap[str]] = list(state_sets_d.values())
- state_ids = await context.get_prev_state_ids()
- state_sets.append(state_ids)
- current_state_ids = (
- await self._state_resolution_handler.resolve_events_with_store(
- event.room_id,
- room_version,
- state_sets,
- event_map=None,
- state_res_store=StateResolutionStore(self._store),
- )
- )
- else:
- current_state_ids = (
- await self._state_storage_controller.get_current_state_ids(
- event.room_id, StateFilter.from_types(auth_types)
- )
- )
- logger.debug(
- "Doing soft-fail check for %s: state %s",
- event.event_id,
- current_state_ids,
- )
- # Now check if event pass auth against said current state
- current_state_ids_list = [
- e for k, e in current_state_ids.items() if k in auth_types
- ]
- current_auth_events = await self._store.get_events_as_list(
- current_state_ids_list
- )
- try:
- check_state_dependent_auth_rules(event, current_auth_events)
- except AuthError as e:
- logger.warning(
- "Soft-failing %r (from %s) because %s",
- event,
- e,
- origin,
- extra={
- "room_id": event.room_id,
- "mxid": event.sender,
- "hs": origin,
- },
- )
- soft_failed_event_counter.inc()
- event.internal_metadata.soft_failed = True
- async def _load_or_fetch_auth_events_for_event(
- self, destination: Optional[str], event: EventBase
- ) -> Collection[EventBase]:
- """Fetch this event's auth_events, from database or remote
- Loads any of the auth_events that we already have from the database/cache. If
- there are any that are missing, calls /event_auth to get the complete auth
- chain for the event (and then attempts to load the auth_events again).
- If any of the auth_events cannot be found, raises an AuthError. This can happen
- for a number of reasons; eg: the events don't exist, or we were unable to talk
- to `destination`, or we couldn't validate the signature on the event (which
- in turn has multiple potential causes).
- Args:
- destination: where to send the /event_auth request. Typically the server
- that sent us `event` in the first place.
- If this is None, no attempt is made to load any missing auth events:
- rather, an AssertionError is raised if there are any missing events.
- event: the event whose auth_events we want
- Returns:
- all of the events listed in `event.auth_events_ids`, after deduplication
- Raises:
- AssertionError if some auth events were missing and no `destination` was
- supplied.
- AuthError if we were unable to fetch the auth_events for any reason.
- """
- event_auth_event_ids = set(event.auth_event_ids())
- event_auth_events = await self._store.get_events(
- event_auth_event_ids, allow_rejected=True
- )
- missing_auth_event_ids = event_auth_event_ids.difference(
- event_auth_events.keys()
- )
- if not missing_auth_event_ids:
- return event_auth_events.values()
- if destination is None:
- # this shouldn't happen: destination must be set unless we know we have already
- # persisted the auth events.
- raise AssertionError(
- "_load_or_fetch_auth_events_for_event() called with no destination for "
- "an event with missing auth_events"
- )
- logger.info(
- "Event %s refers to unknown auth events %s: fetching auth chain",
- event,
- missing_auth_event_ids,
- )
- try:
- await self._get_remote_auth_chain_for_event(
- destination, event.room_id, event.event_id
- )
- except Exception as e:
- logger.warning("Failed to get auth chain for %s: %s", event, e)
- # in this case, it's very likely we still won't have all the auth
- # events - but we pick that up below.
- # try to fetch the auth events we missed list time.
- extra_auth_events = await self._store.get_events(
- missing_auth_event_ids, allow_rejected=True
- )
- missing_auth_event_ids.difference_update(extra_auth_events.keys())
- event_auth_events.update(extra_auth_events)
- if not missing_auth_event_ids:
- return event_auth_events.values()
- # we still don't have all the auth events.
- logger.warning(
- "Missing auth events for %s: %s",
- event,
- shortstr(missing_auth_event_ids),
- )
- # the fact we can't find the auth event doesn't mean it doesn't
- # exist, which means it is premature to store `event` as rejected.
- # instead we raise an AuthError, which will make the caller ignore it.
- raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
- @trace
- @tag_args
- async def _get_remote_auth_chain_for_event(
- self, destination: str, room_id: str, event_id: str
- ) -> None:
- """If we are missing some of an event's auth events, attempt to request them
- Args:
- destination: where to fetch the auth tree from
- room_id: the room in which we are lacking auth events
- event_id: the event for which we are lacking auth events
- """
- try:
- remote_events = await self._federation_client.get_event_auth(
- destination, room_id, event_id
- )
- except RequestSendFailed as e1:
- # The other side isn't around or doesn't implement the
- # endpoint, so lets just bail out.
- logger.info("Failed to get event auth from remote: %s", e1)
- return
- logger.info("/event_auth returned %i events", len(remote_events))
- # `event` may be returned, but we should not yet process it.
- remote_auth_events = (e for e in remote_events if e.event_id != event_id)
- await self._auth_and_persist_outliers(room_id, remote_auth_events)
- @trace
- async def _run_push_actions_and_persist_event(
- self, event: EventBase, context: EventContext, backfilled: bool = False
- ) -> None:
- """Run the push actions for a received event, and persist it.
- Args:
- event: The event itself.
- context: The event context.
- backfilled: True if the event was backfilled.
- PartialStateConflictError: if attempting to persist a partial state event in
- a room that has been un-partial stated.
- """
- # this method should not be called on outliers (those code paths call
- # persist_events_and_notify directly.)
- assert not event.internal_metadata.outlier
- if not backfilled and not context.rejected:
- min_depth = await self._store.get_min_depth(event.room_id)
- if min_depth is None or min_depth > event.depth:
- # XXX richvdh 2021/10/07: I don't really understand what this
- # condition is doing. I think it's trying not to send pushes
- # for events that predate our join - but that's not really what
- # min_depth means, and anyway ancient events are a more general
- # problem.
- #
- # for now I'm just going to log about it.
- logger.info(
- "Skipping push actions for old event with depth %s < %s",
- event.depth,
- min_depth,
- )
- else:
- await self._bulk_push_rule_evaluator.action_for_events_by_user(
- [(event, context)]
- )
- try:
- await self.persist_events_and_notify(
- event.room_id, [(event, context)], backfilled=backfilled
- )
- except Exception:
- await self._store.remove_push_actions_from_staging(event.event_id)
- raise
- async def persist_events_and_notify(
- self,
- room_id: str,
- event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
- backfilled: bool = False,
- ) -> int:
- """Persists events and tells the notifier/pushers about them, if
- necessary.
- Args:
- room_id: The room ID of events being persisted.
- event_and_contexts: Sequence of events with their associated
- context that should be persisted. All events must belong to
- the same room.
- backfilled: Whether these events are a result of
- backfilling or not
- Returns:
- The stream ID after which all events have been persisted.
- Raises:
- PartialStateConflictError: if attempting to persist a partial state event in
- a room that has been un-partial stated.
- """
- if not event_and_contexts:
- return self._store.get_room_max_stream_ordering()
- instance = self._config.worker.events_shard_config.get_instance(room_id)
- if instance != self._instance_name:
- # Limit the number of events sent over replication. We choose 200
- # here as that is what we default to in `max_request_body_size(..)`
- result = {}
- try:
- for batch in batch_iter(event_and_contexts, 200):
- result = await self._send_events(
- instance_name=instance,
- store=self._store,
- room_id=room_id,
- event_and_contexts=batch,
- backfilled=backfilled,
- )
- except SynapseError as e:
- if e.code == HTTPStatus.CONFLICT:
- raise PartialStateConflictError()
- raise
- return result["max_stream_id"]
- else:
- assert self._storage_controllers.persistence
- # Note that this returns the events that were persisted, which may not be
- # the same as were passed in if some were deduplicated due to transaction IDs.
- (
- events,
- max_stream_token,
- ) = await self._storage_controllers.persistence.persist_events(
- event_and_contexts, backfilled=backfilled
- )
- # After persistence we always need to notify replication there may
- # be new data.
- self._notifier.notify_replication()
- if self._ephemeral_messages_enabled:
- for event in events:
- # If there's an expiry timestamp on the event, schedule its expiry.
- self._message_handler.maybe_schedule_expiry(event)
- if not backfilled: # Never notify for backfilled events
- with start_active_span("notify_persisted_events"):
- set_tag(
- SynapseTags.RESULT_PREFIX + "event_ids",
- str([ev.event_id for ev in events]),
- )
- set_tag(
- SynapseTags.RESULT_PREFIX + "event_ids.length",
- str(len(events)),
- )
- for event in events:
- await self._notify_persisted_event(event, max_stream_token)
- return max_stream_token.stream
- async def _notify_persisted_event(
- self, event: EventBase, max_stream_token: RoomStreamToken
- ) -> None:
- """Checks to see if notifier/pushers should be notified about the
- event or not.
- Args:
- event:
- max_stream_token: The max_stream_id returned by persist_events
- """
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- # We notify for memberships if its an invite for one of our
- # users
- if event.internal_metadata.is_outlier():
- if event.membership != Membership.INVITE:
- if not self._is_mine_id(target_user_id):
- return
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
- elif event.internal_metadata.is_outlier():
- return
- # the event has been persisted so it should have a stream ordering.
- assert event.internal_metadata.stream_ordering
- event_pos = PersistedEventPosition(
- self._instance_name, event.internal_metadata.stream_ordering
- )
- await self._notifier.on_new_room_events(
- [(event, event_pos)], max_stream_token, extra_users=extra_users
- )
- if event.type == EventTypes.Member and event.membership == Membership.JOIN:
- # TODO retrieve the previous state, and exclude join -> join transitions
- self._notifier.notify_user_joined_room(event.event_id, event.room_id)
- def _sanity_check_event(self, ev: EventBase) -> None:
- """
- Do some early sanity checks of a received event
- In particular, checks it doesn't have an excessive number of
- prev_events or auth_events, which could cause a huge state resolution
- or cascade of event fetches.
- Args:
- ev: event to be checked
- Raises:
- SynapseError if the event does not pass muster
- """
- if len(ev.prev_event_ids()) > 20:
- logger.warning(
- "Rejecting event %s which has %i prev_events",
- ev.event_id,
- len(ev.prev_event_ids()),
- )
- raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
- if len(ev.auth_event_ids()) > 10:
- logger.warning(
- "Rejecting event %s which has %i auth_events",
- ev.event_id,
- len(ev.auth_event_ids()),
- )
- raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|