membership.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import TYPE_CHECKING, List, Optional, Tuple
  16. from twisted.web.server import Request
  17. from synapse.http.server import HttpServer
  18. from synapse.http.site import SynapseRequest
  19. from synapse.replication.http._base import ReplicationEndpoint
  20. from synapse.types import JsonDict, Requester, UserID
  21. from synapse.util.distributor import user_left_room
  22. if TYPE_CHECKING:
  23. from synapse.server import HomeServer
  24. logger = logging.getLogger(__name__)
  25. class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
  26. """Does a remote join for the given user to the given room
  27. Request format:
  28. POST /_synapse/replication/remote_join/:room_id/:user_id
  29. {
  30. "requester": ...,
  31. "remote_room_hosts": [...],
  32. "content": { ... }
  33. }
  34. """
  35. NAME = "remote_join"
  36. PATH_ARGS = ("room_id", "user_id")
  37. def __init__(self, hs: "HomeServer"):
  38. super().__init__(hs)
  39. self.federation_handler = hs.get_federation_handler()
  40. self.store = hs.get_datastores().main
  41. self.clock = hs.get_clock()
  42. @staticmethod
  43. async def _serialize_payload( # type: ignore[override]
  44. requester: Requester,
  45. room_id: str,
  46. user_id: str,
  47. remote_room_hosts: List[str],
  48. content: JsonDict,
  49. ) -> JsonDict:
  50. """
  51. Args:
  52. requester: The user making the request according to the access token
  53. room_id: The ID of the room.
  54. user_id: The ID of the user.
  55. remote_room_hosts: Servers to try and join via
  56. content: The event content to use for the join event
  57. Returns:
  58. A dict representing the payload of the request.
  59. """
  60. return {
  61. "requester": requester.serialize(),
  62. "remote_room_hosts": remote_room_hosts,
  63. "content": content,
  64. }
  65. async def _handle_request( # type: ignore[override]
  66. self, request: SynapseRequest, content: JsonDict, room_id: str, user_id: str
  67. ) -> Tuple[int, JsonDict]:
  68. remote_room_hosts = content["remote_room_hosts"]
  69. event_content = content["content"]
  70. requester = Requester.deserialize(self.store, content["requester"])
  71. request.requester = requester
  72. logger.info("remote_join: %s into room: %s", user_id, room_id)
  73. event_id, stream_id = await self.federation_handler.do_invite_join(
  74. remote_room_hosts, room_id, user_id, event_content
  75. )
  76. return 200, {"event_id": event_id, "stream_id": stream_id}
  77. class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
  78. """Perform a remote knock for the given user on the given room
  79. Request format:
  80. POST /_synapse/replication/remote_knock/:room_id/:user_id
  81. {
  82. "requester": ...,
  83. "remote_room_hosts": [...],
  84. "content": { ... }
  85. }
  86. """
  87. NAME = "remote_knock"
  88. PATH_ARGS = ("room_id", "user_id")
  89. def __init__(self, hs: "HomeServer"):
  90. super().__init__(hs)
  91. self.federation_handler = hs.get_federation_handler()
  92. self.store = hs.get_datastores().main
  93. self.clock = hs.get_clock()
  94. @staticmethod
  95. async def _serialize_payload( # type: ignore[override]
  96. requester: Requester,
  97. room_id: str,
  98. user_id: str,
  99. remote_room_hosts: List[str],
  100. content: JsonDict,
  101. ) -> JsonDict:
  102. """
  103. Args:
  104. requester: The user making the request, according to the access token.
  105. room_id: The ID of the room to knock on.
  106. user_id: The ID of the knocking user.
  107. remote_room_hosts: Servers to try and send the knock via.
  108. content: The event content to use for the knock event.
  109. """
  110. return {
  111. "requester": requester.serialize(),
  112. "remote_room_hosts": remote_room_hosts,
  113. "content": content,
  114. }
  115. async def _handle_request( # type: ignore[override]
  116. self,
  117. request: SynapseRequest,
  118. content: JsonDict,
  119. room_id: str,
  120. user_id: str,
  121. ) -> Tuple[int, JsonDict]:
  122. remote_room_hosts = content["remote_room_hosts"]
  123. event_content = content["content"]
  124. requester = Requester.deserialize(self.store, content["requester"])
  125. request.requester = requester
  126. logger.debug("remote_knock: %s on room: %s", user_id, room_id)
  127. event_id, stream_id = await self.federation_handler.do_knock(
  128. remote_room_hosts, room_id, user_id, event_content
  129. )
  130. return 200, {"event_id": event_id, "stream_id": stream_id}
  131. class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
  132. """Rejects an out-of-band invite we have received from a remote server
  133. Request format:
  134. POST /_synapse/replication/remote_reject_invite/:event_id
  135. {
  136. "txn_id": ...,
  137. "requester": ...,
  138. "content": { ... }
  139. }
  140. """
  141. NAME = "remote_reject_invite"
  142. PATH_ARGS = ("invite_event_id",)
  143. def __init__(self, hs: "HomeServer"):
  144. super().__init__(hs)
  145. self.store = hs.get_datastores().main
  146. self.clock = hs.get_clock()
  147. self.member_handler = hs.get_room_member_handler()
  148. @staticmethod
  149. async def _serialize_payload( # type: ignore[override]
  150. invite_event_id: str,
  151. txn_id: Optional[str],
  152. requester: Requester,
  153. content: JsonDict,
  154. ) -> JsonDict:
  155. """
  156. Args:
  157. invite_event_id: The ID of the invite to be rejected.
  158. txn_id: Optional transaction ID supplied by the client
  159. requester: User making the rejection request, according to the access token
  160. content: Additional content to include in the rejection event.
  161. Normally an empty dict.
  162. Returns:
  163. A dict representing the payload of the request.
  164. """
  165. return {
  166. "txn_id": txn_id,
  167. "requester": requester.serialize(),
  168. "content": content,
  169. }
  170. async def _handle_request( # type: ignore[override]
  171. self, request: SynapseRequest, content: JsonDict, invite_event_id: str
  172. ) -> Tuple[int, JsonDict]:
  173. txn_id = content["txn_id"]
  174. event_content = content["content"]
  175. requester = Requester.deserialize(self.store, content["requester"])
  176. request.requester = requester
  177. # hopefully we're now on the master, so this won't recurse!
  178. event_id, stream_id = await self.member_handler.remote_reject_invite(
  179. invite_event_id,
  180. txn_id,
  181. requester,
  182. event_content,
  183. )
  184. return 200, {"event_id": event_id, "stream_id": stream_id}
  185. class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
  186. """Rescinds a local knock made on a remote room
  187. Request format:
  188. POST /_synapse/replication/remote_rescind_knock/:event_id
  189. {
  190. "txn_id": ...,
  191. "requester": ...,
  192. "content": { ... }
  193. }
  194. """
  195. NAME = "remote_rescind_knock"
  196. PATH_ARGS = ("knock_event_id",)
  197. def __init__(self, hs: "HomeServer"):
  198. super().__init__(hs)
  199. self.store = hs.get_datastores().main
  200. self.clock = hs.get_clock()
  201. self.member_handler = hs.get_room_member_handler()
  202. @staticmethod
  203. async def _serialize_payload( # type: ignore[override]
  204. knock_event_id: str,
  205. txn_id: Optional[str],
  206. requester: Requester,
  207. content: JsonDict,
  208. ) -> JsonDict:
  209. """
  210. Args:
  211. knock_event_id: The ID of the knock to be rescinded.
  212. txn_id: An optional transaction ID supplied by the client.
  213. requester: The user making the rescind request, according to the access token.
  214. content: The content to include in the rescind event.
  215. """
  216. return {
  217. "txn_id": txn_id,
  218. "requester": requester.serialize(),
  219. "content": content,
  220. }
  221. async def _handle_request( # type: ignore[override]
  222. self,
  223. request: SynapseRequest,
  224. content: JsonDict,
  225. knock_event_id: str,
  226. ) -> Tuple[int, JsonDict]:
  227. txn_id = content["txn_id"]
  228. event_content = content["content"]
  229. requester = Requester.deserialize(self.store, content["requester"])
  230. request.requester = requester
  231. # hopefully we're now on the master, so this won't recurse!
  232. event_id, stream_id = await self.member_handler.remote_rescind_knock(
  233. knock_event_id,
  234. txn_id,
  235. requester,
  236. event_content,
  237. )
  238. return 200, {"event_id": event_id, "stream_id": stream_id}
  239. class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
  240. """Notifies that a user has joined or left the room
  241. Request format:
  242. POST /_synapse/replication/membership_change/:room_id/:user_id/:change
  243. {}
  244. """
  245. NAME = "membership_change"
  246. PATH_ARGS = ("room_id", "user_id", "change")
  247. CACHE = False # No point caching as should return instantly.
  248. def __init__(self, hs: "HomeServer"):
  249. super().__init__(hs)
  250. self.registeration_handler = hs.get_registration_handler()
  251. self.store = hs.get_datastores().main
  252. self.clock = hs.get_clock()
  253. self.distributor = hs.get_distributor()
  254. @staticmethod
  255. async def _serialize_payload( # type: ignore[override]
  256. room_id: str, user_id: str, change: str
  257. ) -> JsonDict:
  258. """
  259. Args:
  260. room_id: The ID of the room.
  261. user_id: The ID of the user.
  262. change: "left"
  263. Returns:
  264. A dict representing the payload of the request.
  265. """
  266. assert change == "left"
  267. return {}
  268. async def _handle_request( # type: ignore[override]
  269. self,
  270. request: Request,
  271. content: JsonDict,
  272. room_id: str,
  273. user_id: str,
  274. change: str,
  275. ) -> Tuple[int, JsonDict]:
  276. logger.info("user membership change: %s in %s", user_id, room_id)
  277. user = UserID.from_string(user_id)
  278. if change == "left":
  279. user_left_room(self.distributor, user, room_id)
  280. else:
  281. raise Exception("Unrecognized change: %r", change)
  282. return 200, {}
  283. def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
  284. ReplicationRemoteJoinRestServlet(hs).register(http_server)
  285. ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
  286. ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)