test_power_levels.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from http import HTTPStatus
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from synapse.api.errors import Codes
  17. from synapse.events.utils import CANONICALJSON_MAX_INT, CANONICALJSON_MIN_INT
  18. from synapse.rest import admin
  19. from synapse.rest.client import login, room, sync
  20. from synapse.server import HomeServer
  21. from synapse.util import Clock
  22. from tests.unittest import HomeserverTestCase
  23. class PowerLevelsTestCase(HomeserverTestCase):
  24. """Tests that power levels are enforced in various situations"""
  25. servlets = [
  26. admin.register_servlets,
  27. room.register_servlets,
  28. login.register_servlets,
  29. sync.register_servlets,
  30. ]
  31. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  32. config = self.default_config()
  33. return self.setup_test_homeserver(config=config)
  34. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  35. # register a room admin, moderator and regular user
  36. self.admin_user_id = self.register_user("admin", "pass")
  37. self.admin_access_token = self.login("admin", "pass")
  38. self.mod_user_id = self.register_user("mod", "pass")
  39. self.mod_access_token = self.login("mod", "pass")
  40. self.user_user_id = self.register_user("user", "pass")
  41. self.user_access_token = self.login("user", "pass")
  42. # Create a room
  43. self.room_id = self.helper.create_room_as(
  44. self.admin_user_id, tok=self.admin_access_token
  45. )
  46. # Invite the other users
  47. self.helper.invite(
  48. room=self.room_id,
  49. src=self.admin_user_id,
  50. tok=self.admin_access_token,
  51. targ=self.mod_user_id,
  52. )
  53. self.helper.invite(
  54. room=self.room_id,
  55. src=self.admin_user_id,
  56. tok=self.admin_access_token,
  57. targ=self.user_user_id,
  58. )
  59. # Make the other users join the room
  60. self.helper.join(
  61. room=self.room_id, user=self.mod_user_id, tok=self.mod_access_token
  62. )
  63. self.helper.join(
  64. room=self.room_id, user=self.user_user_id, tok=self.user_access_token
  65. )
  66. # Mod the mod
  67. room_power_levels = self.helper.get_state(
  68. self.room_id,
  69. "m.room.power_levels",
  70. tok=self.admin_access_token,
  71. )
  72. # Update existing power levels with mod at PL50
  73. room_power_levels["users"].update({self.mod_user_id: 50})
  74. self.helper.send_state(
  75. self.room_id,
  76. "m.room.power_levels",
  77. room_power_levels,
  78. tok=self.admin_access_token,
  79. )
  80. def test_non_admins_cannot_enable_room_encryption(self) -> None:
  81. # have the mod try to enable room encryption
  82. self.helper.send_state(
  83. self.room_id,
  84. "m.room.encryption",
  85. {"algorithm": "m.megolm.v1.aes-sha2"},
  86. tok=self.mod_access_token,
  87. expect_code=403, # expect failure
  88. )
  89. # have the user try to enable room encryption
  90. self.helper.send_state(
  91. self.room_id,
  92. "m.room.encryption",
  93. {"algorithm": "m.megolm.v1.aes-sha2"},
  94. tok=self.user_access_token,
  95. expect_code=HTTPStatus.FORBIDDEN, # expect failure
  96. )
  97. def test_non_admins_cannot_send_server_acl(self) -> None:
  98. # have the mod try to send a server ACL
  99. self.helper.send_state(
  100. self.room_id,
  101. "m.room.server_acl",
  102. {
  103. "allow": ["*"],
  104. "allow_ip_literals": False,
  105. "deny": ["*.evil.com", "evil.com"],
  106. },
  107. tok=self.mod_access_token,
  108. expect_code=HTTPStatus.FORBIDDEN, # expect failure
  109. )
  110. # have the user try to send a server ACL
  111. self.helper.send_state(
  112. self.room_id,
  113. "m.room.server_acl",
  114. {
  115. "allow": ["*"],
  116. "allow_ip_literals": False,
  117. "deny": ["*.evil.com", "evil.com"],
  118. },
  119. tok=self.user_access_token,
  120. expect_code=HTTPStatus.FORBIDDEN, # expect failure
  121. )
  122. def test_non_admins_cannot_tombstone_room(self) -> None:
  123. # Create another room that will serve as our "upgraded room"
  124. self.upgraded_room_id = self.helper.create_room_as(
  125. self.admin_user_id, tok=self.admin_access_token
  126. )
  127. # have the mod try to send a tombstone event
  128. self.helper.send_state(
  129. self.room_id,
  130. "m.room.tombstone",
  131. {
  132. "body": "This room has been replaced",
  133. "replacement_room": self.upgraded_room_id,
  134. },
  135. tok=self.mod_access_token,
  136. expect_code=HTTPStatus.FORBIDDEN, # expect failure
  137. )
  138. # have the user try to send a tombstone event
  139. self.helper.send_state(
  140. self.room_id,
  141. "m.room.tombstone",
  142. {
  143. "body": "This room has been replaced",
  144. "replacement_room": self.upgraded_room_id,
  145. },
  146. tok=self.user_access_token,
  147. expect_code=403, # expect failure
  148. )
  149. def test_admins_can_enable_room_encryption(self) -> None:
  150. # have the admin try to enable room encryption
  151. self.helper.send_state(
  152. self.room_id,
  153. "m.room.encryption",
  154. {"algorithm": "m.megolm.v1.aes-sha2"},
  155. tok=self.admin_access_token,
  156. expect_code=HTTPStatus.OK, # expect success
  157. )
  158. def test_admins_can_send_server_acl(self) -> None:
  159. # have the admin try to send a server ACL
  160. self.helper.send_state(
  161. self.room_id,
  162. "m.room.server_acl",
  163. {
  164. "allow": ["*"],
  165. "allow_ip_literals": False,
  166. "deny": ["*.evil.com", "evil.com"],
  167. },
  168. tok=self.admin_access_token,
  169. expect_code=HTTPStatus.OK, # expect success
  170. )
  171. def test_admins_can_tombstone_room(self) -> None:
  172. # Create another room that will serve as our "upgraded room"
  173. self.upgraded_room_id = self.helper.create_room_as(
  174. self.admin_user_id, tok=self.admin_access_token
  175. )
  176. # have the admin try to send a tombstone event
  177. self.helper.send_state(
  178. self.room_id,
  179. "m.room.tombstone",
  180. {
  181. "body": "This room has been replaced",
  182. "replacement_room": self.upgraded_room_id,
  183. },
  184. tok=self.admin_access_token,
  185. expect_code=HTTPStatus.OK, # expect success
  186. )
  187. def test_cannot_set_string_power_levels(self) -> None:
  188. room_power_levels = self.helper.get_state(
  189. self.room_id,
  190. "m.room.power_levels",
  191. tok=self.admin_access_token,
  192. )
  193. # Update existing power levels with user at PL "0"
  194. room_power_levels["users"].update({self.user_user_id: "0"})
  195. body = self.helper.send_state(
  196. self.room_id,
  197. "m.room.power_levels",
  198. room_power_levels,
  199. tok=self.admin_access_token,
  200. expect_code=HTTPStatus.BAD_REQUEST, # expect failure
  201. )
  202. self.assertEqual(
  203. body["errcode"],
  204. Codes.BAD_JSON,
  205. body,
  206. )
  207. def test_cannot_set_unsafe_large_power_levels(self) -> None:
  208. room_power_levels = self.helper.get_state(
  209. self.room_id,
  210. "m.room.power_levels",
  211. tok=self.admin_access_token,
  212. )
  213. # Update existing power levels with user at PL above the max safe integer
  214. room_power_levels["users"].update(
  215. {self.user_user_id: CANONICALJSON_MAX_INT + 1}
  216. )
  217. body = self.helper.send_state(
  218. self.room_id,
  219. "m.room.power_levels",
  220. room_power_levels,
  221. tok=self.admin_access_token,
  222. expect_code=HTTPStatus.BAD_REQUEST, # expect failure
  223. )
  224. self.assertEqual(
  225. body["errcode"],
  226. Codes.BAD_JSON,
  227. body,
  228. )
  229. def test_cannot_set_unsafe_small_power_levels(self) -> None:
  230. room_power_levels = self.helper.get_state(
  231. self.room_id,
  232. "m.room.power_levels",
  233. tok=self.admin_access_token,
  234. )
  235. # Update existing power levels with user at PL below the minimum safe integer
  236. room_power_levels["users"].update(
  237. {self.user_user_id: CANONICALJSON_MIN_INT - 1}
  238. )
  239. body = self.helper.send_state(
  240. self.room_id,
  241. "m.room.power_levels",
  242. room_power_levels,
  243. tok=self.admin_access_token,
  244. expect_code=HTTPStatus.BAD_REQUEST, # expect failure
  245. )
  246. self.assertEqual(
  247. body["errcode"],
  248. Codes.BAD_JSON,
  249. body,
  250. )