federation.py 89 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """Contains handlers for federation events."""
  17. import itertools
  18. import logging
  19. import sys
  20. import six
  21. from six import iteritems
  22. from six.moves import http_client
  23. from signedjson.key import decode_verify_key_bytes
  24. from signedjson.sign import verify_signed_json
  25. from unpaddedbase64 import decode_base64
  26. from twisted.internet import defer
  27. from synapse.api.constants import EventTypes, Membership, RejectedReason
  28. from synapse.api.errors import (
  29. AuthError,
  30. CodeMessageException,
  31. FederationDeniedError,
  32. FederationError,
  33. StoreError,
  34. SynapseError,
  35. )
  36. from synapse.crypto.event_signing import (
  37. add_hashes_and_signatures,
  38. compute_event_signature,
  39. )
  40. from synapse.events.utils import prune_event
  41. from synapse.events.validator import EventValidator
  42. from synapse.state import resolve_events_with_factory
  43. from synapse.types import UserID, get_domain_from_id
  44. from synapse.util import logcontext, unwrapFirstError
  45. from synapse.util.async import Linearizer
  46. from synapse.util.distributor import user_joined_room
  47. from synapse.util.frozenutils import unfreeze
  48. from synapse.util.logutils import log_function
  49. from synapse.util.metrics import measure_func
  50. from synapse.util.retryutils import NotRetryingDestination
  51. from ._base import BaseHandler
  52. logger = logging.getLogger(__name__)
  53. class FederationHandler(BaseHandler):
  54. """Handles events that originated from federation.
  55. Responsible for:
  56. a) handling received Pdus before handing them on as Events to the rest
  57. of the home server (including auth and state conflict resoultion)
  58. b) converting events that were produced by local clients that may need
  59. to be sent to remote home servers.
  60. c) doing the necessary dances to invite remote users and join remote
  61. rooms.
  62. """
  63. def __init__(self, hs):
  64. super(FederationHandler, self).__init__(hs)
  65. self.hs = hs
  66. self.store = hs.get_datastore()
  67. self.replication_layer = hs.get_federation_client()
  68. self.state_handler = hs.get_state_handler()
  69. self.server_name = hs.hostname
  70. self.keyring = hs.get_keyring()
  71. self.action_generator = hs.get_action_generator()
  72. self.is_mine_id = hs.is_mine_id
  73. self.pusher_pool = hs.get_pusherpool()
  74. self.spam_checker = hs.get_spam_checker()
  75. self.event_creation_handler = hs.get_event_creation_handler()
  76. self._server_notices_mxid = hs.config.server_notices_mxid
  77. # When joining a room we need to queue any events for that room up
  78. self.room_queues = {}
  79. self._room_pdu_linearizer = Linearizer("fed_room_pdu")
  80. @defer.inlineCallbacks
  81. @log_function
  82. def on_receive_pdu(
  83. self, origin, pdu, get_missing=True, sent_to_us_directly=False,
  84. ):
  85. """ Process a PDU received via a federation /send/ transaction, or
  86. via backfill of missing prev_events
  87. Args:
  88. origin (str): server which initiated the /send/ transaction. Will
  89. be used to fetch missing events or state.
  90. pdu (FrozenEvent): received PDU
  91. get_missing (bool): True if we should fetch missing prev_events
  92. Returns (Deferred): completes with None
  93. """
  94. # We reprocess pdus when we have seen them only as outliers
  95. existing = yield self.store.get_event(
  96. pdu.event_id,
  97. allow_none=True,
  98. allow_rejected=True,
  99. )
  100. # FIXME: Currently we fetch an event again when we already have it
  101. # if it has been marked as an outlier.
  102. already_seen = (
  103. existing and (
  104. not existing.internal_metadata.is_outlier()
  105. or pdu.internal_metadata.is_outlier()
  106. )
  107. )
  108. if already_seen:
  109. logger.debug("Already seen pdu %s", pdu.event_id)
  110. return
  111. # do some initial sanity-checking of the event. In particular, make
  112. # sure it doesn't have hundreds of prev_events or auth_events, which
  113. # could cause a huge state resolution or cascade of event fetches.
  114. try:
  115. self._sanity_check_event(pdu)
  116. except SynapseError as err:
  117. raise FederationError(
  118. "ERROR",
  119. err.code,
  120. err.msg,
  121. affected=pdu.event_id,
  122. )
  123. # If we are currently in the process of joining this room, then we
  124. # queue up events for later processing.
  125. if pdu.room_id in self.room_queues:
  126. logger.info("Ignoring PDU %s for room %s from %s for now; join "
  127. "in progress", pdu.event_id, pdu.room_id, origin)
  128. self.room_queues[pdu.room_id].append((pdu, origin))
  129. return
  130. # If we're no longer in the room just ditch the event entirely. This
  131. # is probably an old server that has come back and thinks we're still
  132. # in the room (or we've been rejoined to the room by a state reset).
  133. #
  134. # If we were never in the room then maybe our database got vaped and
  135. # we should check if we *are* in fact in the room. If we are then we
  136. # can magically rejoin the room.
  137. is_in_room = yield self.auth.check_host_in_room(
  138. pdu.room_id,
  139. self.server_name
  140. )
  141. if not is_in_room:
  142. was_in_room = yield self.store.was_host_joined(
  143. pdu.room_id, self.server_name,
  144. )
  145. if was_in_room:
  146. logger.info(
  147. "Ignoring PDU %s for room %s from %s as we've left the room!",
  148. pdu.event_id, pdu.room_id, origin,
  149. )
  150. defer.returnValue(None)
  151. state = None
  152. auth_chain = []
  153. # Get missing pdus if necessary.
  154. if not pdu.internal_metadata.is_outlier():
  155. # We only backfill backwards to the min depth.
  156. min_depth = yield self.get_min_depth_for_context(
  157. pdu.room_id
  158. )
  159. logger.debug(
  160. "_handle_new_pdu min_depth for %s: %d",
  161. pdu.room_id, min_depth
  162. )
  163. prevs = {e_id for e_id, _ in pdu.prev_events}
  164. seen = yield self.store.have_seen_events(prevs)
  165. if min_depth and pdu.depth < min_depth:
  166. # This is so that we don't notify the user about this
  167. # message, to work around the fact that some events will
  168. # reference really really old events we really don't want to
  169. # send to the clients.
  170. pdu.internal_metadata.outlier = True
  171. elif min_depth and pdu.depth > min_depth:
  172. if get_missing and prevs - seen:
  173. # If we're missing stuff, ensure we only fetch stuff one
  174. # at a time.
  175. logger.info(
  176. "Acquiring lock for room %r to fetch %d missing events: %r...",
  177. pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
  178. )
  179. with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
  180. logger.info(
  181. "Acquired lock for room %r to fetch %d missing events",
  182. pdu.room_id, len(prevs - seen),
  183. )
  184. yield self._get_missing_events_for_pdu(
  185. origin, pdu, prevs, min_depth
  186. )
  187. # Update the set of things we've seen after trying to
  188. # fetch the missing stuff
  189. seen = yield self.store.have_seen_events(prevs)
  190. if not prevs - seen:
  191. logger.info(
  192. "Found all missing prev events for %s", pdu.event_id
  193. )
  194. elif prevs - seen:
  195. logger.info(
  196. "Not fetching %d missing events for room %r,event %s: %r...",
  197. len(prevs - seen), pdu.room_id, pdu.event_id,
  198. list(prevs - seen)[:5],
  199. )
  200. if sent_to_us_directly and prevs - seen:
  201. # If they have sent it to us directly, and the server
  202. # isn't telling us about the auth events that it's
  203. # made a message referencing, we explode
  204. raise FederationError(
  205. "ERROR",
  206. 403,
  207. (
  208. "Your server isn't divulging details about prev_events "
  209. "referenced in this event."
  210. ),
  211. affected=pdu.event_id,
  212. )
  213. elif prevs - seen:
  214. # Calculate the state of the previous events, and
  215. # de-conflict them to find the current state.
  216. state_groups = []
  217. auth_chains = set()
  218. try:
  219. # Get the state of the events we know about
  220. ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
  221. state_groups.append(ours)
  222. # Ask the remote server for the states we don't
  223. # know about
  224. for p in prevs - seen:
  225. state, got_auth_chain = (
  226. yield self.replication_layer.get_state_for_room(
  227. origin, pdu.room_id, p
  228. )
  229. )
  230. auth_chains.update(got_auth_chain)
  231. state_group = {(x.type, x.state_key): x.event_id for x in state}
  232. state_groups.append(state_group)
  233. # Resolve any conflicting state
  234. def fetch(ev_ids):
  235. return self.store.get_events(
  236. ev_ids, get_prev_content=False, check_redacted=False
  237. )
  238. state_map = yield resolve_events_with_factory(
  239. state_groups, {pdu.event_id: pdu}, fetch
  240. )
  241. state = (yield self.store.get_events(state_map.values())).values()
  242. auth_chain = list(auth_chains)
  243. except Exception:
  244. raise FederationError(
  245. "ERROR",
  246. 403,
  247. "We can't get valid state history.",
  248. affected=pdu.event_id,
  249. )
  250. yield self._process_received_pdu(
  251. origin,
  252. pdu,
  253. state=state,
  254. auth_chain=auth_chain,
  255. )
  256. @defer.inlineCallbacks
  257. def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
  258. """
  259. Args:
  260. origin (str): Origin of the pdu. Will be called to get the missing events
  261. pdu: received pdu
  262. prevs (set(str)): List of event ids which we are missing
  263. min_depth (int): Minimum depth of events to return.
  264. """
  265. # We recalculate seen, since it may have changed.
  266. seen = yield self.store.have_seen_events(prevs)
  267. if not prevs - seen:
  268. return
  269. latest = yield self.store.get_latest_event_ids_in_room(
  270. pdu.room_id
  271. )
  272. # We add the prev events that we have seen to the latest
  273. # list to ensure the remote server doesn't give them to us
  274. latest = set(latest)
  275. latest |= seen
  276. logger.info(
  277. "Missing %d events for room %r pdu %s: %r...",
  278. len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
  279. )
  280. # XXX: we set timeout to 10s to help workaround
  281. # https://github.com/matrix-org/synapse/issues/1733.
  282. # The reason is to avoid holding the linearizer lock
  283. # whilst processing inbound /send transactions, causing
  284. # FDs to stack up and block other inbound transactions
  285. # which empirically can currently take up to 30 minutes.
  286. #
  287. # N.B. this explicitly disables retry attempts.
  288. #
  289. # N.B. this also increases our chances of falling back to
  290. # fetching fresh state for the room if the missing event
  291. # can't be found, which slightly reduces our security.
  292. # it may also increase our DAG extremity count for the room,
  293. # causing additional state resolution? See #1760.
  294. # However, fetching state doesn't hold the linearizer lock
  295. # apparently.
  296. #
  297. # see https://github.com/matrix-org/synapse/pull/1744
  298. missing_events = yield self.replication_layer.get_missing_events(
  299. origin,
  300. pdu.room_id,
  301. earliest_events_ids=list(latest),
  302. latest_events=[pdu],
  303. limit=10,
  304. min_depth=min_depth,
  305. timeout=10000,
  306. )
  307. logger.info(
  308. "Got %d events: %r...",
  309. len(missing_events), [e.event_id for e in missing_events[:5]]
  310. )
  311. # We want to sort these by depth so we process them and
  312. # tell clients about them in order.
  313. missing_events.sort(key=lambda x: x.depth)
  314. for e in missing_events:
  315. logger.info("Handling found event %s", e.event_id)
  316. try:
  317. yield self.on_receive_pdu(
  318. origin,
  319. e,
  320. get_missing=False
  321. )
  322. except FederationError as e:
  323. if e.code == 403:
  324. logger.warn("Event %s failed history check.")
  325. else:
  326. raise
  327. @log_function
  328. @defer.inlineCallbacks
  329. def _process_received_pdu(self, origin, pdu, state, auth_chain):
  330. """ Called when we have a new pdu. We need to do auth checks and put it
  331. through the StateHandler.
  332. """
  333. event = pdu
  334. logger.debug("Processing event: %s", event)
  335. # FIXME (erikj): Awful hack to make the case where we are not currently
  336. # in the room work
  337. # If state and auth_chain are None, then we don't need to do this check
  338. # as we already know we have enough state in the DB to handle this
  339. # event.
  340. if state and auth_chain and not event.internal_metadata.is_outlier():
  341. is_in_room = yield self.auth.check_host_in_room(
  342. event.room_id,
  343. self.server_name
  344. )
  345. else:
  346. is_in_room = True
  347. if not is_in_room:
  348. logger.info(
  349. "Got event for room we're not in: %r %r",
  350. event.room_id, event.event_id
  351. )
  352. try:
  353. event_stream_id, max_stream_id = yield self._persist_auth_tree(
  354. origin, auth_chain, state, event
  355. )
  356. except AuthError as e:
  357. raise FederationError(
  358. "ERROR",
  359. e.code,
  360. e.msg,
  361. affected=event.event_id,
  362. )
  363. else:
  364. event_ids = set()
  365. if state:
  366. event_ids |= {e.event_id for e in state}
  367. if auth_chain:
  368. event_ids |= {e.event_id for e in auth_chain}
  369. seen_ids = yield self.store.have_seen_events(event_ids)
  370. if state and auth_chain is not None:
  371. # If we have any state or auth_chain given to us by the replication
  372. # layer, then we should handle them (if we haven't before.)
  373. event_infos = []
  374. for e in itertools.chain(auth_chain, state):
  375. if e.event_id in seen_ids:
  376. continue
  377. e.internal_metadata.outlier = True
  378. auth_ids = [e_id for e_id, _ in e.auth_events]
  379. auth = {
  380. (e.type, e.state_key): e for e in auth_chain
  381. if e.event_id in auth_ids or e.type == EventTypes.Create
  382. }
  383. event_infos.append({
  384. "event": e,
  385. "auth_events": auth,
  386. })
  387. seen_ids.add(e.event_id)
  388. yield self._handle_new_events(origin, event_infos)
  389. try:
  390. context, event_stream_id, max_stream_id = yield self._handle_new_event(
  391. origin,
  392. event,
  393. state=state,
  394. )
  395. except AuthError as e:
  396. raise FederationError(
  397. "ERROR",
  398. e.code,
  399. e.msg,
  400. affected=event.event_id,
  401. )
  402. room = yield self.store.get_room(event.room_id)
  403. if not room:
  404. try:
  405. yield self.store.store_room(
  406. room_id=event.room_id,
  407. room_creator_user_id="",
  408. is_public=False,
  409. )
  410. except StoreError:
  411. logger.exception("Failed to store room.")
  412. extra_users = []
  413. if event.type == EventTypes.Member:
  414. target_user_id = event.state_key
  415. target_user = UserID.from_string(target_user_id)
  416. extra_users.append(target_user)
  417. self.notifier.on_new_room_event(
  418. event, event_stream_id, max_stream_id,
  419. extra_users=extra_users
  420. )
  421. if event.type == EventTypes.Member:
  422. if event.membership == Membership.JOIN:
  423. # Only fire user_joined_room if the user has acutally
  424. # joined the room. Don't bother if the user is just
  425. # changing their profile info.
  426. newly_joined = True
  427. prev_state_id = context.prev_state_ids.get(
  428. (event.type, event.state_key)
  429. )
  430. if prev_state_id:
  431. prev_state = yield self.store.get_event(
  432. prev_state_id, allow_none=True,
  433. )
  434. if prev_state and prev_state.membership == Membership.JOIN:
  435. newly_joined = False
  436. if newly_joined:
  437. user = UserID.from_string(event.state_key)
  438. yield user_joined_room(self.distributor, user, event.room_id)
  439. @measure_func("_filter_events_for_server")
  440. @defer.inlineCallbacks
  441. def _filter_events_for_server(self, server_name, room_id, events):
  442. """Filter the given events for the given server, redacting those the
  443. server can't see.
  444. Assumes the server is currently in the room.
  445. Returns
  446. list[FrozenEvent]
  447. """
  448. # First lets check to see if all the events have a history visibility
  449. # of "shared" or "world_readable". If thats the case then we don't
  450. # need to check membership (as we know the server is in the room).
  451. event_to_state_ids = yield self.store.get_state_ids_for_events(
  452. frozenset(e.event_id for e in events),
  453. types=(
  454. (EventTypes.RoomHistoryVisibility, ""),
  455. )
  456. )
  457. visibility_ids = set()
  458. for sids in event_to_state_ids.itervalues():
  459. hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
  460. if hist:
  461. visibility_ids.add(hist)
  462. # If we failed to find any history visibility events then the default
  463. # is "shared" visiblity.
  464. if not visibility_ids:
  465. defer.returnValue(events)
  466. event_map = yield self.store.get_events(visibility_ids)
  467. all_open = all(
  468. e.content.get("history_visibility") in (None, "shared", "world_readable")
  469. for e in event_map.itervalues()
  470. )
  471. if all_open:
  472. defer.returnValue(events)
  473. # Ok, so we're dealing with events that have non-trivial visibility
  474. # rules, so we need to also get the memberships of the room.
  475. event_to_state_ids = yield self.store.get_state_ids_for_events(
  476. frozenset(e.event_id for e in events),
  477. types=(
  478. (EventTypes.RoomHistoryVisibility, ""),
  479. (EventTypes.Member, None),
  480. )
  481. )
  482. # We only want to pull out member events that correspond to the
  483. # server's domain.
  484. def check_match(id):
  485. try:
  486. return server_name == get_domain_from_id(id)
  487. except Exception:
  488. return False
  489. # Parses mapping `event_id -> (type, state_key) -> state event_id`
  490. # to get all state ids that we're interested in.
  491. event_map = yield self.store.get_events([
  492. e_id
  493. for key_to_eid in list(event_to_state_ids.values())
  494. for key, e_id in key_to_eid.items()
  495. if key[0] != EventTypes.Member or check_match(key[1])
  496. ])
  497. event_to_state = {
  498. e_id: {
  499. key: event_map[inner_e_id]
  500. for key, inner_e_id in key_to_eid.iteritems()
  501. if inner_e_id in event_map
  502. }
  503. for e_id, key_to_eid in event_to_state_ids.iteritems()
  504. }
  505. erased_senders = yield self.store.are_users_erased(
  506. e.sender for e in events,
  507. )
  508. def redact_disallowed(event, state):
  509. # if the sender has been gdpr17ed, always return a redacted
  510. # copy of the event.
  511. if erased_senders[event.sender]:
  512. logger.info(
  513. "Sender of %s has been erased, redacting",
  514. event.event_id,
  515. )
  516. return prune_event(event)
  517. if not state:
  518. return event
  519. history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
  520. if history:
  521. visibility = history.content.get("history_visibility", "shared")
  522. if visibility in ["invited", "joined"]:
  523. # We now loop through all state events looking for
  524. # membership states for the requesting server to determine
  525. # if the server is either in the room or has been invited
  526. # into the room.
  527. for ev in state.itervalues():
  528. if ev.type != EventTypes.Member:
  529. continue
  530. try:
  531. domain = get_domain_from_id(ev.state_key)
  532. except Exception:
  533. continue
  534. if domain != server_name:
  535. continue
  536. memtype = ev.membership
  537. if memtype == Membership.JOIN:
  538. return event
  539. elif memtype == Membership.INVITE:
  540. if visibility == "invited":
  541. return event
  542. else:
  543. return prune_event(event)
  544. return event
  545. defer.returnValue([
  546. redact_disallowed(e, event_to_state[e.event_id])
  547. for e in events
  548. ])
  549. @log_function
  550. @defer.inlineCallbacks
  551. def backfill(self, dest, room_id, limit, extremities):
  552. """ Trigger a backfill request to `dest` for the given `room_id`
  553. This will attempt to get more events from the remote. If the other side
  554. has no new events to offer, this will return an empty list.
  555. As the events are received, we check their signatures, and also do some
  556. sanity-checking on them. If any of the backfilled events are invalid,
  557. this method throws a SynapseError.
  558. TODO: make this more useful to distinguish failures of the remote
  559. server from invalid events (there is probably no point in trying to
  560. re-fetch invalid events from every other HS in the room.)
  561. """
  562. if dest == self.server_name:
  563. raise SynapseError(400, "Can't backfill from self.")
  564. events = yield self.replication_layer.backfill(
  565. dest,
  566. room_id,
  567. limit=limit,
  568. extremities=extremities,
  569. )
  570. # ideally we'd sanity check the events here for excess prev_events etc,
  571. # but it's hard to reject events at this point without completely
  572. # breaking backfill in the same way that it is currently broken by
  573. # events whose signature we cannot verify (#3121).
  574. #
  575. # So for now we accept the events anyway. #3124 tracks this.
  576. #
  577. # for ev in events:
  578. # self._sanity_check_event(ev)
  579. # Don't bother processing events we already have.
  580. seen_events = yield self.store.have_events_in_timeline(
  581. set(e.event_id for e in events)
  582. )
  583. events = [e for e in events if e.event_id not in seen_events]
  584. if not events:
  585. defer.returnValue([])
  586. event_map = {e.event_id: e for e in events}
  587. event_ids = set(e.event_id for e in events)
  588. edges = [
  589. ev.event_id
  590. for ev in events
  591. if set(e_id for e_id, _ in ev.prev_events) - event_ids
  592. ]
  593. logger.info(
  594. "backfill: Got %d events with %d edges",
  595. len(events), len(edges),
  596. )
  597. # For each edge get the current state.
  598. auth_events = {}
  599. state_events = {}
  600. events_to_state = {}
  601. for e_id in edges:
  602. state, auth = yield self.replication_layer.get_state_for_room(
  603. destination=dest,
  604. room_id=room_id,
  605. event_id=e_id
  606. )
  607. auth_events.update({a.event_id: a for a in auth})
  608. auth_events.update({s.event_id: s for s in state})
  609. state_events.update({s.event_id: s for s in state})
  610. events_to_state[e_id] = state
  611. required_auth = set(
  612. a_id
  613. for event in events + state_events.values() + auth_events.values()
  614. for a_id, _ in event.auth_events
  615. )
  616. auth_events.update({
  617. e_id: event_map[e_id] for e_id in required_auth if e_id in event_map
  618. })
  619. missing_auth = required_auth - set(auth_events)
  620. failed_to_fetch = set()
  621. # Try and fetch any missing auth events from both DB and remote servers.
  622. # We repeatedly do this until we stop finding new auth events.
  623. while missing_auth - failed_to_fetch:
  624. logger.info("Missing auth for backfill: %r", missing_auth)
  625. ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
  626. auth_events.update(ret_events)
  627. required_auth.update(
  628. a_id for event in ret_events.values() for a_id, _ in event.auth_events
  629. )
  630. missing_auth = required_auth - set(auth_events)
  631. if missing_auth - failed_to_fetch:
  632. logger.info(
  633. "Fetching missing auth for backfill: %r",
  634. missing_auth - failed_to_fetch
  635. )
  636. results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
  637. [
  638. logcontext.run_in_background(
  639. self.replication_layer.get_pdu,
  640. [dest],
  641. event_id,
  642. outlier=True,
  643. timeout=10000,
  644. )
  645. for event_id in missing_auth - failed_to_fetch
  646. ],
  647. consumeErrors=True
  648. )).addErrback(unwrapFirstError)
  649. auth_events.update({a.event_id: a for a in results if a})
  650. required_auth.update(
  651. a_id
  652. for event in results if event
  653. for a_id, _ in event.auth_events
  654. )
  655. missing_auth = required_auth - set(auth_events)
  656. failed_to_fetch = missing_auth - set(auth_events)
  657. seen_events = yield self.store.have_seen_events(
  658. set(auth_events.keys()) | set(state_events.keys())
  659. )
  660. ev_infos = []
  661. for a in auth_events.values():
  662. if a.event_id in seen_events:
  663. continue
  664. a.internal_metadata.outlier = True
  665. ev_infos.append({
  666. "event": a,
  667. "auth_events": {
  668. (auth_events[a_id].type, auth_events[a_id].state_key):
  669. auth_events[a_id]
  670. for a_id, _ in a.auth_events
  671. if a_id in auth_events
  672. }
  673. })
  674. for e_id in events_to_state:
  675. ev_infos.append({
  676. "event": event_map[e_id],
  677. "state": events_to_state[e_id],
  678. "auth_events": {
  679. (auth_events[a_id].type, auth_events[a_id].state_key):
  680. auth_events[a_id]
  681. for a_id, _ in event_map[e_id].auth_events
  682. if a_id in auth_events
  683. }
  684. })
  685. yield self._handle_new_events(
  686. dest, ev_infos,
  687. backfilled=True,
  688. )
  689. events.sort(key=lambda e: e.depth)
  690. for event in events:
  691. if event in events_to_state:
  692. continue
  693. # We store these one at a time since each event depends on the
  694. # previous to work out the state.
  695. # TODO: We can probably do something more clever here.
  696. yield self._handle_new_event(
  697. dest, event, backfilled=True,
  698. )
  699. defer.returnValue(events)
  700. @defer.inlineCallbacks
  701. def maybe_backfill(self, room_id, current_depth):
  702. """Checks the database to see if we should backfill before paginating,
  703. and if so do.
  704. """
  705. extremities = yield self.store.get_oldest_events_with_depth_in_room(
  706. room_id
  707. )
  708. if not extremities:
  709. logger.debug("Not backfilling as no extremeties found.")
  710. return
  711. # Check if we reached a point where we should start backfilling.
  712. sorted_extremeties_tuple = sorted(
  713. extremities.items(),
  714. key=lambda e: -int(e[1])
  715. )
  716. max_depth = sorted_extremeties_tuple[0][1]
  717. # We don't want to specify too many extremities as it causes the backfill
  718. # request URI to be too long.
  719. extremities = dict(sorted_extremeties_tuple[:5])
  720. if current_depth > max_depth:
  721. logger.debug(
  722. "Not backfilling as we don't need to. %d < %d",
  723. max_depth, current_depth,
  724. )
  725. return
  726. # Now we need to decide which hosts to hit first.
  727. # First we try hosts that are already in the room
  728. # TODO: HEURISTIC ALERT.
  729. curr_state = yield self.state_handler.get_current_state(room_id)
  730. def get_domains_from_state(state):
  731. """Get joined domains from state
  732. Args:
  733. state (dict[tuple, FrozenEvent]): State map from type/state
  734. key to event.
  735. Returns:
  736. list[tuple[str, int]]: Returns a list of servers with the
  737. lowest depth of their joins. Sorted by lowest depth first.
  738. """
  739. joined_users = [
  740. (state_key, int(event.depth))
  741. for (e_type, state_key), event in state.iteritems()
  742. if e_type == EventTypes.Member
  743. and event.membership == Membership.JOIN
  744. ]
  745. joined_domains = {}
  746. for u, d in joined_users:
  747. try:
  748. dom = get_domain_from_id(u)
  749. old_d = joined_domains.get(dom)
  750. if old_d:
  751. joined_domains[dom] = min(d, old_d)
  752. else:
  753. joined_domains[dom] = d
  754. except Exception:
  755. pass
  756. return sorted(joined_domains.iteritems(), key=lambda d: d[1])
  757. curr_domains = get_domains_from_state(curr_state)
  758. likely_domains = [
  759. domain for domain, depth in curr_domains
  760. if domain != self.server_name
  761. ]
  762. @defer.inlineCallbacks
  763. def try_backfill(domains):
  764. # TODO: Should we try multiple of these at a time?
  765. for dom in domains:
  766. try:
  767. yield self.backfill(
  768. dom, room_id,
  769. limit=100,
  770. extremities=extremities,
  771. )
  772. # If this succeeded then we probably already have the
  773. # appropriate stuff.
  774. # TODO: We can probably do something more intelligent here.
  775. defer.returnValue(True)
  776. except SynapseError as e:
  777. logger.info(
  778. "Failed to backfill from %s because %s",
  779. dom, e,
  780. )
  781. continue
  782. except CodeMessageException as e:
  783. if 400 <= e.code < 500:
  784. raise
  785. logger.info(
  786. "Failed to backfill from %s because %s",
  787. dom, e,
  788. )
  789. continue
  790. except NotRetryingDestination as e:
  791. logger.info(e.message)
  792. continue
  793. except FederationDeniedError as e:
  794. logger.info(e)
  795. continue
  796. except Exception as e:
  797. logger.exception(
  798. "Failed to backfill from %s because %s",
  799. dom, e,
  800. )
  801. continue
  802. defer.returnValue(False)
  803. success = yield try_backfill(likely_domains)
  804. if success:
  805. defer.returnValue(True)
  806. # Huh, well *those* domains didn't work out. Lets try some domains
  807. # from the time.
  808. tried_domains = set(likely_domains)
  809. tried_domains.add(self.server_name)
  810. event_ids = list(extremities.iterkeys())
  811. logger.debug("calling resolve_state_groups in _maybe_backfill")
  812. resolve = logcontext.preserve_fn(
  813. self.state_handler.resolve_state_groups_for_events
  814. )
  815. states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
  816. [resolve(room_id, [e]) for e in event_ids],
  817. consumeErrors=True,
  818. ))
  819. # dict[str, dict[tuple, str]], a map from event_id to state map of
  820. # event_ids.
  821. states = dict(zip(event_ids, [s.state for s in states]))
  822. state_map = yield self.store.get_events(
  823. [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
  824. get_prev_content=False
  825. )
  826. states = {
  827. key: {
  828. k: state_map[e_id]
  829. for k, e_id in state_dict.iteritems()
  830. if e_id in state_map
  831. } for key, state_dict in states.iteritems()
  832. }
  833. for e_id, _ in sorted_extremeties_tuple:
  834. likely_domains = get_domains_from_state(states[e_id])
  835. success = yield try_backfill([
  836. dom for dom, _ in likely_domains
  837. if dom not in tried_domains
  838. ])
  839. if success:
  840. defer.returnValue(True)
  841. tried_domains.update(dom for dom, _ in likely_domains)
  842. defer.returnValue(False)
  843. def _sanity_check_event(self, ev):
  844. """
  845. Do some early sanity checks of a received event
  846. In particular, checks it doesn't have an excessive number of
  847. prev_events or auth_events, which could cause a huge state resolution
  848. or cascade of event fetches.
  849. Args:
  850. ev (synapse.events.EventBase): event to be checked
  851. Returns: None
  852. Raises:
  853. SynapseError if the event does not pass muster
  854. """
  855. if len(ev.prev_events) > 20:
  856. logger.warn("Rejecting event %s which has %i prev_events",
  857. ev.event_id, len(ev.prev_events))
  858. raise SynapseError(
  859. http_client.BAD_REQUEST,
  860. "Too many prev_events",
  861. )
  862. if len(ev.auth_events) > 10:
  863. logger.warn("Rejecting event %s which has %i auth_events",
  864. ev.event_id, len(ev.auth_events))
  865. raise SynapseError(
  866. http_client.BAD_REQUEST,
  867. "Too many auth_events",
  868. )
  869. @defer.inlineCallbacks
  870. def send_invite(self, target_host, event):
  871. """ Sends the invite to the remote server for signing.
  872. Invites must be signed by the invitee's server before distribution.
  873. """
  874. pdu = yield self.replication_layer.send_invite(
  875. destination=target_host,
  876. room_id=event.room_id,
  877. event_id=event.event_id,
  878. pdu=event
  879. )
  880. defer.returnValue(pdu)
  881. @defer.inlineCallbacks
  882. def on_event_auth(self, event_id):
  883. event = yield self.store.get_event(event_id)
  884. auth = yield self.store.get_auth_chain(
  885. [auth_id for auth_id, _ in event.auth_events],
  886. include_given=True
  887. )
  888. for event in auth:
  889. event.signatures.update(
  890. compute_event_signature(
  891. event,
  892. self.hs.hostname,
  893. self.hs.config.signing_key[0]
  894. )
  895. )
  896. defer.returnValue([e for e in auth])
  897. @log_function
  898. @defer.inlineCallbacks
  899. def do_invite_join(self, target_hosts, room_id, joinee, content):
  900. """ Attempts to join the `joinee` to the room `room_id` via the
  901. server `target_host`.
  902. This first triggers a /make_join/ request that returns a partial
  903. event that we can fill out and sign. This is then sent to the
  904. remote server via /send_join/ which responds with the state at that
  905. event and the auth_chains.
  906. We suspend processing of any received events from this room until we
  907. have finished processing the join.
  908. """
  909. logger.debug("Joining %s to %s", joinee, room_id)
  910. origin, event = yield self._make_and_verify_event(
  911. target_hosts,
  912. room_id,
  913. joinee,
  914. "join",
  915. content,
  916. )
  917. # This shouldn't happen, because the RoomMemberHandler has a
  918. # linearizer lock which only allows one operation per user per room
  919. # at a time - so this is just paranoia.
  920. assert (room_id not in self.room_queues)
  921. self.room_queues[room_id] = []
  922. yield self.store.clean_room_for_join(room_id)
  923. handled_events = set()
  924. try:
  925. event = self._sign_event(event)
  926. # Try the host we successfully got a response to /make_join/
  927. # request first.
  928. try:
  929. target_hosts.remove(origin)
  930. target_hosts.insert(0, origin)
  931. except ValueError:
  932. pass
  933. ret = yield self.replication_layer.send_join(target_hosts, event)
  934. origin = ret["origin"]
  935. state = ret["state"]
  936. auth_chain = ret["auth_chain"]
  937. auth_chain.sort(key=lambda e: e.depth)
  938. handled_events.update([s.event_id for s in state])
  939. handled_events.update([a.event_id for a in auth_chain])
  940. handled_events.add(event.event_id)
  941. logger.debug("do_invite_join auth_chain: %s", auth_chain)
  942. logger.debug("do_invite_join state: %s", state)
  943. logger.debug("do_invite_join event: %s", event)
  944. try:
  945. yield self.store.store_room(
  946. room_id=room_id,
  947. room_creator_user_id="",
  948. is_public=False
  949. )
  950. except Exception:
  951. # FIXME
  952. pass
  953. event_stream_id, max_stream_id = yield self._persist_auth_tree(
  954. origin, auth_chain, state, event
  955. )
  956. self.notifier.on_new_room_event(
  957. event, event_stream_id, max_stream_id,
  958. extra_users=[joinee]
  959. )
  960. logger.debug("Finished joining %s to %s", joinee, room_id)
  961. finally:
  962. room_queue = self.room_queues[room_id]
  963. del self.room_queues[room_id]
  964. # we don't need to wait for the queued events to be processed -
  965. # it's just a best-effort thing at this point. We do want to do
  966. # them roughly in order, though, otherwise we'll end up making
  967. # lots of requests for missing prev_events which we do actually
  968. # have. Hence we fire off the deferred, but don't wait for it.
  969. logcontext.run_in_background(self._handle_queued_pdus, room_queue)
  970. defer.returnValue(True)
  971. @defer.inlineCallbacks
  972. def _handle_queued_pdus(self, room_queue):
  973. """Process PDUs which got queued up while we were busy send_joining.
  974. Args:
  975. room_queue (list[FrozenEvent, str]): list of PDUs to be processed
  976. and the servers that sent them
  977. """
  978. for p, origin in room_queue:
  979. try:
  980. logger.info("Processing queued PDU %s which was received "
  981. "while we were joining %s", p.event_id, p.room_id)
  982. yield self.on_receive_pdu(origin, p)
  983. except Exception as e:
  984. logger.warn(
  985. "Error handling queued PDU %s from %s: %s",
  986. p.event_id, origin, e)
  987. @defer.inlineCallbacks
  988. @log_function
  989. def on_make_join_request(self, room_id, user_id):
  990. """ We've received a /make_join/ request, so we create a partial
  991. join event for the room and return that. We do *not* persist or
  992. process it until the other server has signed it and sent it back.
  993. """
  994. event_content = {"membership": Membership.JOIN}
  995. builder = self.event_builder_factory.new({
  996. "type": EventTypes.Member,
  997. "content": event_content,
  998. "room_id": room_id,
  999. "sender": user_id,
  1000. "state_key": user_id,
  1001. })
  1002. try:
  1003. event, context = yield self.event_creation_handler.create_new_client_event(
  1004. builder=builder,
  1005. )
  1006. except AuthError as e:
  1007. logger.warn("Failed to create join %r because %s", event, e)
  1008. raise e
  1009. # The remote hasn't signed it yet, obviously. We'll do the full checks
  1010. # when we get the event back in `on_send_join_request`
  1011. yield self.auth.check_from_context(event, context, do_sig_check=False)
  1012. defer.returnValue(event)
  1013. @defer.inlineCallbacks
  1014. @log_function
  1015. def on_send_join_request(self, origin, pdu):
  1016. """ We have received a join event for a room. Fully process it and
  1017. respond with the current state and auth chains.
  1018. """
  1019. event = pdu
  1020. logger.debug(
  1021. "on_send_join_request: Got event: %s, signatures: %s",
  1022. event.event_id,
  1023. event.signatures,
  1024. )
  1025. event.internal_metadata.outlier = False
  1026. # Send this event on behalf of the origin server.
  1027. #
  1028. # The reasons we have the destination server rather than the origin
  1029. # server send it are slightly mysterious: the origin server should have
  1030. # all the neccessary state once it gets the response to the send_join,
  1031. # so it could send the event itself if it wanted to. It may be that
  1032. # doing it this way reduces failure modes, or avoids certain attacks
  1033. # where a new server selectively tells a subset of the federation that
  1034. # it has joined.
  1035. #
  1036. # The fact is that, as of the current writing, Synapse doesn't send out
  1037. # the join event over federation after joining, and changing it now
  1038. # would introduce the danger of backwards-compatibility problems.
  1039. event.internal_metadata.send_on_behalf_of = origin
  1040. context, event_stream_id, max_stream_id = yield self._handle_new_event(
  1041. origin, event
  1042. )
  1043. logger.debug(
  1044. "on_send_join_request: After _handle_new_event: %s, sigs: %s",
  1045. event.event_id,
  1046. event.signatures,
  1047. )
  1048. extra_users = []
  1049. if event.type == EventTypes.Member:
  1050. target_user_id = event.state_key
  1051. target_user = UserID.from_string(target_user_id)
  1052. extra_users.append(target_user)
  1053. self.notifier.on_new_room_event(
  1054. event, event_stream_id, max_stream_id, extra_users=extra_users
  1055. )
  1056. if event.type == EventTypes.Member:
  1057. if event.content["membership"] == Membership.JOIN:
  1058. user = UserID.from_string(event.state_key)
  1059. yield user_joined_room(self.distributor, user, event.room_id)
  1060. state_ids = list(context.prev_state_ids.values())
  1061. auth_chain = yield self.store.get_auth_chain(state_ids)
  1062. state = yield self.store.get_events(list(context.prev_state_ids.values()))
  1063. defer.returnValue({
  1064. "state": list(state.values()),
  1065. "auth_chain": auth_chain,
  1066. })
  1067. @defer.inlineCallbacks
  1068. def on_invite_request(self, origin, pdu):
  1069. """ We've got an invite event. Process and persist it. Sign it.
  1070. Respond with the now signed event.
  1071. """
  1072. event = pdu
  1073. if event.state_key is None:
  1074. raise SynapseError(400, "The invite event did not have a state key")
  1075. is_blocked = yield self.store.is_room_blocked(event.room_id)
  1076. if is_blocked:
  1077. raise SynapseError(403, "This room has been blocked on this server")
  1078. if self.hs.config.block_non_admin_invites:
  1079. raise SynapseError(403, "This server does not accept room invites")
  1080. if not self.spam_checker.user_may_invite(
  1081. event.sender, event.state_key, event.room_id,
  1082. ):
  1083. raise SynapseError(
  1084. 403, "This user is not permitted to send invites to this server/user"
  1085. )
  1086. membership = event.content.get("membership")
  1087. if event.type != EventTypes.Member or membership != Membership.INVITE:
  1088. raise SynapseError(400, "The event was not an m.room.member invite event")
  1089. sender_domain = get_domain_from_id(event.sender)
  1090. if sender_domain != origin:
  1091. raise SynapseError(400, "The invite event was not from the server sending it")
  1092. if not self.is_mine_id(event.state_key):
  1093. raise SynapseError(400, "The invite event must be for this server")
  1094. # block any attempts to invite the server notices mxid
  1095. if event.state_key == self._server_notices_mxid:
  1096. raise SynapseError(
  1097. http_client.FORBIDDEN,
  1098. "Cannot invite this user",
  1099. )
  1100. event.internal_metadata.outlier = True
  1101. event.internal_metadata.invite_from_remote = True
  1102. event.signatures.update(
  1103. compute_event_signature(
  1104. event,
  1105. self.hs.hostname,
  1106. self.hs.config.signing_key[0]
  1107. )
  1108. )
  1109. context = yield self.state_handler.compute_event_context(event)
  1110. event_stream_id, max_stream_id = yield self.store.persist_event(
  1111. event,
  1112. context=context,
  1113. )
  1114. target_user = UserID.from_string(event.state_key)
  1115. self.notifier.on_new_room_event(
  1116. event, event_stream_id, max_stream_id,
  1117. extra_users=[target_user],
  1118. )
  1119. defer.returnValue(event)
  1120. @defer.inlineCallbacks
  1121. def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
  1122. origin, event = yield self._make_and_verify_event(
  1123. target_hosts,
  1124. room_id,
  1125. user_id,
  1126. "leave"
  1127. )
  1128. # Mark as outlier as we don't have any state for this event; we're not
  1129. # even in the room.
  1130. event.internal_metadata.outlier = True
  1131. event = self._sign_event(event)
  1132. # Try the host that we succesfully called /make_leave/ on first for
  1133. # the /send_leave/ request.
  1134. try:
  1135. target_hosts.remove(origin)
  1136. target_hosts.insert(0, origin)
  1137. except ValueError:
  1138. pass
  1139. yield self.replication_layer.send_leave(
  1140. target_hosts,
  1141. event
  1142. )
  1143. context = yield self.state_handler.compute_event_context(event)
  1144. event_stream_id, max_stream_id = yield self.store.persist_event(
  1145. event,
  1146. context=context,
  1147. )
  1148. target_user = UserID.from_string(event.state_key)
  1149. self.notifier.on_new_room_event(
  1150. event, event_stream_id, max_stream_id,
  1151. extra_users=[target_user],
  1152. )
  1153. defer.returnValue(event)
  1154. @defer.inlineCallbacks
  1155. def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
  1156. content={},):
  1157. origin, pdu = yield self.replication_layer.make_membership_event(
  1158. target_hosts,
  1159. room_id,
  1160. user_id,
  1161. membership,
  1162. content,
  1163. )
  1164. logger.debug("Got response to make_%s: %s", membership, pdu)
  1165. event = pdu
  1166. # We should assert some things.
  1167. # FIXME: Do this in a nicer way
  1168. assert(event.type == EventTypes.Member)
  1169. assert(event.user_id == user_id)
  1170. assert(event.state_key == user_id)
  1171. assert(event.room_id == room_id)
  1172. defer.returnValue((origin, event))
  1173. def _sign_event(self, event):
  1174. event.internal_metadata.outlier = False
  1175. builder = self.event_builder_factory.new(
  1176. unfreeze(event.get_pdu_json())
  1177. )
  1178. builder.event_id = self.event_builder_factory.create_event_id()
  1179. builder.origin = self.hs.hostname
  1180. if not hasattr(event, "signatures"):
  1181. builder.signatures = {}
  1182. add_hashes_and_signatures(
  1183. builder,
  1184. self.hs.hostname,
  1185. self.hs.config.signing_key[0],
  1186. )
  1187. return builder.build()
  1188. @defer.inlineCallbacks
  1189. @log_function
  1190. def on_make_leave_request(self, room_id, user_id):
  1191. """ We've received a /make_leave/ request, so we create a partial
  1192. join event for the room and return that. We do *not* persist or
  1193. process it until the other server has signed it and sent it back.
  1194. """
  1195. builder = self.event_builder_factory.new({
  1196. "type": EventTypes.Member,
  1197. "content": {"membership": Membership.LEAVE},
  1198. "room_id": room_id,
  1199. "sender": user_id,
  1200. "state_key": user_id,
  1201. })
  1202. event, context = yield self.event_creation_handler.create_new_client_event(
  1203. builder=builder,
  1204. )
  1205. try:
  1206. # The remote hasn't signed it yet, obviously. We'll do the full checks
  1207. # when we get the event back in `on_send_leave_request`
  1208. yield self.auth.check_from_context(event, context, do_sig_check=False)
  1209. except AuthError as e:
  1210. logger.warn("Failed to create new leave %r because %s", event, e)
  1211. raise e
  1212. defer.returnValue(event)
  1213. @defer.inlineCallbacks
  1214. @log_function
  1215. def on_send_leave_request(self, origin, pdu):
  1216. """ We have received a leave event for a room. Fully process it."""
  1217. event = pdu
  1218. logger.debug(
  1219. "on_send_leave_request: Got event: %s, signatures: %s",
  1220. event.event_id,
  1221. event.signatures,
  1222. )
  1223. event.internal_metadata.outlier = False
  1224. context, event_stream_id, max_stream_id = yield self._handle_new_event(
  1225. origin, event
  1226. )
  1227. logger.debug(
  1228. "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
  1229. event.event_id,
  1230. event.signatures,
  1231. )
  1232. extra_users = []
  1233. if event.type == EventTypes.Member:
  1234. target_user_id = event.state_key
  1235. target_user = UserID.from_string(target_user_id)
  1236. extra_users.append(target_user)
  1237. self.notifier.on_new_room_event(
  1238. event, event_stream_id, max_stream_id, extra_users=extra_users
  1239. )
  1240. defer.returnValue(None)
  1241. @defer.inlineCallbacks
  1242. def get_state_for_pdu(self, room_id, event_id):
  1243. """Returns the state at the event. i.e. not including said event.
  1244. """
  1245. state_groups = yield self.store.get_state_groups(
  1246. room_id, [event_id]
  1247. )
  1248. if state_groups:
  1249. _, state = list(iteritems(state_groups)).pop()
  1250. results = {
  1251. (e.type, e.state_key): e for e in state
  1252. }
  1253. event = yield self.store.get_event(event_id)
  1254. if event and event.is_state():
  1255. # Get previous state
  1256. if "replaces_state" in event.unsigned:
  1257. prev_id = event.unsigned["replaces_state"]
  1258. if prev_id != event.event_id:
  1259. prev_event = yield self.store.get_event(prev_id)
  1260. results[(event.type, event.state_key)] = prev_event
  1261. else:
  1262. del results[(event.type, event.state_key)]
  1263. res = list(results.values())
  1264. for event in res:
  1265. # We sign these again because there was a bug where we
  1266. # incorrectly signed things the first time round
  1267. if self.is_mine_id(event.event_id):
  1268. event.signatures.update(
  1269. compute_event_signature(
  1270. event,
  1271. self.hs.hostname,
  1272. self.hs.config.signing_key[0]
  1273. )
  1274. )
  1275. defer.returnValue(res)
  1276. else:
  1277. defer.returnValue([])
  1278. @defer.inlineCallbacks
  1279. def get_state_ids_for_pdu(self, room_id, event_id):
  1280. """Returns the state at the event. i.e. not including said event.
  1281. """
  1282. state_groups = yield self.store.get_state_groups_ids(
  1283. room_id, [event_id]
  1284. )
  1285. if state_groups:
  1286. _, state = state_groups.items().pop()
  1287. results = state
  1288. event = yield self.store.get_event(event_id)
  1289. if event and event.is_state():
  1290. # Get previous state
  1291. if "replaces_state" in event.unsigned:
  1292. prev_id = event.unsigned["replaces_state"]
  1293. if prev_id != event.event_id:
  1294. results[(event.type, event.state_key)] = prev_id
  1295. else:
  1296. results.pop((event.type, event.state_key), None)
  1297. defer.returnValue(list(results.values()))
  1298. else:
  1299. defer.returnValue([])
  1300. @defer.inlineCallbacks
  1301. @log_function
  1302. def on_backfill_request(self, origin, room_id, pdu_list, limit):
  1303. in_room = yield self.auth.check_host_in_room(room_id, origin)
  1304. if not in_room:
  1305. raise AuthError(403, "Host not in room.")
  1306. events = yield self.store.get_backfill_events(
  1307. room_id,
  1308. pdu_list,
  1309. limit
  1310. )
  1311. events = yield self._filter_events_for_server(origin, room_id, events)
  1312. defer.returnValue(events)
  1313. @defer.inlineCallbacks
  1314. @log_function
  1315. def get_persisted_pdu(self, origin, event_id):
  1316. """Get an event from the database for the given server.
  1317. Args:
  1318. origin [str]: hostname of server which is requesting the event; we
  1319. will check that the server is allowed to see it.
  1320. event_id [str]: id of the event being requested
  1321. Returns:
  1322. Deferred[EventBase|None]: None if we know nothing about the event;
  1323. otherwise the (possibly-redacted) event.
  1324. Raises:
  1325. AuthError if the server is not currently in the room
  1326. """
  1327. event = yield self.store.get_event(
  1328. event_id,
  1329. allow_none=True,
  1330. allow_rejected=True,
  1331. )
  1332. if event:
  1333. if self.is_mine_id(event.event_id):
  1334. # FIXME: This is a temporary work around where we occasionally
  1335. # return events slightly differently than when they were
  1336. # originally signed
  1337. event.signatures.update(
  1338. compute_event_signature(
  1339. event,
  1340. self.hs.hostname,
  1341. self.hs.config.signing_key[0]
  1342. )
  1343. )
  1344. in_room = yield self.auth.check_host_in_room(
  1345. event.room_id,
  1346. origin
  1347. )
  1348. if not in_room:
  1349. raise AuthError(403, "Host not in room.")
  1350. events = yield self._filter_events_for_server(
  1351. origin, event.room_id, [event]
  1352. )
  1353. event = events[0]
  1354. defer.returnValue(event)
  1355. else:
  1356. defer.returnValue(None)
  1357. @log_function
  1358. def get_min_depth_for_context(self, context):
  1359. return self.store.get_min_depth(context)
  1360. @defer.inlineCallbacks
  1361. @log_function
  1362. def _handle_new_event(self, origin, event, state=None, auth_events=None,
  1363. backfilled=False):
  1364. context = yield self._prep_event(
  1365. origin, event,
  1366. state=state,
  1367. auth_events=auth_events,
  1368. )
  1369. try:
  1370. if not event.internal_metadata.is_outlier() and not backfilled:
  1371. yield self.action_generator.handle_push_actions_for_event(
  1372. event, context
  1373. )
  1374. event_stream_id, max_stream_id = yield self.store.persist_event(
  1375. event,
  1376. context=context,
  1377. backfilled=backfilled,
  1378. )
  1379. except: # noqa: E722, as we reraise the exception this is fine.
  1380. tp, value, tb = sys.exc_info()
  1381. logcontext.run_in_background(
  1382. self.store.remove_push_actions_from_staging,
  1383. event.event_id,
  1384. )
  1385. six.reraise(tp, value, tb)
  1386. if not backfilled:
  1387. # this intentionally does not yield: we don't care about the result
  1388. # and don't need to wait for it.
  1389. logcontext.run_in_background(
  1390. self.pusher_pool.on_new_notifications,
  1391. event_stream_id, max_stream_id,
  1392. )
  1393. defer.returnValue((context, event_stream_id, max_stream_id))
  1394. @defer.inlineCallbacks
  1395. def _handle_new_events(self, origin, event_infos, backfilled=False):
  1396. """Creates the appropriate contexts and persists events. The events
  1397. should not depend on one another, e.g. this should be used to persist
  1398. a bunch of outliers, but not a chunk of individual events that depend
  1399. on each other for state calculations.
  1400. """
  1401. contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
  1402. [
  1403. logcontext.run_in_background(
  1404. self._prep_event,
  1405. origin,
  1406. ev_info["event"],
  1407. state=ev_info.get("state"),
  1408. auth_events=ev_info.get("auth_events"),
  1409. )
  1410. for ev_info in event_infos
  1411. ], consumeErrors=True,
  1412. ))
  1413. yield self.store.persist_events(
  1414. [
  1415. (ev_info["event"], context)
  1416. for ev_info, context in itertools.izip(event_infos, contexts)
  1417. ],
  1418. backfilled=backfilled,
  1419. )
  1420. @defer.inlineCallbacks
  1421. def _persist_auth_tree(self, origin, auth_events, state, event):
  1422. """Checks the auth chain is valid (and passes auth checks) for the
  1423. state and event. Then persists the auth chain and state atomically.
  1424. Persists the event seperately.
  1425. Will attempt to fetch missing auth events.
  1426. Args:
  1427. origin (str): Where the events came from
  1428. auth_events (list)
  1429. state (list)
  1430. event (Event)
  1431. Returns:
  1432. 2-tuple of (event_stream_id, max_stream_id) from the persist_event
  1433. call for `event`
  1434. """
  1435. events_to_context = {}
  1436. for e in itertools.chain(auth_events, state):
  1437. e.internal_metadata.outlier = True
  1438. ctx = yield self.state_handler.compute_event_context(e)
  1439. events_to_context[e.event_id] = ctx
  1440. event_map = {
  1441. e.event_id: e
  1442. for e in itertools.chain(auth_events, state, [event])
  1443. }
  1444. create_event = None
  1445. for e in auth_events:
  1446. if (e.type, e.state_key) == (EventTypes.Create, ""):
  1447. create_event = e
  1448. break
  1449. missing_auth_events = set()
  1450. for e in itertools.chain(auth_events, state, [event]):
  1451. for e_id, _ in e.auth_events:
  1452. if e_id not in event_map:
  1453. missing_auth_events.add(e_id)
  1454. for e_id in missing_auth_events:
  1455. m_ev = yield self.replication_layer.get_pdu(
  1456. [origin],
  1457. e_id,
  1458. outlier=True,
  1459. timeout=10000,
  1460. )
  1461. if m_ev and m_ev.event_id == e_id:
  1462. event_map[e_id] = m_ev
  1463. else:
  1464. logger.info("Failed to find auth event %r", e_id)
  1465. for e in itertools.chain(auth_events, state, [event]):
  1466. auth_for_e = {
  1467. (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
  1468. for e_id, _ in e.auth_events
  1469. if e_id in event_map
  1470. }
  1471. if create_event:
  1472. auth_for_e[(EventTypes.Create, "")] = create_event
  1473. try:
  1474. self.auth.check(e, auth_events=auth_for_e)
  1475. except SynapseError as err:
  1476. # we may get SynapseErrors here as well as AuthErrors. For
  1477. # instance, there are a couple of (ancient) events in some
  1478. # rooms whose senders do not have the correct sigil; these
  1479. # cause SynapseErrors in auth.check. We don't want to give up
  1480. # the attempt to federate altogether in such cases.
  1481. logger.warn(
  1482. "Rejecting %s because %s",
  1483. e.event_id, err.msg
  1484. )
  1485. if e == event:
  1486. raise
  1487. events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
  1488. yield self.store.persist_events(
  1489. [
  1490. (e, events_to_context[e.event_id])
  1491. for e in itertools.chain(auth_events, state)
  1492. ],
  1493. )
  1494. new_event_context = yield self.state_handler.compute_event_context(
  1495. event, old_state=state
  1496. )
  1497. event_stream_id, max_stream_id = yield self.store.persist_event(
  1498. event, new_event_context,
  1499. )
  1500. defer.returnValue((event_stream_id, max_stream_id))
  1501. @defer.inlineCallbacks
  1502. def _prep_event(self, origin, event, state=None, auth_events=None):
  1503. """
  1504. Args:
  1505. origin:
  1506. event:
  1507. state:
  1508. auth_events:
  1509. Returns:
  1510. Deferred, which resolves to synapse.events.snapshot.EventContext
  1511. """
  1512. context = yield self.state_handler.compute_event_context(
  1513. event, old_state=state,
  1514. )
  1515. if not auth_events:
  1516. auth_events_ids = yield self.auth.compute_auth_events(
  1517. event, context.prev_state_ids, for_verification=True,
  1518. )
  1519. auth_events = yield self.store.get_events(auth_events_ids)
  1520. auth_events = {
  1521. (e.type, e.state_key): e for e in auth_events.values()
  1522. }
  1523. # This is a hack to fix some old rooms where the initial join event
  1524. # didn't reference the create event in its auth events.
  1525. if event.type == EventTypes.Member and not event.auth_events:
  1526. if len(event.prev_events) == 1 and event.depth < 5:
  1527. c = yield self.store.get_event(
  1528. event.prev_events[0][0],
  1529. allow_none=True,
  1530. )
  1531. if c and c.type == EventTypes.Create:
  1532. auth_events[(c.type, c.state_key)] = c
  1533. try:
  1534. yield self.do_auth(
  1535. origin, event, context, auth_events=auth_events
  1536. )
  1537. except AuthError as e:
  1538. logger.warn(
  1539. "Rejecting %s because %s",
  1540. event.event_id, e.msg
  1541. )
  1542. context.rejected = RejectedReason.AUTH_ERROR
  1543. if event.type == EventTypes.GuestAccess and not context.rejected:
  1544. yield self.maybe_kick_guest_users(event)
  1545. defer.returnValue(context)
  1546. @defer.inlineCallbacks
  1547. def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
  1548. missing):
  1549. # Just go through and process each event in `remote_auth_chain`. We
  1550. # don't want to fall into the trap of `missing` being wrong.
  1551. for e in remote_auth_chain:
  1552. try:
  1553. yield self._handle_new_event(origin, e)
  1554. except AuthError:
  1555. pass
  1556. # Now get the current auth_chain for the event.
  1557. event = yield self.store.get_event(event_id)
  1558. local_auth_chain = yield self.store.get_auth_chain(
  1559. [auth_id for auth_id, _ in event.auth_events],
  1560. include_given=True
  1561. )
  1562. # TODO: Check if we would now reject event_id. If so we need to tell
  1563. # everyone.
  1564. ret = yield self.construct_auth_difference(
  1565. local_auth_chain, remote_auth_chain
  1566. )
  1567. for event in ret["auth_chain"]:
  1568. event.signatures.update(
  1569. compute_event_signature(
  1570. event,
  1571. self.hs.hostname,
  1572. self.hs.config.signing_key[0]
  1573. )
  1574. )
  1575. logger.debug("on_query_auth returning: %s", ret)
  1576. defer.returnValue(ret)
  1577. @defer.inlineCallbacks
  1578. def on_get_missing_events(self, origin, room_id, earliest_events,
  1579. latest_events, limit, min_depth):
  1580. in_room = yield self.auth.check_host_in_room(
  1581. room_id,
  1582. origin
  1583. )
  1584. if not in_room:
  1585. raise AuthError(403, "Host not in room.")
  1586. limit = min(limit, 20)
  1587. min_depth = max(min_depth, 0)
  1588. missing_events = yield self.store.get_missing_events(
  1589. room_id=room_id,
  1590. earliest_events=earliest_events,
  1591. latest_events=latest_events,
  1592. limit=limit,
  1593. min_depth=min_depth,
  1594. )
  1595. missing_events = yield self._filter_events_for_server(
  1596. origin, room_id, missing_events,
  1597. )
  1598. defer.returnValue(missing_events)
  1599. @defer.inlineCallbacks
  1600. @log_function
  1601. def do_auth(self, origin, event, context, auth_events):
  1602. """
  1603. Args:
  1604. origin (str):
  1605. event (synapse.events.FrozenEvent):
  1606. context (synapse.events.snapshot.EventContext):
  1607. auth_events (dict[(str, str)->str]):
  1608. Returns:
  1609. defer.Deferred[None]
  1610. """
  1611. # Check if we have all the auth events.
  1612. current_state = set(e.event_id for e in auth_events.values())
  1613. event_auth_events = set(e_id for e_id, _ in event.auth_events)
  1614. if event.is_state():
  1615. event_key = (event.type, event.state_key)
  1616. else:
  1617. event_key = None
  1618. if event_auth_events - current_state:
  1619. # TODO: can we use store.have_seen_events here instead?
  1620. have_events = yield self.store.get_seen_events_with_rejections(
  1621. event_auth_events - current_state
  1622. )
  1623. else:
  1624. have_events = {}
  1625. have_events.update({
  1626. e.event_id: ""
  1627. for e in auth_events.values()
  1628. })
  1629. seen_events = set(have_events.keys())
  1630. missing_auth = event_auth_events - seen_events - current_state
  1631. if missing_auth:
  1632. logger.info("Missing auth: %s", missing_auth)
  1633. # If we don't have all the auth events, we need to get them.
  1634. try:
  1635. remote_auth_chain = yield self.replication_layer.get_event_auth(
  1636. origin, event.room_id, event.event_id
  1637. )
  1638. seen_remotes = yield self.store.have_seen_events(
  1639. [e.event_id for e in remote_auth_chain]
  1640. )
  1641. for e in remote_auth_chain:
  1642. if e.event_id in seen_remotes:
  1643. continue
  1644. if e.event_id == event.event_id:
  1645. continue
  1646. try:
  1647. auth_ids = [e_id for e_id, _ in e.auth_events]
  1648. auth = {
  1649. (e.type, e.state_key): e for e in remote_auth_chain
  1650. if e.event_id in auth_ids or e.type == EventTypes.Create
  1651. }
  1652. e.internal_metadata.outlier = True
  1653. logger.debug(
  1654. "do_auth %s missing_auth: %s",
  1655. event.event_id, e.event_id
  1656. )
  1657. yield self._handle_new_event(
  1658. origin, e, auth_events=auth
  1659. )
  1660. if e.event_id in event_auth_events:
  1661. auth_events[(e.type, e.state_key)] = e
  1662. except AuthError:
  1663. pass
  1664. have_events = yield self.store.get_seen_events_with_rejections(
  1665. [e_id for e_id, _ in event.auth_events]
  1666. )
  1667. seen_events = set(have_events.keys())
  1668. except Exception:
  1669. # FIXME:
  1670. logger.exception("Failed to get auth chain")
  1671. # FIXME: Assumes we have and stored all the state for all the
  1672. # prev_events
  1673. current_state = set(e.event_id for e in auth_events.values())
  1674. different_auth = event_auth_events - current_state
  1675. if different_auth and not event.internal_metadata.is_outlier():
  1676. # Do auth conflict res.
  1677. logger.info("Different auth: %s", different_auth)
  1678. different_events = yield logcontext.make_deferred_yieldable(
  1679. defer.gatherResults([
  1680. logcontext.run_in_background(
  1681. self.store.get_event,
  1682. d,
  1683. allow_none=True,
  1684. allow_rejected=False,
  1685. )
  1686. for d in different_auth
  1687. if d in have_events and not have_events[d]
  1688. ], consumeErrors=True)
  1689. ).addErrback(unwrapFirstError)
  1690. if different_events:
  1691. local_view = dict(auth_events)
  1692. remote_view = dict(auth_events)
  1693. remote_view.update({
  1694. (d.type, d.state_key): d for d in different_events if d
  1695. })
  1696. new_state = self.state_handler.resolve_events(
  1697. [list(local_view.values()), list(remote_view.values())],
  1698. event
  1699. )
  1700. auth_events.update(new_state)
  1701. current_state = set(e.event_id for e in auth_events.values())
  1702. different_auth = event_auth_events - current_state
  1703. yield self._update_context_for_auth_events(
  1704. event, context, auth_events, event_key,
  1705. )
  1706. if different_auth and not event.internal_metadata.is_outlier():
  1707. logger.info("Different auth after resolution: %s", different_auth)
  1708. # Only do auth resolution if we have something new to say.
  1709. # We can't rove an auth failure.
  1710. do_resolution = False
  1711. provable = [
  1712. RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
  1713. ]
  1714. for e_id in different_auth:
  1715. if e_id in have_events:
  1716. if have_events[e_id] in provable:
  1717. do_resolution = True
  1718. break
  1719. if do_resolution:
  1720. # 1. Get what we think is the auth chain.
  1721. auth_ids = yield self.auth.compute_auth_events(
  1722. event, context.prev_state_ids
  1723. )
  1724. local_auth_chain = yield self.store.get_auth_chain(
  1725. auth_ids, include_given=True
  1726. )
  1727. try:
  1728. # 2. Get remote difference.
  1729. result = yield self.replication_layer.query_auth(
  1730. origin,
  1731. event.room_id,
  1732. event.event_id,
  1733. local_auth_chain,
  1734. )
  1735. seen_remotes = yield self.store.have_seen_events(
  1736. [e.event_id for e in result["auth_chain"]]
  1737. )
  1738. # 3. Process any remote auth chain events we haven't seen.
  1739. for ev in result["auth_chain"]:
  1740. if ev.event_id in seen_remotes:
  1741. continue
  1742. if ev.event_id == event.event_id:
  1743. continue
  1744. try:
  1745. auth_ids = [e_id for e_id, _ in ev.auth_events]
  1746. auth = {
  1747. (e.type, e.state_key): e
  1748. for e in result["auth_chain"]
  1749. if e.event_id in auth_ids
  1750. or event.type == EventTypes.Create
  1751. }
  1752. ev.internal_metadata.outlier = True
  1753. logger.debug(
  1754. "do_auth %s different_auth: %s",
  1755. event.event_id, e.event_id
  1756. )
  1757. yield self._handle_new_event(
  1758. origin, ev, auth_events=auth
  1759. )
  1760. if ev.event_id in event_auth_events:
  1761. auth_events[(ev.type, ev.state_key)] = ev
  1762. except AuthError:
  1763. pass
  1764. except Exception:
  1765. # FIXME:
  1766. logger.exception("Failed to query auth chain")
  1767. # 4. Look at rejects and their proofs.
  1768. # TODO.
  1769. yield self._update_context_for_auth_events(
  1770. event, context, auth_events, event_key,
  1771. )
  1772. try:
  1773. self.auth.check(event, auth_events=auth_events)
  1774. except AuthError as e:
  1775. logger.warn("Failed auth resolution for %r because %s", event, e)
  1776. raise e
  1777. @defer.inlineCallbacks
  1778. def _update_context_for_auth_events(self, event, context, auth_events,
  1779. event_key):
  1780. """Update the state_ids in an event context after auth event resolution,
  1781. storing the changes as a new state group.
  1782. Args:
  1783. event (Event): The event we're handling the context for
  1784. context (synapse.events.snapshot.EventContext): event context
  1785. to be updated
  1786. auth_events (dict[(str, str)->str]): Events to update in the event
  1787. context.
  1788. event_key ((str, str)): (type, state_key) for the current event.
  1789. this will not be included in the current_state in the context.
  1790. """
  1791. state_updates = {
  1792. k: a.event_id for k, a in iteritems(auth_events)
  1793. if k != event_key
  1794. }
  1795. context.current_state_ids = dict(context.current_state_ids)
  1796. context.current_state_ids.update(state_updates)
  1797. if context.delta_ids is not None:
  1798. context.delta_ids = dict(context.delta_ids)
  1799. context.delta_ids.update(state_updates)
  1800. context.prev_state_ids = dict(context.prev_state_ids)
  1801. context.prev_state_ids.update({
  1802. k: a.event_id for k, a in iteritems(auth_events)
  1803. })
  1804. context.state_group = yield self.store.store_state_group(
  1805. event.event_id,
  1806. event.room_id,
  1807. prev_group=context.prev_group,
  1808. delta_ids=context.delta_ids,
  1809. current_state_ids=context.current_state_ids,
  1810. )
  1811. @defer.inlineCallbacks
  1812. def construct_auth_difference(self, local_auth, remote_auth):
  1813. """ Given a local and remote auth chain, find the differences. This
  1814. assumes that we have already processed all events in remote_auth
  1815. Params:
  1816. local_auth (list)
  1817. remote_auth (list)
  1818. Returns:
  1819. dict
  1820. """
  1821. logger.debug("construct_auth_difference Start!")
  1822. # TODO: Make sure we are OK with local_auth or remote_auth having more
  1823. # auth events in them than strictly necessary.
  1824. def sort_fun(ev):
  1825. return ev.depth, ev.event_id
  1826. logger.debug("construct_auth_difference after sort_fun!")
  1827. # We find the differences by starting at the "bottom" of each list
  1828. # and iterating up on both lists. The lists are ordered by depth and
  1829. # then event_id, we iterate up both lists until we find the event ids
  1830. # don't match. Then we look at depth/event_id to see which side is
  1831. # missing that event, and iterate only up that list. Repeat.
  1832. remote_list = list(remote_auth)
  1833. remote_list.sort(key=sort_fun)
  1834. local_list = list(local_auth)
  1835. local_list.sort(key=sort_fun)
  1836. local_iter = iter(local_list)
  1837. remote_iter = iter(remote_list)
  1838. logger.debug("construct_auth_difference before get_next!")
  1839. def get_next(it, opt=None):
  1840. try:
  1841. return next(it)
  1842. except Exception:
  1843. return opt
  1844. current_local = get_next(local_iter)
  1845. current_remote = get_next(remote_iter)
  1846. logger.debug("construct_auth_difference before while")
  1847. missing_remotes = []
  1848. missing_locals = []
  1849. while current_local or current_remote:
  1850. if current_remote is None:
  1851. missing_locals.append(current_local)
  1852. current_local = get_next(local_iter)
  1853. continue
  1854. if current_local is None:
  1855. missing_remotes.append(current_remote)
  1856. current_remote = get_next(remote_iter)
  1857. continue
  1858. if current_local.event_id == current_remote.event_id:
  1859. current_local = get_next(local_iter)
  1860. current_remote = get_next(remote_iter)
  1861. continue
  1862. if current_local.depth < current_remote.depth:
  1863. missing_locals.append(current_local)
  1864. current_local = get_next(local_iter)
  1865. continue
  1866. if current_local.depth > current_remote.depth:
  1867. missing_remotes.append(current_remote)
  1868. current_remote = get_next(remote_iter)
  1869. continue
  1870. # They have the same depth, so we fall back to the event_id order
  1871. if current_local.event_id < current_remote.event_id:
  1872. missing_locals.append(current_local)
  1873. current_local = get_next(local_iter)
  1874. if current_local.event_id > current_remote.event_id:
  1875. missing_remotes.append(current_remote)
  1876. current_remote = get_next(remote_iter)
  1877. continue
  1878. logger.debug("construct_auth_difference after while")
  1879. # missing locals should be sent to the server
  1880. # We should find why we are missing remotes, as they will have been
  1881. # rejected.
  1882. # Remove events from missing_remotes if they are referencing a missing
  1883. # remote. We only care about the "root" rejected ones.
  1884. missing_remote_ids = [e.event_id for e in missing_remotes]
  1885. base_remote_rejected = list(missing_remotes)
  1886. for e in missing_remotes:
  1887. for e_id, _ in e.auth_events:
  1888. if e_id in missing_remote_ids:
  1889. try:
  1890. base_remote_rejected.remove(e)
  1891. except ValueError:
  1892. pass
  1893. reason_map = {}
  1894. for e in base_remote_rejected:
  1895. reason = yield self.store.get_rejection_reason(e.event_id)
  1896. if reason is None:
  1897. # TODO: e is not in the current state, so we should
  1898. # construct some proof of that.
  1899. continue
  1900. reason_map[e.event_id] = reason
  1901. if reason == RejectedReason.AUTH_ERROR:
  1902. pass
  1903. elif reason == RejectedReason.REPLACED:
  1904. # TODO: Get proof
  1905. pass
  1906. elif reason == RejectedReason.NOT_ANCESTOR:
  1907. # TODO: Get proof.
  1908. pass
  1909. logger.debug("construct_auth_difference returning")
  1910. defer.returnValue({
  1911. "auth_chain": local_auth,
  1912. "rejects": {
  1913. e.event_id: {
  1914. "reason": reason_map[e.event_id],
  1915. "proof": None,
  1916. }
  1917. for e in base_remote_rejected
  1918. },
  1919. "missing": [e.event_id for e in missing_locals],
  1920. })
  1921. @defer.inlineCallbacks
  1922. @log_function
  1923. def exchange_third_party_invite(
  1924. self,
  1925. sender_user_id,
  1926. target_user_id,
  1927. room_id,
  1928. signed,
  1929. ):
  1930. third_party_invite = {
  1931. "signed": signed,
  1932. }
  1933. event_dict = {
  1934. "type": EventTypes.Member,
  1935. "content": {
  1936. "membership": Membership.INVITE,
  1937. "third_party_invite": third_party_invite,
  1938. },
  1939. "room_id": room_id,
  1940. "sender": sender_user_id,
  1941. "state_key": target_user_id,
  1942. }
  1943. if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
  1944. builder = self.event_builder_factory.new(event_dict)
  1945. EventValidator().validate_new(builder)
  1946. event, context = yield self.event_creation_handler.create_new_client_event(
  1947. builder=builder
  1948. )
  1949. event, context = yield self.add_display_name_to_third_party_invite(
  1950. event_dict, event, context
  1951. )
  1952. try:
  1953. yield self.auth.check_from_context(event, context)
  1954. except AuthError as e:
  1955. logger.warn("Denying new third party invite %r because %s", event, e)
  1956. raise e
  1957. yield self._check_signature(event, context)
  1958. member_handler = self.hs.get_room_member_handler()
  1959. yield member_handler.send_membership_event(None, event, context)
  1960. else:
  1961. destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
  1962. yield self.replication_layer.forward_third_party_invite(
  1963. destinations,
  1964. room_id,
  1965. event_dict,
  1966. )
  1967. @defer.inlineCallbacks
  1968. @log_function
  1969. def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
  1970. """Handle an exchange_third_party_invite request from a remote server
  1971. The remote server will call this when it wants to turn a 3pid invite
  1972. into a normal m.room.member invite.
  1973. Returns:
  1974. Deferred: resolves (to None)
  1975. """
  1976. builder = self.event_builder_factory.new(event_dict)
  1977. event, context = yield self.event_creation_handler.create_new_client_event(
  1978. builder=builder,
  1979. )
  1980. event, context = yield self.add_display_name_to_third_party_invite(
  1981. event_dict, event, context
  1982. )
  1983. try:
  1984. self.auth.check_from_context(event, context)
  1985. except AuthError as e:
  1986. logger.warn("Denying third party invite %r because %s", event, e)
  1987. raise e
  1988. yield self._check_signature(event, context)
  1989. # XXX we send the invite here, but send_membership_event also sends it,
  1990. # so we end up making two requests. I think this is redundant.
  1991. returned_invite = yield self.send_invite(origin, event)
  1992. # TODO: Make sure the signatures actually are correct.
  1993. event.signatures.update(returned_invite.signatures)
  1994. member_handler = self.hs.get_room_member_handler()
  1995. yield member_handler.send_membership_event(None, event, context)
  1996. @defer.inlineCallbacks
  1997. def add_display_name_to_third_party_invite(self, event_dict, event, context):
  1998. key = (
  1999. EventTypes.ThirdPartyInvite,
  2000. event.content["third_party_invite"]["signed"]["token"]
  2001. )
  2002. original_invite = None
  2003. original_invite_id = context.prev_state_ids.get(key)
  2004. if original_invite_id:
  2005. original_invite = yield self.store.get_event(
  2006. original_invite_id, allow_none=True
  2007. )
  2008. if original_invite:
  2009. display_name = original_invite.content["display_name"]
  2010. event_dict["content"]["third_party_invite"]["display_name"] = display_name
  2011. else:
  2012. logger.info(
  2013. "Could not find invite event for third_party_invite: %r",
  2014. event_dict
  2015. )
  2016. # We don't discard here as this is not the appropriate place to do
  2017. # auth checks. If we need the invite and don't have it then the
  2018. # auth check code will explode appropriately.
  2019. builder = self.event_builder_factory.new(event_dict)
  2020. EventValidator().validate_new(builder)
  2021. event, context = yield self.event_creation_handler.create_new_client_event(
  2022. builder=builder,
  2023. )
  2024. defer.returnValue((event, context))
  2025. @defer.inlineCallbacks
  2026. def _check_signature(self, event, context):
  2027. """
  2028. Checks that the signature in the event is consistent with its invite.
  2029. Args:
  2030. event (Event): The m.room.member event to check
  2031. context (EventContext):
  2032. Raises:
  2033. AuthError: if signature didn't match any keys, or key has been
  2034. revoked,
  2035. SynapseError: if a transient error meant a key couldn't be checked
  2036. for revocation.
  2037. """
  2038. signed = event.content["third_party_invite"]["signed"]
  2039. token = signed["token"]
  2040. invite_event_id = context.prev_state_ids.get(
  2041. (EventTypes.ThirdPartyInvite, token,)
  2042. )
  2043. invite_event = None
  2044. if invite_event_id:
  2045. invite_event = yield self.store.get_event(invite_event_id, allow_none=True)
  2046. if not invite_event:
  2047. raise AuthError(403, "Could not find invite")
  2048. last_exception = None
  2049. for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
  2050. try:
  2051. for server, signature_block in signed["signatures"].items():
  2052. for key_name, encoded_signature in signature_block.items():
  2053. if not key_name.startswith("ed25519:"):
  2054. continue
  2055. public_key = public_key_object["public_key"]
  2056. verify_key = decode_verify_key_bytes(
  2057. key_name,
  2058. decode_base64(public_key)
  2059. )
  2060. verify_signed_json(signed, server, verify_key)
  2061. if "key_validity_url" in public_key_object:
  2062. yield self._check_key_revocation(
  2063. public_key,
  2064. public_key_object["key_validity_url"]
  2065. )
  2066. return
  2067. except Exception as e:
  2068. last_exception = e
  2069. raise last_exception
  2070. @defer.inlineCallbacks
  2071. def _check_key_revocation(self, public_key, url):
  2072. """
  2073. Checks whether public_key has been revoked.
  2074. Args:
  2075. public_key (str): base-64 encoded public key.
  2076. url (str): Key revocation URL.
  2077. Raises:
  2078. AuthError: if they key has been revoked.
  2079. SynapseError: if a transient error meant a key couldn't be checked
  2080. for revocation.
  2081. """
  2082. try:
  2083. response = yield self.hs.get_simple_http_client().get_json(
  2084. url,
  2085. {"public_key": public_key}
  2086. )
  2087. except Exception:
  2088. raise SynapseError(
  2089. 502,
  2090. "Third party certificate could not be checked"
  2091. )
  2092. if "valid" not in response or not response["valid"]:
  2093. raise AuthError(403, "Third party certificate was invalid")