membership.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import TYPE_CHECKING, List, Optional, Tuple
  17. from twisted.web.http import Request
  18. from synapse.http.servlet import parse_json_object_from_request
  19. from synapse.replication.http._base import ReplicationEndpoint
  20. from synapse.types import JsonDict, Requester, UserID
  21. from synapse.util.distributor import user_left_room
  22. if TYPE_CHECKING:
  23. from synapse.server import HomeServer
  24. logger = logging.getLogger(__name__)
  25. class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
  26. """Does a remote join for the given user to the given room
  27. Request format:
  28. POST /_synapse/replication/remote_join/:room_id/:user_id
  29. {
  30. "requester": ...,
  31. "remote_room_hosts": [...],
  32. "content": { ... }
  33. }
  34. """
  35. NAME = "remote_join"
  36. PATH_ARGS = ("room_id", "user_id")
  37. def __init__(self, hs):
  38. super().__init__(hs)
  39. self.federation_handler = hs.get_federation_handler()
  40. self.store = hs.get_datastore()
  41. self.clock = hs.get_clock()
  42. @staticmethod
  43. async def _serialize_payload( # type: ignore
  44. requester: Requester,
  45. room_id: str,
  46. user_id: str,
  47. remote_room_hosts: List[str],
  48. content: JsonDict,
  49. ) -> JsonDict:
  50. """
  51. Args:
  52. requester: The user making the request according to the access token
  53. room_id: The ID of the room.
  54. user_id: The ID of the user.
  55. remote_room_hosts: Servers to try and join via
  56. content: The event content to use for the join event
  57. Returns:
  58. A dict representing the payload of the request.
  59. """
  60. return {
  61. "requester": requester.serialize(),
  62. "remote_room_hosts": remote_room_hosts,
  63. "content": content,
  64. }
  65. async def _handle_request( # type: ignore
  66. self, request: Request, room_id: str, user_id: str
  67. ) -> Tuple[int, JsonDict]:
  68. content = parse_json_object_from_request(request)
  69. remote_room_hosts = content["remote_room_hosts"]
  70. event_content = content["content"]
  71. requester = Requester.deserialize(self.store, content["requester"])
  72. request.requester = requester
  73. logger.info("remote_join: %s into room: %s", user_id, room_id)
  74. event_id, stream_id = await self.federation_handler.do_invite_join(
  75. remote_room_hosts, room_id, user_id, event_content
  76. )
  77. return 200, {"event_id": event_id, "stream_id": stream_id}
  78. class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
  79. """Rejects an out-of-band invite we have received from a remote server
  80. Request format:
  81. POST /_synapse/replication/remote_reject_invite/:event_id
  82. {
  83. "txn_id": ...,
  84. "requester": ...,
  85. "content": { ... }
  86. }
  87. """
  88. NAME = "remote_reject_invite"
  89. PATH_ARGS = ("invite_event_id",)
  90. def __init__(self, hs: "HomeServer"):
  91. super().__init__(hs)
  92. self.store = hs.get_datastore()
  93. self.clock = hs.get_clock()
  94. self.member_handler = hs.get_room_member_handler()
  95. @staticmethod
  96. async def _serialize_payload( # type: ignore
  97. invite_event_id: str,
  98. txn_id: Optional[str],
  99. requester: Requester,
  100. content: JsonDict,
  101. ) -> JsonDict:
  102. """
  103. Args:
  104. invite_event_id: The ID of the invite to be rejected.
  105. txn_id: Optional transaction ID supplied by the client
  106. requester: User making the rejection request, according to the access token
  107. content: Additional content to include in the rejection event.
  108. Normally an empty dict.
  109. Returns:
  110. A dict representing the payload of the request.
  111. """
  112. return {
  113. "txn_id": txn_id,
  114. "requester": requester.serialize(),
  115. "content": content,
  116. }
  117. async def _handle_request( # type: ignore
  118. self, request: Request, invite_event_id: str
  119. ) -> Tuple[int, JsonDict]:
  120. content = parse_json_object_from_request(request)
  121. txn_id = content["txn_id"]
  122. event_content = content["content"]
  123. requester = Requester.deserialize(self.store, content["requester"])
  124. request.requester = requester
  125. # hopefully we're now on the master, so this won't recurse!
  126. event_id, stream_id = await self.member_handler.remote_reject_invite(
  127. invite_event_id,
  128. txn_id,
  129. requester,
  130. event_content,
  131. )
  132. return 200, {"event_id": event_id, "stream_id": stream_id}
  133. class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
  134. """Notifies that a user has joined or left the room
  135. Request format:
  136. POST /_synapse/replication/membership_change/:room_id/:user_id/:change
  137. {}
  138. """
  139. NAME = "membership_change"
  140. PATH_ARGS = ("room_id", "user_id", "change")
  141. CACHE = False # No point caching as should return instantly.
  142. def __init__(self, hs):
  143. super().__init__(hs)
  144. self.registeration_handler = hs.get_registration_handler()
  145. self.store = hs.get_datastore()
  146. self.clock = hs.get_clock()
  147. self.distributor = hs.get_distributor()
  148. @staticmethod
  149. async def _serialize_payload( # type: ignore
  150. room_id: str, user_id: str, change: str
  151. ) -> JsonDict:
  152. """
  153. Args:
  154. room_id: The ID of the room.
  155. user_id: The ID of the user.
  156. change: "left"
  157. Returns:
  158. A dict representing the payload of the request.
  159. """
  160. assert change == "left"
  161. return {}
  162. def _handle_request( # type: ignore
  163. self, request: Request, room_id: str, user_id: str, change: str
  164. ) -> Tuple[int, JsonDict]:
  165. logger.info("user membership change: %s in %s", user_id, room_id)
  166. user = UserID.from_string(user_id)
  167. if change == "left":
  168. user_left_room(self.distributor, user, room_id)
  169. else:
  170. raise Exception("Unrecognized change: %r", change)
  171. return 200, {}
  172. def register_servlets(hs, http_server):
  173. ReplicationRemoteJoinRestServlet(hs).register(http_server)
  174. ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
  175. ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)