certs.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. #***************************************************************************
  4. # _ _ ____ _
  5. # Project ___| | | | _ \| |
  6. # / __| | | | |_) | |
  7. # | (__| |_| | _ <| |___
  8. # \___|\___/|_| \_\_____|
  9. #
  10. # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
  11. #
  12. # This software is licensed as described in the file COPYING, which
  13. # you should have received as part of this distribution. The terms
  14. # are also available at https://curl.se/docs/copyright.html.
  15. #
  16. # You may opt to use, copy, modify, merge, publish, distribute and/or sell
  17. # copies of the Software, and permit persons to whom the Software is
  18. # furnished to do so, under the terms of the COPYING file.
  19. #
  20. # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
  21. # KIND, either express or implied.
  22. #
  23. # SPDX-License-Identifier: curl
  24. #
  25. ###########################################################################
  26. #
  27. import os
  28. import re
  29. from datetime import timedelta, datetime
  30. from typing import List, Any, Optional
  31. from cryptography import x509
  32. from cryptography.hazmat.backends import default_backend
  33. from cryptography.hazmat.primitives import hashes
  34. from cryptography.hazmat.primitives.asymmetric import ec, rsa
  35. from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
  36. from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
  37. from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
  38. from cryptography.x509 import ExtendedKeyUsageOID, NameOID
  39. EC_SUPPORTED = {}
  40. EC_SUPPORTED.update([(curve.name.upper(), curve) for curve in [
  41. ec.SECP192R1,
  42. ec.SECP224R1,
  43. ec.SECP256R1,
  44. ec.SECP384R1,
  45. ]])
  46. def _private_key(key_type):
  47. if isinstance(key_type, str):
  48. key_type = key_type.upper()
  49. m = re.match(r'^(RSA)?(\d+)$', key_type)
  50. if m:
  51. key_type = int(m.group(2))
  52. if isinstance(key_type, int):
  53. return rsa.generate_private_key(
  54. public_exponent=65537,
  55. key_size=key_type,
  56. backend=default_backend()
  57. )
  58. if not isinstance(key_type, ec.EllipticCurve) and key_type in EC_SUPPORTED:
  59. key_type = EC_SUPPORTED[key_type]
  60. return ec.generate_private_key(
  61. curve=key_type,
  62. backend=default_backend()
  63. )
  64. class CertificateSpec:
  65. def __init__(self, name: Optional[str] = None,
  66. domains: Optional[List[str]] = None,
  67. email: Optional[str] = None,
  68. key_type: Optional[str] = None,
  69. single_file: bool = False,
  70. valid_from: timedelta = timedelta(days=-1),
  71. valid_to: timedelta = timedelta(days=89),
  72. client: bool = False,
  73. sub_specs: Optional[List['CertificateSpec']] = None):
  74. self._name = name
  75. self.domains = domains
  76. self.client = client
  77. self.email = email
  78. self.key_type = key_type
  79. self.single_file = single_file
  80. self.valid_from = valid_from
  81. self.valid_to = valid_to
  82. self.sub_specs = sub_specs
  83. @property
  84. def name(self) -> Optional[str]:
  85. if self._name:
  86. return self._name
  87. elif self.domains:
  88. return self.domains[0]
  89. return None
  90. @property
  91. def type(self) -> Optional[str]:
  92. if self.domains and len(self.domains):
  93. return "server"
  94. elif self.client:
  95. return "client"
  96. elif self.name:
  97. return "ca"
  98. return None
  99. class Credentials:
  100. def __init__(self,
  101. name: str,
  102. cert: Any,
  103. pkey: Any,
  104. issuer: Optional['Credentials'] = None):
  105. self._name = name
  106. self._cert = cert
  107. self._pkey = pkey
  108. self._issuer = issuer
  109. self._cert_file = None
  110. self._pkey_file = None
  111. self._store = None
  112. @property
  113. def name(self) -> str:
  114. return self._name
  115. @property
  116. def subject(self) -> x509.Name:
  117. return self._cert.subject
  118. @property
  119. def key_type(self):
  120. if isinstance(self._pkey, RSAPrivateKey):
  121. return f"rsa{self._pkey.key_size}"
  122. elif isinstance(self._pkey, EllipticCurvePrivateKey):
  123. return f"{self._pkey.curve.name}"
  124. else:
  125. raise Exception(f"unknown key type: {self._pkey}")
  126. @property
  127. def private_key(self) -> Any:
  128. return self._pkey
  129. @property
  130. def certificate(self) -> Any:
  131. return self._cert
  132. @property
  133. def cert_pem(self) -> bytes:
  134. return self._cert.public_bytes(Encoding.PEM)
  135. @property
  136. def pkey_pem(self) -> bytes:
  137. return self._pkey.private_bytes(
  138. Encoding.PEM,
  139. PrivateFormat.TraditionalOpenSSL if self.key_type.startswith('rsa') else PrivateFormat.PKCS8,
  140. NoEncryption())
  141. @property
  142. def issuer(self) -> Optional['Credentials']:
  143. return self._issuer
  144. def set_store(self, store: 'CertStore'):
  145. self._store = store
  146. def set_files(self, cert_file: str, pkey_file: Optional[str] = None,
  147. combined_file: Optional[str] = None):
  148. self._cert_file = cert_file
  149. self._pkey_file = pkey_file
  150. self._combined_file = combined_file
  151. @property
  152. def cert_file(self) -> str:
  153. return self._cert_file
  154. @property
  155. def pkey_file(self) -> Optional[str]:
  156. return self._pkey_file
  157. @property
  158. def combined_file(self) -> Optional[str]:
  159. return self._combined_file
  160. def get_first(self, name) -> Optional['Credentials']:
  161. creds = self._store.get_credentials_for_name(name) if self._store else []
  162. return creds[0] if len(creds) else None
  163. def get_credentials_for_name(self, name) -> List['Credentials']:
  164. return self._store.get_credentials_for_name(name) if self._store else []
  165. def issue_certs(self, specs: List[CertificateSpec],
  166. chain: Optional[List['Credentials']] = None) -> List['Credentials']:
  167. return [self.issue_cert(spec=spec, chain=chain) for spec in specs]
  168. def issue_cert(self, spec: CertificateSpec,
  169. chain: Optional[List['Credentials']] = None) -> 'Credentials':
  170. key_type = spec.key_type if spec.key_type else self.key_type
  171. creds = None
  172. if self._store:
  173. creds = self._store.load_credentials(
  174. name=spec.name, key_type=key_type, single_file=spec.single_file, issuer=self)
  175. if creds is None:
  176. creds = TestCA.create_credentials(spec=spec, issuer=self, key_type=key_type,
  177. valid_from=spec.valid_from, valid_to=spec.valid_to)
  178. if self._store:
  179. self._store.save(creds, single_file=spec.single_file)
  180. if spec.type == "ca":
  181. self._store.save_chain(creds, "ca", with_root=True)
  182. if spec.sub_specs:
  183. if self._store:
  184. sub_store = CertStore(fpath=os.path.join(self._store.path, creds.name))
  185. creds.set_store(sub_store)
  186. subchain = chain.copy() if chain else []
  187. subchain.append(self)
  188. creds.issue_certs(spec.sub_specs, chain=subchain)
  189. return creds
  190. class CertStore:
  191. def __init__(self, fpath: str):
  192. self._store_dir = fpath
  193. if not os.path.exists(self._store_dir):
  194. os.makedirs(self._store_dir)
  195. self._creds_by_name = {}
  196. @property
  197. def path(self) -> str:
  198. return self._store_dir
  199. def save(self, creds: Credentials, name: Optional[str] = None,
  200. chain: Optional[List[Credentials]] = None,
  201. single_file: bool = False) -> None:
  202. name = name if name is not None else creds.name
  203. cert_file = self.get_cert_file(name=name, key_type=creds.key_type)
  204. pkey_file = self.get_pkey_file(name=name, key_type=creds.key_type)
  205. comb_file = self.get_combined_file(name=name, key_type=creds.key_type)
  206. if single_file:
  207. pkey_file = None
  208. with open(cert_file, "wb") as fd:
  209. fd.write(creds.cert_pem)
  210. if chain:
  211. for c in chain:
  212. fd.write(c.cert_pem)
  213. if pkey_file is None:
  214. fd.write(creds.pkey_pem)
  215. if pkey_file is not None:
  216. with open(pkey_file, "wb") as fd:
  217. fd.write(creds.pkey_pem)
  218. with open(comb_file, "wb") as fd:
  219. fd.write(creds.cert_pem)
  220. if chain:
  221. for c in chain:
  222. fd.write(c.cert_pem)
  223. fd.write(creds.pkey_pem)
  224. creds.set_files(cert_file, pkey_file, comb_file)
  225. self._add_credentials(name, creds)
  226. def save_chain(self, creds: Credentials, infix: str, with_root=False):
  227. name = creds.name
  228. chain = [creds]
  229. while creds.issuer is not None:
  230. creds = creds.issuer
  231. chain.append(creds)
  232. if not with_root and len(chain) > 1:
  233. chain = chain[:-1]
  234. chain_file = os.path.join(self._store_dir, f'{name}-{infix}.pem')
  235. with open(chain_file, "wb") as fd:
  236. for c in chain:
  237. fd.write(c.cert_pem)
  238. def _add_credentials(self, name: str, creds: Credentials):
  239. if name not in self._creds_by_name:
  240. self._creds_by_name[name] = []
  241. self._creds_by_name[name].append(creds)
  242. def get_credentials_for_name(self, name) -> List[Credentials]:
  243. return self._creds_by_name[name] if name in self._creds_by_name else []
  244. def get_cert_file(self, name: str, key_type=None) -> str:
  245. key_infix = ".{0}".format(key_type) if key_type is not None else ""
  246. return os.path.join(self._store_dir, f'{name}{key_infix}.cert.pem')
  247. def get_pkey_file(self, name: str, key_type=None) -> str:
  248. key_infix = ".{0}".format(key_type) if key_type is not None else ""
  249. return os.path.join(self._store_dir, f'{name}{key_infix}.pkey.pem')
  250. def get_combined_file(self, name: str, key_type=None) -> str:
  251. return os.path.join(self._store_dir, f'{name}.pem')
  252. def load_pem_cert(self, fpath: str) -> x509.Certificate:
  253. with open(fpath) as fd:
  254. return x509.load_pem_x509_certificate("".join(fd.readlines()).encode())
  255. def load_pem_pkey(self, fpath: str):
  256. with open(fpath) as fd:
  257. return load_pem_private_key("".join(fd.readlines()).encode(), password=None)
  258. def load_credentials(self, name: str, key_type=None,
  259. single_file: bool = False,
  260. issuer: Optional[Credentials] = None):
  261. cert_file = self.get_cert_file(name=name, key_type=key_type)
  262. pkey_file = cert_file if single_file else self.get_pkey_file(name=name, key_type=key_type)
  263. comb_file = self.get_combined_file(name=name, key_type=key_type)
  264. if os.path.isfile(cert_file) and os.path.isfile(pkey_file):
  265. cert = self.load_pem_cert(cert_file)
  266. pkey = self.load_pem_pkey(pkey_file)
  267. creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
  268. creds.set_store(self)
  269. creds.set_files(cert_file, pkey_file, comb_file)
  270. self._add_credentials(name, creds)
  271. return creds
  272. return None
  273. class TestCA:
  274. @classmethod
  275. def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Credentials:
  276. store = CertStore(fpath=store_dir)
  277. creds = store.load_credentials(name="ca", key_type=key_type, issuer=None)
  278. if creds is None:
  279. creds = TestCA._make_ca_credentials(name=name, key_type=key_type)
  280. store.save(creds, name="ca")
  281. creds.set_store(store)
  282. return creds
  283. @staticmethod
  284. def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any,
  285. valid_from: timedelta = timedelta(days=-1),
  286. valid_to: timedelta = timedelta(days=89),
  287. ) -> Credentials:
  288. """Create a certificate signed by this CA for the given domains.
  289. :returns: the certificate and private key PEM file paths
  290. """
  291. if spec.domains and len(spec.domains):
  292. creds = TestCA._make_server_credentials(name=spec.name, domains=spec.domains,
  293. issuer=issuer, valid_from=valid_from,
  294. valid_to=valid_to, key_type=key_type)
  295. elif spec.client:
  296. creds = TestCA._make_client_credentials(name=spec.name, issuer=issuer,
  297. email=spec.email, valid_from=valid_from,
  298. valid_to=valid_to, key_type=key_type)
  299. elif spec.name:
  300. creds = TestCA._make_ca_credentials(name=spec.name, issuer=issuer,
  301. valid_from=valid_from, valid_to=valid_to,
  302. key_type=key_type)
  303. else:
  304. raise Exception(f"unrecognized certificate specification: {spec}")
  305. return creds
  306. @staticmethod
  307. def _make_x509_name(org_name: str = None, common_name: str = None, parent: x509.Name = None) -> x509.Name:
  308. name_pieces = []
  309. if org_name:
  310. oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME
  311. name_pieces.append(x509.NameAttribute(oid, org_name))
  312. elif common_name:
  313. name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
  314. if parent:
  315. name_pieces.extend([rdn for rdn in parent])
  316. return x509.Name(name_pieces)
  317. @staticmethod
  318. def _make_csr(
  319. subject: x509.Name,
  320. pkey: Any,
  321. issuer_subject: Optional[Credentials],
  322. valid_from_delta: timedelta = None,
  323. valid_until_delta: timedelta = None
  324. ):
  325. pubkey = pkey.public_key()
  326. issuer_subject = issuer_subject if issuer_subject is not None else subject
  327. valid_from = datetime.now()
  328. if valid_until_delta is not None:
  329. valid_from += valid_from_delta
  330. valid_until = datetime.now()
  331. if valid_until_delta is not None:
  332. valid_until += valid_until_delta
  333. return (
  334. x509.CertificateBuilder()
  335. .subject_name(subject)
  336. .issuer_name(issuer_subject)
  337. .public_key(pubkey)
  338. .not_valid_before(valid_from)
  339. .not_valid_after(valid_until)
  340. .serial_number(x509.random_serial_number())
  341. .add_extension(
  342. x509.SubjectKeyIdentifier.from_public_key(pubkey),
  343. critical=False,
  344. )
  345. )
  346. @staticmethod
  347. def _add_ca_usages(csr: Any) -> Any:
  348. return csr.add_extension(
  349. x509.BasicConstraints(ca=True, path_length=9),
  350. critical=True,
  351. ).add_extension(
  352. x509.KeyUsage(
  353. digital_signature=True,
  354. content_commitment=False,
  355. key_encipherment=False,
  356. data_encipherment=False,
  357. key_agreement=False,
  358. key_cert_sign=True,
  359. crl_sign=True,
  360. encipher_only=False,
  361. decipher_only=False),
  362. critical=True
  363. ).add_extension(
  364. x509.ExtendedKeyUsage([
  365. ExtendedKeyUsageOID.CLIENT_AUTH,
  366. ExtendedKeyUsageOID.SERVER_AUTH,
  367. ExtendedKeyUsageOID.CODE_SIGNING,
  368. ]),
  369. critical=True
  370. )
  371. @staticmethod
  372. def _add_leaf_usages(csr: Any, domains: List[str], issuer: Credentials) -> Any:
  373. return csr.add_extension(
  374. x509.BasicConstraints(ca=False, path_length=None),
  375. critical=True,
  376. ).add_extension(
  377. x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
  378. issuer.certificate.extensions.get_extension_for_class(
  379. x509.SubjectKeyIdentifier).value),
  380. critical=False
  381. ).add_extension(
  382. x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains]),
  383. critical=True,
  384. ).add_extension(
  385. x509.ExtendedKeyUsage([
  386. ExtendedKeyUsageOID.SERVER_AUTH,
  387. ]),
  388. critical=False
  389. )
  390. @staticmethod
  391. def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: str = None) -> Any:
  392. cert = csr.add_extension(
  393. x509.BasicConstraints(ca=False, path_length=None),
  394. critical=True,
  395. ).add_extension(
  396. x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
  397. issuer.certificate.extensions.get_extension_for_class(
  398. x509.SubjectKeyIdentifier).value),
  399. critical=False
  400. )
  401. if rfc82name:
  402. cert.add_extension(
  403. x509.SubjectAlternativeName([x509.RFC822Name(rfc82name)]),
  404. critical=True,
  405. )
  406. cert.add_extension(
  407. x509.ExtendedKeyUsage([
  408. ExtendedKeyUsageOID.CLIENT_AUTH,
  409. ]),
  410. critical=True
  411. )
  412. return cert
  413. @staticmethod
  414. def _make_ca_credentials(name, key_type: Any,
  415. issuer: Credentials = None,
  416. valid_from: timedelta = timedelta(days=-1),
  417. valid_to: timedelta = timedelta(days=89),
  418. ) -> Credentials:
  419. pkey = _private_key(key_type=key_type)
  420. if issuer is not None:
  421. issuer_subject = issuer.certificate.subject
  422. issuer_key = issuer.private_key
  423. else:
  424. issuer_subject = None
  425. issuer_key = pkey
  426. subject = TestCA._make_x509_name(org_name=name, parent=issuer.subject if issuer else None)
  427. csr = TestCA._make_csr(subject=subject,
  428. issuer_subject=issuer_subject, pkey=pkey,
  429. valid_from_delta=valid_from, valid_until_delta=valid_to)
  430. csr = TestCA._add_ca_usages(csr)
  431. cert = csr.sign(private_key=issuer_key,
  432. algorithm=hashes.SHA256(),
  433. backend=default_backend())
  434. return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
  435. @staticmethod
  436. def _make_server_credentials(name: str, domains: List[str], issuer: Credentials,
  437. key_type: Any,
  438. valid_from: timedelta = timedelta(days=-1),
  439. valid_to: timedelta = timedelta(days=89),
  440. ) -> Credentials:
  441. name = name
  442. pkey = _private_key(key_type=key_type)
  443. subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
  444. csr = TestCA._make_csr(subject=subject,
  445. issuer_subject=issuer.certificate.subject, pkey=pkey,
  446. valid_from_delta=valid_from, valid_until_delta=valid_to)
  447. csr = TestCA._add_leaf_usages(csr, domains=domains, issuer=issuer)
  448. cert = csr.sign(private_key=issuer.private_key,
  449. algorithm=hashes.SHA256(),
  450. backend=default_backend())
  451. return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
  452. @staticmethod
  453. def _make_client_credentials(name: str,
  454. issuer: Credentials, email: Optional[str],
  455. key_type: Any,
  456. valid_from: timedelta = timedelta(days=-1),
  457. valid_to: timedelta = timedelta(days=89),
  458. ) -> Credentials:
  459. pkey = _private_key(key_type=key_type)
  460. subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
  461. csr = TestCA._make_csr(subject=subject,
  462. issuer_subject=issuer.certificate.subject, pkey=pkey,
  463. valid_from_delta=valid_from, valid_until_delta=valid_to)
  464. csr = TestCA._add_client_usages(csr, issuer=issuer, rfc82name=email)
  465. cert = csr.sign(private_key=issuer.private_key,
  466. algorithm=hashes.SHA256(),
  467. backend=default_backend())
  468. return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)