test_password_providers.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Tests for the password_auth_provider interface"""
  15. from typing import Any, Type, Union
  16. from unittest.mock import Mock
  17. from twisted.internet import defer
  18. import synapse
  19. from synapse.handlers.auth import load_legacy_password_auth_providers
  20. from synapse.module_api import ModuleApi
  21. from synapse.rest.client import devices, login
  22. from synapse.types import JsonDict
  23. from tests import unittest
  24. from tests.server import FakeChannel
  25. from tests.unittest import override_config
  26. # (possibly experimental) login flows we expect to appear in the list after the normal
  27. # ones
  28. ADDITIONAL_LOGIN_FLOWS = [{"type": "uk.half-shot.msc2778.login.application_service"}]
  29. # a mock instance which the dummy auth providers delegate to, so we can see what's going
  30. # on
  31. mock_password_provider = Mock()
  32. class LegacyPasswordOnlyAuthProvider:
  33. """A legacy password_provider which only implements `check_password`."""
  34. @staticmethod
  35. def parse_config(self):
  36. pass
  37. def __init__(self, config, account_handler):
  38. pass
  39. def check_password(self, *args):
  40. return mock_password_provider.check_password(*args)
  41. class LegacyCustomAuthProvider:
  42. """A legacy password_provider which implements a custom login type."""
  43. @staticmethod
  44. def parse_config(self):
  45. pass
  46. def __init__(self, config, account_handler):
  47. pass
  48. def get_supported_login_types(self):
  49. return {"test.login_type": ["test_field"]}
  50. def check_auth(self, *args):
  51. return mock_password_provider.check_auth(*args)
  52. class CustomAuthProvider:
  53. """A module which registers password_auth_provider callbacks for a custom login type."""
  54. @staticmethod
  55. def parse_config(self):
  56. pass
  57. def __init__(self, config, api: ModuleApi):
  58. api.register_password_auth_provider_callbacks(
  59. auth_checkers={("test.login_type", ("test_field",)): self.check_auth},
  60. )
  61. def check_auth(self, *args):
  62. return mock_password_provider.check_auth(*args)
  63. class LegacyPasswordCustomAuthProvider:
  64. """A password_provider which implements password login via `check_auth`, as well
  65. as a custom type."""
  66. @staticmethod
  67. def parse_config(self):
  68. pass
  69. def __init__(self, config, account_handler):
  70. pass
  71. def get_supported_login_types(self):
  72. return {"m.login.password": ["password"], "test.login_type": ["test_field"]}
  73. def check_auth(self, *args):
  74. return mock_password_provider.check_auth(*args)
  75. class PasswordCustomAuthProvider:
  76. """A module which registers password_auth_provider callbacks for a custom login type.
  77. as well as a password login"""
  78. @staticmethod
  79. def parse_config(self):
  80. pass
  81. def __init__(self, config, api: ModuleApi):
  82. api.register_password_auth_provider_callbacks(
  83. auth_checkers={
  84. ("test.login_type", ("test_field",)): self.check_auth,
  85. ("m.login.password", ("password",)): self.check_auth,
  86. },
  87. )
  88. pass
  89. def check_auth(self, *args):
  90. return mock_password_provider.check_auth(*args)
  91. def check_pass(self, *args):
  92. return mock_password_provider.check_password(*args)
  93. def legacy_providers_config(*providers: Type[Any]) -> dict:
  94. """Returns a config dict that will enable the given legacy password auth providers"""
  95. return {
  96. "password_providers": [
  97. {"module": "%s.%s" % (__name__, provider.__qualname__), "config": {}}
  98. for provider in providers
  99. ]
  100. }
  101. def providers_config(*providers: Type[Any]) -> dict:
  102. """Returns a config dict that will enable the given modules"""
  103. return {
  104. "modules": [
  105. {"module": "%s.%s" % (__name__, provider.__qualname__), "config": {}}
  106. for provider in providers
  107. ]
  108. }
  109. class PasswordAuthProviderTests(unittest.HomeserverTestCase):
  110. servlets = [
  111. synapse.rest.admin.register_servlets,
  112. login.register_servlets,
  113. devices.register_servlets,
  114. ]
  115. def setUp(self):
  116. # we use a global mock device, so make sure we are starting with a clean slate
  117. mock_password_provider.reset_mock()
  118. super().setUp()
  119. def make_homeserver(self, reactor, clock):
  120. hs = self.setup_test_homeserver()
  121. # Load the modules into the homeserver
  122. module_api = hs.get_module_api()
  123. for module, config in hs.config.modules.loaded_modules:
  124. module(config=config, api=module_api)
  125. load_legacy_password_auth_providers(hs)
  126. return hs
  127. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  128. def test_password_only_auth_progiver_login_legacy(self):
  129. self.password_only_auth_provider_login_test_body()
  130. def password_only_auth_provider_login_test_body(self):
  131. # login flows should only have m.login.password
  132. flows = self._get_login_flows()
  133. self.assertEqual(flows, [{"type": "m.login.password"}] + ADDITIONAL_LOGIN_FLOWS)
  134. # check_password must return an awaitable
  135. mock_password_provider.check_password.return_value = defer.succeed(True)
  136. channel = self._send_password_login("u", "p")
  137. self.assertEqual(channel.code, 200, channel.result)
  138. self.assertEqual("@u:test", channel.json_body["user_id"])
  139. mock_password_provider.check_password.assert_called_once_with("@u:test", "p")
  140. mock_password_provider.reset_mock()
  141. # login with mxid should work too
  142. channel = self._send_password_login("@u:bz", "p")
  143. self.assertEqual(channel.code, 200, channel.result)
  144. self.assertEqual("@u:bz", channel.json_body["user_id"])
  145. mock_password_provider.check_password.assert_called_once_with("@u:bz", "p")
  146. mock_password_provider.reset_mock()
  147. # try a weird username / pass. Honestly it's unclear what we *expect* to happen
  148. # in these cases, but at least we can guard against the API changing
  149. # unexpectedly
  150. channel = self._send_password_login(" USER🙂NAME ", " pASS\U0001F622word ")
  151. self.assertEqual(channel.code, 200, channel.result)
  152. self.assertEqual("@ USER🙂NAME :test", channel.json_body["user_id"])
  153. mock_password_provider.check_password.assert_called_once_with(
  154. "@ USER🙂NAME :test", " pASS😢word "
  155. )
  156. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  157. def test_password_only_auth_provider_ui_auth_legacy(self):
  158. self.password_only_auth_provider_ui_auth_test_body()
  159. def password_only_auth_provider_ui_auth_test_body(self):
  160. """UI Auth should delegate correctly to the password provider"""
  161. # create the user, otherwise access doesn't work
  162. module_api = self.hs.get_module_api()
  163. self.get_success(module_api.register_user("u"))
  164. # log in twice, to get two devices
  165. mock_password_provider.check_password.return_value = defer.succeed(True)
  166. tok1 = self.login("u", "p")
  167. self.login("u", "p", device_id="dev2")
  168. mock_password_provider.reset_mock()
  169. # have the auth provider deny the request to start with
  170. mock_password_provider.check_password.return_value = defer.succeed(False)
  171. # make the initial request which returns a 401
  172. session = self._start_delete_device_session(tok1, "dev2")
  173. mock_password_provider.check_password.assert_not_called()
  174. # Make another request providing the UI auth flow.
  175. channel = self._authed_delete_device(tok1, "dev2", session, "u", "p")
  176. self.assertEqual(channel.code, 401) # XXX why not a 403?
  177. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  178. mock_password_provider.check_password.assert_called_once_with("@u:test", "p")
  179. mock_password_provider.reset_mock()
  180. # Finally, check the request goes through when we allow it
  181. mock_password_provider.check_password.return_value = defer.succeed(True)
  182. channel = self._authed_delete_device(tok1, "dev2", session, "u", "p")
  183. self.assertEqual(channel.code, 200)
  184. mock_password_provider.check_password.assert_called_once_with("@u:test", "p")
  185. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  186. def test_local_user_fallback_login_legacy(self):
  187. self.local_user_fallback_login_test_body()
  188. def local_user_fallback_login_test_body(self):
  189. """rejected login should fall back to local db"""
  190. self.register_user("localuser", "localpass")
  191. # check_password must return an awaitable
  192. mock_password_provider.check_password.return_value = defer.succeed(False)
  193. channel = self._send_password_login("u", "p")
  194. self.assertEqual(channel.code, 403, channel.result)
  195. channel = self._send_password_login("localuser", "localpass")
  196. self.assertEqual(channel.code, 200, channel.result)
  197. self.assertEqual("@localuser:test", channel.json_body["user_id"])
  198. @override_config(legacy_providers_config(LegacyPasswordOnlyAuthProvider))
  199. def test_local_user_fallback_ui_auth_legacy(self):
  200. self.local_user_fallback_ui_auth_test_body()
  201. def local_user_fallback_ui_auth_test_body(self):
  202. """rejected login should fall back to local db"""
  203. self.register_user("localuser", "localpass")
  204. # have the auth provider deny the request
  205. mock_password_provider.check_password.return_value = defer.succeed(False)
  206. # log in twice, to get two devices
  207. tok1 = self.login("localuser", "localpass")
  208. self.login("localuser", "localpass", device_id="dev2")
  209. mock_password_provider.check_password.reset_mock()
  210. # first delete should give a 401
  211. session = self._start_delete_device_session(tok1, "dev2")
  212. mock_password_provider.check_password.assert_not_called()
  213. # Wrong password
  214. channel = self._authed_delete_device(tok1, "dev2", session, "localuser", "xxx")
  215. self.assertEqual(channel.code, 401) # XXX why not a 403?
  216. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  217. mock_password_provider.check_password.assert_called_once_with(
  218. "@localuser:test", "xxx"
  219. )
  220. mock_password_provider.reset_mock()
  221. # Right password
  222. channel = self._authed_delete_device(
  223. tok1, "dev2", session, "localuser", "localpass"
  224. )
  225. self.assertEqual(channel.code, 200)
  226. mock_password_provider.check_password.assert_called_once_with(
  227. "@localuser:test", "localpass"
  228. )
  229. @override_config(
  230. {
  231. **legacy_providers_config(LegacyPasswordOnlyAuthProvider),
  232. "password_config": {"localdb_enabled": False},
  233. }
  234. )
  235. def test_no_local_user_fallback_login_legacy(self):
  236. self.no_local_user_fallback_login_test_body()
  237. def no_local_user_fallback_login_test_body(self):
  238. """localdb_enabled can block login with the local password"""
  239. self.register_user("localuser", "localpass")
  240. # check_password must return an awaitable
  241. mock_password_provider.check_password.return_value = defer.succeed(False)
  242. channel = self._send_password_login("localuser", "localpass")
  243. self.assertEqual(channel.code, 403)
  244. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  245. mock_password_provider.check_password.assert_called_once_with(
  246. "@localuser:test", "localpass"
  247. )
  248. @override_config(
  249. {
  250. **legacy_providers_config(LegacyPasswordOnlyAuthProvider),
  251. "password_config": {"localdb_enabled": False},
  252. }
  253. )
  254. def test_no_local_user_fallback_ui_auth_legacy(self):
  255. self.no_local_user_fallback_ui_auth_test_body()
  256. def no_local_user_fallback_ui_auth_test_body(self):
  257. """localdb_enabled can block ui auth with the local password"""
  258. self.register_user("localuser", "localpass")
  259. # allow login via the auth provider
  260. mock_password_provider.check_password.return_value = defer.succeed(True)
  261. # log in twice, to get two devices
  262. tok1 = self.login("localuser", "p")
  263. self.login("localuser", "p", device_id="dev2")
  264. mock_password_provider.check_password.reset_mock()
  265. # first delete should give a 401
  266. channel = self._delete_device(tok1, "dev2")
  267. self.assertEqual(channel.code, 401)
  268. # m.login.password UIA is permitted because the auth provider allows it,
  269. # even though the localdb does not.
  270. self.assertEqual(channel.json_body["flows"], [{"stages": ["m.login.password"]}])
  271. session = channel.json_body["session"]
  272. mock_password_provider.check_password.assert_not_called()
  273. # now try deleting with the local password
  274. mock_password_provider.check_password.return_value = defer.succeed(False)
  275. channel = self._authed_delete_device(
  276. tok1, "dev2", session, "localuser", "localpass"
  277. )
  278. self.assertEqual(channel.code, 401) # XXX why not a 403?
  279. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  280. mock_password_provider.check_password.assert_called_once_with(
  281. "@localuser:test", "localpass"
  282. )
  283. @override_config(
  284. {
  285. **legacy_providers_config(LegacyPasswordOnlyAuthProvider),
  286. "password_config": {"enabled": False},
  287. }
  288. )
  289. def test_password_auth_disabled_legacy(self):
  290. self.password_auth_disabled_test_body()
  291. def password_auth_disabled_test_body(self):
  292. """password auth doesn't work if it's disabled across the board"""
  293. # login flows should be empty
  294. flows = self._get_login_flows()
  295. self.assertEqual(flows, ADDITIONAL_LOGIN_FLOWS)
  296. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  297. channel = self._send_password_login("u", "p")
  298. self.assertEqual(channel.code, 400, channel.result)
  299. mock_password_provider.check_password.assert_not_called()
  300. @override_config(legacy_providers_config(LegacyCustomAuthProvider))
  301. def test_custom_auth_provider_login_legacy(self):
  302. self.custom_auth_provider_login_test_body()
  303. @override_config(providers_config(CustomAuthProvider))
  304. def test_custom_auth_provider_login(self):
  305. self.custom_auth_provider_login_test_body()
  306. def custom_auth_provider_login_test_body(self):
  307. # login flows should have the custom flow and m.login.password, since we
  308. # haven't disabled local password lookup.
  309. # (password must come first, because reasons)
  310. flows = self._get_login_flows()
  311. self.assertEqual(
  312. flows,
  313. [{"type": "m.login.password"}, {"type": "test.login_type"}]
  314. + ADDITIONAL_LOGIN_FLOWS,
  315. )
  316. # login with missing param should be rejected
  317. channel = self._send_login("test.login_type", "u")
  318. self.assertEqual(channel.code, 400, channel.result)
  319. mock_password_provider.check_auth.assert_not_called()
  320. mock_password_provider.check_auth.return_value = defer.succeed(
  321. ("@user:bz", None)
  322. )
  323. channel = self._send_login("test.login_type", "u", test_field="y")
  324. self.assertEqual(channel.code, 200, channel.result)
  325. self.assertEqual("@user:bz", channel.json_body["user_id"])
  326. mock_password_provider.check_auth.assert_called_once_with(
  327. "u", "test.login_type", {"test_field": "y"}
  328. )
  329. mock_password_provider.reset_mock()
  330. # try a weird username. Again, it's unclear what we *expect* to happen
  331. # in these cases, but at least we can guard against the API changing
  332. # unexpectedly
  333. mock_password_provider.check_auth.return_value = defer.succeed(
  334. ("@ MALFORMED! :bz", None)
  335. )
  336. channel = self._send_login("test.login_type", " USER🙂NAME ", test_field=" abc ")
  337. self.assertEqual(channel.code, 200, channel.result)
  338. self.assertEqual("@ MALFORMED! :bz", channel.json_body["user_id"])
  339. mock_password_provider.check_auth.assert_called_once_with(
  340. " USER🙂NAME ", "test.login_type", {"test_field": " abc "}
  341. )
  342. @override_config(legacy_providers_config(LegacyCustomAuthProvider))
  343. def test_custom_auth_provider_ui_auth_legacy(self):
  344. self.custom_auth_provider_ui_auth_test_body()
  345. @override_config(providers_config(CustomAuthProvider))
  346. def test_custom_auth_provider_ui_auth(self):
  347. self.custom_auth_provider_ui_auth_test_body()
  348. def custom_auth_provider_ui_auth_test_body(self):
  349. # register the user and log in twice, to get two devices
  350. self.register_user("localuser", "localpass")
  351. tok1 = self.login("localuser", "localpass")
  352. self.login("localuser", "localpass", device_id="dev2")
  353. # make the initial request which returns a 401
  354. channel = self._delete_device(tok1, "dev2")
  355. self.assertEqual(channel.code, 401)
  356. # Ensure that flows are what is expected.
  357. self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
  358. self.assertIn({"stages": ["test.login_type"]}, channel.json_body["flows"])
  359. session = channel.json_body["session"]
  360. # missing param
  361. body = {
  362. "auth": {
  363. "type": "test.login_type",
  364. "identifier": {"type": "m.id.user", "user": "localuser"},
  365. "session": session,
  366. },
  367. }
  368. channel = self._delete_device(tok1, "dev2", body)
  369. self.assertEqual(channel.code, 400)
  370. # there's a perfectly good M_MISSING_PARAM errcode, but heaven forfend we should
  371. # use it...
  372. self.assertIn("Missing parameters", channel.json_body["error"])
  373. mock_password_provider.check_auth.assert_not_called()
  374. mock_password_provider.reset_mock()
  375. # right params, but authing as the wrong user
  376. mock_password_provider.check_auth.return_value = defer.succeed(
  377. ("@user:bz", None)
  378. )
  379. body["auth"]["test_field"] = "foo"
  380. channel = self._delete_device(tok1, "dev2", body)
  381. self.assertEqual(channel.code, 403)
  382. self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
  383. mock_password_provider.check_auth.assert_called_once_with(
  384. "localuser", "test.login_type", {"test_field": "foo"}
  385. )
  386. mock_password_provider.reset_mock()
  387. # and finally, succeed
  388. mock_password_provider.check_auth.return_value = defer.succeed(
  389. ("@localuser:test", None)
  390. )
  391. channel = self._delete_device(tok1, "dev2", body)
  392. self.assertEqual(channel.code, 200)
  393. mock_password_provider.check_auth.assert_called_once_with(
  394. "localuser", "test.login_type", {"test_field": "foo"}
  395. )
  396. @override_config(legacy_providers_config(LegacyCustomAuthProvider))
  397. def test_custom_auth_provider_callback_legacy(self):
  398. self.custom_auth_provider_callback_test_body()
  399. @override_config(providers_config(CustomAuthProvider))
  400. def test_custom_auth_provider_callback(self):
  401. self.custom_auth_provider_callback_test_body()
  402. def custom_auth_provider_callback_test_body(self):
  403. callback = Mock(return_value=defer.succeed(None))
  404. mock_password_provider.check_auth.return_value = defer.succeed(
  405. ("@user:bz", callback)
  406. )
  407. channel = self._send_login("test.login_type", "u", test_field="y")
  408. self.assertEqual(channel.code, 200, channel.result)
  409. self.assertEqual("@user:bz", channel.json_body["user_id"])
  410. mock_password_provider.check_auth.assert_called_once_with(
  411. "u", "test.login_type", {"test_field": "y"}
  412. )
  413. # check the args to the callback
  414. callback.assert_called_once()
  415. call_args, call_kwargs = callback.call_args
  416. # should be one positional arg
  417. self.assertEqual(len(call_args), 1)
  418. self.assertEqual(call_args[0]["user_id"], "@user:bz")
  419. for p in ["user_id", "access_token", "device_id", "home_server"]:
  420. self.assertIn(p, call_args[0])
  421. @override_config(
  422. {
  423. **legacy_providers_config(LegacyCustomAuthProvider),
  424. "password_config": {"enabled": False},
  425. }
  426. )
  427. def test_custom_auth_password_disabled_legacy(self):
  428. self.custom_auth_password_disabled_test_body()
  429. @override_config(
  430. {**providers_config(CustomAuthProvider), "password_config": {"enabled": False}}
  431. )
  432. def test_custom_auth_password_disabled(self):
  433. self.custom_auth_password_disabled_test_body()
  434. def custom_auth_password_disabled_test_body(self):
  435. """Test login with a custom auth provider where password login is disabled"""
  436. self.register_user("localuser", "localpass")
  437. flows = self._get_login_flows()
  438. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  439. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  440. channel = self._send_password_login("localuser", "localpass")
  441. self.assertEqual(channel.code, 400, channel.result)
  442. mock_password_provider.check_auth.assert_not_called()
  443. @override_config(
  444. {
  445. **legacy_providers_config(LegacyCustomAuthProvider),
  446. "password_config": {"enabled": False, "localdb_enabled": False},
  447. }
  448. )
  449. def test_custom_auth_password_disabled_localdb_enabled_legacy(self):
  450. self.custom_auth_password_disabled_localdb_enabled_test_body()
  451. @override_config(
  452. {
  453. **providers_config(CustomAuthProvider),
  454. "password_config": {"enabled": False, "localdb_enabled": False},
  455. }
  456. )
  457. def test_custom_auth_password_disabled_localdb_enabled(self):
  458. self.custom_auth_password_disabled_localdb_enabled_test_body()
  459. def custom_auth_password_disabled_localdb_enabled_test_body(self):
  460. """Check the localdb_enabled == enabled == False
  461. Regression test for https://github.com/matrix-org/synapse/issues/8914: check
  462. that setting *both* `localdb_enabled` *and* `password: enabled` to False doesn't
  463. cause an exception.
  464. """
  465. self.register_user("localuser", "localpass")
  466. flows = self._get_login_flows()
  467. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  468. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  469. channel = self._send_password_login("localuser", "localpass")
  470. self.assertEqual(channel.code, 400, channel.result)
  471. mock_password_provider.check_auth.assert_not_called()
  472. @override_config(
  473. {
  474. **legacy_providers_config(LegacyPasswordCustomAuthProvider),
  475. "password_config": {"enabled": False},
  476. }
  477. )
  478. def test_password_custom_auth_password_disabled_login_legacy(self):
  479. self.password_custom_auth_password_disabled_login_test_body()
  480. @override_config(
  481. {
  482. **providers_config(PasswordCustomAuthProvider),
  483. "password_config": {"enabled": False},
  484. }
  485. )
  486. def test_password_custom_auth_password_disabled_login(self):
  487. self.password_custom_auth_password_disabled_login_test_body()
  488. def password_custom_auth_password_disabled_login_test_body(self):
  489. """log in with a custom auth provider which implements password, but password
  490. login is disabled"""
  491. self.register_user("localuser", "localpass")
  492. flows = self._get_login_flows()
  493. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  494. # login shouldn't work and should be rejected with a 400 ("unknown login type")
  495. channel = self._send_password_login("localuser", "localpass")
  496. self.assertEqual(channel.code, 400, channel.result)
  497. mock_password_provider.check_auth.assert_not_called()
  498. mock_password_provider.check_password.assert_not_called()
  499. @override_config(
  500. {
  501. **legacy_providers_config(LegacyPasswordCustomAuthProvider),
  502. "password_config": {"enabled": False},
  503. }
  504. )
  505. def test_password_custom_auth_password_disabled_ui_auth_legacy(self):
  506. self.password_custom_auth_password_disabled_ui_auth_test_body()
  507. @override_config(
  508. {
  509. **providers_config(PasswordCustomAuthProvider),
  510. "password_config": {"enabled": False},
  511. }
  512. )
  513. def test_password_custom_auth_password_disabled_ui_auth(self):
  514. self.password_custom_auth_password_disabled_ui_auth_test_body()
  515. def password_custom_auth_password_disabled_ui_auth_test_body(self):
  516. """UI Auth with a custom auth provider which implements password, but password
  517. login is disabled"""
  518. # register the user and log in twice via the test login type to get two devices,
  519. self.register_user("localuser", "localpass")
  520. mock_password_provider.check_auth.return_value = defer.succeed(
  521. ("@localuser:test", None)
  522. )
  523. channel = self._send_login("test.login_type", "localuser", test_field="")
  524. self.assertEqual(channel.code, 200, channel.result)
  525. tok1 = channel.json_body["access_token"]
  526. channel = self._send_login(
  527. "test.login_type", "localuser", test_field="", device_id="dev2"
  528. )
  529. self.assertEqual(channel.code, 200, channel.result)
  530. # make the initial request which returns a 401
  531. channel = self._delete_device(tok1, "dev2")
  532. self.assertEqual(channel.code, 401)
  533. # Ensure that flows are what is expected. In particular, "password" should *not*
  534. # be present.
  535. self.assertIn({"stages": ["test.login_type"]}, channel.json_body["flows"])
  536. session = channel.json_body["session"]
  537. mock_password_provider.reset_mock()
  538. # check that auth with password is rejected
  539. body = {
  540. "auth": {
  541. "type": "m.login.password",
  542. "identifier": {"type": "m.id.user", "user": "localuser"},
  543. "password": "localpass",
  544. "session": session,
  545. },
  546. }
  547. channel = self._delete_device(tok1, "dev2", body)
  548. self.assertEqual(channel.code, 400)
  549. self.assertEqual(
  550. "Password login has been disabled.", channel.json_body["error"]
  551. )
  552. mock_password_provider.check_auth.assert_not_called()
  553. mock_password_provider.check_password.assert_not_called()
  554. mock_password_provider.reset_mock()
  555. # successful auth
  556. body["auth"]["type"] = "test.login_type"
  557. body["auth"]["test_field"] = "x"
  558. channel = self._delete_device(tok1, "dev2", body)
  559. self.assertEqual(channel.code, 200)
  560. mock_password_provider.check_auth.assert_called_once_with(
  561. "localuser", "test.login_type", {"test_field": "x"}
  562. )
  563. mock_password_provider.check_password.assert_not_called()
  564. @override_config(
  565. {
  566. **legacy_providers_config(LegacyCustomAuthProvider),
  567. "password_config": {"localdb_enabled": False},
  568. }
  569. )
  570. def test_custom_auth_no_local_user_fallback_legacy(self):
  571. self.custom_auth_no_local_user_fallback_test_body()
  572. @override_config(
  573. {
  574. **providers_config(CustomAuthProvider),
  575. "password_config": {"localdb_enabled": False},
  576. }
  577. )
  578. def test_custom_auth_no_local_user_fallback(self):
  579. self.custom_auth_no_local_user_fallback_test_body()
  580. def custom_auth_no_local_user_fallback_test_body(self):
  581. """Test login with a custom auth provider where the local db is disabled"""
  582. self.register_user("localuser", "localpass")
  583. flows = self._get_login_flows()
  584. self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS)
  585. # password login shouldn't work and should be rejected with a 400
  586. # ("unknown login type")
  587. channel = self._send_password_login("localuser", "localpass")
  588. self.assertEqual(channel.code, 400, channel.result)
  589. def _get_login_flows(self) -> JsonDict:
  590. channel = self.make_request("GET", "/_matrix/client/r0/login")
  591. self.assertEqual(channel.code, 200, channel.result)
  592. return channel.json_body["flows"]
  593. def _send_password_login(self, user: str, password: str) -> FakeChannel:
  594. return self._send_login(type="m.login.password", user=user, password=password)
  595. def _send_login(self, type, user, **params) -> FakeChannel:
  596. params.update({"identifier": {"type": "m.id.user", "user": user}, "type": type})
  597. channel = self.make_request("POST", "/_matrix/client/r0/login", params)
  598. return channel
  599. def _start_delete_device_session(self, access_token, device_id) -> str:
  600. """Make an initial delete device request, and return the UI Auth session ID"""
  601. channel = self._delete_device(access_token, device_id)
  602. self.assertEqual(channel.code, 401)
  603. # Ensure that flows are what is expected.
  604. self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
  605. return channel.json_body["session"]
  606. def _authed_delete_device(
  607. self,
  608. access_token: str,
  609. device_id: str,
  610. session: str,
  611. user_id: str,
  612. password: str,
  613. ) -> FakeChannel:
  614. """Make a delete device request, authenticating with the given uid/password"""
  615. return self._delete_device(
  616. access_token,
  617. device_id,
  618. {
  619. "auth": {
  620. "type": "m.login.password",
  621. "identifier": {"type": "m.id.user", "user": user_id},
  622. "password": password,
  623. "session": session,
  624. },
  625. },
  626. )
  627. def _delete_device(
  628. self,
  629. access_token: str,
  630. device: str,
  631. body: Union[JsonDict, bytes] = b"",
  632. ) -> FakeChannel:
  633. """Delete an individual device."""
  634. channel = self.make_request(
  635. "DELETE", "devices/" + device, body, access_token=access_token
  636. )
  637. return channel