message.py 35 KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 - 2016 OpenMarket Ltd
  3. # Copyright 2017 - 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import simplejson
  18. import sys
  19. from canonicaljson import encode_canonical_json
  20. import six
  21. from six import string_types, itervalues, iteritems
  22. from twisted.internet import defer, reactor
  23. from twisted.internet.defer import succeed
  24. from twisted.python.failure import Failure
  25. from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
  26. from synapse.api.errors import (
  27. AuthError, Codes, SynapseError,
  28. ConsentNotGivenError,
  29. )
  30. from synapse.api.urls import ConsentURIBuilder
  31. from synapse.crypto.event_signing import add_hashes_and_signatures
  32. from synapse.events.utils import serialize_event
  33. from synapse.events.validator import EventValidator
  34. from synapse.types import (
  35. UserID, RoomAlias, RoomStreamToken,
  36. )
  37. from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
  38. from synapse.util.logcontext import run_in_background
  39. from synapse.util.metrics import measure_func
  40. from synapse.util.frozenutils import frozendict_json_encoder
  41. from synapse.util.stringutils import random_string
  42. from synapse.visibility import filter_events_for_client
  43. from synapse.replication.http.send_event import send_event_to_master
  44. from ._base import BaseHandler
  45. logger = logging.getLogger(__name__)
  46. class PurgeStatus(object):
  47. """Object tracking the status of a purge request
  48. This class contains information on the progress of a purge request, for
  49. return by get_purge_status.
  50. Attributes:
  51. status (int): Tracks whether this request has completed. One of
  52. STATUS_{ACTIVE,COMPLETE,FAILED}
  53. """
  54. STATUS_ACTIVE = 0
  55. STATUS_COMPLETE = 1
  56. STATUS_FAILED = 2
  57. STATUS_TEXT = {
  58. STATUS_ACTIVE: "active",
  59. STATUS_COMPLETE: "complete",
  60. STATUS_FAILED: "failed",
  61. }
  62. def __init__(self):
  63. self.status = PurgeStatus.STATUS_ACTIVE
  64. def asdict(self):
  65. return {
  66. "status": PurgeStatus.STATUS_TEXT[self.status]
  67. }
  68. class MessageHandler(BaseHandler):
  69. def __init__(self, hs):
  70. super(MessageHandler, self).__init__(hs)
  71. self.hs = hs
  72. self.state = hs.get_state_handler()
  73. self.clock = hs.get_clock()
  74. self.pagination_lock = ReadWriteLock()
  75. self._purges_in_progress_by_room = set()
  76. # map from purge id to PurgeStatus
  77. self._purges_by_id = {}
  78. def start_purge_history(self, room_id, token,
  79. delete_local_events=False):
  80. """Start off a history purge on a room.
  81. Args:
  82. room_id (str): The room to purge from
  83. token (str): topological token to delete events before
  84. delete_local_events (bool): True to delete local events as well as
  85. remote ones
  86. Returns:
  87. str: unique ID for this purge transaction.
  88. """
  89. if room_id in self._purges_in_progress_by_room:
  90. raise SynapseError(
  91. 400,
  92. "History purge already in progress for %s" % (room_id, ),
  93. )
  94. purge_id = random_string(16)
  95. # we log the purge_id here so that it can be tied back to the
  96. # request id in the log lines.
  97. logger.info("[purge] starting purge_id %s", purge_id)
  98. self._purges_by_id[purge_id] = PurgeStatus()
  99. run_in_background(
  100. self._purge_history,
  101. purge_id, room_id, token, delete_local_events,
  102. )
  103. return purge_id
  104. @defer.inlineCallbacks
  105. def _purge_history(self, purge_id, room_id, token,
  106. delete_local_events):
  107. """Carry out a history purge on a room.
  108. Args:
  109. purge_id (str): The id for this purge
  110. room_id (str): The room to purge from
  111. token (str): topological token to delete events before
  112. delete_local_events (bool): True to delete local events as well as
  113. remote ones
  114. Returns:
  115. Deferred
  116. """
  117. self._purges_in_progress_by_room.add(room_id)
  118. try:
  119. with (yield self.pagination_lock.write(room_id)):
  120. yield self.store.purge_history(
  121. room_id, token, delete_local_events,
  122. )
  123. logger.info("[purge] complete")
  124. self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
  125. except Exception:
  126. logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
  127. self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
  128. finally:
  129. self._purges_in_progress_by_room.discard(room_id)
  130. # remove the purge from the list 24 hours after it completes
  131. def clear_purge():
  132. del self._purges_by_id[purge_id]
  133. reactor.callLater(24 * 3600, clear_purge)
  134. def get_purge_status(self, purge_id):
  135. """Get the current status of an active purge
  136. Args:
  137. purge_id (str): purge_id returned by start_purge_history
  138. Returns:
  139. PurgeStatus|None
  140. """
  141. return self._purges_by_id.get(purge_id)
  142. @defer.inlineCallbacks
  143. def get_messages(self, requester, room_id=None, pagin_config=None,
  144. as_client_event=True, event_filter=None):
  145. """Get messages in a room.
  146. Args:
  147. requester (Requester): The user requesting messages.
  148. room_id (str): The room they want messages from.
  149. pagin_config (synapse.api.streams.PaginationConfig): The pagination
  150. config rules to apply, if any.
  151. as_client_event (bool): True to get events in client-server format.
  152. event_filter (Filter): Filter to apply to results or None
  153. Returns:
  154. dict: Pagination API results
  155. """
  156. user_id = requester.user.to_string()
  157. if pagin_config.from_token:
  158. room_token = pagin_config.from_token.room_key
  159. else:
  160. pagin_config.from_token = (
  161. yield self.hs.get_event_sources().get_current_token_for_room(
  162. room_id=room_id
  163. )
  164. )
  165. room_token = pagin_config.from_token.room_key
  166. room_token = RoomStreamToken.parse(room_token)
  167. pagin_config.from_token = pagin_config.from_token.copy_and_replace(
  168. "room_key", str(room_token)
  169. )
  170. source_config = pagin_config.get_source_config("room")
  171. with (yield self.pagination_lock.read(room_id)):
  172. membership, member_event_id = yield self._check_in_room_or_world_readable(
  173. room_id, user_id
  174. )
  175. if source_config.direction == 'b':
  176. # if we're going backwards, we might need to backfill. This
  177. # requires that we have a topo token.
  178. if room_token.topological:
  179. max_topo = room_token.topological
  180. else:
  181. max_topo = yield self.store.get_max_topological_token(
  182. room_id, room_token.stream
  183. )
  184. if membership == Membership.LEAVE:
  185. # If they have left the room then clamp the token to be before
  186. # they left the room, to save the effort of loading from the
  187. # database.
  188. leave_token = yield self.store.get_topological_token_for_event(
  189. member_event_id
  190. )
  191. leave_token = RoomStreamToken.parse(leave_token)
  192. if leave_token.topological < max_topo:
  193. source_config.from_key = str(leave_token)
  194. yield self.hs.get_handlers().federation_handler.maybe_backfill(
  195. room_id, max_topo
  196. )
  197. events, next_key = yield self.store.paginate_room_events(
  198. room_id=room_id,
  199. from_key=source_config.from_key,
  200. to_key=source_config.to_key,
  201. direction=source_config.direction,
  202. limit=source_config.limit,
  203. event_filter=event_filter,
  204. )
  205. next_token = pagin_config.from_token.copy_and_replace(
  206. "room_key", next_key
  207. )
  208. if not events:
  209. defer.returnValue({
  210. "chunk": [],
  211. "start": pagin_config.from_token.to_string(),
  212. "end": next_token.to_string(),
  213. })
  214. if event_filter:
  215. events = event_filter.filter(events)
  216. events = yield filter_events_for_client(
  217. self.store,
  218. user_id,
  219. events,
  220. is_peeking=(member_event_id is None),
  221. )
  222. time_now = self.clock.time_msec()
  223. chunk = {
  224. "chunk": [
  225. serialize_event(e, time_now, as_client_event)
  226. for e in events
  227. ],
  228. "start": pagin_config.from_token.to_string(),
  229. "end": next_token.to_string(),
  230. }
  231. defer.returnValue(chunk)
  232. @defer.inlineCallbacks
  233. def get_room_data(self, user_id=None, room_id=None,
  234. event_type=None, state_key="", is_guest=False):
  235. """ Get data from a room.
  236. Args:
  237. event : The room path event
  238. Returns:
  239. The path data content.
  240. Raises:
  241. SynapseError if something went wrong.
  242. """
  243. membership, membership_event_id = yield self._check_in_room_or_world_readable(
  244. room_id, user_id
  245. )
  246. if membership == Membership.JOIN:
  247. data = yield self.state_handler.get_current_state(
  248. room_id, event_type, state_key
  249. )
  250. elif membership == Membership.LEAVE:
  251. key = (event_type, state_key)
  252. room_state = yield self.store.get_state_for_events(
  253. [membership_event_id], [key]
  254. )
  255. data = room_state[membership_event_id].get(key)
  256. defer.returnValue(data)
  257. @defer.inlineCallbacks
  258. def _check_in_room_or_world_readable(self, room_id, user_id):
  259. try:
  260. # check_user_was_in_room will return the most recent membership
  261. # event for the user if:
  262. # * The user is a non-guest user, and was ever in the room
  263. # * The user is a guest user, and has joined the room
  264. # else it will throw.
  265. member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
  266. defer.returnValue((member_event.membership, member_event.event_id))
  267. return
  268. except AuthError:
  269. visibility = yield self.state_handler.get_current_state(
  270. room_id, EventTypes.RoomHistoryVisibility, ""
  271. )
  272. if (
  273. visibility and
  274. visibility.content["history_visibility"] == "world_readable"
  275. ):
  276. defer.returnValue((Membership.JOIN, None))
  277. return
  278. raise AuthError(
  279. 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
  280. )
  281. @defer.inlineCallbacks
  282. def get_state_events(self, user_id, room_id, is_guest=False):
  283. """Retrieve all state events for a given room. If the user is
  284. joined to the room then return the current state. If the user has
  285. left the room return the state events from when they left.
  286. Args:
  287. user_id(str): The user requesting state events.
  288. room_id(str): The room ID to get all state events from.
  289. Returns:
  290. A list of dicts representing state events. [{}, {}, {}]
  291. """
  292. membership, membership_event_id = yield self._check_in_room_or_world_readable(
  293. room_id, user_id
  294. )
  295. if membership == Membership.JOIN:
  296. room_state = yield self.state_handler.get_current_state(room_id)
  297. elif membership == Membership.LEAVE:
  298. room_state = yield self.store.get_state_for_events(
  299. [membership_event_id], None
  300. )
  301. room_state = room_state[membership_event_id]
  302. now = self.clock.time_msec()
  303. defer.returnValue(
  304. [serialize_event(c, now) for c in room_state.values()]
  305. )
  306. @defer.inlineCallbacks
  307. def get_joined_members(self, requester, room_id):
  308. """Get all the joined members in the room and their profile information.
  309. If the user has left the room return the state events from when they left.
  310. Args:
  311. requester(Requester): The user requesting state events.
  312. room_id(str): The room ID to get all state events from.
  313. Returns:
  314. A dict of user_id to profile info
  315. """
  316. user_id = requester.user.to_string()
  317. if not requester.app_service:
  318. # We check AS auth after fetching the room membership, as it
  319. # requires us to pull out all joined members anyway.
  320. membership, _ = yield self._check_in_room_or_world_readable(
  321. room_id, user_id
  322. )
  323. if membership != Membership.JOIN:
  324. raise NotImplementedError(
  325. "Getting joined members after leaving is not implemented"
  326. )
  327. users_with_profile = yield self.state.get_current_user_in_room(room_id)
  328. # If this is an AS, double check that they are allowed to see the members.
  329. # This can either be because the AS user is in the room or becuase there
  330. # is a user in the room that the AS is "interested in"
  331. if requester.app_service and user_id not in users_with_profile:
  332. for uid in users_with_profile:
  333. if requester.app_service.is_interested_in_user(uid):
  334. break
  335. else:
  336. # Loop fell through, AS has no interested users in room
  337. raise AuthError(403, "Appservice not in room")
  338. defer.returnValue({
  339. user_id: {
  340. "avatar_url": profile.avatar_url,
  341. "display_name": profile.display_name,
  342. }
  343. for user_id, profile in iteritems(users_with_profile)
  344. })
  345. class EventCreationHandler(object):
  346. def __init__(self, hs):
  347. self.hs = hs
  348. self.auth = hs.get_auth()
  349. self.store = hs.get_datastore()
  350. self.state = hs.get_state_handler()
  351. self.clock = hs.get_clock()
  352. self.validator = EventValidator()
  353. self.profile_handler = hs.get_profile_handler()
  354. self.event_builder_factory = hs.get_event_builder_factory()
  355. self.server_name = hs.hostname
  356. self.ratelimiter = hs.get_ratelimiter()
  357. self.notifier = hs.get_notifier()
  358. self.config = hs.config
  359. self.http_client = hs.get_simple_http_client()
  360. # This is only used to get at ratelimit function, and maybe_kick_guest_users
  361. self.base_handler = BaseHandler(hs)
  362. self.pusher_pool = hs.get_pusherpool()
  363. # We arbitrarily limit concurrent event creation for a room to 5.
  364. # This is to stop us from diverging history *too* much.
  365. self.limiter = Limiter(max_count=5)
  366. self.action_generator = hs.get_action_generator()
  367. self.spam_checker = hs.get_spam_checker()
  368. if self.config.block_events_without_consent_error is not None:
  369. self._consent_uri_builder = ConsentURIBuilder(self.config)
  370. @defer.inlineCallbacks
  371. def create_event(self, requester, event_dict, token_id=None, txn_id=None,
  372. prev_events_and_hashes=None):
  373. """
  374. Given a dict from a client, create a new event.
  375. Creates an FrozenEvent object, filling out auth_events, prev_events,
  376. etc.
  377. Adds display names to Join membership events.
  378. Args:
  379. requester
  380. event_dict (dict): An entire event
  381. token_id (str)
  382. txn_id (str)
  383. prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
  384. the forward extremities to use as the prev_events for the
  385. new event. For each event, a tuple of (event_id, hashes, depth)
  386. where *hashes* is a map from algorithm to hash.
  387. If None, they will be requested from the database.
  388. Returns:
  389. Tuple of created event (FrozenEvent), Context
  390. """
  391. builder = self.event_builder_factory.new(event_dict)
  392. self.validator.validate_new(builder)
  393. if builder.type == EventTypes.Member:
  394. membership = builder.content.get("membership", None)
  395. target = UserID.from_string(builder.state_key)
  396. if membership in {Membership.JOIN, Membership.INVITE}:
  397. # If event doesn't include a display name, add one.
  398. profile = self.profile_handler
  399. content = builder.content
  400. try:
  401. if "displayname" not in content:
  402. content["displayname"] = yield profile.get_displayname(target)
  403. if "avatar_url" not in content:
  404. content["avatar_url"] = yield profile.get_avatar_url(target)
  405. except Exception as e:
  406. logger.info(
  407. "Failed to get profile information for %r: %s",
  408. target, e
  409. )
  410. is_exempt = yield self._is_exempt_from_privacy_policy(builder)
  411. if not is_exempt:
  412. yield self.assert_accepted_privacy_policy(requester)
  413. if token_id is not None:
  414. builder.internal_metadata.token_id = token_id
  415. if txn_id is not None:
  416. builder.internal_metadata.txn_id = txn_id
  417. event, context = yield self.create_new_client_event(
  418. builder=builder,
  419. requester=requester,
  420. prev_events_and_hashes=prev_events_and_hashes,
  421. )
  422. defer.returnValue((event, context))
  423. def _is_exempt_from_privacy_policy(self, builder):
  424. """"Determine if an event to be sent is exempt from having to consent
  425. to the privacy policy
  426. Args:
  427. builder (synapse.events.builder.EventBuilder): event being created
  428. Returns:
  429. Deferred[bool]: true if the event can be sent without the user
  430. consenting
  431. """
  432. # the only thing the user can do is join the server notices room.
  433. if builder.type == EventTypes.Member:
  434. membership = builder.content.get("membership", None)
  435. if membership == Membership.JOIN:
  436. return self._is_server_notices_room(builder.room_id)
  437. return succeed(False)
  438. @defer.inlineCallbacks
  439. def _is_server_notices_room(self, room_id):
  440. if self.config.server_notices_mxid is None:
  441. defer.returnValue(False)
  442. user_ids = yield self.store.get_users_in_room(room_id)
  443. defer.returnValue(self.config.server_notices_mxid in user_ids)
  444. @defer.inlineCallbacks
  445. def assert_accepted_privacy_policy(self, requester):
  446. """Check if a user has accepted the privacy policy
  447. Called when the given user is about to do something that requires
  448. privacy consent. We see if the user is exempt and otherwise check that
  449. they have given consent. If they have not, a ConsentNotGiven error is
  450. raised.
  451. Args:
  452. requester (synapse.types.Requester):
  453. The user making the request
  454. Returns:
  455. Deferred[None]: returns normally if the user has consented or is
  456. exempt
  457. Raises:
  458. ConsentNotGivenError: if the user has not given consent yet
  459. """
  460. if self.config.block_events_without_consent_error is None:
  461. return
  462. # exempt AS users from needing consent
  463. if requester.app_service is not None:
  464. return
  465. user_id = requester.user.to_string()
  466. # exempt the system notices user
  467. if (
  468. self.config.server_notices_mxid is not None and
  469. user_id == self.config.server_notices_mxid
  470. ):
  471. return
  472. u = yield self.store.get_user_by_id(user_id)
  473. assert u is not None
  474. if u["appservice_id"] is not None:
  475. # users registered by an appservice are exempt
  476. return
  477. if u["consent_version"] == self.config.user_consent_version:
  478. return
  479. consent_uri = self._consent_uri_builder.build_user_consent_uri(
  480. requester.user.localpart,
  481. )
  482. msg = self.config.block_events_without_consent_error % {
  483. 'consent_uri': consent_uri,
  484. }
  485. raise ConsentNotGivenError(
  486. msg=msg,
  487. consent_uri=consent_uri,
  488. )
  489. @defer.inlineCallbacks
  490. def send_nonmember_event(self, requester, event, context, ratelimit=True):
  491. """
  492. Persists and notifies local clients and federation of an event.
  493. Args:
  494. event (FrozenEvent) the event to send.
  495. context (Context) the context of the event.
  496. ratelimit (bool): Whether to rate limit this send.
  497. is_guest (bool): Whether the sender is a guest.
  498. """
  499. if event.type == EventTypes.Member:
  500. raise SynapseError(
  501. 500,
  502. "Tried to send member event through non-member codepath"
  503. )
  504. user = UserID.from_string(event.sender)
  505. assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
  506. if event.is_state():
  507. prev_state = yield self.deduplicate_state_event(event, context)
  508. if prev_state is not None:
  509. defer.returnValue(prev_state)
  510. yield self.handle_new_client_event(
  511. requester=requester,
  512. event=event,
  513. context=context,
  514. ratelimit=ratelimit,
  515. )
  516. @defer.inlineCallbacks
  517. def deduplicate_state_event(self, event, context):
  518. """
  519. Checks whether event is in the latest resolved state in context.
  520. If so, returns the version of the event in context.
  521. Otherwise, returns None.
  522. """
  523. prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
  524. prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
  525. if not prev_event:
  526. return
  527. if prev_event and event.user_id == prev_event.user_id:
  528. prev_content = encode_canonical_json(prev_event.content)
  529. next_content = encode_canonical_json(event.content)
  530. if prev_content == next_content:
  531. defer.returnValue(prev_event)
  532. return
  533. @defer.inlineCallbacks
  534. def create_and_send_nonmember_event(
  535. self,
  536. requester,
  537. event_dict,
  538. ratelimit=True,
  539. txn_id=None
  540. ):
  541. """
  542. Creates an event, then sends it.
  543. See self.create_event and self.send_nonmember_event.
  544. """
  545. # We limit the number of concurrent event sends in a room so that we
  546. # don't fork the DAG too much. If we don't limit then we can end up in
  547. # a situation where event persistence can't keep up, causing
  548. # extremities to pile up, which in turn leads to state resolution
  549. # taking longer.
  550. with (yield self.limiter.queue(event_dict["room_id"])):
  551. event, context = yield self.create_event(
  552. requester,
  553. event_dict,
  554. token_id=requester.access_token_id,
  555. txn_id=txn_id
  556. )
  557. spam_error = self.spam_checker.check_event_for_spam(event)
  558. if spam_error:
  559. if not isinstance(spam_error, string_types):
  560. spam_error = "Spam is not permitted here"
  561. raise SynapseError(
  562. 403, spam_error, Codes.FORBIDDEN
  563. )
  564. yield self.send_nonmember_event(
  565. requester,
  566. event,
  567. context,
  568. ratelimit=ratelimit,
  569. )
  570. defer.returnValue(event)
  571. @measure_func("create_new_client_event")
  572. @defer.inlineCallbacks
  573. def create_new_client_event(self, builder, requester=None,
  574. prev_events_and_hashes=None):
  575. """Create a new event for a local client
  576. Args:
  577. builder (EventBuilder):
  578. requester (synapse.types.Requester|None):
  579. prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
  580. the forward extremities to use as the prev_events for the
  581. new event. For each event, a tuple of (event_id, hashes, depth)
  582. where *hashes* is a map from algorithm to hash.
  583. If None, they will be requested from the database.
  584. Returns:
  585. Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
  586. """
  587. if prev_events_and_hashes is not None:
  588. assert len(prev_events_and_hashes) <= 10, \
  589. "Attempting to create an event with %i prev_events" % (
  590. len(prev_events_and_hashes),
  591. )
  592. else:
  593. prev_events_and_hashes = \
  594. yield self.store.get_prev_events_for_room(builder.room_id)
  595. if prev_events_and_hashes:
  596. depth = max([d for _, _, d in prev_events_and_hashes]) + 1
  597. # we cap depth of generated events, to ensure that they are not
  598. # rejected by other servers (and so that they can be persisted in
  599. # the db)
  600. depth = min(depth, MAX_DEPTH)
  601. else:
  602. depth = 1
  603. prev_events = [
  604. (event_id, prev_hashes)
  605. for event_id, prev_hashes, _ in prev_events_and_hashes
  606. ]
  607. builder.prev_events = prev_events
  608. builder.depth = depth
  609. context = yield self.state.compute_event_context(builder)
  610. if requester:
  611. context.app_service = requester.app_service
  612. if builder.is_state():
  613. builder.prev_state = yield self.store.add_event_hashes(
  614. context.prev_state_events
  615. )
  616. yield self.auth.add_auth_events(builder, context)
  617. signing_key = self.hs.config.signing_key[0]
  618. add_hashes_and_signatures(
  619. builder, self.server_name, signing_key
  620. )
  621. event = builder.build()
  622. logger.debug(
  623. "Created event %s with state: %s",
  624. event.event_id, context.prev_state_ids,
  625. )
  626. defer.returnValue(
  627. (event, context,)
  628. )
  629. @measure_func("handle_new_client_event")
  630. @defer.inlineCallbacks
  631. def handle_new_client_event(
  632. self,
  633. requester,
  634. event,
  635. context,
  636. ratelimit=True,
  637. extra_users=[],
  638. ):
  639. """Processes a new event. This includes checking auth, persisting it,
  640. notifying users, sending to remote servers, etc.
  641. If called from a worker will hit out to the master process for final
  642. processing.
  643. Args:
  644. requester (Requester)
  645. event (FrozenEvent)
  646. context (EventContext)
  647. ratelimit (bool)
  648. extra_users (list(UserID)): Any extra users to notify about event
  649. """
  650. try:
  651. yield self.auth.check_from_context(event, context)
  652. except AuthError as err:
  653. logger.warn("Denying new event %r because %s", event, err)
  654. raise err
  655. # Ensure that we can round trip before trying to persist in db
  656. try:
  657. dump = frozendict_json_encoder.encode(event.content)
  658. simplejson.loads(dump)
  659. except Exception:
  660. logger.exception("Failed to encode content: %r", event.content)
  661. raise
  662. yield self.action_generator.handle_push_actions_for_event(
  663. event, context
  664. )
  665. try:
  666. # If we're a worker we need to hit out to the master.
  667. if self.config.worker_app:
  668. yield send_event_to_master(
  669. self.http_client,
  670. host=self.config.worker_replication_host,
  671. port=self.config.worker_replication_http_port,
  672. requester=requester,
  673. event=event,
  674. context=context,
  675. ratelimit=ratelimit,
  676. extra_users=extra_users,
  677. )
  678. return
  679. yield self.persist_and_notify_client_event(
  680. requester,
  681. event,
  682. context,
  683. ratelimit=ratelimit,
  684. extra_users=extra_users,
  685. )
  686. except: # noqa: E722, as we reraise the exception this is fine.
  687. # Ensure that we actually remove the entries in the push actions
  688. # staging area, if we calculated them.
  689. tp, value, tb = sys.exc_info()
  690. run_in_background(
  691. self.store.remove_push_actions_from_staging,
  692. event.event_id,
  693. )
  694. six.reraise(tp, value, tb)
  695. @defer.inlineCallbacks
  696. def persist_and_notify_client_event(
  697. self,
  698. requester,
  699. event,
  700. context,
  701. ratelimit=True,
  702. extra_users=[],
  703. ):
  704. """Called when we have fully built the event, have already
  705. calculated the push actions for the event, and checked auth.
  706. This should only be run on master.
  707. """
  708. assert not self.config.worker_app
  709. if ratelimit:
  710. yield self.base_handler.ratelimit(requester)
  711. yield self.base_handler.maybe_kick_guest_users(event, context)
  712. if event.type == EventTypes.CanonicalAlias:
  713. # Check the alias is acually valid (at this time at least)
  714. room_alias_str = event.content.get("alias", None)
  715. if room_alias_str:
  716. room_alias = RoomAlias.from_string(room_alias_str)
  717. directory_handler = self.hs.get_handlers().directory_handler
  718. mapping = yield directory_handler.get_association(room_alias)
  719. if mapping["room_id"] != event.room_id:
  720. raise SynapseError(
  721. 400,
  722. "Room alias %s does not point to the room" % (
  723. room_alias_str,
  724. )
  725. )
  726. federation_handler = self.hs.get_handlers().federation_handler
  727. if event.type == EventTypes.Member:
  728. if event.content["membership"] == Membership.INVITE:
  729. def is_inviter_member_event(e):
  730. return (
  731. e.type == EventTypes.Member and
  732. e.sender == event.sender
  733. )
  734. state_to_include_ids = [
  735. e_id
  736. for k, e_id in iteritems(context.current_state_ids)
  737. if k[0] in self.hs.config.room_invite_state_types
  738. or k == (EventTypes.Member, event.sender)
  739. ]
  740. state_to_include = yield self.store.get_events(state_to_include_ids)
  741. event.unsigned["invite_room_state"] = [
  742. {
  743. "type": e.type,
  744. "state_key": e.state_key,
  745. "content": e.content,
  746. "sender": e.sender,
  747. }
  748. for e in itervalues(state_to_include)
  749. ]
  750. invitee = UserID.from_string(event.state_key)
  751. if not self.hs.is_mine(invitee):
  752. # TODO: Can we add signature from remote server in a nicer
  753. # way? If we have been invited by a remote server, we need
  754. # to get them to sign the event.
  755. returned_invite = yield federation_handler.send_invite(
  756. invitee.domain,
  757. event,
  758. )
  759. event.unsigned.pop("room_state", None)
  760. # TODO: Make sure the signatures actually are correct.
  761. event.signatures.update(
  762. returned_invite.signatures
  763. )
  764. if event.type == EventTypes.Redaction:
  765. auth_events_ids = yield self.auth.compute_auth_events(
  766. event, context.prev_state_ids, for_verification=True,
  767. )
  768. auth_events = yield self.store.get_events(auth_events_ids)
  769. auth_events = {
  770. (e.type, e.state_key): e for e in auth_events.values()
  771. }
  772. if self.auth.check_redaction(event, auth_events=auth_events):
  773. original_event = yield self.store.get_event(
  774. event.redacts,
  775. check_redacted=False,
  776. get_prev_content=False,
  777. allow_rejected=False,
  778. allow_none=False
  779. )
  780. if event.user_id != original_event.user_id:
  781. raise AuthError(
  782. 403,
  783. "You don't have permission to redact events"
  784. )
  785. if event.type == EventTypes.Create and context.prev_state_ids:
  786. raise AuthError(
  787. 403,
  788. "Changing the room create event is forbidden",
  789. )
  790. (event_stream_id, max_stream_id) = yield self.store.persist_event(
  791. event, context=context
  792. )
  793. # this intentionally does not yield: we don't care about the result
  794. # and don't need to wait for it.
  795. run_in_background(
  796. self.pusher_pool.on_new_notifications,
  797. event_stream_id, max_stream_id
  798. )
  799. @defer.inlineCallbacks
  800. def _notify():
  801. yield run_on_reactor()
  802. try:
  803. self.notifier.on_new_room_event(
  804. event, event_stream_id, max_stream_id,
  805. extra_users=extra_users
  806. )
  807. except Exception:
  808. logger.exception("Error notifying about new room event")
  809. run_in_background(_notify)
  810. if event.type == EventTypes.Message:
  811. # We don't want to block sending messages on any presence code. This
  812. # matters as sometimes presence code can take a while.
  813. run_in_background(self._bump_active_time, requester.user)
  814. @defer.inlineCallbacks
  815. def _bump_active_time(self, user):
  816. try:
  817. presence = self.hs.get_presence_handler()
  818. yield presence.bump_presence_active_time(user)
  819. except Exception:
  820. logger.exception("Error bumping presence active time")