send_event.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from synapse.api.errors import (
  17. SynapseError, MatrixCodeMessageException, CodeMessageException,
  18. )
  19. from synapse.events import FrozenEvent
  20. from synapse.events.snapshot import EventContext
  21. from synapse.http.servlet import RestServlet, parse_json_object_from_request
  22. from synapse.util.async import sleep
  23. from synapse.util.caches.response_cache import ResponseCache
  24. from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
  25. from synapse.util.metrics import Measure
  26. from synapse.types import Requester, UserID
  27. import logging
  28. import re
  29. logger = logging.getLogger(__name__)
  30. @defer.inlineCallbacks
  31. def send_event_to_master(client, host, port, requester, event, context,
  32. ratelimit, extra_users):
  33. """Send event to be handled on the master
  34. Args:
  35. client (SimpleHttpClient)
  36. host (str): host of master
  37. port (int): port on master listening for HTTP replication
  38. requester (Requester)
  39. event (FrozenEvent)
  40. context (EventContext)
  41. ratelimit (bool)
  42. extra_users (list(UserID)): Any extra users to notify about event
  43. """
  44. uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
  45. host, port, event.event_id,
  46. )
  47. payload = {
  48. "event": event.get_pdu_json(),
  49. "internal_metadata": event.internal_metadata.get_dict(),
  50. "rejected_reason": event.rejected_reason,
  51. "context": context.serialize(event),
  52. "requester": requester.serialize(),
  53. "ratelimit": ratelimit,
  54. "extra_users": [u.to_string() for u in extra_users],
  55. }
  56. try:
  57. # We keep retrying the same request for timeouts. This is so that we
  58. # have a good idea that the request has either succeeded or failed on
  59. # the master, and so whether we should clean up or not.
  60. while True:
  61. try:
  62. result = yield client.put_json(uri, payload)
  63. break
  64. except CodeMessageException as e:
  65. if e.code != 504:
  66. raise
  67. logger.warn("send_event request timed out")
  68. # If we timed out we probably don't need to worry about backing
  69. # off too much, but lets just wait a little anyway.
  70. yield sleep(1)
  71. except MatrixCodeMessageException as e:
  72. # We convert to SynapseError as we know that it was a SynapseError
  73. # on the master process that we should send to the client. (And
  74. # importantly, not stack traces everywhere)
  75. raise SynapseError(e.code, e.msg, e.errcode)
  76. defer.returnValue(result)
  77. class ReplicationSendEventRestServlet(RestServlet):
  78. """Handles events newly created on workers, including persisting and
  79. notifying.
  80. The API looks like:
  81. POST /_synapse/replication/send_event/:event_id
  82. {
  83. "event": { .. serialized event .. },
  84. "internal_metadata": { .. serialized internal_metadata .. },
  85. "rejected_reason": .., // The event.rejected_reason field
  86. "context": { .. serialized event context .. },
  87. "requester": { .. serialized requester .. },
  88. "ratelimit": true,
  89. "extra_users": [],
  90. }
  91. """
  92. PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
  93. def __init__(self, hs):
  94. super(ReplicationSendEventRestServlet, self).__init__()
  95. self.event_creation_handler = hs.get_event_creation_handler()
  96. self.store = hs.get_datastore()
  97. self.clock = hs.get_clock()
  98. # The responses are tiny, so we may as well cache them for a while
  99. self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
  100. def on_PUT(self, request, event_id):
  101. result = self.response_cache.get(event_id)
  102. if not result:
  103. result = self.response_cache.set(
  104. event_id,
  105. self._handle_request(request)
  106. )
  107. else:
  108. logger.warn("Returning cached response")
  109. return make_deferred_yieldable(result)
  110. @preserve_fn
  111. @defer.inlineCallbacks
  112. def _handle_request(self, request):
  113. with Measure(self.clock, "repl_send_event_parse"):
  114. content = parse_json_object_from_request(request)
  115. event_dict = content["event"]
  116. internal_metadata = content["internal_metadata"]
  117. rejected_reason = content["rejected_reason"]
  118. event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
  119. requester = Requester.deserialize(self.store, content["requester"])
  120. context = yield EventContext.deserialize(self.store, content["context"])
  121. ratelimit = content["ratelimit"]
  122. extra_users = [UserID.from_string(u) for u in content["extra_users"]]
  123. if requester.user:
  124. request.authenticated_entity = requester.user.to_string()
  125. logger.info(
  126. "Got event to send with ID: %s into room: %s",
  127. event.event_id, event.room_id,
  128. )
  129. yield self.event_creation_handler.persist_and_notify_client_event(
  130. requester, event, context,
  131. ratelimit=ratelimit,
  132. extra_users=extra_users,
  133. )
  134. defer.returnValue((200, {}))
  135. def register_servlets(hs, http_server):
  136. ReplicationSendEventRestServlet(hs).register(http_server)