message.py 35 KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 - 2016 OpenMarket Ltd
  3. # Copyright 2017 - 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import sys
  18. import six
  19. from six import iteritems, itervalues, string_types
  20. from canonicaljson import encode_canonical_json, json
  21. from twisted.internet import defer
  22. from twisted.internet.defer import succeed
  23. from twisted.python.failure import Failure
  24. from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
  25. from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
  26. from synapse.api.urls import ConsentURIBuilder
  27. from synapse.crypto.event_signing import add_hashes_and_signatures
  28. from synapse.events.utils import serialize_event
  29. from synapse.events.validator import EventValidator
  30. from synapse.replication.http.send_event import send_event_to_master
  31. from synapse.types import RoomAlias, RoomStreamToken, UserID
  32. from synapse.util.async import Limiter, ReadWriteLock
  33. from synapse.util.frozenutils import frozendict_json_encoder
  34. from synapse.util.logcontext import run_in_background
  35. from synapse.util.metrics import measure_func
  36. from synapse.util.stringutils import random_string
  37. from synapse.visibility import filter_events_for_client
  38. from ._base import BaseHandler
  39. logger = logging.getLogger(__name__)
  40. class PurgeStatus(object):
  41. """Object tracking the status of a purge request
  42. This class contains information on the progress of a purge request, for
  43. return by get_purge_status.
  44. Attributes:
  45. status (int): Tracks whether this request has completed. One of
  46. STATUS_{ACTIVE,COMPLETE,FAILED}
  47. """
  48. STATUS_ACTIVE = 0
  49. STATUS_COMPLETE = 1
  50. STATUS_FAILED = 2
  51. STATUS_TEXT = {
  52. STATUS_ACTIVE: "active",
  53. STATUS_COMPLETE: "complete",
  54. STATUS_FAILED: "failed",
  55. }
  56. def __init__(self):
  57. self.status = PurgeStatus.STATUS_ACTIVE
  58. def asdict(self):
  59. return {
  60. "status": PurgeStatus.STATUS_TEXT[self.status]
  61. }
  62. class MessageHandler(BaseHandler):
  63. def __init__(self, hs):
  64. super(MessageHandler, self).__init__(hs)
  65. self.hs = hs
  66. self.state = hs.get_state_handler()
  67. self.clock = hs.get_clock()
  68. self.pagination_lock = ReadWriteLock()
  69. self._purges_in_progress_by_room = set()
  70. # map from purge id to PurgeStatus
  71. self._purges_by_id = {}
  72. def start_purge_history(self, room_id, token,
  73. delete_local_events=False):
  74. """Start off a history purge on a room.
  75. Args:
  76. room_id (str): The room to purge from
  77. token (str): topological token to delete events before
  78. delete_local_events (bool): True to delete local events as well as
  79. remote ones
  80. Returns:
  81. str: unique ID for this purge transaction.
  82. """
  83. if room_id in self._purges_in_progress_by_room:
  84. raise SynapseError(
  85. 400,
  86. "History purge already in progress for %s" % (room_id, ),
  87. )
  88. purge_id = random_string(16)
  89. # we log the purge_id here so that it can be tied back to the
  90. # request id in the log lines.
  91. logger.info("[purge] starting purge_id %s", purge_id)
  92. self._purges_by_id[purge_id] = PurgeStatus()
  93. run_in_background(
  94. self._purge_history,
  95. purge_id, room_id, token, delete_local_events,
  96. )
  97. return purge_id
  98. @defer.inlineCallbacks
  99. def _purge_history(self, purge_id, room_id, token,
  100. delete_local_events):
  101. """Carry out a history purge on a room.
  102. Args:
  103. purge_id (str): The id for this purge
  104. room_id (str): The room to purge from
  105. token (str): topological token to delete events before
  106. delete_local_events (bool): True to delete local events as well as
  107. remote ones
  108. Returns:
  109. Deferred
  110. """
  111. self._purges_in_progress_by_room.add(room_id)
  112. try:
  113. with (yield self.pagination_lock.write(room_id)):
  114. yield self.store.purge_history(
  115. room_id, token, delete_local_events,
  116. )
  117. logger.info("[purge] complete")
  118. self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
  119. except Exception:
  120. logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
  121. self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
  122. finally:
  123. self._purges_in_progress_by_room.discard(room_id)
  124. # remove the purge from the list 24 hours after it completes
  125. def clear_purge():
  126. del self._purges_by_id[purge_id]
  127. self.hs.get_reactor().callLater(24 * 3600, clear_purge)
  128. def get_purge_status(self, purge_id):
  129. """Get the current status of an active purge
  130. Args:
  131. purge_id (str): purge_id returned by start_purge_history
  132. Returns:
  133. PurgeStatus|None
  134. """
  135. return self._purges_by_id.get(purge_id)
  136. @defer.inlineCallbacks
  137. def get_messages(self, requester, room_id=None, pagin_config=None,
  138. as_client_event=True, event_filter=None):
  139. """Get messages in a room.
  140. Args:
  141. requester (Requester): The user requesting messages.
  142. room_id (str): The room they want messages from.
  143. pagin_config (synapse.api.streams.PaginationConfig): The pagination
  144. config rules to apply, if any.
  145. as_client_event (bool): True to get events in client-server format.
  146. event_filter (Filter): Filter to apply to results or None
  147. Returns:
  148. dict: Pagination API results
  149. """
  150. user_id = requester.user.to_string()
  151. if pagin_config.from_token:
  152. room_token = pagin_config.from_token.room_key
  153. else:
  154. pagin_config.from_token = (
  155. yield self.hs.get_event_sources().get_current_token_for_room(
  156. room_id=room_id
  157. )
  158. )
  159. room_token = pagin_config.from_token.room_key
  160. room_token = RoomStreamToken.parse(room_token)
  161. pagin_config.from_token = pagin_config.from_token.copy_and_replace(
  162. "room_key", str(room_token)
  163. )
  164. source_config = pagin_config.get_source_config("room")
  165. with (yield self.pagination_lock.read(room_id)):
  166. membership, member_event_id = yield self._check_in_room_or_world_readable(
  167. room_id, user_id
  168. )
  169. if source_config.direction == 'b':
  170. # if we're going backwards, we might need to backfill. This
  171. # requires that we have a topo token.
  172. if room_token.topological:
  173. max_topo = room_token.topological
  174. else:
  175. max_topo = yield self.store.get_max_topological_token(
  176. room_id, room_token.stream
  177. )
  178. if membership == Membership.LEAVE:
  179. # If they have left the room then clamp the token to be before
  180. # they left the room, to save the effort of loading from the
  181. # database.
  182. leave_token = yield self.store.get_topological_token_for_event(
  183. member_event_id
  184. )
  185. leave_token = RoomStreamToken.parse(leave_token)
  186. if leave_token.topological < max_topo:
  187. source_config.from_key = str(leave_token)
  188. yield self.hs.get_handlers().federation_handler.maybe_backfill(
  189. room_id, max_topo
  190. )
  191. events, next_key = yield self.store.paginate_room_events(
  192. room_id=room_id,
  193. from_key=source_config.from_key,
  194. to_key=source_config.to_key,
  195. direction=source_config.direction,
  196. limit=source_config.limit,
  197. event_filter=event_filter,
  198. )
  199. next_token = pagin_config.from_token.copy_and_replace(
  200. "room_key", next_key
  201. )
  202. if not events:
  203. defer.returnValue({
  204. "chunk": [],
  205. "start": pagin_config.from_token.to_string(),
  206. "end": next_token.to_string(),
  207. })
  208. if event_filter:
  209. events = event_filter.filter(events)
  210. events = yield filter_events_for_client(
  211. self.store,
  212. user_id,
  213. events,
  214. is_peeking=(member_event_id is None),
  215. )
  216. time_now = self.clock.time_msec()
  217. chunk = {
  218. "chunk": [
  219. serialize_event(e, time_now, as_client_event)
  220. for e in events
  221. ],
  222. "start": pagin_config.from_token.to_string(),
  223. "end": next_token.to_string(),
  224. }
  225. defer.returnValue(chunk)
  226. @defer.inlineCallbacks
  227. def get_room_data(self, user_id=None, room_id=None,
  228. event_type=None, state_key="", is_guest=False):
  229. """ Get data from a room.
  230. Args:
  231. event : The room path event
  232. Returns:
  233. The path data content.
  234. Raises:
  235. SynapseError if something went wrong.
  236. """
  237. membership, membership_event_id = yield self._check_in_room_or_world_readable(
  238. room_id, user_id
  239. )
  240. if membership == Membership.JOIN:
  241. data = yield self.state_handler.get_current_state(
  242. room_id, event_type, state_key
  243. )
  244. elif membership == Membership.LEAVE:
  245. key = (event_type, state_key)
  246. room_state = yield self.store.get_state_for_events(
  247. [membership_event_id], [key]
  248. )
  249. data = room_state[membership_event_id].get(key)
  250. defer.returnValue(data)
  251. @defer.inlineCallbacks
  252. def _check_in_room_or_world_readable(self, room_id, user_id):
  253. try:
  254. # check_user_was_in_room will return the most recent membership
  255. # event for the user if:
  256. # * The user is a non-guest user, and was ever in the room
  257. # * The user is a guest user, and has joined the room
  258. # else it will throw.
  259. member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
  260. defer.returnValue((member_event.membership, member_event.event_id))
  261. return
  262. except AuthError:
  263. visibility = yield self.state_handler.get_current_state(
  264. room_id, EventTypes.RoomHistoryVisibility, ""
  265. )
  266. if (
  267. visibility and
  268. visibility.content["history_visibility"] == "world_readable"
  269. ):
  270. defer.returnValue((Membership.JOIN, None))
  271. return
  272. raise AuthError(
  273. 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
  274. )
  275. @defer.inlineCallbacks
  276. def get_state_events(self, user_id, room_id, is_guest=False):
  277. """Retrieve all state events for a given room. If the user is
  278. joined to the room then return the current state. If the user has
  279. left the room return the state events from when they left.
  280. Args:
  281. user_id(str): The user requesting state events.
  282. room_id(str): The room ID to get all state events from.
  283. Returns:
  284. A list of dicts representing state events. [{}, {}, {}]
  285. """
  286. membership, membership_event_id = yield self._check_in_room_or_world_readable(
  287. room_id, user_id
  288. )
  289. if membership == Membership.JOIN:
  290. room_state = yield self.state_handler.get_current_state(room_id)
  291. elif membership == Membership.LEAVE:
  292. room_state = yield self.store.get_state_for_events(
  293. [membership_event_id], None
  294. )
  295. room_state = room_state[membership_event_id]
  296. now = self.clock.time_msec()
  297. defer.returnValue(
  298. [serialize_event(c, now) for c in room_state.values()]
  299. )
  300. @defer.inlineCallbacks
  301. def get_joined_members(self, requester, room_id):
  302. """Get all the joined members in the room and their profile information.
  303. If the user has left the room return the state events from when they left.
  304. Args:
  305. requester(Requester): The user requesting state events.
  306. room_id(str): The room ID to get all state events from.
  307. Returns:
  308. A dict of user_id to profile info
  309. """
  310. user_id = requester.user.to_string()
  311. if not requester.app_service:
  312. # We check AS auth after fetching the room membership, as it
  313. # requires us to pull out all joined members anyway.
  314. membership, _ = yield self._check_in_room_or_world_readable(
  315. room_id, user_id
  316. )
  317. if membership != Membership.JOIN:
  318. raise NotImplementedError(
  319. "Getting joined members after leaving is not implemented"
  320. )
  321. users_with_profile = yield self.state.get_current_user_in_room(room_id)
  322. # If this is an AS, double check that they are allowed to see the members.
  323. # This can either be because the AS user is in the room or because there
  324. # is a user in the room that the AS is "interested in"
  325. if requester.app_service and user_id not in users_with_profile:
  326. for uid in users_with_profile:
  327. if requester.app_service.is_interested_in_user(uid):
  328. break
  329. else:
  330. # Loop fell through, AS has no interested users in room
  331. raise AuthError(403, "Appservice not in room")
  332. defer.returnValue({
  333. user_id: {
  334. "avatar_url": profile.avatar_url,
  335. "display_name": profile.display_name,
  336. }
  337. for user_id, profile in iteritems(users_with_profile)
  338. })
  339. class EventCreationHandler(object):
  340. def __init__(self, hs):
  341. self.hs = hs
  342. self.auth = hs.get_auth()
  343. self.store = hs.get_datastore()
  344. self.state = hs.get_state_handler()
  345. self.clock = hs.get_clock()
  346. self.validator = EventValidator()
  347. self.profile_handler = hs.get_profile_handler()
  348. self.event_builder_factory = hs.get_event_builder_factory()
  349. self.server_name = hs.hostname
  350. self.ratelimiter = hs.get_ratelimiter()
  351. self.notifier = hs.get_notifier()
  352. self.config = hs.config
  353. self.http_client = hs.get_simple_http_client()
  354. # This is only used to get at ratelimit function, and maybe_kick_guest_users
  355. self.base_handler = BaseHandler(hs)
  356. self.pusher_pool = hs.get_pusherpool()
  357. # We arbitrarily limit concurrent event creation for a room to 5.
  358. # This is to stop us from diverging history *too* much.
  359. self.limiter = Limiter(max_count=5)
  360. self.action_generator = hs.get_action_generator()
  361. self.spam_checker = hs.get_spam_checker()
  362. if self.config.block_events_without_consent_error is not None:
  363. self._consent_uri_builder = ConsentURIBuilder(self.config)
  364. @defer.inlineCallbacks
  365. def create_event(self, requester, event_dict, token_id=None, txn_id=None,
  366. prev_events_and_hashes=None):
  367. """
  368. Given a dict from a client, create a new event.
  369. Creates an FrozenEvent object, filling out auth_events, prev_events,
  370. etc.
  371. Adds display names to Join membership events.
  372. Args:
  373. requester
  374. event_dict (dict): An entire event
  375. token_id (str)
  376. txn_id (str)
  377. prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
  378. the forward extremities to use as the prev_events for the
  379. new event. For each event, a tuple of (event_id, hashes, depth)
  380. where *hashes* is a map from algorithm to hash.
  381. If None, they will be requested from the database.
  382. Returns:
  383. Tuple of created event (FrozenEvent), Context
  384. """
  385. builder = self.event_builder_factory.new(event_dict)
  386. self.validator.validate_new(builder)
  387. if builder.type == EventTypes.Member:
  388. membership = builder.content.get("membership", None)
  389. target = UserID.from_string(builder.state_key)
  390. if membership in {Membership.JOIN, Membership.INVITE}:
  391. # If event doesn't include a display name, add one.
  392. profile = self.profile_handler
  393. content = builder.content
  394. try:
  395. if "displayname" not in content:
  396. content["displayname"] = yield profile.get_displayname(target)
  397. if "avatar_url" not in content:
  398. content["avatar_url"] = yield profile.get_avatar_url(target)
  399. except Exception as e:
  400. logger.info(
  401. "Failed to get profile information for %r: %s",
  402. target, e
  403. )
  404. is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
  405. if not is_exempt:
  406. yield self.assert_accepted_privacy_policy(requester)
  407. if token_id is not None:
  408. builder.internal_metadata.token_id = token_id
  409. if txn_id is not None:
  410. builder.internal_metadata.txn_id = txn_id
  411. event, context = yield self.create_new_client_event(
  412. builder=builder,
  413. requester=requester,
  414. prev_events_and_hashes=prev_events_and_hashes,
  415. )
  416. defer.returnValue((event, context))
  417. def _is_exempt_from_privacy_policy(self, builder, requester):
  418. """"Determine if an event to be sent is exempt from having to consent
  419. to the privacy policy
  420. Args:
  421. builder (synapse.events.builder.EventBuilder): event being created
  422. requester (Requster): user requesting this event
  423. Returns:
  424. Deferred[bool]: true if the event can be sent without the user
  425. consenting
  426. """
  427. # the only thing the user can do is join the server notices room.
  428. if builder.type == EventTypes.Member:
  429. membership = builder.content.get("membership", None)
  430. if membership == Membership.JOIN:
  431. return self._is_server_notices_room(builder.room_id)
  432. elif membership == Membership.LEAVE:
  433. # the user is always allowed to leave (but not kick people)
  434. return builder.state_key == requester.user.to_string()
  435. return succeed(False)
  436. @defer.inlineCallbacks
  437. def _is_server_notices_room(self, room_id):
  438. if self.config.server_notices_mxid is None:
  439. defer.returnValue(False)
  440. user_ids = yield self.store.get_users_in_room(room_id)
  441. defer.returnValue(self.config.server_notices_mxid in user_ids)
  442. @defer.inlineCallbacks
  443. def assert_accepted_privacy_policy(self, requester):
  444. """Check if a user has accepted the privacy policy
  445. Called when the given user is about to do something that requires
  446. privacy consent. We see if the user is exempt and otherwise check that
  447. they have given consent. If they have not, a ConsentNotGiven error is
  448. raised.
  449. Args:
  450. requester (synapse.types.Requester):
  451. The user making the request
  452. Returns:
  453. Deferred[None]: returns normally if the user has consented or is
  454. exempt
  455. Raises:
  456. ConsentNotGivenError: if the user has not given consent yet
  457. """
  458. if self.config.block_events_without_consent_error is None:
  459. return
  460. # exempt AS users from needing consent
  461. if requester.app_service is not None:
  462. return
  463. user_id = requester.user.to_string()
  464. # exempt the system notices user
  465. if (
  466. self.config.server_notices_mxid is not None and
  467. user_id == self.config.server_notices_mxid
  468. ):
  469. return
  470. u = yield self.store.get_user_by_id(user_id)
  471. assert u is not None
  472. if u["appservice_id"] is not None:
  473. # users registered by an appservice are exempt
  474. return
  475. if u["consent_version"] == self.config.user_consent_version:
  476. return
  477. consent_uri = self._consent_uri_builder.build_user_consent_uri(
  478. requester.user.localpart,
  479. )
  480. msg = self.config.block_events_without_consent_error % {
  481. 'consent_uri': consent_uri,
  482. }
  483. raise ConsentNotGivenError(
  484. msg=msg,
  485. consent_uri=consent_uri,
  486. )
  487. @defer.inlineCallbacks
  488. def send_nonmember_event(self, requester, event, context, ratelimit=True):
  489. """
  490. Persists and notifies local clients and federation of an event.
  491. Args:
  492. event (FrozenEvent) the event to send.
  493. context (Context) the context of the event.
  494. ratelimit (bool): Whether to rate limit this send.
  495. is_guest (bool): Whether the sender is a guest.
  496. """
  497. if event.type == EventTypes.Member:
  498. raise SynapseError(
  499. 500,
  500. "Tried to send member event through non-member codepath"
  501. )
  502. user = UserID.from_string(event.sender)
  503. assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
  504. if event.is_state():
  505. prev_state = yield self.deduplicate_state_event(event, context)
  506. if prev_state is not None:
  507. defer.returnValue(prev_state)
  508. yield self.handle_new_client_event(
  509. requester=requester,
  510. event=event,
  511. context=context,
  512. ratelimit=ratelimit,
  513. )
  514. @defer.inlineCallbacks
  515. def deduplicate_state_event(self, event, context):
  516. """
  517. Checks whether event is in the latest resolved state in context.
  518. If so, returns the version of the event in context.
  519. Otherwise, returns None.
  520. """
  521. prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
  522. prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
  523. if not prev_event:
  524. return
  525. if prev_event and event.user_id == prev_event.user_id:
  526. prev_content = encode_canonical_json(prev_event.content)
  527. next_content = encode_canonical_json(event.content)
  528. if prev_content == next_content:
  529. defer.returnValue(prev_event)
  530. return
  531. @defer.inlineCallbacks
  532. def create_and_send_nonmember_event(
  533. self,
  534. requester,
  535. event_dict,
  536. ratelimit=True,
  537. txn_id=None
  538. ):
  539. """
  540. Creates an event, then sends it.
  541. See self.create_event and self.send_nonmember_event.
  542. """
  543. # We limit the number of concurrent event sends in a room so that we
  544. # don't fork the DAG too much. If we don't limit then we can end up in
  545. # a situation where event persistence can't keep up, causing
  546. # extremities to pile up, which in turn leads to state resolution
  547. # taking longer.
  548. with (yield self.limiter.queue(event_dict["room_id"])):
  549. event, context = yield self.create_event(
  550. requester,
  551. event_dict,
  552. token_id=requester.access_token_id,
  553. txn_id=txn_id
  554. )
  555. spam_error = self.spam_checker.check_event_for_spam(event)
  556. if spam_error:
  557. if not isinstance(spam_error, string_types):
  558. spam_error = "Spam is not permitted here"
  559. raise SynapseError(
  560. 403, spam_error, Codes.FORBIDDEN
  561. )
  562. yield self.send_nonmember_event(
  563. requester,
  564. event,
  565. context,
  566. ratelimit=ratelimit,
  567. )
  568. defer.returnValue(event)
  569. @measure_func("create_new_client_event")
  570. @defer.inlineCallbacks
  571. def create_new_client_event(self, builder, requester=None,
  572. prev_events_and_hashes=None):
  573. """Create a new event for a local client
  574. Args:
  575. builder (EventBuilder):
  576. requester (synapse.types.Requester|None):
  577. prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
  578. the forward extremities to use as the prev_events for the
  579. new event. For each event, a tuple of (event_id, hashes, depth)
  580. where *hashes* is a map from algorithm to hash.
  581. If None, they will be requested from the database.
  582. Returns:
  583. Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
  584. """
  585. if prev_events_and_hashes is not None:
  586. assert len(prev_events_and_hashes) <= 10, \
  587. "Attempting to create an event with %i prev_events" % (
  588. len(prev_events_and_hashes),
  589. )
  590. else:
  591. prev_events_and_hashes = \
  592. yield self.store.get_prev_events_for_room(builder.room_id)
  593. if prev_events_and_hashes:
  594. depth = max([d for _, _, d in prev_events_and_hashes]) + 1
  595. # we cap depth of generated events, to ensure that they are not
  596. # rejected by other servers (and so that they can be persisted in
  597. # the db)
  598. depth = min(depth, MAX_DEPTH)
  599. else:
  600. depth = 1
  601. prev_events = [
  602. (event_id, prev_hashes)
  603. for event_id, prev_hashes, _ in prev_events_and_hashes
  604. ]
  605. builder.prev_events = prev_events
  606. builder.depth = depth
  607. context = yield self.state.compute_event_context(builder)
  608. if requester:
  609. context.app_service = requester.app_service
  610. if builder.is_state():
  611. builder.prev_state = yield self.store.add_event_hashes(
  612. context.prev_state_events
  613. )
  614. yield self.auth.add_auth_events(builder, context)
  615. signing_key = self.hs.config.signing_key[0]
  616. add_hashes_and_signatures(
  617. builder, self.server_name, signing_key
  618. )
  619. event = builder.build()
  620. logger.debug(
  621. "Created event %s with state: %s",
  622. event.event_id, context.prev_state_ids,
  623. )
  624. defer.returnValue(
  625. (event, context,)
  626. )
  627. @measure_func("handle_new_client_event")
  628. @defer.inlineCallbacks
  629. def handle_new_client_event(
  630. self,
  631. requester,
  632. event,
  633. context,
  634. ratelimit=True,
  635. extra_users=[],
  636. ):
  637. """Processes a new event. This includes checking auth, persisting it,
  638. notifying users, sending to remote servers, etc.
  639. If called from a worker will hit out to the master process for final
  640. processing.
  641. Args:
  642. requester (Requester)
  643. event (FrozenEvent)
  644. context (EventContext)
  645. ratelimit (bool)
  646. extra_users (list(UserID)): Any extra users to notify about event
  647. """
  648. try:
  649. yield self.auth.check_from_context(event, context)
  650. except AuthError as err:
  651. logger.warn("Denying new event %r because %s", event, err)
  652. raise err
  653. # Ensure that we can round trip before trying to persist in db
  654. try:
  655. dump = frozendict_json_encoder.encode(event.content)
  656. json.loads(dump)
  657. except Exception:
  658. logger.exception("Failed to encode content: %r", event.content)
  659. raise
  660. yield self.action_generator.handle_push_actions_for_event(
  661. event, context
  662. )
  663. try:
  664. # If we're a worker we need to hit out to the master.
  665. if self.config.worker_app:
  666. yield send_event_to_master(
  667. self.hs.get_clock(),
  668. self.http_client,
  669. host=self.config.worker_replication_host,
  670. port=self.config.worker_replication_http_port,
  671. requester=requester,
  672. event=event,
  673. context=context,
  674. ratelimit=ratelimit,
  675. extra_users=extra_users,
  676. )
  677. return
  678. yield self.persist_and_notify_client_event(
  679. requester,
  680. event,
  681. context,
  682. ratelimit=ratelimit,
  683. extra_users=extra_users,
  684. )
  685. except: # noqa: E722, as we reraise the exception this is fine.
  686. # Ensure that we actually remove the entries in the push actions
  687. # staging area, if we calculated them.
  688. tp, value, tb = sys.exc_info()
  689. run_in_background(
  690. self.store.remove_push_actions_from_staging,
  691. event.event_id,
  692. )
  693. six.reraise(tp, value, tb)
  694. @defer.inlineCallbacks
  695. def persist_and_notify_client_event(
  696. self,
  697. requester,
  698. event,
  699. context,
  700. ratelimit=True,
  701. extra_users=[],
  702. ):
  703. """Called when we have fully built the event, have already
  704. calculated the push actions for the event, and checked auth.
  705. This should only be run on master.
  706. """
  707. assert not self.config.worker_app
  708. if ratelimit:
  709. yield self.base_handler.ratelimit(requester)
  710. yield self.base_handler.maybe_kick_guest_users(event, context)
  711. if event.type == EventTypes.CanonicalAlias:
  712. # Check the alias is acually valid (at this time at least)
  713. room_alias_str = event.content.get("alias", None)
  714. if room_alias_str:
  715. room_alias = RoomAlias.from_string(room_alias_str)
  716. directory_handler = self.hs.get_handlers().directory_handler
  717. mapping = yield directory_handler.get_association(room_alias)
  718. if mapping["room_id"] != event.room_id:
  719. raise SynapseError(
  720. 400,
  721. "Room alias %s does not point to the room" % (
  722. room_alias_str,
  723. )
  724. )
  725. federation_handler = self.hs.get_handlers().federation_handler
  726. if event.type == EventTypes.Member:
  727. if event.content["membership"] == Membership.INVITE:
  728. def is_inviter_member_event(e):
  729. return (
  730. e.type == EventTypes.Member and
  731. e.sender == event.sender
  732. )
  733. state_to_include_ids = [
  734. e_id
  735. for k, e_id in iteritems(context.current_state_ids)
  736. if k[0] in self.hs.config.room_invite_state_types
  737. or k == (EventTypes.Member, event.sender)
  738. ]
  739. state_to_include = yield self.store.get_events(state_to_include_ids)
  740. event.unsigned["invite_room_state"] = [
  741. {
  742. "type": e.type,
  743. "state_key": e.state_key,
  744. "content": e.content,
  745. "sender": e.sender,
  746. }
  747. for e in itervalues(state_to_include)
  748. ]
  749. invitee = UserID.from_string(event.state_key)
  750. if not self.hs.is_mine(invitee):
  751. # TODO: Can we add signature from remote server in a nicer
  752. # way? If we have been invited by a remote server, we need
  753. # to get them to sign the event.
  754. returned_invite = yield federation_handler.send_invite(
  755. invitee.domain,
  756. event,
  757. )
  758. event.unsigned.pop("room_state", None)
  759. # TODO: Make sure the signatures actually are correct.
  760. event.signatures.update(
  761. returned_invite.signatures
  762. )
  763. if event.type == EventTypes.Redaction:
  764. auth_events_ids = yield self.auth.compute_auth_events(
  765. event, context.prev_state_ids, for_verification=True,
  766. )
  767. auth_events = yield self.store.get_events(auth_events_ids)
  768. auth_events = {
  769. (e.type, e.state_key): e for e in auth_events.values()
  770. }
  771. if self.auth.check_redaction(event, auth_events=auth_events):
  772. original_event = yield self.store.get_event(
  773. event.redacts,
  774. check_redacted=False,
  775. get_prev_content=False,
  776. allow_rejected=False,
  777. allow_none=False
  778. )
  779. if event.user_id != original_event.user_id:
  780. raise AuthError(
  781. 403,
  782. "You don't have permission to redact events"
  783. )
  784. if event.type == EventTypes.Create and context.prev_state_ids:
  785. raise AuthError(
  786. 403,
  787. "Changing the room create event is forbidden",
  788. )
  789. (event_stream_id, max_stream_id) = yield self.store.persist_event(
  790. event, context=context
  791. )
  792. # this intentionally does not yield: we don't care about the result
  793. # and don't need to wait for it.
  794. run_in_background(
  795. self.pusher_pool.on_new_notifications,
  796. event_stream_id, max_stream_id
  797. )
  798. def _notify():
  799. try:
  800. self.notifier.on_new_room_event(
  801. event, event_stream_id, max_stream_id,
  802. extra_users=extra_users
  803. )
  804. except Exception:
  805. logger.exception("Error notifying about new room event")
  806. run_in_background(_notify)
  807. if event.type == EventTypes.Message:
  808. # We don't want to block sending messages on any presence code. This
  809. # matters as sometimes presence code can take a while.
  810. run_in_background(self._bump_active_time, requester.user)
  811. @defer.inlineCallbacks
  812. def _bump_active_time(self, user):
  813. try:
  814. presence = self.hs.get_presence_handler()
  815. yield presence.bump_presence_active_time(user)
  816. except Exception:
  817. logger.exception("Error bumping presence active time")