certs.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. #***************************************************************************
  4. # _ _ ____ _
  5. # Project ___| | | | _ \| |
  6. # / __| | | | |_) | |
  7. # | (__| |_| | _ <| |___
  8. # \___|\___/|_| \_\_____|
  9. #
  10. # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
  11. #
  12. # This software is licensed as described in the file COPYING, which
  13. # you should have received as part of this distribution. The terms
  14. # are also available at https://curl.se/docs/copyright.html.
  15. #
  16. # You may opt to use, copy, modify, merge, publish, distribute and/or sell
  17. # copies of the Software, and permit persons to whom the Software is
  18. # furnished to do so, under the terms of the COPYING file.
  19. #
  20. # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
  21. # KIND, either express or implied.
  22. #
  23. # SPDX-License-Identifier: curl
  24. #
  25. ###########################################################################
  26. #
  27. import ipaddress
  28. import os
  29. import re
  30. from datetime import timedelta, datetime, timezone
  31. from typing import List, Any, Optional
  32. from cryptography import x509
  33. from cryptography.hazmat.backends import default_backend
  34. from cryptography.hazmat.primitives import hashes
  35. from cryptography.hazmat.primitives.asymmetric import ec, rsa
  36. from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
  37. from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
  38. from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
  39. from cryptography.x509 import ExtendedKeyUsageOID, NameOID
  40. EC_SUPPORTED = {}
  41. EC_SUPPORTED.update([(curve.name.upper(), curve) for curve in [
  42. ec.SECP192R1,
  43. ec.SECP224R1,
  44. ec.SECP256R1,
  45. ec.SECP384R1,
  46. ]])
  47. def _private_key(key_type):
  48. if isinstance(key_type, str):
  49. key_type = key_type.upper()
  50. m = re.match(r'^(RSA)?(\d+)$', key_type)
  51. if m:
  52. key_type = int(m.group(2))
  53. if isinstance(key_type, int):
  54. return rsa.generate_private_key(
  55. public_exponent=65537,
  56. key_size=key_type,
  57. backend=default_backend()
  58. )
  59. if not isinstance(key_type, ec.EllipticCurve) and key_type in EC_SUPPORTED:
  60. key_type = EC_SUPPORTED[key_type]
  61. return ec.generate_private_key(
  62. curve=key_type,
  63. backend=default_backend()
  64. )
  65. class CertificateSpec:
  66. def __init__(self, name: Optional[str] = None,
  67. domains: Optional[List[str]] = None,
  68. email: Optional[str] = None,
  69. key_type: Optional[str] = None,
  70. single_file: bool = False,
  71. valid_from: timedelta = timedelta(days=-1),
  72. valid_to: timedelta = timedelta(days=89),
  73. client: bool = False,
  74. check_valid: bool = True,
  75. sub_specs: Optional[List['CertificateSpec']] = None):
  76. self._name = name
  77. self.domains = domains
  78. self.client = client
  79. self.email = email
  80. self.key_type = key_type
  81. self.single_file = single_file
  82. self.valid_from = valid_from
  83. self.valid_to = valid_to
  84. self.sub_specs = sub_specs
  85. self.check_valid = check_valid
  86. @property
  87. def name(self) -> Optional[str]:
  88. if self._name:
  89. return self._name
  90. elif self.domains:
  91. return self.domains[0]
  92. return None
  93. @property
  94. def type(self) -> Optional[str]:
  95. if self.domains and len(self.domains):
  96. return "server"
  97. elif self.client:
  98. return "client"
  99. elif self.name:
  100. return "ca"
  101. return None
  102. class Credentials:
  103. def __init__(self,
  104. name: str,
  105. cert: Any,
  106. pkey: Any,
  107. issuer: Optional['Credentials'] = None):
  108. self._name = name
  109. self._cert = cert
  110. self._pkey = pkey
  111. self._issuer = issuer
  112. self._cert_file = None
  113. self._pkey_file = None
  114. self._store = None
  115. self._combined_file = None
  116. @property
  117. def name(self) -> str:
  118. return self._name
  119. @property
  120. def subject(self) -> x509.Name:
  121. return self._cert.subject
  122. @property
  123. def key_type(self):
  124. if isinstance(self._pkey, RSAPrivateKey):
  125. return f"rsa{self._pkey.key_size}"
  126. elif isinstance(self._pkey, EllipticCurvePrivateKey):
  127. return f"{self._pkey.curve.name}"
  128. else:
  129. raise Exception(f"unknown key type: {self._pkey}")
  130. @property
  131. def private_key(self) -> Any:
  132. return self._pkey
  133. @property
  134. def certificate(self) -> Any:
  135. return self._cert
  136. @property
  137. def cert_pem(self) -> bytes:
  138. return self._cert.public_bytes(Encoding.PEM)
  139. @property
  140. def pkey_pem(self) -> bytes:
  141. return self._pkey.private_bytes(
  142. Encoding.PEM,
  143. PrivateFormat.TraditionalOpenSSL if self.key_type.startswith('rsa') else PrivateFormat.PKCS8,
  144. NoEncryption())
  145. @property
  146. def issuer(self) -> Optional['Credentials']:
  147. return self._issuer
  148. def set_store(self, store: 'CertStore'):
  149. self._store = store
  150. def set_files(self, cert_file: str, pkey_file: Optional[str] = None,
  151. combined_file: Optional[str] = None):
  152. self._cert_file = cert_file
  153. self._pkey_file = pkey_file
  154. self._combined_file = combined_file
  155. @property
  156. def cert_file(self) -> str:
  157. return self._cert_file
  158. @property
  159. def pkey_file(self) -> Optional[str]:
  160. return self._pkey_file
  161. @property
  162. def combined_file(self) -> Optional[str]:
  163. return self._combined_file
  164. def get_first(self, name) -> Optional['Credentials']:
  165. creds = self._store.get_credentials_for_name(name) if self._store else []
  166. return creds[0] if len(creds) else None
  167. def get_credentials_for_name(self, name) -> List['Credentials']:
  168. return self._store.get_credentials_for_name(name) if self._store else []
  169. def issue_certs(self, specs: List[CertificateSpec],
  170. chain: Optional[List['Credentials']] = None) -> List['Credentials']:
  171. return [self.issue_cert(spec=spec, chain=chain) for spec in specs]
  172. def issue_cert(self, spec: CertificateSpec,
  173. chain: Optional[List['Credentials']] = None) -> 'Credentials':
  174. key_type = spec.key_type if spec.key_type else self.key_type
  175. creds = None
  176. if self._store:
  177. creds = self._store.load_credentials(
  178. name=spec.name, key_type=key_type, single_file=spec.single_file,
  179. issuer=self, check_valid=spec.check_valid)
  180. if creds is None:
  181. creds = TestCA.create_credentials(spec=spec, issuer=self, key_type=key_type,
  182. valid_from=spec.valid_from, valid_to=spec.valid_to)
  183. if self._store:
  184. self._store.save(creds, single_file=spec.single_file)
  185. if spec.type == "ca":
  186. self._store.save_chain(creds, "ca", with_root=True)
  187. if spec.sub_specs:
  188. if self._store:
  189. sub_store = CertStore(fpath=os.path.join(self._store.path, creds.name))
  190. creds.set_store(sub_store)
  191. subchain = chain.copy() if chain else []
  192. subchain.append(self)
  193. creds.issue_certs(spec.sub_specs, chain=subchain)
  194. return creds
  195. class CertStore:
  196. def __init__(self, fpath: str):
  197. self._store_dir = fpath
  198. if not os.path.exists(self._store_dir):
  199. os.makedirs(self._store_dir)
  200. self._creds_by_name = {}
  201. @property
  202. def path(self) -> str:
  203. return self._store_dir
  204. def save(self, creds: Credentials, name: Optional[str] = None,
  205. chain: Optional[List[Credentials]] = None,
  206. single_file: bool = False) -> None:
  207. name = name if name is not None else creds.name
  208. cert_file = self.get_cert_file(name=name, key_type=creds.key_type)
  209. pkey_file = self.get_pkey_file(name=name, key_type=creds.key_type)
  210. comb_file = self.get_combined_file(name=name, key_type=creds.key_type)
  211. if single_file:
  212. pkey_file = None
  213. with open(cert_file, "wb") as fd:
  214. fd.write(creds.cert_pem)
  215. if chain:
  216. for c in chain:
  217. fd.write(c.cert_pem)
  218. if pkey_file is None:
  219. fd.write(creds.pkey_pem)
  220. if pkey_file is not None:
  221. with open(pkey_file, "wb") as fd:
  222. fd.write(creds.pkey_pem)
  223. with open(comb_file, "wb") as fd:
  224. fd.write(creds.cert_pem)
  225. if chain:
  226. for c in chain:
  227. fd.write(c.cert_pem)
  228. fd.write(creds.pkey_pem)
  229. creds.set_files(cert_file, pkey_file, comb_file)
  230. self._add_credentials(name, creds)
  231. def save_chain(self, creds: Credentials, infix: str, with_root=False):
  232. name = creds.name
  233. chain = [creds]
  234. while creds.issuer is not None:
  235. creds = creds.issuer
  236. chain.append(creds)
  237. if not with_root and len(chain) > 1:
  238. chain = chain[:-1]
  239. chain_file = os.path.join(self._store_dir, f'{name}-{infix}.pem')
  240. with open(chain_file, "wb") as fd:
  241. for c in chain:
  242. fd.write(c.cert_pem)
  243. def _add_credentials(self, name: str, creds: Credentials):
  244. if name not in self._creds_by_name:
  245. self._creds_by_name[name] = []
  246. self._creds_by_name[name].append(creds)
  247. def get_credentials_for_name(self, name) -> List[Credentials]:
  248. return self._creds_by_name[name] if name in self._creds_by_name else []
  249. def get_cert_file(self, name: str, key_type=None) -> str:
  250. key_infix = ".{0}".format(key_type) if key_type is not None else ""
  251. return os.path.join(self._store_dir, f'{name}{key_infix}.cert.pem')
  252. def get_pkey_file(self, name: str, key_type=None) -> str:
  253. key_infix = ".{0}".format(key_type) if key_type is not None else ""
  254. return os.path.join(self._store_dir, f'{name}{key_infix}.pkey.pem')
  255. def get_combined_file(self, name: str, key_type=None) -> str:
  256. return os.path.join(self._store_dir, f'{name}.pem')
  257. def load_pem_cert(self, fpath: str) -> x509.Certificate:
  258. with open(fpath) as fd:
  259. return x509.load_pem_x509_certificate("".join(fd.readlines()).encode())
  260. def load_pem_pkey(self, fpath: str):
  261. with open(fpath) as fd:
  262. return load_pem_private_key("".join(fd.readlines()).encode(), password=None)
  263. def load_credentials(self, name: str, key_type=None,
  264. single_file: bool = False,
  265. issuer: Optional[Credentials] = None,
  266. check_valid: bool = False):
  267. cert_file = self.get_cert_file(name=name, key_type=key_type)
  268. pkey_file = cert_file if single_file else self.get_pkey_file(name=name, key_type=key_type)
  269. comb_file = self.get_combined_file(name=name, key_type=key_type)
  270. if os.path.isfile(cert_file) and os.path.isfile(pkey_file):
  271. cert = self.load_pem_cert(cert_file)
  272. pkey = self.load_pem_pkey(pkey_file)
  273. try:
  274. now = datetime.now(tz=timezone.utc)
  275. if check_valid and \
  276. ((cert.not_valid_after_utc < now) or
  277. (cert.not_valid_before_utc > now)):
  278. return None
  279. except AttributeError: # older python
  280. now = datetime.now()
  281. if check_valid and \
  282. ((cert.not_valid_after < now) or
  283. (cert.not_valid_before > now)):
  284. return None
  285. creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
  286. creds.set_store(self)
  287. creds.set_files(cert_file, pkey_file, comb_file)
  288. self._add_credentials(name, creds)
  289. return creds
  290. return None
  291. class TestCA:
  292. @classmethod
  293. def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Credentials:
  294. store = CertStore(fpath=store_dir)
  295. creds = store.load_credentials(name="ca", key_type=key_type, issuer=None)
  296. if creds is None:
  297. creds = TestCA._make_ca_credentials(name=name, key_type=key_type)
  298. store.save(creds, name="ca")
  299. creds.set_store(store)
  300. return creds
  301. @staticmethod
  302. def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any,
  303. valid_from: timedelta = timedelta(days=-1),
  304. valid_to: timedelta = timedelta(days=89),
  305. ) -> Credentials:
  306. """
  307. Create a certificate signed by this CA for the given domains.
  308. :returns: the certificate and private key PEM file paths
  309. """
  310. if spec.domains and len(spec.domains):
  311. creds = TestCA._make_server_credentials(name=spec.name, domains=spec.domains,
  312. issuer=issuer, valid_from=valid_from,
  313. valid_to=valid_to, key_type=key_type)
  314. elif spec.client:
  315. creds = TestCA._make_client_credentials(name=spec.name, issuer=issuer,
  316. email=spec.email, valid_from=valid_from,
  317. valid_to=valid_to, key_type=key_type)
  318. elif spec.name:
  319. creds = TestCA._make_ca_credentials(name=spec.name, issuer=issuer,
  320. valid_from=valid_from, valid_to=valid_to,
  321. key_type=key_type)
  322. else:
  323. raise Exception(f"unrecognized certificate specification: {spec}")
  324. return creds
  325. @staticmethod
  326. def _make_x509_name(org_name: Optional[str] = None, common_name: Optional[str] = None, parent: x509.Name = None) -> x509.Name:
  327. name_pieces = []
  328. if org_name:
  329. oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME
  330. name_pieces.append(x509.NameAttribute(oid, org_name))
  331. elif common_name:
  332. name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
  333. if parent:
  334. name_pieces.extend(list(parent))
  335. return x509.Name(name_pieces)
  336. @staticmethod
  337. def _make_csr(
  338. subject: x509.Name,
  339. pkey: Any,
  340. issuer_subject: Optional[Credentials],
  341. valid_from_delta: Optional[timedelta] = None,
  342. valid_until_delta: Optional[timedelta] = None
  343. ):
  344. pubkey = pkey.public_key()
  345. issuer_subject = issuer_subject if issuer_subject is not None else subject
  346. valid_from = datetime.now()
  347. if valid_until_delta is not None:
  348. valid_from += valid_from_delta
  349. valid_until = datetime.now()
  350. if valid_until_delta is not None:
  351. valid_until += valid_until_delta
  352. return (
  353. x509.CertificateBuilder()
  354. .subject_name(subject)
  355. .issuer_name(issuer_subject)
  356. .public_key(pubkey)
  357. .not_valid_before(valid_from)
  358. .not_valid_after(valid_until)
  359. .serial_number(x509.random_serial_number())
  360. .add_extension(
  361. x509.SubjectKeyIdentifier.from_public_key(pubkey),
  362. critical=False,
  363. )
  364. )
  365. @staticmethod
  366. def _add_ca_usages(csr: Any) -> Any:
  367. return csr.add_extension(
  368. x509.BasicConstraints(ca=True, path_length=9),
  369. critical=True,
  370. ).add_extension(
  371. x509.KeyUsage(
  372. digital_signature=True,
  373. content_commitment=False,
  374. key_encipherment=False,
  375. data_encipherment=False,
  376. key_agreement=False,
  377. key_cert_sign=True,
  378. crl_sign=True,
  379. encipher_only=False,
  380. decipher_only=False),
  381. critical=True
  382. ).add_extension(
  383. x509.ExtendedKeyUsage([
  384. ExtendedKeyUsageOID.CLIENT_AUTH,
  385. ExtendedKeyUsageOID.SERVER_AUTH,
  386. ExtendedKeyUsageOID.CODE_SIGNING,
  387. ]),
  388. critical=True
  389. )
  390. @staticmethod
  391. def _add_leaf_usages(csr: Any, domains: List[str], issuer: Credentials) -> Any:
  392. names = []
  393. for name in domains:
  394. try:
  395. names.append(x509.IPAddress(ipaddress.ip_address(name)))
  396. # TODO: specify specific exceptions here
  397. except: # noqa: E722
  398. names.append(x509.DNSName(name))
  399. return csr.add_extension(
  400. x509.BasicConstraints(ca=False, path_length=None),
  401. critical=True,
  402. ).add_extension(
  403. x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
  404. issuer.certificate.extensions.get_extension_for_class(
  405. x509.SubjectKeyIdentifier).value),
  406. critical=False
  407. ).add_extension(
  408. x509.SubjectAlternativeName(names), critical=True,
  409. ).add_extension(
  410. x509.ExtendedKeyUsage([
  411. ExtendedKeyUsageOID.SERVER_AUTH,
  412. ]),
  413. critical=False
  414. )
  415. @staticmethod
  416. def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: Optional[str] = None) -> Any:
  417. cert = csr.add_extension(
  418. x509.BasicConstraints(ca=False, path_length=None),
  419. critical=True,
  420. ).add_extension(
  421. x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
  422. issuer.certificate.extensions.get_extension_for_class(
  423. x509.SubjectKeyIdentifier).value),
  424. critical=False
  425. )
  426. if rfc82name:
  427. cert.add_extension(
  428. x509.SubjectAlternativeName([x509.RFC822Name(rfc82name)]),
  429. critical=True,
  430. )
  431. cert.add_extension(
  432. x509.ExtendedKeyUsage([
  433. ExtendedKeyUsageOID.CLIENT_AUTH,
  434. ]),
  435. critical=True
  436. )
  437. return cert
  438. @staticmethod
  439. def _make_ca_credentials(name, key_type: Any,
  440. issuer: Optional[Credentials] = None,
  441. valid_from: timedelta = timedelta(days=-1),
  442. valid_to: timedelta = timedelta(days=89),
  443. ) -> Credentials:
  444. pkey = _private_key(key_type=key_type)
  445. if issuer is not None:
  446. issuer_subject = issuer.certificate.subject
  447. issuer_key = issuer.private_key
  448. else:
  449. issuer_subject = None
  450. issuer_key = pkey
  451. subject = TestCA._make_x509_name(org_name=name, parent=issuer.subject if issuer else None)
  452. csr = TestCA._make_csr(subject=subject,
  453. issuer_subject=issuer_subject, pkey=pkey,
  454. valid_from_delta=valid_from, valid_until_delta=valid_to)
  455. csr = TestCA._add_ca_usages(csr)
  456. cert = csr.sign(private_key=issuer_key,
  457. algorithm=hashes.SHA256(),
  458. backend=default_backend())
  459. return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
  460. @staticmethod
  461. def _make_server_credentials(name: str, domains: List[str], issuer: Credentials,
  462. key_type: Any,
  463. valid_from: timedelta = timedelta(days=-1),
  464. valid_to: timedelta = timedelta(days=89),
  465. ) -> Credentials:
  466. pkey = _private_key(key_type=key_type)
  467. subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
  468. csr = TestCA._make_csr(subject=subject,
  469. issuer_subject=issuer.certificate.subject, pkey=pkey,
  470. valid_from_delta=valid_from, valid_until_delta=valid_to)
  471. csr = TestCA._add_leaf_usages(csr, domains=domains, issuer=issuer)
  472. cert = csr.sign(private_key=issuer.private_key,
  473. algorithm=hashes.SHA256(),
  474. backend=default_backend())
  475. return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
  476. @staticmethod
  477. def _make_client_credentials(name: str,
  478. issuer: Credentials, email: Optional[str],
  479. key_type: Any,
  480. valid_from: timedelta = timedelta(days=-1),
  481. valid_to: timedelta = timedelta(days=89),
  482. ) -> Credentials:
  483. pkey = _private_key(key_type=key_type)
  484. subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
  485. csr = TestCA._make_csr(subject=subject,
  486. issuer_subject=issuer.certificate.subject, pkey=pkey,
  487. valid_from_delta=valid_from, valid_until_delta=valid_to)
  488. csr = TestCA._add_client_usages(csr, issuer=issuer, rfc82name=email)
  489. cert = csr.sign(private_key=issuer.private_key,
  490. algorithm=hashes.SHA256(),
  491. backend=default_backend())
  492. return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)