send_event.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.events import event_type_from_format_version
  18. from synapse.events.snapshot import EventContext
  19. from synapse.http.servlet import parse_json_object_from_request
  20. from synapse.replication.http._base import ReplicationEndpoint
  21. from synapse.types import Requester, UserID
  22. from synapse.util.metrics import Measure
  23. logger = logging.getLogger(__name__)
  24. class ReplicationSendEventRestServlet(ReplicationEndpoint):
  25. """Handles events newly created on workers, including persisting and
  26. notifying.
  27. The API looks like:
  28. POST /_synapse/replication/send_event/:event_id/:txn_id
  29. {
  30. "event": { .. serialized event .. },
  31. "internal_metadata": { .. serialized internal_metadata .. },
  32. "rejected_reason": .., // The event.rejected_reason field
  33. "context": { .. serialized event context .. },
  34. "requester": { .. serialized requester .. },
  35. "ratelimit": true,
  36. "extra_users": [],
  37. }
  38. """
  39. NAME = "send_event"
  40. PATH_ARGS = ("event_id",)
  41. def __init__(self, hs):
  42. super(ReplicationSendEventRestServlet, self).__init__(hs)
  43. self.event_creation_handler = hs.get_event_creation_handler()
  44. self.store = hs.get_datastore()
  45. self.clock = hs.get_clock()
  46. @staticmethod
  47. @defer.inlineCallbacks
  48. def _serialize_payload(
  49. event_id, store, event, context, requester, ratelimit, extra_users
  50. ):
  51. """
  52. Args:
  53. event_id (str)
  54. store (DataStore)
  55. requester (Requester)
  56. event (FrozenEvent)
  57. context (EventContext)
  58. ratelimit (bool)
  59. extra_users (list(UserID)): Any extra users to notify about event
  60. """
  61. serialized_context = yield context.serialize(event, store)
  62. payload = {
  63. "event": event.get_pdu_json(),
  64. "event_format_version": event.format_version,
  65. "internal_metadata": event.internal_metadata.get_dict(),
  66. "rejected_reason": event.rejected_reason,
  67. "context": serialized_context,
  68. "requester": requester.serialize(),
  69. "ratelimit": ratelimit,
  70. "extra_users": [u.to_string() for u in extra_users],
  71. }
  72. return payload
  73. @defer.inlineCallbacks
  74. def _handle_request(self, request, event_id):
  75. with Measure(self.clock, "repl_send_event_parse"):
  76. content = parse_json_object_from_request(request)
  77. event_dict = content["event"]
  78. format_ver = content["event_format_version"]
  79. internal_metadata = content["internal_metadata"]
  80. rejected_reason = content["rejected_reason"]
  81. EventType = event_type_from_format_version(format_ver)
  82. event = EventType(event_dict, internal_metadata, rejected_reason)
  83. requester = Requester.deserialize(self.store, content["requester"])
  84. context = yield EventContext.deserialize(self.store, content["context"])
  85. ratelimit = content["ratelimit"]
  86. extra_users = [UserID.from_string(u) for u in content["extra_users"]]
  87. if requester.user:
  88. request.authenticated_entity = requester.user.to_string()
  89. logger.info(
  90. "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
  91. )
  92. yield self.event_creation_handler.persist_and_notify_client_event(
  93. requester, event, context, ratelimit=ratelimit, extra_users=extra_users
  94. )
  95. return (200, {})
  96. def register_servlets(hs, http_server):
  97. ReplicationSendEventRestServlet(hs).register(http_server)