federation.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.events import event_type_from_format_version
  18. from synapse.events.snapshot import EventContext
  19. from synapse.http.servlet import parse_json_object_from_request
  20. from synapse.replication.http._base import ReplicationEndpoint
  21. from synapse.util.metrics import Measure
  22. logger = logging.getLogger(__name__)
  23. class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
  24. """Handles events newly received from federation, including persisting and
  25. notifying.
  26. The API looks like:
  27. POST /_synapse/replication/fed_send_events/:txn_id
  28. {
  29. "events": [{
  30. "event": { .. serialized event .. },
  31. "internal_metadata": { .. serialized internal_metadata .. },
  32. "rejected_reason": .., // The event.rejected_reason field
  33. "context": { .. serialized event context .. },
  34. }],
  35. "backfilled": false
  36. """
  37. NAME = "fed_send_events"
  38. PATH_ARGS = ()
  39. def __init__(self, hs):
  40. super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
  41. self.store = hs.get_datastore()
  42. self.clock = hs.get_clock()
  43. self.federation_handler = hs.get_handlers().federation_handler
  44. @staticmethod
  45. @defer.inlineCallbacks
  46. def _serialize_payload(store, event_and_contexts, backfilled):
  47. """
  48. Args:
  49. store
  50. event_and_contexts (list[tuple[FrozenEvent, EventContext]])
  51. backfilled (bool): Whether or not the events are the result of
  52. backfilling
  53. """
  54. event_payloads = []
  55. for event, context in event_and_contexts:
  56. serialized_context = yield context.serialize(event, store)
  57. event_payloads.append(
  58. {
  59. "event": event.get_pdu_json(),
  60. "event_format_version": event.format_version,
  61. "internal_metadata": event.internal_metadata.get_dict(),
  62. "rejected_reason": event.rejected_reason,
  63. "context": serialized_context,
  64. }
  65. )
  66. payload = {"events": event_payloads, "backfilled": backfilled}
  67. return payload
  68. @defer.inlineCallbacks
  69. def _handle_request(self, request):
  70. with Measure(self.clock, "repl_fed_send_events_parse"):
  71. content = parse_json_object_from_request(request)
  72. backfilled = content["backfilled"]
  73. event_payloads = content["events"]
  74. event_and_contexts = []
  75. for event_payload in event_payloads:
  76. event_dict = event_payload["event"]
  77. format_ver = event_payload["event_format_version"]
  78. internal_metadata = event_payload["internal_metadata"]
  79. rejected_reason = event_payload["rejected_reason"]
  80. EventType = event_type_from_format_version(format_ver)
  81. event = EventType(event_dict, internal_metadata, rejected_reason)
  82. context = yield EventContext.deserialize(
  83. self.store, event_payload["context"]
  84. )
  85. event_and_contexts.append((event, context))
  86. logger.info("Got %d events from federation", len(event_and_contexts))
  87. yield self.federation_handler.persist_events_and_notify(
  88. event_and_contexts, backfilled
  89. )
  90. return (200, {})
  91. class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
  92. """Handles EDUs newly received from federation, including persisting and
  93. notifying.
  94. Request format:
  95. POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
  96. {
  97. "origin": ...,
  98. "content: { ... }
  99. }
  100. """
  101. NAME = "fed_send_edu"
  102. PATH_ARGS = ("edu_type",)
  103. def __init__(self, hs):
  104. super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
  105. self.store = hs.get_datastore()
  106. self.clock = hs.get_clock()
  107. self.registry = hs.get_federation_registry()
  108. @staticmethod
  109. def _serialize_payload(edu_type, origin, content):
  110. return {"origin": origin, "content": content}
  111. @defer.inlineCallbacks
  112. def _handle_request(self, request, edu_type):
  113. with Measure(self.clock, "repl_fed_send_edu_parse"):
  114. content = parse_json_object_from_request(request)
  115. origin = content["origin"]
  116. edu_content = content["content"]
  117. logger.info("Got %r edu from %s", edu_type, origin)
  118. result = yield self.registry.on_edu(edu_type, origin, edu_content)
  119. return (200, result)
  120. class ReplicationGetQueryRestServlet(ReplicationEndpoint):
  121. """Handle responding to queries from federation.
  122. Request format:
  123. POST /_synapse/replication/fed_query/:query_type
  124. {
  125. "args": { ... }
  126. }
  127. """
  128. NAME = "fed_query"
  129. PATH_ARGS = ("query_type",)
  130. # This is a query, so let's not bother caching
  131. CACHE = False
  132. def __init__(self, hs):
  133. super(ReplicationGetQueryRestServlet, self).__init__(hs)
  134. self.store = hs.get_datastore()
  135. self.clock = hs.get_clock()
  136. self.registry = hs.get_federation_registry()
  137. @staticmethod
  138. def _serialize_payload(query_type, args):
  139. """
  140. Args:
  141. query_type (str)
  142. args (dict): The arguments received for the given query type
  143. """
  144. return {"args": args}
  145. @defer.inlineCallbacks
  146. def _handle_request(self, request, query_type):
  147. with Measure(self.clock, "repl_fed_query_parse"):
  148. content = parse_json_object_from_request(request)
  149. args = content["args"]
  150. logger.info("Got %r query", query_type)
  151. result = yield self.registry.on_query(query_type, args)
  152. return (200, result)
  153. class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
  154. """Called to clean up any data in DB for a given room, ready for the
  155. server to join the room.
  156. Request format:
  157. POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
  158. {}
  159. """
  160. NAME = "fed_cleanup_room"
  161. PATH_ARGS = ("room_id",)
  162. def __init__(self, hs):
  163. super(ReplicationCleanRoomRestServlet, self).__init__(hs)
  164. self.store = hs.get_datastore()
  165. @staticmethod
  166. def _serialize_payload(room_id, args):
  167. """
  168. Args:
  169. room_id (str)
  170. """
  171. return {}
  172. @defer.inlineCallbacks
  173. def _handle_request(self, request, room_id):
  174. yield self.store.clean_room_for_join(room_id)
  175. return (200, {})
  176. def register_servlets(hs, http_server):
  177. ReplicationFederationSendEventsRestServlet(hs).register(http_server)
  178. ReplicationFederationSendEduRestServlet(hs).register(http_server)
  179. ReplicationGetQueryRestServlet(hs).register(http_server)
  180. ReplicationCleanRoomRestServlet(hs).register(http_server)