tasks_mirror.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. # -*- coding: utf-8 -*-
  2. """
  3. (c) 2018 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import base64
  9. import datetime
  10. import logging
  11. import os
  12. import stat
  13. import struct
  14. import six
  15. import werkzeug.utils
  16. from celery import Celery
  17. from cryptography import utils
  18. from cryptography.hazmat.backends import default_backend
  19. from cryptography.hazmat.primitives import serialization
  20. from cryptography.hazmat.primitives.asymmetric import rsa
  21. import pagure.lib.query
  22. from pagure.config import config as pagure_config
  23. from pagure.lib.tasks_utils import pagure_task
  24. from pagure.utils import ssh_urlpattern
  25. # logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})
  26. _log = logging.getLogger(__name__)
  27. if os.environ.get("PAGURE_BROKER_URL"): # pragma: no-cover
  28. broker_url = os.environ["PAGURE_BROKER_URL"]
  29. elif pagure_config.get("BROKER_URL"):
  30. broker_url = pagure_config["BROKER_URL"]
  31. elif pagure_config.get("REDIS_SOCKET"):
  32. broker_url = "redis+socket://%s?virtual_host=%d" % (
  33. pagure_config["REDIS_SOCKET"],
  34. pagure_config["REDIS_DB"],
  35. )
  36. elif "REDIS_HOST" in pagure_config and "REDIS_PORT" in pagure_config:
  37. broker_url = "redis://%s:%d/%d" % (
  38. pagure_config["REDIS_HOST"],
  39. pagure_config["REDIS_PORT"],
  40. pagure_config["REDIS_DB"],
  41. )
  42. conn = Celery("tasks_mirror", broker=broker_url, backend=broker_url)
  43. conn.conf.update(pagure_config["CELERY_CONFIG"])
  44. # Code from:
  45. # https://github.com/pyca/cryptography/blob/6b08aba7f1eb296461528328a3c9871fa7594fc4/src/cryptography/hazmat/primitives/serialization.py#L161
  46. # Taken from upstream cryptography since the version we have is too old
  47. # and doesn't have this code (yet)
  48. def _ssh_write_string(data):
  49. return struct.pack(">I", len(data)) + data
  50. def _ssh_write_mpint(value):
  51. data = utils.int_to_bytes(value)
  52. if six.indexbytes(data, 0) & 0x80:
  53. data = b"\x00" + data
  54. return _ssh_write_string(data)
  55. # Code from _openssh_public_key_bytes at:
  56. # https://github.com/pyca/cryptography/tree/6b08aba7f1eb296461528328a3c9871fa7594fc4/src/cryptography/hazmat/backends/openssl#L1616
  57. # Taken from upstream cryptography since the version we have is too old
  58. # and doesn't have this code (yet)
  59. def _serialize_public_ssh_key(key):
  60. if isinstance(key, rsa.RSAPublicKey):
  61. public_numbers = key.public_numbers()
  62. return b"ssh-rsa " + base64.b64encode(
  63. _ssh_write_string(b"ssh-rsa")
  64. + _ssh_write_mpint(public_numbers.e)
  65. + _ssh_write_mpint(public_numbers.n)
  66. )
  67. else:
  68. # Since we only write RSA keys, drop the other serializations
  69. return
  70. def _create_ssh_key(keyfile):
  71. """Create the public and private ssh keys.
  72. The specified file name will be the private key and the public one will
  73. be in a similar file name ending with a '.pub'.
  74. """
  75. private_key = rsa.generate_private_key(
  76. public_exponent=65537, key_size=4096, backend=default_backend()
  77. )
  78. private_pem = private_key.private_bytes(
  79. encoding=serialization.Encoding.PEM,
  80. format=serialization.PrivateFormat.TraditionalOpenSSL,
  81. encryption_algorithm=serialization.NoEncryption(),
  82. )
  83. with os.fdopen(
  84. os.open(keyfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600), "wb"
  85. ) as stream:
  86. stream.write(private_pem)
  87. public_key = private_key.public_key()
  88. public_pem = _serialize_public_ssh_key(public_key)
  89. if public_pem:
  90. with open(keyfile + ".pub", "wb") as stream:
  91. stream.write(public_pem)
  92. @conn.task(queue=pagure_config["MIRRORING_QUEUE"], bind=True)
  93. @pagure_task
  94. def setup_mirroring(self, session, username, namespace, name):
  95. """Setup the specified project for mirroring."""
  96. plugin = pagure.lib.plugins.get_plugin("Mirroring")
  97. plugin.db_object()
  98. project = pagure.lib.query._get_project(
  99. session, namespace=namespace, name=name, user=username
  100. )
  101. public_key_name = werkzeug.utils.secure_filename(project.fullname)
  102. ssh_folder = pagure_config["MIRROR_SSHKEYS_FOLDER"]
  103. if not os.path.exists(ssh_folder):
  104. os.makedirs(ssh_folder, mode=0o700)
  105. else:
  106. if os.path.islink(ssh_folder):
  107. raise pagure.exceptions.PagureException("SSH folder is a link")
  108. folder_stat = os.stat(ssh_folder)
  109. filemode = stat.S_IMODE(folder_stat.st_mode)
  110. if filemode != int("0700", 8):
  111. raise pagure.exceptions.PagureException(
  112. "SSH folder had invalid permissions"
  113. )
  114. if (
  115. folder_stat.st_uid != os.getuid()
  116. or folder_stat.st_gid != os.getgid()
  117. ):
  118. raise pagure.exceptions.PagureException(
  119. "SSH folder does not belong to the user or group running "
  120. "this task"
  121. )
  122. public_key_file = os.path.join(ssh_folder, "%s.pub" % public_key_name)
  123. _log.info("Public key of interest: %s", public_key_file)
  124. if os.path.exists(public_key_file):
  125. raise pagure.exceptions.PagureException("SSH key already exists")
  126. _log.info("Creating public key")
  127. _create_ssh_key(os.path.join(ssh_folder, public_key_name))
  128. with open(public_key_file) as stream:
  129. public_key = stream.read()
  130. if project.mirror_hook.public_key != public_key:
  131. _log.info("Updating information in the DB")
  132. project.mirror_hook.public_key = public_key
  133. session.add(project.mirror_hook)
  134. session.commit()
  135. @conn.task(queue=pagure_config["MIRRORING_QUEUE"], bind=True)
  136. @pagure_task
  137. def teardown_mirroring(self, session, username, namespace, name):
  138. """Stop the mirroring of the specified project."""
  139. plugin = pagure.lib.plugins.get_plugin("Mirroring")
  140. plugin.db_object()
  141. project = pagure.lib.query._get_project(
  142. session, namespace=namespace, name=name, user=username
  143. )
  144. ssh_folder = pagure_config["MIRROR_SSHKEYS_FOLDER"]
  145. public_key_name = werkzeug.utils.secure_filename(project.fullname)
  146. private_key_file = os.path.join(ssh_folder, public_key_name)
  147. public_key_file = os.path.join(ssh_folder, "%s.pub" % public_key_name)
  148. if os.path.exists(private_key_file):
  149. os.unlink(private_key_file)
  150. if os.path.exists(public_key_file):
  151. os.unlink(public_key_file)
  152. project.mirror_hook.public_key = None
  153. session.add(project.mirror_hook)
  154. session.commit()
  155. @conn.task(queue=pagure_config["MIRRORING_QUEUE"], bind=True)
  156. @pagure_task
  157. def mirror_project(self, session, username, namespace, name):
  158. """Does the actual mirroring of the specified project."""
  159. plugin = pagure.lib.plugins.get_plugin("Mirroring")
  160. plugin.db_object()
  161. project = pagure.lib.query._get_project(
  162. session, namespace=namespace, name=name, user=username
  163. )
  164. repofolder = pagure_config["GIT_FOLDER"]
  165. repopath = os.path.join(repofolder, project.path)
  166. if not os.path.exists(repopath):
  167. _log.warning("Git folder not found at: %s, bailing", repopath)
  168. return
  169. ssh_folder = pagure_config["MIRROR_SSHKEYS_FOLDER"]
  170. public_key_name = werkzeug.utils.secure_filename(project.fullname)
  171. private_key_file = os.path.join(ssh_folder, public_key_name)
  172. if not os.path.exists(private_key_file):
  173. _log.warning("No %s key found, bailing", private_key_file)
  174. project.mirror_hook.last_log = "Private key not found on disk, bailing"
  175. session.add(project.mirror_hook)
  176. session.commit()
  177. return
  178. # Add the utility script allowing this feature to work on old(er) git.
  179. here = os.path.join(os.path.dirname(os.path.abspath(__file__)))
  180. script_file = os.path.join(here, "ssh_script.sh")
  181. # Get the list of remotes
  182. remotes = [
  183. remote.strip()
  184. for remote in project.mirror_hook.target.split("\n")
  185. if project.mirror_hook
  186. and remote.strip()
  187. and ssh_urlpattern.match(remote.strip())
  188. ]
  189. # Push
  190. logs = []
  191. for remote in remotes:
  192. _log.info(
  193. "Pushing to remote %s using key: %s", remote, private_key_file
  194. )
  195. (stdout, stderr) = pagure.lib.git.read_git_lines(
  196. ["push", "--mirror", remote],
  197. abspath=repopath,
  198. error=True,
  199. env={"SSHKEY": private_key_file, "GIT_SSH": script_file},
  200. )
  201. log = "Output from the push (%s):\n stdout: %s\n stderr: %s" % (
  202. datetime.datetime.utcnow().isoformat(),
  203. stdout,
  204. stderr,
  205. )
  206. logs.append(log)
  207. if logs:
  208. project.mirror_hook.last_log = "\n".join(logs)
  209. session.add(project.mirror_hook)
  210. session.commit()
  211. _log.info("\n".join(logs))