1
0

test_password_providers.py 30 KB


  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Tests for the password_auth_provider interface"""
  15. from typing import Any, Type, Union
  16. from unittest.mock import Mock
  17. from twisted.internet import defer
  18. import synapse
  19. from synapse.handlers.auth import load_legacy_password_auth_providers
  20. from synapse.module_api import ModuleApi
  21. from synapse.rest.client import devices, login
  22. from synapse.types import JsonDict
  23. from tests import unittest
  24. from tests.server import FakeChannel
  25. from tests.unittest import override_config
  26. # (possibly experimental) login flows we expect to appear in the list after the normal
  27. # ones
  28. ADDITIONAL_LOGIN_FLOWS = [
  29. {"type": "m.login.application_service"},
  30. {"type": "uk.half-shot.msc2778.login.application_service"},
  31. ]
  32. # a mock instance which the dummy auth providers delegate to, so we can see what's going
  33. # on
  34. mock_password_provider = Mock()
  35. class LegacyPasswordOnlyAuthProvider:
  36. """A legacy password_provider which only implements `check_password`."""
  37. @staticmethod
  38. def parse_config(self):
  39. pass
  40. def __init__(self, config, account_handler):
  41. pass
  42. def check_password(self, *args):
  43. return mock_password_provider.check_password(*args)
  44. class LegacyCustomAuthProvider:
  45. """A legacy password_provider which implements a custom login type."""
  46. @staticmethod
  47. def parse_config(self):
  48. pass
  49. def __init__(self, config, account_handler):
  50. pass
  51. def get_supported_login_types(self):
  52. return {"test.login_type": ["test_field"]}
  53. def check_auth(self, *args):
  54. return mock_password_provider.check_auth(*args)
  55. class CustomAuthProvider:
  56. """A module which registers password_auth_provider callbacks for a custom login type."""
  57. @staticmethod
  58. def parse_config(self):
  59. pass
  60. def __init__(self, config, api: ModuleApi):
  61. api.register_password_auth_provider_callbacks(
  62. auth_checkers={("test.login_type", ("test_field",)): self.check_auth},
  63. )
  64. def check_auth(self, *args):
  65. return mock_password_provider.check_auth(*args)
  66. class LegacyPasswordCustomAuthProvider:
  67. """A password_provider which implements password login via `check_auth`, as well
  68. as a custom type."""
  69. @staticmethod
  70. def parse_config(self):
  71. pass
  72. def __init__(self, config, account_handler):
  73. pass
  74. def get_supported_login_types(self):
  75. return {"m.login.password": ["password"], "test.login_type": ["test_field"]}
  76. def check_auth(self, *args):
  77. return mock_password_provider.check_auth(*args)
  78. class PasswordCustomAuthProvider:
  79. """A module which registers password_auth_provider callbacks for a custom login type.
  80. as well as a password login"""
  81. @staticmethod
  82. def parse_config(self):
  83. pass
  84. def __init__(self, config, api: ModuleApi):
  85. api.register_password_auth_provider_callbacks(
  86. auth_checkers={
  87. ("test.login_type", ("test_field",)): self.check_auth,
  88. ("m.login.password", ("password",)): self.check_auth,
  89. },
  90. )
  91. pass
  92. def check_auth(self, *args):
  93. return mock_password_provider.check_auth(*args)
  94. def check_pass(self, *args):
  95. return mock_password_provider.check_password(*args)
  96. def legacy_providers_config(*providers: Type[Any]) -> dict:
  97. """Returns a config dict that will enable the given legacy password auth providers"""
  98. return {
  99. "password_providers": [
  100. {"module": "%s.%s" % (__name__, provider.__qualname__), "config": {}}
  101. for provider in providers
  102. ]
  103. }
  104. def providers_config(*providers: Type[Any]) -> dict:
  105. """Returns a config dict that will enable the given modules"""
  106. return {
  107. "modules": [
  108. {"module": "%s.%s" % (__name__, provider.__qualname__), "config": {}}
  109. for provider in providers
  110. ]
  111. }
  112. class PasswordAuthProviderTests(unittest.HomeserverTestCase):
  113. servlets = [
  114. synapse.rest.admin.register_servlets,
  115. login.register_servlets,
  116. devices.register_servlets,
  117. ]
  118. def setUp(self):
  119. # we use a global mock device, so make sure we are starting with a clean slate
  120. mock_password_provider.reset_mock()
  121. super().setUp()
  122. def make_homeserver(self, reactor, clock):
  123. hs = self.setup_test_homeserver()
  124. # Load the modules into the homeserver
  125. module_api = hs.get_module_api()
  126. for module, config in hs.config.modules.loaded_modules:
  127. module(config=config, api=module_api)
  128. load_legacy_password_auth_providers(hs)
  129. return hs
  130. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  131. def test_password_only_auth_progiver_login_legacy(self):
  132. self.password_only_auth_provider_login_test_body()
  133. def password_only_auth_provider_login_test_body(self):
  134. # login flows should only have m.login.password
  135. flows = self._get_login_flows()
  136. self.assertEqual(flows, [{"type": "m.login.password"}] + ADDITIONAL_LOGIN_FLOWS)
  137. # check_password must return an awaitable
  138. mock_password_provider.check_password.return_value = defer.succeed(True)
  139. channel = self._send_password_login("u", "p")
  140. self.assertEqual(channel.code, 200, channel.result)
  141. self.assertEqual("@u:test", channel.json_body["user_id"])
  142. mock_password_provider.check_password.assert_called_once_with("@u:test", "p")
  143. mock_password_provider.reset_mock()
  144. # login with mxid should work too
  145. channel = self._send_password_login("@u:bz", "p")
  146. self.assertEqual(channel.code, 200, channel.result)
  147. self.assertEqual("@u:bz", channel.json_body["user_id"])
  148. mock_password_provider.check_password.assert_called_once_with("@u:bz", "p")
  149. mock_password_provider.reset_mock()
  150. # try a weird username / pass. Honestly it's unclear what we *expect* to happen
  151. # in these cases, but at least we can guard against the API changing
  152. # unexpectedly
  153. channel = self._send_password_login(" USER🙂NAME ", " pASS\U0001F622word ")
  154. self.assertEqual(channel.code, 200, channel.result)
  155. self.assertEqual("@ USER🙂NAME :test", channel.json_body["user_id"])
  156. mock_password_provider.check_password.assert_called_once_with(
  157. "@ USER🙂NAME :test", " pASS😢word "
  158. )
  159. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  160. def test_password_only_auth_provider_ui_auth_legacy(self):
  161. self.password_only_auth_provider_ui_auth_test_body()
  162. def password_only_auth_provider_ui_auth_test_body(self):
  163. """UI Auth should delegate correctly to the password provider"""
  164. # create the user, otherwise access doesn't work
  165. module_api = self.hs.get_module_api()
  166. self.get_success(module_api.register_user("u"))
  167. # log in twice, to get two devices
  168. mock_password_provider.check_password.return_value = defer.succeed(True)
  169. tok1 = self.login("u", "p")
  170. self.login("u", "p", device_id="dev2")
  171. mock_password_provider.reset_mock()
  172. # have the auth provider deny the request to start with
  173. mock_password_provider.check_password.return_value = defer.succeed(False)
  174. # make the initial request which returns a 401
  175. session = self._start_delete_device_session(tok1, "dev2")
  176. mock_password_provider.check_password.assert_not_called()
  177. # Make another request providing the UI auth flow.
  178. channel = self._authed_delete_device(tok1, "dev2", session, "u", "p")
  179. self.assertEqual(channel.code, 401) # XXX why not a 403?
  180. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  181. mock_password_provider.check_password.assert_called_once_with("@u:test", "p")
  182. mock_password_provider.reset_mock()
  183. # Finally, check the request goes through when we allow it
  184. mock_password_provider.check_password.return_value = defer.succeed(True)
  185. channel = self._authed_delete_device(tok1, "dev2", session, "u", "p")
  186. self.assertEqual(channel.code, 200)
  187. mock_password_provider.check_password.assert_called_once_with("@u:test", "p")
  188. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  189. def test_local_user_fallback_login_legacy(self):
  190. self.local_user_fallback_login_test_body()
  191. def local_user_fallback_login_test_body(self):
  192. """rejected login should fall back to local db"""
  193. self.register_user("localuser", "localpass")
  194. # check_password must return an awaitable
  195. mock_password_provider.check_password.return_value = defer.succeed(False)
  196. channel = self._send_password_login("u", "p")
  197. self.assertEqual(channel.code, 403, channel.result)
  198. channel = self._send_password_login("localuser", "localpass")
  199. self.assertEqual(channel.code, 200, channel.result)
  200. self.assertEqual("@localuser:test", channel.json_body["user_id"])
  201. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  202. def test_local_user_fallback_ui_auth_legacy(self):
  203. self.local_user_fallback_ui_auth_test_body()
  204. def local_user_fallback_ui_auth_test_body(self):
  205. """rejected login should fall back to local db"""
  206. self.register_user("localuser", "localpass")
  207. # have the auth provider deny the request
  208. mock_password_provider.check_password.return_value = defer.succeed(False)
  209. # log in twice, to get two devices
  210. tok1 = self.login("localuser", "localpass")
  211. self.login("localuser", "localpass", device_id="dev2")
  212. mock_password_provider.check_password.reset_mock()
  213. # first delete should give a 401
  214. session = self._start_delete_device_session(tok1, "dev2")
  215. mock_password_provider.check_password.assert_not_called()
  216. # Wrong password
  217. channel = self._authed_delete_device(tok1, "dev2", session, "localuser", "xxx")
  218. self.assertEqual(channel.code, 401) # XXX why not a 403?
  219. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  220. mock_password_provider.check_password.assert_called_once_with(
  221. "@localuser:test", "xxx"
  222. )
  223. mock_password_provider.reset_mock()
  224. # Right password
  225. channel = self._authed_delete_device(
  226. tok1, "dev2", session, "localuser", "localpass"
  227. )
  228. self.assertEqual(channel.code, 200)
  229. mock_password_provider.check_password.assert_called_once_with(
  230. "@localuser:test", "localpass"
  231. )
  232. @override_config(
  233. {
  234. **legacy_providers_config(LegacyPasswordOnlyAuthProvider),
  235. "password_config": {"localdb_enabled": False},
  236. }
  237. )
  238. def test_no_local_user_fallback_login_legacy(self):
  239. self.no_local_user_fallback_login_test_body()
  240. def no_local_user_fallback_login_test_body(self):
  241. """localdb_enabled can block login with the local password"""
  242. self.register_user("localuser", "localpass")
  243. # check_password must return an awaitable
  244. mock_password_provider.check_password.return_value = defer.succeed(False)
  245. channel = self._send_password_login("localuser", "localpass")
  246. self.assertEqual(channel.code, 403)
  247. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  248. mock_password_provider.check_password.assert_called_once_with(
  249. "@localuser:test", "localpass"
  250. )
  251. @override_config(
  252. {
  253. **legacy_providers_config(LegacyPasswordOnlyAuthProvider),
  254. "password_config": {"localdb_enabled": False},
  255. }
  256. )
  257. def test_no_local_user_fallback_ui_auth_legacy(self):
  258. self.no_local_user_fallback_ui_auth_test_body()
  259. def no_local_user_fallback_ui_auth_test_body(self):
  260. """localdb_enabled can block ui auth with the local password"""
  261. self.register_user("localuser", "localpass")
  262. # allow login via the auth provider
  263. mock_password_provider.check_password.return_value = defer.succeed(True)
  264. # log in twice, to get two devices
  265. tok1 = self.login("localuser", "p")
  266. self.login("localuser", "p", device_id="dev2")
  267. mock_password_provider.check_password.reset_mock()
  268. # first delete should give a 401
  269. channel = self._delete_device(tok1, "dev2")
  270. self.assertEqual(channel.code, 401)
  271. # m.login.password UIA is permitted because the auth provider allows it,
  272. # even though the localdb does not.
  273. self.assertEqual(channel.json_body["flows"], [{"stages": ["m.login.password"]}])
  274. session = channel.json_body["session"]
  275. mock_password_provider.check_password.assert_not_called()
  276. # now try deleting with the local password
  277. mock_password_provider.check_password.return_value = defer.succeed(False)
  278. channel = self._authed_delete_device(
  279. tok1, "dev2", session, "localuser", "localpass"
  280. )
  281. self.assertEqual(channel.code, 401) # XXX why not a 403?
  282. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  283. mock_password_provider.check_password.assert_called_once_with(
  284. "@localuser:test", "localpass"
  285. )
  286. @override_config(
  287. {
  288. **legacy_providers_config(LegacyPasswordOnlyAuthProvider),
  289. "password_config": {"enabled": False},
  290. }
  291. )
  292. def test_password_auth_disabled_legacy(self):
  293. self.password_auth_disabled_test_body()
  294. def password_auth_disabled_test_body(self):
  295. """password auth doesn't work if it's disabled across the board"""
  296. # login flows should be empty
  297. flows = self._get_login_flows()
  298. self.assertEqual(flows, ADDITIONAL_LOGIN_FLOWS)
  299. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  300. channel = self._send_password_login("u", "p")
  301. self.assertEqual(channel.code, 400, channel.result)
  302. mock_password_provider.check_password.assert_not_called()
  303. @override_config(legacy_providers_config(LegacyCustomAuthProvider))
  304. def test_custom_auth_provider_login_legacy(self):
  305. self.custom_auth_provider_login_test_body()
  306. @override_config(providers_config(CustomAuthProvider))
  307. def test_custom_auth_provider_login(self):
  308. self.custom_auth_provider_login_test_body()
  309. def custom_auth_provider_login_test_body(self):
  310. # login flows should have the custom flow and m.login.password, since we
  311. # haven't disabled local password lookup.
  312. # (password must come first, because reasons)
  313. flows = self._get_login_flows()
  314. self.assertEqual(
  315. flows,
  316. [{"type": "m.login.password"}, {"type": "test.login_type"}]
  317. + ADDITIONAL_LOGIN_FLOWS,
  318. )
  319. # login with missing param should be rejected
  320. channel = self._send_login("test.login_type", "u")
  321. self.assertEqual(channel.code, 400, channel.result)
  322. mock_password_provider.check_auth.assert_not_called()
  323. mock_password_provider.check_auth.return_value = defer.succeed(
  324. ("@user:bz", None)
  325. )
  326. channel = self._send_login("test.login_type", "u", test_field="y")
  327. self.assertEqual(channel.code, 200, channel.result)
  328. self.assertEqual("@user:bz", channel.json_body["user_id"])
  329. mock_password_provider.check_auth.assert_called_once_with(
  330. "u", "test.login_type", {"test_field": "y"}
  331. )
  332. mock_password_provider.reset_mock()
  333. # try a weird username. Again, it's unclear what we *expect* to happen
  334. # in these cases, but at least we can guard against the API changing
  335. # unexpectedly
  336. mock_password_provider.check_auth.return_value = defer.succeed(
  337. ("@ MALFORMED! :bz", None)
  338. )
  339. channel = self._send_login("test.login_type", " USER🙂NAME ", test_field=" abc ")
  340. self.assertEqual(channel.code, 200, channel.result)
  341. self.assertEqual("@ MALFORMED! :bz", channel.json_body["user_id"])
  342. mock_password_provider.check_auth.assert_called_once_with(
  343. " USER🙂NAME ", "test.login_type", {"test_field": " abc "}
  344. )
  345. @override_config(legacy_providers_config(LegacyCustomAuthProvider))
  346. def test_custom_auth_provider_ui_auth_legacy(self):
  347. self.custom_auth_provider_ui_auth_test_body()
  348. @override_config(providers_config(CustomAuthProvider))
  349. def test_custom_auth_provider_ui_auth(self):
  350. self.custom_auth_provider_ui_auth_test_body()
  351. def custom_auth_provider_ui_auth_test_body(self):
  352. # register the user and log in twice, to get two devices
  353. self.register_user("localuser", "localpass")
  354. tok1 = self.login("localuser", "localpass")
  355. self.login("localuser", "localpass", device_id="dev2")
  356. # make the initial request which returns a 401
  357. channel = self._delete_device(tok1, "dev2")
  358. self.assertEqual(channel.code, 401)
  359. # Ensure that flows are what is expected.
  360. self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
  361. self.assertIn({"stages": ["test.login_type"]}, channel.json_body["flows"])
  362. session = channel.json_body["session"]
  363. # missing param
  364. body = {
  365. "auth": {
  366. "type": "test.login_type",
  367. "identifier": {"type": "m.id.user", "user": "localuser"},
  368. "session": session,
  369. },
  370. }
  371. channel = self._delete_device(tok1, "dev2", body)
  372. self.assertEqual(channel.code, 400)
  373. # there's a perfectly good M_MISSING_PARAM errcode, but heaven forfend we should
  374. # use it...
  375. self.assertIn("Missing parameters", channel.json_body["error"])
  376. mock_password_provider.check_auth.assert_not_called()
  377. mock_password_provider.reset_mock()
  378. # right params, but authing as the wrong user
  379. mock_password_provider.check_auth.return_value = defer.succeed(
  380. ("@user:bz", None)
  381. )
  382. body["auth"]["test_field"] = "foo"
  383. channel = self._delete_device(tok1, "dev2", body)
  384. self.assertEqual(channel.code, 403)
  385. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  386. mock_password_provider.check_auth.assert_called_once_with(
  387. "localuser", "test.login_type", {"test_field": "foo"}
  388. )
  389. mock_password_provider.reset_mock()
  390. # and finally, succeed
  391. mock_password_provider.check_auth.return_value = defer.succeed(
  392. ("@localuser:test", None)
  393. )
  394. channel = self._delete_device(tok1, "dev2", body)
  395. self.assertEqual(channel.code, 200)
  396. mock_password_provider.check_auth.assert_called_once_with(
  397. "localuser", "test.login_type", {"test_field": "foo"}
  398. )
  399. @override_config(legacy_providers_config(LegacyCustomAuthProvider))
  400. def test_custom_auth_provider_callback_legacy(self):
  401. self.custom_auth_provider_callback_test_body()
  402. @override_config(providers_config(CustomAuthProvider))
  403. def test_custom_auth_provider_callback(self):
  404. self.custom_auth_provider_callback_test_body()
  405. def custom_auth_provider_callback_test_body(self):
  406. callback = Mock(return_value=defer.succeed(None))
  407. mock_password_provider.check_auth.return_value = defer.succeed(
  408. ("@user:bz", callback)
  409. )
  410. channel = self._send_login("test.login_type", "u", test_field="y")
  411. self.assertEqual(channel.code, 200, channel.result)
  412. self.assertEqual("@user:bz", channel.json_body["user_id"])
  413. mock_password_provider.check_auth.assert_called_once_with(
  414. "u", "test.login_type", {"test_field": "y"}
  415. )
  416. # check the args to the callback
  417. callback.assert_called_once()
  418. call_args, call_kwargs = callback.call_args
  419. # should be one positional arg
  420. self.assertEqual(len(call_args), 1)
  421. self.assertEqual(call_args[0]["user_id"], "@user:bz")
  422. for p in ["user_id", "access_token", "device_id", "home_server"]:
  423. self.assertIn(p, call_args[0])
  424. @override_config(
  425. {
  426. **legacy_providers_config(LegacyCustomAuthProvider),
  427. "password_config": {"enabled": False},
  428. }
  429. )
  430. def test_custom_auth_password_disabled_legacy(self):
  431. self.custom_auth_password_disabled_test_body()
  432. @override_config(
  433. {**providers_config(CustomAuthProvider), "password_config": {"enabled": False}}
  434. )
  435. def test_custom_auth_password_disabled(self):
  436. self.custom_auth_password_disabled_test_body()
  437. def custom_auth_password_disabled_test_body(self):
  438. """Test login with a custom auth provider where password login is disabled"""
  439. self.register_user("localuser", "localpass")
  440. flows = self._get_login_flows()
  441. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  442. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  443. channel = self._send_password_login("localuser", "localpass")
  444. self.assertEqual(channel.code, 400, channel.result)
  445. mock_password_provider.check_auth.assert_not_called()
  446. @override_config(
  447. {
  448. **legacy_providers_config(LegacyCustomAuthProvider),
  449. "password_config": {"enabled": False, "localdb_enabled": False},
  450. }
  451. )
  452. def test_custom_auth_password_disabled_localdb_enabled_legacy(self):
  453. self.custom_auth_password_disabled_localdb_enabled_test_body()
  454. @override_config(
  455. {
  456. **providers_config(CustomAuthProvider),
  457. "password_config": {"enabled": False, "localdb_enabled": False},
  458. }
  459. )
  460. def test_custom_auth_password_disabled_localdb_enabled(self):
  461. self.custom_auth_password_disabled_localdb_enabled_test_body()
  462. def custom_auth_password_disabled_localdb_enabled_test_body(self):
  463. """Check the localdb_enabled == enabled == False
  464. Regression test for https://github.com/matrix-org/synapse/issues/8914: check
  465. that setting *both* `localdb_enabled` *and* `password: enabled` to False doesn't
  466. cause an exception.
  467. """
  468. self.register_user("localuser", "localpass")
  469. flows = self._get_login_flows()
  470. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  471. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  472. channel = self._send_password_login("localuser", "localpass")
  473. self.assertEqual(channel.code, 400, channel.result)
  474. mock_password_provider.check_auth.assert_not_called()
  475. @override_config(
  476. {
  477. **legacy_providers_config(LegacyPasswordCustomAuthProvider),
  478. "password_config": {"enabled": False},
  479. }
  480. )
  481. def test_password_custom_auth_password_disabled_login_legacy(self):
  482. self.password_custom_auth_password_disabled_login_test_body()
  483. @override_config(
  484. {
  485. **providers_config(PasswordCustomAuthProvider),
  486. "password_config": {"enabled": False},
  487. }
  488. )
  489. def test_password_custom_auth_password_disabled_login(self):
  490. self.password_custom_auth_password_disabled_login_test_body()
  491. def password_custom_auth_password_disabled_login_test_body(self):
  492. """log in with a custom auth provider which implements password, but password
  493. login is disabled"""
  494. self.register_user("localuser", "localpass")
  495. flows = self._get_login_flows()
  496. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  497. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  498. channel = self._send_password_login("localuser", "localpass")
  499. self.assertEqual(channel.code, 400, channel.result)
  500. mock_password_provider.check_auth.assert_not_called()
  501. mock_password_provider.check_password.assert_not_called()
  502. @override_config(
  503. {
  504. **legacy_providers_config(LegacyPasswordCustomAuthProvider),
  505. "password_config": {"enabled": False},
  506. }
  507. )
  508. def test_password_custom_auth_password_disabled_ui_auth_legacy(self):
  509. self.password_custom_auth_password_disabled_ui_auth_test_body()
  510. @override_config(
  511. {
  512. **providers_config(PasswordCustomAuthProvider),
  513. "password_config": {"enabled": False},
  514. }
  515. )
  516. def test_password_custom_auth_password_disabled_ui_auth(self):
  517. self.password_custom_auth_password_disabled_ui_auth_test_body()
  518. def password_custom_auth_password_disabled_ui_auth_test_body(self):
  519. """UI Auth with a custom auth provider which implements password, but password
  520. login is disabled"""
  521. # register the user and log in twice via the test login type to get two devices,
  522. self.register_user("localuser", "localpass")
  523. mock_password_provider.check_auth.return_value = defer.succeed(
  524. ("@localuser:test", None)
  525. )
  526. channel = self._send_login("test.login_type", "localuser", test_field="")
  527. self.assertEqual(channel.code, 200, channel.result)
  528. tok1 = channel.json_body["access_token"]
  529. channel = self._send_login(
  530. "test.login_type", "localuser", test_field="", device_id="dev2"
  531. )
  532. self.assertEqual(channel.code, 200, channel.result)
  533. # make the initial request which returns a 401
  534. channel = self._delete_device(tok1, "dev2")
  535. self.assertEqual(channel.code, 401)
  536. # Ensure that flows are what is expected. In particular, "password" should *not*
  537. # be present.
  538. self.assertIn({"stages": ["test.login_type"]}, channel.json_body["flows"])
  539. session = channel.json_body["session"]
  540. mock_password_provider.reset_mock()
  541. # check that auth with password is rejected
  542. body = {
  543. "auth": {
  544. "type": "m.login.password",
  545. "identifier": {"type": "m.id.user", "user": "localuser"},
  546. "password": "localpass",
  547. "session": session,
  548. },
  549. }
  550. channel = self._delete_device(tok1, "dev2", body)
  551. self.assertEqual(channel.code, 400)
  552. self.assertEqual(
  553. "Password login has been disabled.", channel.json_body["error"]
  554. )
  555. mock_password_provider.check_auth.assert_not_called()
  556. mock_password_provider.check_password.assert_not_called()
  557. mock_password_provider.reset_mock()
  558. # successful auth
  559. body["auth"]["type"] = "test.login_type"
  560. body["auth"]["test_field"] = "x"
  561. channel = self._delete_device(tok1, "dev2", body)
  562. self.assertEqual(channel.code, 200)
  563. mock_password_provider.check_auth.assert_called_once_with(
  564. "localuser", "test.login_type", {"test_field": "x"}
  565. )
  566. mock_password_provider.check_password.assert_not_called()
  567. @override_config(
  568. {
  569. **legacy_providers_config(LegacyCustomAuthProvider),
  570. "password_config": {"localdb_enabled": False},
  571. }
  572. )
  573. def test_custom_auth_no_local_user_fallback_legacy(self):
  574. self.custom_auth_no_local_user_fallback_test_body()
  575. @override_config(
  576. {
  577. **providers_config(CustomAuthProvider),
  578. "password_config": {"localdb_enabled": False},
  579. }
  580. )
  581. def test_custom_auth_no_local_user_fallback(self):
  582. self.custom_auth_no_local_user_fallback_test_body()
  583. def custom_auth_no_local_user_fallback_test_body(self):
  584. """Test login with a custom auth provider where the local db is disabled"""
  585. self.register_user("localuser", "localpass")
  586. flows = self._get_login_flows()
  587. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  588. # password login shouldn't work and should be rejected with a 400
  589. # ("unknown login type")
  590. channel = self._send_password_login("localuser", "localpass")
  591. self.assertEqual(channel.code, 400, channel.result)
  592. def _get_login_flows(self) -> JsonDict:
  593. channel = self.make_request("GET", "/_matrix/client/r0/login")
  594. self.assertEqual(channel.code, 200, channel.result)
  595. return channel.json_body["flows"]
  596. def _send_password_login(self, user: str, password: str) -> FakeChannel:
  597. return self._send_login(type="m.login.password", user=user, password=password)
  598. def _send_login(self, type, user, **params) -> FakeChannel:
  599. params.update({"identifier": {"type": "m.id.user", "user": user}, "type": type})
  600. channel = self.make_request("POST", "/_matrix/client/r0/login", params)
  601. return channel
  602. def _start_delete_device_session(self, access_token, device_id) -> str:
  603. """Make an initial delete device request, and return the UI Auth session ID"""
  604. channel = self._delete_device(access_token, device_id)
  605. self.assertEqual(channel.code, 401)
  606. # Ensure that flows are what is expected.
  607. self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
  608. return channel.json_body["session"]
  609. def _authed_delete_device(
  610. self,
  611. access_token: str,
  612. device_id: str,
  613. session: str,
  614. user_id: str,
  615. password: str,
  616. ) -> FakeChannel:
  617. """Make a delete device request, authenticating with the given uid/password"""
  618. return self._delete_device(
  619. access_token,
  620. device_id,
  621. {
  622. "auth": {
  623. "type": "m.login.password",
  624. "identifier": {"type": "m.id.user", "user": user_id},
  625. "password": password,
  626. "session": session,
  627. },
  628. },
  629. )
  630. def _delete_device(
  631. self,
  632. access_token: str,
  633. device: str,
  634. body: Union[JsonDict, bytes] = b"",
  635. ) -> FakeChannel:
  636. """Delete an individual device."""
  637. channel = self.make_request(
  638. "DELETE", "devices/" + device, body, access_token=access_token
  639. )
  640. return channel