123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- # Copyright 2022 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import json
- from typing import Any, ContextManager, Dict, List, Optional, Tuple
- from unittest.mock import Mock, patch
- from urllib.parse import parse_qs
- import attr
- from twisted.web.http_headers import Headers
- from twisted.web.iweb import IResponse
- from synapse.server import HomeServer
- from synapse.util import Clock
- from synapse.util.stringutils import random_string
- from tests.test_utils import FakeResponse
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class FakeAuthorizationGrant:
- userinfo: dict
- client_id: str
- redirect_uri: str
- scope: str
- nonce: Optional[str]
- sid: Optional[str]
- class FakeOidcServer:
- """A fake OpenID Connect Provider."""
- # All methods here are mocks, so we can track when they are called, and override
- # their values
- request: Mock
- get_jwks_handler: Mock
- get_metadata_handler: Mock
- get_userinfo_handler: Mock
- post_token_handler: Mock
- sid_counter: int = 0
- def __init__(self, clock: Clock, issuer: str):
- from authlib.jose import ECKey, KeySet
- self._clock = clock
- self.issuer = issuer
- self.request = Mock(side_effect=self._request)
- self.get_jwks_handler = Mock(side_effect=self._get_jwks_handler)
- self.get_metadata_handler = Mock(side_effect=self._get_metadata_handler)
- self.get_userinfo_handler = Mock(side_effect=self._get_userinfo_handler)
- self.post_token_handler = Mock(side_effect=self._post_token_handler)
- # A code -> grant mapping
- self._authorization_grants: Dict[str, FakeAuthorizationGrant] = {}
- # An access token -> grant mapping
- self._sessions: Dict[str, FakeAuthorizationGrant] = {}
- # We generate here an ECDSA key with the P-256 curve (ES256 algorithm) used for
- # signing JWTs. ECDSA keys are really quick to generate compared to RSA.
- self._key = ECKey.generate_key(crv="P-256", is_private=True)
- self._jwks = KeySet([ECKey.import_key(self._key.as_pem(is_private=False))])
- self._id_token_overrides: Dict[str, Any] = {}
- def reset_mocks(self) -> None:
- self.request.reset_mock()
- self.get_jwks_handler.reset_mock()
- self.get_metadata_handler.reset_mock()
- self.get_userinfo_handler.reset_mock()
- self.post_token_handler.reset_mock()
- def patch_homeserver(self, hs: HomeServer) -> ContextManager[Mock]:
- """Patch the ``HomeServer`` HTTP client to handle requests through the ``FakeOidcServer``.
- This patch should be used whenever the HS is expected to perform request to the
- OIDC provider, e.g.::
- fake_oidc_server = self.helper.fake_oidc_server()
- with fake_oidc_server.patch_homeserver(hs):
- self.make_request("GET", "/_matrix/client/r0/login/sso/redirect")
- """
- return patch.object(hs.get_proxied_http_client(), "request", self.request)
- @property
- def authorization_endpoint(self) -> str:
- return self.issuer + "authorize"
- @property
- def token_endpoint(self) -> str:
- return self.issuer + "token"
- @property
- def userinfo_endpoint(self) -> str:
- return self.issuer + "userinfo"
- @property
- def metadata_endpoint(self) -> str:
- return self.issuer + ".well-known/openid-configuration"
- @property
- def jwks_uri(self) -> str:
- return self.issuer + "jwks"
- def get_metadata(self) -> dict:
- return {
- "issuer": self.issuer,
- "authorization_endpoint": self.authorization_endpoint,
- "token_endpoint": self.token_endpoint,
- "jwks_uri": self.jwks_uri,
- "userinfo_endpoint": self.userinfo_endpoint,
- "response_types_supported": ["code"],
- "subject_types_supported": ["public"],
- "id_token_signing_alg_values_supported": ["ES256"],
- }
- def get_jwks(self) -> dict:
- return self._jwks.as_dict()
- def get_userinfo(self, access_token: str) -> Optional[dict]:
- """Given an access token, get the userinfo of the associated session."""
- session = self._sessions.get(access_token, None)
- if session is None:
- return None
- return session.userinfo
- def _sign(self, payload: dict) -> str:
- from authlib.jose import JsonWebSignature
- jws = JsonWebSignature()
- kid = self.get_jwks()["keys"][0]["kid"]
- protected = {"alg": "ES256", "kid": kid}
- json_payload = json.dumps(payload)
- return jws.serialize_compact(protected, json_payload, self._key).decode("utf-8")
- def generate_id_token(self, grant: FakeAuthorizationGrant) -> str:
- now = int(self._clock.time())
- id_token = {
- **grant.userinfo,
- "iss": self.issuer,
- "aud": grant.client_id,
- "iat": now,
- "nbf": now,
- "exp": now + 600,
- }
- if grant.nonce is not None:
- id_token["nonce"] = grant.nonce
- if grant.sid is not None:
- id_token["sid"] = grant.sid
- id_token.update(self._id_token_overrides)
- return self._sign(id_token)
- def generate_logout_token(self, grant: FakeAuthorizationGrant) -> str:
- now = int(self._clock.time())
- logout_token = {
- "iss": self.issuer,
- "aud": grant.client_id,
- "iat": now,
- "jti": random_string(10),
- "events": {
- "http://schemas.openid.net/event/backchannel-logout": {},
- },
- }
- if grant.sid is not None:
- logout_token["sid"] = grant.sid
- if "sub" in grant.userinfo:
- logout_token["sub"] = grant.userinfo["sub"]
- return self._sign(logout_token)
- def id_token_override(self, overrides: dict) -> ContextManager[dict]:
- """Temporarily patch the ID token generated by the token endpoint."""
- return patch.object(self, "_id_token_overrides", overrides)
- def start_authorization(
- self,
- client_id: str,
- scope: str,
- redirect_uri: str,
- userinfo: dict,
- nonce: Optional[str] = None,
- with_sid: bool = False,
- ) -> Tuple[str, FakeAuthorizationGrant]:
- """Start an authorization request, and get back the code to use on the authorization endpoint."""
- code = random_string(10)
- sid = None
- if with_sid:
- sid = str(self.sid_counter)
- self.sid_counter += 1
- grant = FakeAuthorizationGrant(
- userinfo=userinfo,
- scope=scope,
- redirect_uri=redirect_uri,
- nonce=nonce,
- client_id=client_id,
- sid=sid,
- )
- self._authorization_grants[code] = grant
- return code, grant
- def exchange_code(self, code: str) -> Optional[Dict[str, Any]]:
- grant = self._authorization_grants.pop(code, None)
- if grant is None:
- return None
- access_token = random_string(10)
- self._sessions[access_token] = grant
- token = {
- "token_type": "Bearer",
- "access_token": access_token,
- "expires_in": 3600,
- "scope": grant.scope,
- }
- if "openid" in grant.scope:
- token["id_token"] = self.generate_id_token(grant)
- return dict(token)
- def buggy_endpoint(
- self,
- *,
- jwks: bool = False,
- metadata: bool = False,
- token: bool = False,
- userinfo: bool = False,
- ) -> ContextManager[Dict[str, Mock]]:
- """A context which makes a set of endpoints return a 500 error.
- Args:
- jwks: If True, makes the JWKS endpoint return a 500 error.
- metadata: If True, makes the OIDC Discovery endpoint return a 500 error.
- token: If True, makes the token endpoint return a 500 error.
- userinfo: If True, makes the userinfo endpoint return a 500 error.
- """
- buggy = FakeResponse(code=500, body=b"Internal server error")
- patches = {}
- if jwks:
- patches["get_jwks_handler"] = Mock(return_value=buggy)
- if metadata:
- patches["get_metadata_handler"] = Mock(return_value=buggy)
- if token:
- patches["post_token_handler"] = Mock(return_value=buggy)
- if userinfo:
- patches["get_userinfo_handler"] = Mock(return_value=buggy)
- return patch.multiple(self, **patches)
- async def _request(
- self,
- method: str,
- uri: str,
- data: Optional[bytes] = None,
- headers: Optional[Headers] = None,
- ) -> IResponse:
- """The override of the SimpleHttpClient#request() method"""
- access_token: Optional[str] = None
- if headers is None:
- headers = Headers()
- # Try to find the access token in the headers if any
- auth_headers = headers.getRawHeaders(b"Authorization")
- if auth_headers:
- parts = auth_headers[0].split(b" ")
- if parts[0] == b"Bearer" and len(parts) == 2:
- access_token = parts[1].decode("ascii")
- if method == "POST":
- # If the method is POST, assume it has an url-encoded body
- if data is None or headers.getRawHeaders(b"Content-Type") != [
- b"application/x-www-form-urlencoded"
- ]:
- return FakeResponse.json(code=400, payload={"error": "invalid_request"})
- params = parse_qs(data.decode("utf-8"))
- if uri == self.token_endpoint:
- # Even though this endpoint should be protected, this does not check
- # for client authentication. We're not checking it for simplicity,
- # and because client authentication is tested in other standalone tests.
- return self.post_token_handler(params)
- elif method == "GET":
- if uri == self.jwks_uri:
- return self.get_jwks_handler()
- elif uri == self.metadata_endpoint:
- return self.get_metadata_handler()
- elif uri == self.userinfo_endpoint:
- return self.get_userinfo_handler(access_token=access_token)
- return FakeResponse(code=404, body=b"404 not found")
- # Request handlers
- def _get_jwks_handler(self) -> IResponse:
- """Handles requests to the JWKS URI."""
- return FakeResponse.json(payload=self.get_jwks())
- def _get_metadata_handler(self) -> IResponse:
- """Handles requests to the OIDC well-known document."""
- return FakeResponse.json(payload=self.get_metadata())
- def _get_userinfo_handler(self, access_token: Optional[str]) -> IResponse:
- """Handles requests to the userinfo endpoint."""
- if access_token is None:
- return FakeResponse(code=401)
- user_info = self.get_userinfo(access_token)
- if user_info is None:
- return FakeResponse(code=401)
- return FakeResponse.json(payload=user_info)
- def _post_token_handler(self, params: Dict[str, List[str]]) -> IResponse:
- """Handles requests to the token endpoint."""
- code = params.get("code", [])
- if len(code) != 1:
- return FakeResponse.json(code=400, payload={"error": "invalid_request"})
- grant = self.exchange_code(code=code[0])
- if grant is None:
- return FakeResponse.json(code=400, payload={"error": "invalid_grant"})
- return FakeResponse.json(payload=grant)
|