federation_event.py 80 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import itertools
  15. import logging
  16. from http import HTTPStatus
  17. from typing import (
  18. TYPE_CHECKING,
  19. Collection,
  20. Container,
  21. Dict,
  22. Iterable,
  23. List,
  24. Optional,
  25. Sequence,
  26. Set,
  27. Tuple,
  28. )
  29. from prometheus_client import Counter
  30. from synapse.api.constants import (
  31. EventContentFields,
  32. EventTypes,
  33. GuestAccess,
  34. Membership,
  35. RejectedReason,
  36. RoomEncryptionAlgorithms,
  37. )
  38. from synapse.api.errors import (
  39. AuthError,
  40. Codes,
  41. FederationError,
  42. HttpResponseException,
  43. RequestSendFailed,
  44. SynapseError,
  45. )
  46. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
  47. from synapse.event_auth import (
  48. auth_types_for_event,
  49. check_auth_rules_for_event,
  50. validate_event_for_room_version,
  51. )
  52. from synapse.events import EventBase
  53. from synapse.events.snapshot import EventContext
  54. from synapse.federation.federation_client import InvalidResponseError
  55. from synapse.logging.context import nested_logging_context, run_in_background
  56. from synapse.metrics.background_process_metrics import run_as_background_process
  57. from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
  58. from synapse.replication.http.federation import (
  59. ReplicationFederationSendEventsRestServlet,
  60. )
  61. from synapse.state import StateResolutionStore
  62. from synapse.storage.databases.main.events_worker import EventRedactBehaviour
  63. from synapse.types import (
  64. PersistedEventPosition,
  65. RoomStreamToken,
  66. StateMap,
  67. UserID,
  68. get_domain_from_id,
  69. )
  70. from synapse.util.async_helpers import Linearizer, concurrently_execute
  71. from synapse.util.iterutils import batch_iter
  72. from synapse.util.retryutils import NotRetryingDestination
  73. from synapse.util.stringutils import shortstr
  74. if TYPE_CHECKING:
  75. from synapse.server import HomeServer
  76. logger = logging.getLogger(__name__)
  77. soft_failed_event_counter = Counter(
  78. "synapse_federation_soft_failed_events_total",
  79. "Events received over federation that we marked as soft_failed",
  80. )
  81. class FederationEventHandler:
  82. """Handles events that originated from federation.
  83. Responsible for handing incoming events and passing them on to the rest
  84. of the homeserver (including auth and state conflict resolutions)
  85. """
  86. def __init__(self, hs: "HomeServer"):
  87. self._store = hs.get_datastores().main
  88. self._storage = hs.get_storage()
  89. self._state_store = self._storage.state
  90. self._state_handler = hs.get_state_handler()
  91. self._event_creation_handler = hs.get_event_creation_handler()
  92. self._event_auth_handler = hs.get_event_auth_handler()
  93. self._message_handler = hs.get_message_handler()
  94. self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
  95. self._state_resolution_handler = hs.get_state_resolution_handler()
  96. # avoid a circular dependency by deferring execution here
  97. self._get_room_member_handler = hs.get_room_member_handler
  98. self._federation_client = hs.get_federation_client()
  99. self._third_party_event_rules = hs.get_third_party_event_rules()
  100. self._notifier = hs.get_notifier()
  101. self._is_mine_id = hs.is_mine_id
  102. self._server_name = hs.hostname
  103. self._instance_name = hs.get_instance_name()
  104. self._config = hs.config
  105. self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
  106. self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
  107. if hs.config.worker.worker_app:
  108. self._user_device_resync = (
  109. ReplicationUserDevicesResyncRestServlet.make_client(hs)
  110. )
  111. else:
  112. self._device_list_updater = hs.get_device_handler().device_list_updater
  113. # When joining a room we need to queue any events for that room up.
  114. # For each room, a list of (pdu, origin) tuples.
  115. # TODO: replace this with something more elegant, probably based around the
  116. # federation event staging area.
  117. self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
  118. self._room_pdu_linearizer = Linearizer("fed_room_pdu")
  119. async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
  120. """Process a PDU received via a federation /send/ transaction
  121. Args:
  122. origin: server which initiated the /send/ transaction. Will
  123. be used to fetch missing events or state.
  124. pdu: received PDU
  125. """
  126. # We should never see any outliers here.
  127. assert not pdu.internal_metadata.outlier
  128. room_id = pdu.room_id
  129. event_id = pdu.event_id
  130. # We reprocess pdus when we have seen them only as outliers
  131. existing = await self._store.get_event(
  132. event_id, allow_none=True, allow_rejected=True
  133. )
  134. # FIXME: Currently we fetch an event again when we already have it
  135. # if it has been marked as an outlier.
  136. if existing:
  137. if not existing.internal_metadata.is_outlier():
  138. logger.info(
  139. "Ignoring received event %s which we have already seen", event_id
  140. )
  141. return
  142. if pdu.internal_metadata.is_outlier():
  143. logger.info(
  144. "Ignoring received outlier %s which we already have as an outlier",
  145. event_id,
  146. )
  147. return
  148. logger.info("De-outliering event %s", event_id)
  149. # do some initial sanity-checking of the event. In particular, make
  150. # sure it doesn't have hundreds of prev_events or auth_events, which
  151. # could cause a huge state resolution or cascade of event fetches.
  152. try:
  153. self._sanity_check_event(pdu)
  154. except SynapseError as err:
  155. logger.warning("Received event failed sanity checks")
  156. raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
  157. # If we are currently in the process of joining this room, then we
  158. # queue up events for later processing.
  159. if room_id in self.room_queues:
  160. logger.info(
  161. "Queuing PDU from %s for now: join in progress",
  162. origin,
  163. )
  164. self.room_queues[room_id].append((pdu, origin))
  165. return
  166. # If we're not in the room just ditch the event entirely. This is
  167. # probably an old server that has come back and thinks we're still in
  168. # the room (or we've been rejoined to the room by a state reset).
  169. #
  170. # Note that if we were never in the room then we would have already
  171. # dropped the event, since we wouldn't know the room version.
  172. is_in_room = await self._event_auth_handler.check_host_in_room(
  173. room_id, self._server_name
  174. )
  175. if not is_in_room:
  176. logger.info(
  177. "Ignoring PDU from %s as we're not in the room",
  178. origin,
  179. )
  180. return None
  181. # Try to fetch any missing prev events to fill in gaps in the graph
  182. prevs = set(pdu.prev_event_ids())
  183. seen = await self._store.have_events_in_timeline(prevs)
  184. missing_prevs = prevs - seen
  185. if missing_prevs:
  186. # We only backfill backwards to the min depth.
  187. min_depth = await self._store.get_min_depth(pdu.room_id)
  188. logger.debug("min_depth: %d", min_depth)
  189. if min_depth is not None and pdu.depth > min_depth:
  190. # If we're missing stuff, ensure we only fetch stuff one
  191. # at a time.
  192. logger.info(
  193. "Acquiring room lock to fetch %d missing prev_events: %s",
  194. len(missing_prevs),
  195. shortstr(missing_prevs),
  196. )
  197. async with self._room_pdu_linearizer.queue(pdu.room_id):
  198. logger.info(
  199. "Acquired room lock to fetch %d missing prev_events",
  200. len(missing_prevs),
  201. )
  202. try:
  203. await self._get_missing_events_for_pdu(
  204. origin, pdu, prevs, min_depth
  205. )
  206. except Exception as e:
  207. raise Exception(
  208. "Error fetching missing prev_events for %s: %s"
  209. % (event_id, e)
  210. ) from e
  211. # Update the set of things we've seen after trying to
  212. # fetch the missing stuff
  213. seen = await self._store.have_events_in_timeline(prevs)
  214. missing_prevs = prevs - seen
  215. if not missing_prevs:
  216. logger.info("Found all missing prev_events")
  217. if missing_prevs:
  218. # since this event was pushed to us, it is possible for it to
  219. # become the only forward-extremity in the room, and we would then
  220. # trust its state to be the state for the whole room. This is very
  221. # bad. Further, if the event was pushed to us, there is no excuse
  222. # for us not to have all the prev_events. (XXX: apart from
  223. # min_depth?)
  224. #
  225. # We therefore reject any such events.
  226. logger.warning(
  227. "Rejecting: failed to fetch %d prev events: %s",
  228. len(missing_prevs),
  229. shortstr(missing_prevs),
  230. )
  231. raise FederationError(
  232. "ERROR",
  233. 403,
  234. (
  235. "Your server isn't divulging details about prev_events "
  236. "referenced in this event."
  237. ),
  238. affected=pdu.event_id,
  239. )
  240. await self._process_received_pdu(origin, pdu, state=None)
  241. async def on_send_membership_event(
  242. self, origin: str, event: EventBase
  243. ) -> Tuple[EventBase, EventContext]:
  244. """
  245. We have received a join/leave/knock event for a room via send_join/leave/knock.
  246. Verify that event and send it into the room on the remote homeserver's behalf.
  247. This is quite similar to on_receive_pdu, with the following principal
  248. differences:
  249. * only membership events are permitted (and only events with
  250. sender==state_key -- ie, no kicks or bans)
  251. * *We* send out the event on behalf of the remote server.
  252. * We enforce the membership restrictions of restricted rooms.
  253. * Rejected events result in an exception rather than being stored.
  254. There are also other differences, however it is not clear if these are by
  255. design or omission. In particular, we do not attempt to backfill any missing
  256. prev_events.
  257. Args:
  258. origin: The homeserver of the remote (joining/invited/knocking) user.
  259. event: The member event that has been signed by the remote homeserver.
  260. Returns:
  261. The event and context of the event after inserting it into the room graph.
  262. Raises:
  263. SynapseError if the event is not accepted into the room
  264. """
  265. logger.debug(
  266. "on_send_membership_event: Got event: %s, signatures: %s",
  267. event.event_id,
  268. event.signatures,
  269. )
  270. if get_domain_from_id(event.sender) != origin:
  271. logger.info(
  272. "Got send_membership request for user %r from different origin %s",
  273. event.sender,
  274. origin,
  275. )
  276. raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
  277. if event.sender != event.state_key:
  278. raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
  279. assert not event.internal_metadata.outlier
  280. # Send this event on behalf of the other server.
  281. #
  282. # The remote server isn't a full participant in the room at this point, so
  283. # may not have an up-to-date list of the other homeservers participating in
  284. # the room, so we send it on their behalf.
  285. event.internal_metadata.send_on_behalf_of = origin
  286. context = await self._state_handler.compute_event_context(event)
  287. context = await self._check_event_auth(origin, event, context)
  288. if context.rejected:
  289. raise SynapseError(
  290. 403, f"{event.membership} event was rejected", Codes.FORBIDDEN
  291. )
  292. # for joins, we need to check the restrictions of restricted rooms
  293. if event.membership == Membership.JOIN:
  294. await self.check_join_restrictions(context, event)
  295. # for knock events, we run the third-party event rules. It's not entirely clear
  296. # why we don't do this for other sorts of membership events.
  297. if event.membership == Membership.KNOCK:
  298. event_allowed, _ = await self._third_party_event_rules.check_event_allowed(
  299. event, context
  300. )
  301. if not event_allowed:
  302. logger.info("Sending of knock %s forbidden by third-party rules", event)
  303. raise SynapseError(
  304. 403, "This event is not allowed in this context", Codes.FORBIDDEN
  305. )
  306. # all looks good, we can persist the event.
  307. # First, precalculate the joined hosts so that the federation sender doesn't
  308. # need to.
  309. await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
  310. await self._check_for_soft_fail(event, None, origin=origin)
  311. await self._run_push_actions_and_persist_event(event, context)
  312. return event, context
  313. async def check_join_restrictions(
  314. self, context: EventContext, event: EventBase
  315. ) -> None:
  316. """Check that restrictions in restricted join rules are matched
  317. Called when we receive a join event via send_join.
  318. Raises an auth error if the restrictions are not matched.
  319. """
  320. prev_state_ids = await context.get_prev_state_ids()
  321. # Check if the user is already in the room or invited to the room.
  322. user_id = event.state_key
  323. prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
  324. prev_member_event = None
  325. if prev_member_event_id:
  326. prev_member_event = await self._store.get_event(prev_member_event_id)
  327. # Check if the member should be allowed access via membership in a space.
  328. await self._event_auth_handler.check_restricted_join_rules(
  329. prev_state_ids,
  330. event.room_version,
  331. user_id,
  332. prev_member_event,
  333. )
  334. async def process_remote_join(
  335. self,
  336. origin: str,
  337. room_id: str,
  338. auth_events: List[EventBase],
  339. state: List[EventBase],
  340. event: EventBase,
  341. room_version: RoomVersion,
  342. partial_state: bool,
  343. ) -> int:
  344. """Persists the events returned by a send_join
  345. Checks the auth chain is valid (and passes auth checks) for the
  346. state and event. Then persists all of the events.
  347. Notifies about the persisted events where appropriate.
  348. Args:
  349. origin: Where the events came from
  350. room_id:
  351. auth_events
  352. state
  353. event
  354. room_version: The room version we expect this room to have, and
  355. will raise if it doesn't match the version in the create event.
  356. partial_state: True if the state omits non-critical membership events
  357. Returns:
  358. The stream ID after which all events have been persisted.
  359. Raises:
  360. SynapseError if the response is in some way invalid.
  361. """
  362. create_event = None
  363. for e in state:
  364. if (e.type, e.state_key) == (EventTypes.Create, ""):
  365. create_event = e
  366. break
  367. if create_event is None:
  368. # If the state doesn't have a create event then the room is
  369. # invalid, and it would fail auth checks anyway.
  370. raise SynapseError(400, "No create event in state")
  371. room_version_id = create_event.content.get(
  372. "room_version", RoomVersions.V1.identifier
  373. )
  374. if room_version.identifier != room_version_id:
  375. raise SynapseError(400, "Room version mismatch")
  376. # persist the auth chain and state events.
  377. #
  378. # any invalid events here will be marked as rejected, and we'll carry on.
  379. #
  380. # any events whose auth events are missing (ie, not in the send_join response,
  381. # and not already in our db) will just be ignored. This is correct behaviour,
  382. # because the reason that auth_events are missing might be due to us being
  383. # unable to validate their signatures. The fact that we can't validate their
  384. # signatures right now doesn't mean that we will *never* be able to, so it
  385. # is premature to reject them.
  386. #
  387. await self._auth_and_persist_outliers(
  388. room_id, itertools.chain(auth_events, state)
  389. )
  390. # and now persist the join event itself.
  391. logger.info(
  392. "Peristing join-via-remote %s (partial_state: %s)", event, partial_state
  393. )
  394. with nested_logging_context(suffix=event.event_id):
  395. context = await self._state_handler.compute_event_context(
  396. event,
  397. old_state=state,
  398. partial_state=partial_state,
  399. )
  400. context = await self._check_event_auth(origin, event, context)
  401. if context.rejected:
  402. raise SynapseError(400, "Join event was rejected")
  403. # the remote server is responsible for sending our join event to the rest
  404. # of the federation. Indeed, attempting to do so will result in problems
  405. # when we try to look up the state before the join (to get the server list)
  406. # and discover that we do not have it.
  407. event.internal_metadata.proactively_send = False
  408. return await self.persist_events_and_notify(room_id, [(event, context)])
  409. async def update_state_for_partial_state_event(
  410. self, destination: str, event: EventBase
  411. ) -> None:
  412. """Recalculate the state at an event as part of a de-partial-stating process
  413. Args:
  414. destination: server to request full state from
  415. event: partial-state event to be de-partial-stated
  416. """
  417. logger.info("Updating state for %s", event.event_id)
  418. with nested_logging_context(suffix=event.event_id):
  419. # if we have all the event's prev_events, then we can work out the
  420. # state based on their states. Otherwise, we request it from the destination
  421. # server.
  422. #
  423. # This is the same operation as we do when we receive a regular event
  424. # over federation.
  425. state = await self._resolve_state_at_missing_prevs(destination, event)
  426. # build a new state group for it if need be
  427. context = await self._state_handler.compute_event_context(
  428. event,
  429. old_state=state,
  430. )
  431. if context.partial_state:
  432. # this can happen if some or all of the event's prev_events still have
  433. # partial state - ie, an event has an earlier stream_ordering than one
  434. # or more of its prev_events, so we de-partial-state it before its
  435. # prev_events.
  436. #
  437. # TODO(faster_joins): we probably need to be more intelligent, and
  438. # exclude partial-state prev_events from consideration
  439. logger.warning(
  440. "%s still has partial state: can't de-partial-state it yet",
  441. event.event_id,
  442. )
  443. return
  444. await self._store.update_state_for_partial_state_event(event, context)
  445. self._state_store.notify_event_un_partial_stated(event.event_id)
  446. async def backfill(
  447. self, dest: str, room_id: str, limit: int, extremities: Collection[str]
  448. ) -> None:
  449. """Trigger a backfill request to `dest` for the given `room_id`
  450. This will attempt to get more events from the remote. If the other side
  451. has no new events to offer, this will return an empty list.
  452. As the events are received, we check their signatures, and also do some
  453. sanity-checking on them. If any of the backfilled events are invalid,
  454. this method throws a SynapseError.
  455. We might also raise an InvalidResponseError if the response from the remote
  456. server is just bogus.
  457. TODO: make this more useful to distinguish failures of the remote
  458. server from invalid events (there is probably no point in trying to
  459. re-fetch invalid events from every other HS in the room.)
  460. """
  461. if dest == self._server_name:
  462. raise SynapseError(400, "Can't backfill from self.")
  463. events = await self._federation_client.backfill(
  464. dest, room_id, limit=limit, extremities=extremities
  465. )
  466. if not events:
  467. return
  468. # if there are any events in the wrong room, the remote server is buggy and
  469. # should not be trusted.
  470. for ev in events:
  471. if ev.room_id != room_id:
  472. raise InvalidResponseError(
  473. f"Remote server {dest} returned event {ev.event_id} which is in "
  474. f"room {ev.room_id}, when we were backfilling in {room_id}"
  475. )
  476. await self._process_pulled_events(
  477. dest,
  478. events,
  479. backfilled=True,
  480. )
  481. async def _get_missing_events_for_pdu(
  482. self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
  483. ) -> None:
  484. """
  485. Args:
  486. origin: Origin of the pdu. Will be called to get the missing events
  487. pdu: received pdu
  488. prevs: List of event ids which we are missing
  489. min_depth: Minimum depth of events to return.
  490. """
  491. room_id = pdu.room_id
  492. event_id = pdu.event_id
  493. seen = await self._store.have_events_in_timeline(prevs)
  494. if not prevs - seen:
  495. return
  496. latest_list = await self._store.get_latest_event_ids_in_room(room_id)
  497. # We add the prev events that we have seen to the latest
  498. # list to ensure the remote server doesn't give them to us
  499. latest = set(latest_list)
  500. latest |= seen
  501. logger.info(
  502. "Requesting missing events between %s and %s",
  503. shortstr(latest),
  504. event_id,
  505. )
  506. # XXX: we set timeout to 10s to help workaround
  507. # https://github.com/matrix-org/synapse/issues/1733.
  508. # The reason is to avoid holding the linearizer lock
  509. # whilst processing inbound /send transactions, causing
  510. # FDs to stack up and block other inbound transactions
  511. # which empirically can currently take up to 30 minutes.
  512. #
  513. # N.B. this explicitly disables retry attempts.
  514. #
  515. # N.B. this also increases our chances of falling back to
  516. # fetching fresh state for the room if the missing event
  517. # can't be found, which slightly reduces our security.
  518. # it may also increase our DAG extremity count for the room,
  519. # causing additional state resolution? See #1760.
  520. # However, fetching state doesn't hold the linearizer lock
  521. # apparently.
  522. #
  523. # see https://github.com/matrix-org/synapse/pull/1744
  524. #
  525. # ----
  526. #
  527. # Update richvdh 2018/09/18: There are a number of problems with timing this
  528. # request out aggressively on the client side:
  529. #
  530. # - it plays badly with the server-side rate-limiter, which starts tarpitting you
  531. # if you send too many requests at once, so you end up with the server carefully
  532. # working through the backlog of your requests, which you have already timed
  533. # out.
  534. #
  535. # - for this request in particular, we now (as of
  536. # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
  537. # server can't produce a plausible-looking set of prev_events - so we becone
  538. # much more likely to reject the event.
  539. #
  540. # - contrary to what it says above, we do *not* fall back to fetching fresh state
  541. # for the room if get_missing_events times out. Rather, we give up processing
  542. # the PDU whose prevs we are missing, which then makes it much more likely that
  543. # we'll end up back here for the *next* PDU in the list, which exacerbates the
  544. # problem.
  545. #
  546. # - the aggressive 10s timeout was introduced to deal with incoming federation
  547. # requests taking 8 hours to process. It's not entirely clear why that was going
  548. # on; certainly there were other issues causing traffic storms which are now
  549. # resolved, and I think in any case we may be more sensible about our locking
  550. # now. We're *certainly* more sensible about our logging.
  551. #
  552. # All that said: Let's try increasing the timeout to 60s and see what happens.
  553. try:
  554. missing_events = await self._federation_client.get_missing_events(
  555. origin,
  556. room_id,
  557. earliest_events_ids=list(latest),
  558. latest_events=[pdu],
  559. limit=10,
  560. min_depth=min_depth,
  561. timeout=60000,
  562. )
  563. except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
  564. # We failed to get the missing events, but since we need to handle
  565. # the case of `get_missing_events` not returning the necessary
  566. # events anyway, it is safe to simply log the error and continue.
  567. logger.warning("Failed to get prev_events: %s", e)
  568. return
  569. logger.info("Got %d prev_events", len(missing_events))
  570. await self._process_pulled_events(origin, missing_events, backfilled=False)
  571. async def _process_pulled_events(
  572. self, origin: str, events: Iterable[EventBase], backfilled: bool
  573. ) -> None:
  574. """Process a batch of events we have pulled from a remote server
  575. Pulls in any events required to auth the events, persists the received events,
  576. and notifies clients, if appropriate.
  577. Assumes the events have already had their signatures and hashes checked.
  578. Params:
  579. origin: The server we received these events from
  580. events: The received events.
  581. backfilled: True if this is part of a historical batch of events (inhibits
  582. notification to clients, and validation of device keys.)
  583. """
  584. logger.debug(
  585. "processing pulled backfilled=%s events=%s",
  586. backfilled,
  587. [
  588. "event_id=%s,depth=%d,body=%s,prevs=%s\n"
  589. % (
  590. event.event_id,
  591. event.depth,
  592. event.content.get("body", event.type),
  593. event.prev_event_ids(),
  594. )
  595. for event in events
  596. ],
  597. )
  598. # We want to sort these by depth so we process them and
  599. # tell clients about them in order.
  600. sorted_events = sorted(events, key=lambda x: x.depth)
  601. for ev in sorted_events:
  602. with nested_logging_context(ev.event_id):
  603. await self._process_pulled_event(origin, ev, backfilled=backfilled)
  604. async def _process_pulled_event(
  605. self, origin: str, event: EventBase, backfilled: bool
  606. ) -> None:
  607. """Process a single event that we have pulled from a remote server
  608. Pulls in any events required to auth the event, persists the received event,
  609. and notifies clients, if appropriate.
  610. Assumes the event has already had its signatures and hashes checked.
  611. This is somewhat equivalent to on_receive_pdu, but applies somewhat different
  612. logic in the case that we are missing prev_events (in particular, it just
  613. requests the state at that point, rather than triggering a get_missing_events) -
  614. so is appropriate when we have pulled the event from a remote server, rather
  615. than having it pushed to us.
  616. Params:
  617. origin: The server we received this event from
  618. events: The received event
  619. backfilled: True if this is part of a historical batch of events (inhibits
  620. notification to clients, and validation of device keys.)
  621. """
  622. logger.info("Processing pulled event %s", event)
  623. # these should not be outliers.
  624. assert (
  625. not event.internal_metadata.is_outlier()
  626. ), "pulled event unexpectedly flagged as outlier"
  627. event_id = event.event_id
  628. existing = await self._store.get_event(
  629. event_id, allow_none=True, allow_rejected=True
  630. )
  631. if existing:
  632. if not existing.internal_metadata.is_outlier():
  633. logger.info(
  634. "Ignoring received event %s which we have already seen",
  635. event_id,
  636. )
  637. return
  638. logger.info("De-outliering event %s", event_id)
  639. try:
  640. self._sanity_check_event(event)
  641. except SynapseError as err:
  642. logger.warning("Event %s failed sanity check: %s", event_id, err)
  643. return
  644. try:
  645. state = await self._resolve_state_at_missing_prevs(origin, event)
  646. # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
  647. # not return partial state
  648. await self._process_received_pdu(
  649. origin, event, state=state, backfilled=backfilled
  650. )
  651. except FederationError as e:
  652. if e.code == 403:
  653. logger.warning("Pulled event %s failed history check.", event_id)
  654. else:
  655. raise
  656. async def _resolve_state_at_missing_prevs(
  657. self, dest: str, event: EventBase
  658. ) -> Optional[Iterable[EventBase]]:
  659. """Calculate the state at an event with missing prev_events.
  660. This is used when we have pulled a batch of events from a remote server, and
  661. still don't have all the prev_events.
  662. If we already have all the prev_events for `event`, this method does nothing.
  663. Otherwise, the missing prevs become new backwards extremities, and we fall back
  664. to asking the remote server for the state after each missing `prev_event`,
  665. and resolving across them.
  666. That's ok provided we then resolve the state against other bits of the DAG
  667. before using it - in other words, that the received event `event` is not going
  668. to become the only forwards_extremity in the room (which will ensure that you
  669. can't just take over a room by sending an event, withholding its prev_events,
  670. and declaring yourself to be an admin in the subsequent state request).
  671. In other words: we should only call this method if `event` has been *pulled*
  672. as part of a batch of missing prev events, or similar.
  673. Params:
  674. dest: the remote server to ask for state at the missing prevs. Typically,
  675. this will be the server we got `event` from.
  676. event: an event to check for missing prevs.
  677. Returns:
  678. if we already had all the prev events, `None`. Otherwise, returns a list of
  679. the events in the state at `event`.
  680. """
  681. room_id = event.room_id
  682. event_id = event.event_id
  683. prevs = set(event.prev_event_ids())
  684. seen = await self._store.have_events_in_timeline(prevs)
  685. missing_prevs = prevs - seen
  686. if not missing_prevs:
  687. return None
  688. logger.info(
  689. "Event %s is missing prev_events %s: calculating state for a "
  690. "backwards extremity",
  691. event_id,
  692. shortstr(missing_prevs),
  693. )
  694. # Calculate the state after each of the previous events, and
  695. # resolve them to find the correct state at the current event.
  696. event_map = {event_id: event}
  697. try:
  698. # Get the state of the events we know about
  699. ours = await self._state_store.get_state_groups_ids(room_id, seen)
  700. # state_maps is a list of mappings from (type, state_key) to event_id
  701. state_maps: List[StateMap[str]] = list(ours.values())
  702. # we don't need this any more, let's delete it.
  703. del ours
  704. # Ask the remote server for the states we don't
  705. # know about
  706. for p in missing_prevs:
  707. logger.info("Requesting state after missing prev_event %s", p)
  708. with nested_logging_context(p):
  709. # note that if any of the missing prevs share missing state or
  710. # auth events, the requests to fetch those events are deduped
  711. # by the get_pdu_cache in federation_client.
  712. remote_state = await self._get_state_after_missing_prev_event(
  713. dest, room_id, p
  714. )
  715. remote_state_map = {
  716. (x.type, x.state_key): x.event_id for x in remote_state
  717. }
  718. state_maps.append(remote_state_map)
  719. for x in remote_state:
  720. event_map[x.event_id] = x
  721. room_version = await self._store.get_room_version_id(room_id)
  722. state_map = await self._state_resolution_handler.resolve_events_with_store(
  723. room_id,
  724. room_version,
  725. state_maps,
  726. event_map,
  727. state_res_store=StateResolutionStore(self._store),
  728. )
  729. # We need to give _process_received_pdu the actual state events
  730. # rather than event ids, so generate that now.
  731. # First though we need to fetch all the events that are in
  732. # state_map, so we can build up the state below.
  733. evs = await self._store.get_events(
  734. list(state_map.values()),
  735. get_prev_content=False,
  736. redact_behaviour=EventRedactBehaviour.as_is,
  737. )
  738. event_map.update(evs)
  739. state = [event_map[e] for e in state_map.values()]
  740. except Exception:
  741. logger.warning(
  742. "Error attempting to resolve state at missing prev_events",
  743. exc_info=True,
  744. )
  745. raise FederationError(
  746. "ERROR",
  747. 403,
  748. "We can't get valid state history.",
  749. affected=event_id,
  750. )
  751. return state
  752. async def _get_state_after_missing_prev_event(
  753. self,
  754. destination: str,
  755. room_id: str,
  756. event_id: str,
  757. ) -> List[EventBase]:
  758. """Requests all of the room state at a given event from a remote homeserver.
  759. Args:
  760. destination: The remote homeserver to query for the state.
  761. room_id: The id of the room we're interested in.
  762. event_id: The id of the event we want the state at.
  763. Returns:
  764. A list of events in the state, including the event itself
  765. """
  766. (
  767. state_event_ids,
  768. auth_event_ids,
  769. ) = await self._federation_client.get_room_state_ids(
  770. destination, room_id, event_id=event_id
  771. )
  772. logger.debug(
  773. "state_ids returned %i state events, %i auth events",
  774. len(state_event_ids),
  775. len(auth_event_ids),
  776. )
  777. # start by just trying to fetch the events from the store
  778. desired_events = set(state_event_ids)
  779. desired_events.add(event_id)
  780. logger.debug("Fetching %i events from cache/store", len(desired_events))
  781. fetched_events = await self._store.get_events(
  782. desired_events, allow_rejected=True
  783. )
  784. missing_desired_events = desired_events - fetched_events.keys()
  785. logger.debug(
  786. "We are missing %i events (got %i)",
  787. len(missing_desired_events),
  788. len(fetched_events),
  789. )
  790. # We probably won't need most of the auth events, so let's just check which
  791. # we have for now, rather than thrashing the event cache with them all
  792. # unnecessarily.
  793. # TODO: we probably won't actually need all of the auth events, since we
  794. # already have a bunch of the state events. It would be nice if the
  795. # federation api gave us a way of finding out which we actually need.
  796. missing_auth_events = set(auth_event_ids) - fetched_events.keys()
  797. missing_auth_events.difference_update(
  798. await self._store.have_seen_events(room_id, missing_auth_events)
  799. )
  800. logger.debug("We are also missing %i auth events", len(missing_auth_events))
  801. missing_events = missing_desired_events | missing_auth_events
  802. # Making an individual request for each of 1000s of events has a lot of
  803. # overhead. On the other hand, we don't really want to fetch all of the events
  804. # if we already have most of them.
  805. #
  806. # As an arbitrary heuristic, if we are missing more than 10% of the events, then
  807. # we fetch the whole state.
  808. #
  809. # TODO: might it be better to have an API which lets us do an aggregate event
  810. # request
  811. if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
  812. logger.debug("Requesting complete state from remote")
  813. await self._get_state_and_persist(destination, room_id, event_id)
  814. else:
  815. logger.debug("Fetching %i events from remote", len(missing_events))
  816. await self._get_events_and_persist(
  817. destination=destination, room_id=room_id, event_ids=missing_events
  818. )
  819. # we need to make sure we re-load from the database to get the rejected
  820. # state correct.
  821. fetched_events.update(
  822. await self._store.get_events(missing_desired_events, allow_rejected=True)
  823. )
  824. # check for events which were in the wrong room.
  825. #
  826. # this can happen if a remote server claims that the state or
  827. # auth_events at an event in room A are actually events in room B
  828. bad_events = [
  829. (event_id, event.room_id)
  830. for event_id, event in fetched_events.items()
  831. if event.room_id != room_id
  832. ]
  833. for bad_event_id, bad_room_id in bad_events:
  834. # This is a bogus situation, but since we may only discover it a long time
  835. # after it happened, we try our best to carry on, by just omitting the
  836. # bad events from the returned state set.
  837. logger.warning(
  838. "Remote server %s claims event %s in room %s is an auth/state "
  839. "event in room %s",
  840. destination,
  841. bad_event_id,
  842. bad_room_id,
  843. room_id,
  844. )
  845. del fetched_events[bad_event_id]
  846. # if we couldn't get the prev event in question, that's a problem.
  847. remote_event = fetched_events.get(event_id)
  848. if not remote_event:
  849. raise Exception("Unable to get missing prev_event %s" % (event_id,))
  850. # missing state at that event is a warning, not a blocker
  851. # XXX: this doesn't sound right? it means that we'll end up with incomplete
  852. # state.
  853. failed_to_fetch = desired_events - fetched_events.keys()
  854. if failed_to_fetch:
  855. logger.warning(
  856. "Failed to fetch missing state events for %s %s",
  857. event_id,
  858. failed_to_fetch,
  859. )
  860. remote_state = [
  861. fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
  862. ]
  863. if remote_event.is_state() and remote_event.rejected_reason is None:
  864. remote_state.append(remote_event)
  865. return remote_state
  866. async def _get_state_and_persist(
  867. self, destination: str, room_id: str, event_id: str
  868. ) -> None:
  869. """Get the complete room state at a given event, and persist any new events
  870. as outliers"""
  871. room_version = await self._store.get_room_version(room_id)
  872. auth_events, state_events = await self._federation_client.get_room_state(
  873. destination, room_id, event_id=event_id, room_version=room_version
  874. )
  875. logger.info("/state returned %i events", len(auth_events) + len(state_events))
  876. await self._auth_and_persist_outliers(
  877. room_id, itertools.chain(auth_events, state_events)
  878. )
  879. # we also need the event itself.
  880. if not await self._store.have_seen_event(room_id, event_id):
  881. await self._get_events_and_persist(
  882. destination=destination, room_id=room_id, event_ids=(event_id,)
  883. )
  884. async def _process_received_pdu(
  885. self,
  886. origin: str,
  887. event: EventBase,
  888. state: Optional[Iterable[EventBase]],
  889. backfilled: bool = False,
  890. ) -> None:
  891. """Called when we have a new non-outlier event.
  892. This is called when we have a new event to add to the room DAG. This can be
  893. due to:
  894. * events received directly via a /send request
  895. * events retrieved via get_missing_events after a /send request
  896. * events backfilled after a client request.
  897. It's not currently used for events received from incoming send_{join,knock,leave}
  898. requests (which go via on_send_membership_event), nor for joins created by a
  899. remote join dance (which go via process_remote_join).
  900. We need to do auth checks and put it through the StateHandler.
  901. Args:
  902. origin: server sending the event
  903. event: event to be persisted
  904. state: Normally None, but if we are handling a gap in the graph
  905. (ie, we are missing one or more prev_events), the resolved state at the
  906. event
  907. backfilled: True if this is part of a historical batch of events (inhibits
  908. notification to clients, and validation of device keys.)
  909. """
  910. logger.debug("Processing event: %s", event)
  911. assert not event.internal_metadata.outlier
  912. try:
  913. context = await self._state_handler.compute_event_context(
  914. event, old_state=state
  915. )
  916. context = await self._check_event_auth(
  917. origin,
  918. event,
  919. context,
  920. )
  921. except AuthError as e:
  922. # FIXME richvdh 2021/10/07 I don't think this is reachable. Let's log it
  923. # for now
  924. logger.exception("Unexpected AuthError from _check_event_auth")
  925. raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
  926. if not backfilled and not context.rejected:
  927. # For new (non-backfilled and non-outlier) events we check if the event
  928. # passes auth based on the current state. If it doesn't then we
  929. # "soft-fail" the event.
  930. await self._check_for_soft_fail(event, state, origin=origin)
  931. await self._run_push_actions_and_persist_event(event, context, backfilled)
  932. await self._handle_marker_event(origin, event)
  933. if backfilled or context.rejected:
  934. return
  935. await self._maybe_kick_guest_users(event)
  936. # For encrypted messages we check that we know about the sending device,
  937. # if we don't then we mark the device cache for that user as stale.
  938. if event.type == EventTypes.Encrypted:
  939. device_id = event.content.get("device_id")
  940. sender_key = event.content.get("sender_key")
  941. cached_devices = await self._store.get_cached_devices_for_user(event.sender)
  942. resync = False # Whether we should resync device lists.
  943. device = None
  944. if device_id is not None:
  945. device = cached_devices.get(device_id)
  946. if device is None:
  947. logger.info(
  948. "Received event from remote device not in our cache: %s %s",
  949. event.sender,
  950. device_id,
  951. )
  952. resync = True
  953. # We also check if the `sender_key` matches what we expect.
  954. if sender_key is not None:
  955. # Figure out what sender key we're expecting. If we know the
  956. # device and recognize the algorithm then we can work out the
  957. # exact key to expect. Otherwise check it matches any key we
  958. # have for that device.
  959. current_keys: Container[str] = []
  960. if device:
  961. keys = device.get("keys", {}).get("keys", {})
  962. if (
  963. event.content.get("algorithm")
  964. == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
  965. ):
  966. # For this algorithm we expect a curve25519 key.
  967. key_name = "curve25519:%s" % (device_id,)
  968. current_keys = [keys.get(key_name)]
  969. else:
  970. # We don't know understand the algorithm, so we just
  971. # check it matches a key for the device.
  972. current_keys = keys.values()
  973. elif device_id:
  974. # We don't have any keys for the device ID.
  975. pass
  976. else:
  977. # The event didn't include a device ID, so we just look for
  978. # keys across all devices.
  979. current_keys = [
  980. key
  981. for device in cached_devices.values()
  982. for key in device.get("keys", {}).get("keys", {}).values()
  983. ]
  984. # We now check that the sender key matches (one of) the expected
  985. # keys.
  986. if sender_key not in current_keys:
  987. logger.info(
  988. "Received event from remote device with unexpected sender key: %s %s: %s",
  989. event.sender,
  990. device_id or "<no device_id>",
  991. sender_key,
  992. )
  993. resync = True
  994. if resync:
  995. run_as_background_process(
  996. "resync_device_due_to_pdu",
  997. self._resync_device,
  998. event.sender,
  999. )
  1000. async def _resync_device(self, sender: str) -> None:
  1001. """We have detected that the device list for the given user may be out
  1002. of sync, so we try and resync them.
  1003. """
  1004. try:
  1005. await self._store.mark_remote_user_device_cache_as_stale(sender)
  1006. # Immediately attempt a resync in the background
  1007. if self._config.worker.worker_app:
  1008. await self._user_device_resync(user_id=sender)
  1009. else:
  1010. await self._device_list_updater.user_device_resync(sender)
  1011. except Exception:
  1012. logger.exception("Failed to resync device for %s", sender)
  1013. async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
  1014. """Handles backfilling the insertion event when we receive a marker
  1015. event that points to one.
  1016. Args:
  1017. origin: Origin of the event. Will be called to get the insertion event
  1018. marker_event: The event to process
  1019. """
  1020. if marker_event.type != EventTypes.MSC2716_MARKER:
  1021. # Not a marker event
  1022. return
  1023. if marker_event.rejected_reason is not None:
  1024. # Rejected event
  1025. return
  1026. # Skip processing a marker event if the room version doesn't
  1027. # support it or the event is not from the room creator.
  1028. room_version = await self._store.get_room_version(marker_event.room_id)
  1029. create_event = await self._store.get_create_event_for_room(marker_event.room_id)
  1030. room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
  1031. if not room_version.msc2716_historical and (
  1032. not self._config.experimental.msc2716_enabled
  1033. or marker_event.sender != room_creator
  1034. ):
  1035. return
  1036. logger.debug("_handle_marker_event: received %s", marker_event)
  1037. insertion_event_id = marker_event.content.get(
  1038. EventContentFields.MSC2716_MARKER_INSERTION
  1039. )
  1040. if insertion_event_id is None:
  1041. # Nothing to retrieve then (invalid marker)
  1042. return
  1043. logger.debug(
  1044. "_handle_marker_event: backfilling insertion event %s", insertion_event_id
  1045. )
  1046. await self._get_events_and_persist(
  1047. origin,
  1048. marker_event.room_id,
  1049. [insertion_event_id],
  1050. )
  1051. insertion_event = await self._store.get_event(
  1052. insertion_event_id, allow_none=True
  1053. )
  1054. if insertion_event is None:
  1055. logger.warning(
  1056. "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
  1057. origin,
  1058. insertion_event_id,
  1059. marker_event.event_id,
  1060. )
  1061. return
  1062. logger.debug(
  1063. "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
  1064. insertion_event,
  1065. marker_event,
  1066. )
  1067. await self._store.insert_insertion_extremity(
  1068. insertion_event_id, marker_event.room_id
  1069. )
  1070. logger.debug(
  1071. "_handle_marker_event: insertion extremity added for %s from marker event %s",
  1072. insertion_event,
  1073. marker_event,
  1074. )
  1075. async def _get_events_and_persist(
  1076. self, destination: str, room_id: str, event_ids: Collection[str]
  1077. ) -> None:
  1078. """Fetch the given events from a server, and persist them as outliers.
  1079. This function *does not* recursively get missing auth events of the
  1080. newly fetched events. Callers must include in the `event_ids` argument
  1081. any missing events from the auth chain.
  1082. Logs a warning if we can't find the given event.
  1083. """
  1084. room_version = await self._store.get_room_version(room_id)
  1085. events: List[EventBase] = []
  1086. async def get_event(event_id: str) -> None:
  1087. with nested_logging_context(event_id):
  1088. try:
  1089. event = await self._federation_client.get_pdu(
  1090. [destination],
  1091. event_id,
  1092. room_version,
  1093. )
  1094. if event is None:
  1095. logger.warning(
  1096. "Server %s didn't return event %s",
  1097. destination,
  1098. event_id,
  1099. )
  1100. return
  1101. events.append(event)
  1102. except Exception as e:
  1103. logger.warning(
  1104. "Error fetching missing state/auth event %s: %s %s",
  1105. event_id,
  1106. type(e),
  1107. e,
  1108. )
  1109. await concurrently_execute(get_event, event_ids, 5)
  1110. logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
  1111. await self._auth_and_persist_outliers(room_id, events)
  1112. async def _auth_and_persist_outliers(
  1113. self, room_id: str, events: Iterable[EventBase]
  1114. ) -> None:
  1115. """Persist a batch of outlier events fetched from remote servers.
  1116. We first sort the events to make sure that we process each event's auth_events
  1117. before the event itself.
  1118. We then mark the events as outliers, persist them to the database, and, where
  1119. appropriate (eg, an invite), awake the notifier.
  1120. Params:
  1121. room_id: the room that the events are meant to be in (though this has
  1122. not yet been checked)
  1123. events: the events that have been fetched
  1124. """
  1125. event_map = {event.event_id: event for event in events}
  1126. # filter out any events we have already seen. This might happen because
  1127. # the events were eagerly pushed to us (eg, during a room join), or because
  1128. # another thread has raced against us since we decided to request the event.
  1129. #
  1130. # This is just an optimisation, so it doesn't need to be watertight - the event
  1131. # persister does another round of deduplication.
  1132. seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
  1133. for s in seen_remotes:
  1134. event_map.pop(s, None)
  1135. # XXX: it might be possible to kick this process off in parallel with fetching
  1136. # the events.
  1137. while event_map:
  1138. # build a list of events whose auth events are not in the queue.
  1139. roots = tuple(
  1140. ev
  1141. for ev in event_map.values()
  1142. if not any(aid in event_map for aid in ev.auth_event_ids())
  1143. )
  1144. if not roots:
  1145. # if *none* of the remaining events are ready, that means
  1146. # we have a loop. This either means a bug in our logic, or that
  1147. # somebody has managed to create a loop (which requires finding a
  1148. # hash collision in room v2 and later).
  1149. logger.warning(
  1150. "Loop found in auth events while fetching missing state/auth "
  1151. "events: %s",
  1152. shortstr(event_map.keys()),
  1153. )
  1154. return
  1155. logger.info(
  1156. "Persisting %i of %i remaining outliers: %s",
  1157. len(roots),
  1158. len(event_map),
  1159. shortstr(e.event_id for e in roots),
  1160. )
  1161. await self._auth_and_persist_outliers_inner(room_id, roots)
  1162. for ev in roots:
  1163. del event_map[ev.event_id]
  1164. async def _auth_and_persist_outliers_inner(
  1165. self, room_id: str, fetched_events: Collection[EventBase]
  1166. ) -> None:
  1167. """Helper for _auth_and_persist_outliers
  1168. Persists a batch of events where we have (theoretically) already persisted all
  1169. of their auth events.
  1170. Marks the events as outliers, auths them, persists them to the database, and,
  1171. where appropriate (eg, an invite), awakes the notifier.
  1172. Params:
  1173. origin: where the events came from
  1174. room_id: the room that the events are meant to be in (though this has
  1175. not yet been checked)
  1176. fetched_events: the events to persist
  1177. """
  1178. # get all the auth events for all the events in this batch. By now, they should
  1179. # have been persisted.
  1180. auth_events = {
  1181. aid for event in fetched_events for aid in event.auth_event_ids()
  1182. }
  1183. persisted_events = await self._store.get_events(
  1184. auth_events,
  1185. allow_rejected=True,
  1186. )
  1187. room_version = await self._store.get_room_version_id(room_id)
  1188. room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
  1189. def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
  1190. with nested_logging_context(suffix=event.event_id):
  1191. auth = []
  1192. for auth_event_id in event.auth_event_ids():
  1193. ae = persisted_events.get(auth_event_id)
  1194. if not ae:
  1195. # the fact we can't find the auth event doesn't mean it doesn't
  1196. # exist, which means it is premature to reject `event`. Instead we
  1197. # just ignore it for now.
  1198. logger.warning(
  1199. "Dropping event %s, which relies on auth_event %s, which could not be found",
  1200. event,
  1201. auth_event_id,
  1202. )
  1203. return None
  1204. auth.append(ae)
  1205. # we're not bothering about room state, so flag the event as an outlier.
  1206. event.internal_metadata.outlier = True
  1207. context = EventContext.for_outlier(self._storage)
  1208. try:
  1209. validate_event_for_room_version(room_version_obj, event)
  1210. check_auth_rules_for_event(room_version_obj, event, auth)
  1211. except AuthError as e:
  1212. logger.warning("Rejecting %r because %s", event, e)
  1213. context.rejected = RejectedReason.AUTH_ERROR
  1214. return event, context
  1215. events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
  1216. await self.persist_events_and_notify(
  1217. room_id,
  1218. tuple(events_to_persist),
  1219. # Mark these events backfilled as they're historic events that will
  1220. # eventually be backfilled. For example, missing events we fetch
  1221. # during backfill should be marked as backfilled as well.
  1222. backfilled=True,
  1223. )
  1224. async def _check_event_auth(
  1225. self,
  1226. origin: str,
  1227. event: EventBase,
  1228. context: EventContext,
  1229. ) -> EventContext:
  1230. """
  1231. Checks whether an event should be rejected (for failing auth checks).
  1232. Args:
  1233. origin: The host the event originates from.
  1234. event: The event itself.
  1235. context:
  1236. The event context.
  1237. Returns:
  1238. The updated context object.
  1239. Raises:
  1240. AuthError if we were unable to find copies of the event's auth events.
  1241. (Most other failures just cause us to set `context.rejected`.)
  1242. """
  1243. # This method should only be used for non-outliers
  1244. assert not event.internal_metadata.outlier
  1245. # first of all, check that the event itself is valid.
  1246. room_version = await self._store.get_room_version_id(event.room_id)
  1247. room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
  1248. try:
  1249. validate_event_for_room_version(room_version_obj, event)
  1250. except AuthError as e:
  1251. logger.warning("While validating received event %r: %s", event, e)
  1252. # TODO: use a different rejected reason here?
  1253. context.rejected = RejectedReason.AUTH_ERROR
  1254. return context
  1255. # next, check that we have all of the event's auth events.
  1256. #
  1257. # Note that this can raise AuthError, which we want to propagate to the
  1258. # caller rather than swallow with `context.rejected` (since we cannot be
  1259. # certain that there is a permanent problem with the event).
  1260. claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
  1261. origin, event
  1262. )
  1263. # ... and check that the event passes auth at those auth events.
  1264. try:
  1265. check_auth_rules_for_event(room_version_obj, event, claimed_auth_events)
  1266. except AuthError as e:
  1267. logger.warning(
  1268. "While checking auth of %r against auth_events: %s", event, e
  1269. )
  1270. context.rejected = RejectedReason.AUTH_ERROR
  1271. return context
  1272. # now check auth against what we think the auth events *should* be.
  1273. prev_state_ids = await context.get_prev_state_ids()
  1274. auth_events_ids = self._event_auth_handler.compute_auth_events(
  1275. event, prev_state_ids, for_verification=True
  1276. )
  1277. auth_events_x = await self._store.get_events(auth_events_ids)
  1278. calculated_auth_event_map = {
  1279. (e.type, e.state_key): e for e in auth_events_x.values()
  1280. }
  1281. try:
  1282. updated_auth_events = await self._update_auth_events_for_auth(
  1283. event,
  1284. calculated_auth_event_map=calculated_auth_event_map,
  1285. )
  1286. except Exception:
  1287. # We don't really mind if the above fails, so lets not fail
  1288. # processing if it does. However, it really shouldn't fail so
  1289. # let's still log as an exception since we'll still want to fix
  1290. # any bugs.
  1291. logger.exception(
  1292. "Failed to double check auth events for %s with remote. "
  1293. "Ignoring failure and continuing processing of event.",
  1294. event.event_id,
  1295. )
  1296. updated_auth_events = None
  1297. if updated_auth_events:
  1298. context = await self._update_context_for_auth_events(
  1299. event, context, updated_auth_events
  1300. )
  1301. auth_events_for_auth = updated_auth_events
  1302. else:
  1303. auth_events_for_auth = calculated_auth_event_map
  1304. try:
  1305. check_auth_rules_for_event(
  1306. room_version_obj, event, auth_events_for_auth.values()
  1307. )
  1308. except AuthError as e:
  1309. logger.warning("Failed auth resolution for %r because %s", event, e)
  1310. context.rejected = RejectedReason.AUTH_ERROR
  1311. return context
  1312. async def _maybe_kick_guest_users(self, event: EventBase) -> None:
  1313. if event.type != EventTypes.GuestAccess:
  1314. return
  1315. guest_access = event.content.get(EventContentFields.GUEST_ACCESS)
  1316. if guest_access == GuestAccess.CAN_JOIN:
  1317. return
  1318. current_state_map = await self._state_handler.get_current_state(event.room_id)
  1319. current_state = list(current_state_map.values())
  1320. await self._get_room_member_handler().kick_guest_users(current_state)
  1321. async def _check_for_soft_fail(
  1322. self,
  1323. event: EventBase,
  1324. state: Optional[Iterable[EventBase]],
  1325. origin: str,
  1326. ) -> None:
  1327. """Checks if we should soft fail the event; if so, marks the event as
  1328. such.
  1329. Args:
  1330. event
  1331. state: The state at the event if we don't have all the event's prev events
  1332. origin: The host the event originates from.
  1333. """
  1334. extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
  1335. extrem_ids = set(extrem_ids_list)
  1336. prev_event_ids = set(event.prev_event_ids())
  1337. if extrem_ids == prev_event_ids:
  1338. # If they're the same then the current state is the same as the
  1339. # state at the event, so no point rechecking auth for soft fail.
  1340. return
  1341. room_version = await self._store.get_room_version_id(event.room_id)
  1342. room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
  1343. # Calculate the "current state".
  1344. if state is not None:
  1345. # If we're explicitly given the state then we won't have all the
  1346. # prev events, and so we have a gap in the graph. In this case
  1347. # we want to be a little careful as we might have been down for
  1348. # a while and have an incorrect view of the current state,
  1349. # however we still want to do checks as gaps are easy to
  1350. # maliciously manufacture.
  1351. #
  1352. # So we use a "current state" that is actually a state
  1353. # resolution across the current forward extremities and the
  1354. # given state at the event. This should correctly handle cases
  1355. # like bans, especially with state res v2.
  1356. state_sets_d = await self._state_store.get_state_groups(
  1357. event.room_id, extrem_ids
  1358. )
  1359. state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
  1360. state_sets.append(state)
  1361. current_states = await self._state_handler.resolve_events(
  1362. room_version, state_sets, event
  1363. )
  1364. current_state_ids: StateMap[str] = {
  1365. k: e.event_id for k, e in current_states.items()
  1366. }
  1367. else:
  1368. current_state_ids = await self._state_handler.get_current_state_ids(
  1369. event.room_id, latest_event_ids=extrem_ids
  1370. )
  1371. logger.debug(
  1372. "Doing soft-fail check for %s: state %s",
  1373. event.event_id,
  1374. current_state_ids,
  1375. )
  1376. # Now check if event pass auth against said current state
  1377. auth_types = auth_types_for_event(room_version_obj, event)
  1378. current_state_ids_list = [
  1379. e for k, e in current_state_ids.items() if k in auth_types
  1380. ]
  1381. current_auth_events = await self._store.get_events_as_list(
  1382. current_state_ids_list
  1383. )
  1384. try:
  1385. check_auth_rules_for_event(room_version_obj, event, current_auth_events)
  1386. except AuthError as e:
  1387. logger.warning(
  1388. "Soft-failing %r (from %s) because %s",
  1389. event,
  1390. e,
  1391. origin,
  1392. extra={
  1393. "room_id": event.room_id,
  1394. "mxid": event.sender,
  1395. "hs": origin,
  1396. },
  1397. )
  1398. soft_failed_event_counter.inc()
  1399. event.internal_metadata.soft_failed = True
  1400. async def _update_auth_events_for_auth(
  1401. self,
  1402. event: EventBase,
  1403. calculated_auth_event_map: StateMap[EventBase],
  1404. ) -> Optional[StateMap[EventBase]]:
  1405. """Helper for _check_event_auth. See there for docs.
  1406. Checks whether a given event has the expected auth events. If it
  1407. doesn't then we talk to the remote server to compare state to see if
  1408. we can come to a consensus (e.g. if one server missed some valid
  1409. state).
  1410. This attempts to resolve any potential divergence of state between
  1411. servers, but is not essential and so failures should not block further
  1412. processing of the event.
  1413. Args:
  1414. event:
  1415. calculated_auth_event_map:
  1416. Our calculated auth_events based on the state of the room
  1417. at the event's position in the DAG.
  1418. Returns:
  1419. updated auth event map, or None if no changes are needed.
  1420. """
  1421. assert not event.internal_metadata.outlier
  1422. # check for events which are in the event's claimed auth_events, but not
  1423. # in our calculated event map.
  1424. event_auth_events = set(event.auth_event_ids())
  1425. different_auth = event_auth_events.difference(
  1426. e.event_id for e in calculated_auth_event_map.values()
  1427. )
  1428. if not different_auth:
  1429. return None
  1430. logger.info(
  1431. "auth_events refers to events which are not in our calculated auth "
  1432. "chain: %s",
  1433. different_auth,
  1434. )
  1435. # XXX: currently this checks for redactions but I'm not convinced that is
  1436. # necessary?
  1437. different_events = await self._store.get_events_as_list(different_auth)
  1438. # double-check they're all in the same room - we should already have checked
  1439. # this but it doesn't hurt to check again.
  1440. for d in different_events:
  1441. assert (
  1442. d.room_id == event.room_id
  1443. ), f"Event {event.event_id} refers to auth_event {d.event_id} which is in a different room"
  1444. # now we state-resolve between our own idea of the auth events, and the remote's
  1445. # idea of them.
  1446. local_state = calculated_auth_event_map.values()
  1447. remote_auth_events = dict(calculated_auth_event_map)
  1448. remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
  1449. remote_state = remote_auth_events.values()
  1450. room_version = await self._store.get_room_version_id(event.room_id)
  1451. new_state = await self._state_handler.resolve_events(
  1452. room_version, (local_state, remote_state), event
  1453. )
  1454. different_state = {
  1455. (d.type, d.state_key): d
  1456. for d in new_state.values()
  1457. if calculated_auth_event_map.get((d.type, d.state_key)) != d
  1458. }
  1459. if not different_state:
  1460. logger.info("State res returned no new state")
  1461. return None
  1462. logger.info(
  1463. "After state res: updating auth_events with new state %s",
  1464. different_state.values(),
  1465. )
  1466. # take a copy of calculated_auth_event_map before we modify it.
  1467. auth_events = dict(calculated_auth_event_map)
  1468. auth_events.update(different_state)
  1469. return auth_events
  1470. async def _load_or_fetch_auth_events_for_event(
  1471. self, destination: str, event: EventBase
  1472. ) -> Collection[EventBase]:
  1473. """Fetch this event's auth_events, from database or remote
  1474. Loads any of the auth_events that we already have from the database/cache. If
  1475. there are any that are missing, calls /event_auth to get the complete auth
  1476. chain for the event (and then attempts to load the auth_events again).
  1477. If any of the auth_events cannot be found, raises an AuthError. This can happen
  1478. for a number of reasons; eg: the events don't exist, or we were unable to talk
  1479. to `destination`, or we couldn't validate the signature on the event (which
  1480. in turn has multiple potential causes).
  1481. Args:
  1482. destination: where to send the /event_auth request. Typically the server
  1483. that sent us `event` in the first place.
  1484. event: the event whose auth_events we want
  1485. Returns:
  1486. all of the events listed in `event.auth_events_ids`, after deduplication
  1487. Raises:
  1488. AuthError if we were unable to fetch the auth_events for any reason.
  1489. """
  1490. event_auth_event_ids = set(event.auth_event_ids())
  1491. event_auth_events = await self._store.get_events(
  1492. event_auth_event_ids, allow_rejected=True
  1493. )
  1494. missing_auth_event_ids = event_auth_event_ids.difference(
  1495. event_auth_events.keys()
  1496. )
  1497. if not missing_auth_event_ids:
  1498. return event_auth_events.values()
  1499. logger.info(
  1500. "Event %s refers to unknown auth events %s: fetching auth chain",
  1501. event,
  1502. missing_auth_event_ids,
  1503. )
  1504. try:
  1505. await self._get_remote_auth_chain_for_event(
  1506. destination, event.room_id, event.event_id
  1507. )
  1508. except Exception as e:
  1509. logger.warning("Failed to get auth chain for %s: %s", event, e)
  1510. # in this case, it's very likely we still won't have all the auth
  1511. # events - but we pick that up below.
  1512. # try to fetch the auth events we missed list time.
  1513. extra_auth_events = await self._store.get_events(
  1514. missing_auth_event_ids, allow_rejected=True
  1515. )
  1516. missing_auth_event_ids.difference_update(extra_auth_events.keys())
  1517. event_auth_events.update(extra_auth_events)
  1518. if not missing_auth_event_ids:
  1519. return event_auth_events.values()
  1520. # we still don't have all the auth events.
  1521. logger.warning(
  1522. "Missing auth events for %s: %s",
  1523. event,
  1524. shortstr(missing_auth_event_ids),
  1525. )
  1526. # the fact we can't find the auth event doesn't mean it doesn't
  1527. # exist, which means it is premature to store `event` as rejected.
  1528. # instead we raise an AuthError, which will make the caller ignore it.
  1529. raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
  1530. async def _get_remote_auth_chain_for_event(
  1531. self, destination: str, room_id: str, event_id: str
  1532. ) -> None:
  1533. """If we are missing some of an event's auth events, attempt to request them
  1534. Args:
  1535. destination: where to fetch the auth tree from
  1536. room_id: the room in which we are lacking auth events
  1537. event_id: the event for which we are lacking auth events
  1538. """
  1539. try:
  1540. remote_events = await self._federation_client.get_event_auth(
  1541. destination, room_id, event_id
  1542. )
  1543. except RequestSendFailed as e1:
  1544. # The other side isn't around or doesn't implement the
  1545. # endpoint, so lets just bail out.
  1546. logger.info("Failed to get event auth from remote: %s", e1)
  1547. return
  1548. logger.info("/event_auth returned %i events", len(remote_events))
  1549. # `event` may be returned, but we should not yet process it.
  1550. remote_auth_events = (e for e in remote_events if e.event_id != event_id)
  1551. await self._auth_and_persist_outliers(room_id, remote_auth_events)
  1552. async def _update_context_for_auth_events(
  1553. self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
  1554. ) -> EventContext:
  1555. """Update the state_ids in an event context after auth event resolution,
  1556. storing the changes as a new state group.
  1557. Args:
  1558. event: The event we're handling the context for
  1559. context: initial event context
  1560. auth_events: Events to update in the event context.
  1561. Returns:
  1562. new event context
  1563. """
  1564. # exclude the state key of the new event from the current_state in the context.
  1565. if event.is_state():
  1566. event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
  1567. else:
  1568. event_key = None
  1569. state_updates = {
  1570. k: a.event_id for k, a in auth_events.items() if k != event_key
  1571. }
  1572. current_state_ids = await context.get_current_state_ids()
  1573. current_state_ids = dict(current_state_ids) # type: ignore
  1574. current_state_ids.update(state_updates)
  1575. prev_state_ids = await context.get_prev_state_ids()
  1576. prev_state_ids = dict(prev_state_ids)
  1577. prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
  1578. # create a new state group as a delta from the existing one.
  1579. prev_group = context.state_group
  1580. state_group = await self._state_store.store_state_group(
  1581. event.event_id,
  1582. event.room_id,
  1583. prev_group=prev_group,
  1584. delta_ids=state_updates,
  1585. current_state_ids=current_state_ids,
  1586. )
  1587. return EventContext.with_state(
  1588. storage=self._storage,
  1589. state_group=state_group,
  1590. state_group_before_event=context.state_group_before_event,
  1591. state_delta_due_to_event=state_updates,
  1592. prev_group=prev_group,
  1593. delta_ids=state_updates,
  1594. partial_state=context.partial_state,
  1595. )
  1596. async def _run_push_actions_and_persist_event(
  1597. self, event: EventBase, context: EventContext, backfilled: bool = False
  1598. ) -> None:
  1599. """Run the push actions for a received event, and persist it.
  1600. Args:
  1601. event: The event itself.
  1602. context: The event context.
  1603. backfilled: True if the event was backfilled.
  1604. """
  1605. # this method should not be called on outliers (those code paths call
  1606. # persist_events_and_notify directly.)
  1607. assert not event.internal_metadata.outlier
  1608. if not backfilled and not context.rejected:
  1609. min_depth = await self._store.get_min_depth(event.room_id)
  1610. if min_depth is None or min_depth > event.depth:
  1611. # XXX richvdh 2021/10/07: I don't really understand what this
  1612. # condition is doing. I think it's trying not to send pushes
  1613. # for events that predate our join - but that's not really what
  1614. # min_depth means, and anyway ancient events are a more general
  1615. # problem.
  1616. #
  1617. # for now I'm just going to log about it.
  1618. logger.info(
  1619. "Skipping push actions for old event with depth %s < %s",
  1620. event.depth,
  1621. min_depth,
  1622. )
  1623. else:
  1624. await self._bulk_push_rule_evaluator.action_for_event_by_user(
  1625. event, context
  1626. )
  1627. try:
  1628. await self.persist_events_and_notify(
  1629. event.room_id, [(event, context)], backfilled=backfilled
  1630. )
  1631. except Exception:
  1632. run_in_background(
  1633. self._store.remove_push_actions_from_staging, event.event_id
  1634. )
  1635. raise
  1636. async def persist_events_and_notify(
  1637. self,
  1638. room_id: str,
  1639. event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
  1640. backfilled: bool = False,
  1641. ) -> int:
  1642. """Persists events and tells the notifier/pushers about them, if
  1643. necessary.
  1644. Args:
  1645. room_id: The room ID of events being persisted.
  1646. event_and_contexts: Sequence of events with their associated
  1647. context that should be persisted. All events must belong to
  1648. the same room.
  1649. backfilled: Whether these events are a result of
  1650. backfilling or not
  1651. Returns:
  1652. The stream ID after which all events have been persisted.
  1653. """
  1654. if not event_and_contexts:
  1655. return self._store.get_room_max_stream_ordering()
  1656. instance = self._config.worker.events_shard_config.get_instance(room_id)
  1657. if instance != self._instance_name:
  1658. # Limit the number of events sent over replication. We choose 200
  1659. # here as that is what we default to in `max_request_body_size(..)`
  1660. for batch in batch_iter(event_and_contexts, 200):
  1661. result = await self._send_events(
  1662. instance_name=instance,
  1663. store=self._store,
  1664. room_id=room_id,
  1665. event_and_contexts=batch,
  1666. backfilled=backfilled,
  1667. )
  1668. return result["max_stream_id"]
  1669. else:
  1670. assert self._storage.persistence
  1671. # Note that this returns the events that were persisted, which may not be
  1672. # the same as were passed in if some were deduplicated due to transaction IDs.
  1673. events, max_stream_token = await self._storage.persistence.persist_events(
  1674. event_and_contexts, backfilled=backfilled
  1675. )
  1676. if self._ephemeral_messages_enabled:
  1677. for event in events:
  1678. # If there's an expiry timestamp on the event, schedule its expiry.
  1679. self._message_handler.maybe_schedule_expiry(event)
  1680. if not backfilled: # Never notify for backfilled events
  1681. for event in events:
  1682. await self._notify_persisted_event(event, max_stream_token)
  1683. return max_stream_token.stream
  1684. async def _notify_persisted_event(
  1685. self, event: EventBase, max_stream_token: RoomStreamToken
  1686. ) -> None:
  1687. """Checks to see if notifier/pushers should be notified about the
  1688. event or not.
  1689. Args:
  1690. event:
  1691. max_stream_token: The max_stream_id returned by persist_events
  1692. """
  1693. extra_users = []
  1694. if event.type == EventTypes.Member:
  1695. target_user_id = event.state_key
  1696. # We notify for memberships if its an invite for one of our
  1697. # users
  1698. if event.internal_metadata.is_outlier():
  1699. if event.membership != Membership.INVITE:
  1700. if not self._is_mine_id(target_user_id):
  1701. return
  1702. target_user = UserID.from_string(target_user_id)
  1703. extra_users.append(target_user)
  1704. elif event.internal_metadata.is_outlier():
  1705. return
  1706. # the event has been persisted so it should have a stream ordering.
  1707. assert event.internal_metadata.stream_ordering
  1708. event_pos = PersistedEventPosition(
  1709. self._instance_name, event.internal_metadata.stream_ordering
  1710. )
  1711. await self._notifier.on_new_room_event(
  1712. event, event_pos, max_stream_token, extra_users=extra_users
  1713. )
  1714. def _sanity_check_event(self, ev: EventBase) -> None:
  1715. """
  1716. Do some early sanity checks of a received event
  1717. In particular, checks it doesn't have an excessive number of
  1718. prev_events or auth_events, which could cause a huge state resolution
  1719. or cascade of event fetches.
  1720. Args:
  1721. ev: event to be checked
  1722. Raises:
  1723. SynapseError if the event does not pass muster
  1724. """
  1725. if len(ev.prev_event_ids()) > 20:
  1726. logger.warning(
  1727. "Rejecting event %s which has %i prev_events",
  1728. ev.event_id,
  1729. len(ev.prev_event_ids()),
  1730. )
  1731. raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
  1732. if len(ev.auth_event_ids()) > 10:
  1733. logger.warning(
  1734. "Rejecting event %s which has %i auth_events",
  1735. ev.event_id,
  1736. len(ev.auth_event_ids()),
  1737. )
  1738. raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")