tasks_services.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. # -*- coding: utf-8 -*-
  2. """
  3. (c) 2018 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import datetime
  9. import hashlib
  10. import hmac
  11. import json
  12. import os
  13. import os.path
  14. import time
  15. import uuid
  16. import requests
  17. import six
  18. from celery import Celery
  19. from celery.signals import after_setup_task_logger
  20. from celery.utils.log import get_task_logger
  21. from kitchen.text.converters import to_bytes
  22. from sqlalchemy.exc import SQLAlchemyError
  23. import pagure.lib.query
  24. from pagure.config import config as pagure_config
  25. from pagure.lib.lib_ci import trigger_jenkins_build
  26. from pagure.lib.tasks_utils import pagure_task
  27. from pagure.mail_logging import format_callstack
  28. from pagure.utils import set_up_logging, split_project_fullname
  29. # logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})
  30. _log = get_task_logger(__name__)
  31. _i = 0
  32. if os.environ.get("PAGURE_BROKER_URL"): # pragma: no cover
  33. broker_url = os.environ["PAGURE_BROKER_URL"]
  34. elif pagure_config.get("BROKER_URL"):
  35. broker_url = pagure_config["BROKER_URL"]
  36. elif pagure_config.get("REDIS_SOCKET"):
  37. broker_url = "redis+socket://%s?virtual_host=%d" % (
  38. pagure_config["REDIS_SOCKET"],
  39. pagure_config["REDIS_DB"],
  40. )
  41. elif "REDIS_HOST" in pagure_config and "REDIS_PORT" in pagure_config:
  42. broker_url = "redis://%s:%d/%d" % (
  43. pagure_config["REDIS_HOST"],
  44. pagure_config["REDIS_PORT"],
  45. pagure_config["REDIS_DB"],
  46. )
  47. conn = Celery("tasks", broker=broker_url, backend=broker_url)
  48. conn.conf.update(pagure_config["CELERY_CONFIG"])
  49. @after_setup_task_logger.connect
  50. def augment_celery_log(**kwargs):
  51. set_up_logging(force=True)
  52. def call_web_hooks(project, topic, msg, urls):
  53. """Sends the web-hook notification."""
  54. _log.info("Processing project: %s - topic: %s", project.fullname, topic)
  55. _log.debug("msg: %s", msg)
  56. # Send web-hooks notification
  57. global _i
  58. _i += 1
  59. year = datetime.datetime.utcnow().year
  60. if isinstance(topic, six.text_type):
  61. topic = to_bytes(topic, encoding="utf8", nonstring="passthru")
  62. msg["pagure_instance"] = pagure_config["APP_URL"]
  63. msg["project_fullname"] = project.fullname
  64. msg = dict(
  65. topic=topic.decode("utf-8"),
  66. msg=msg,
  67. timestamp=int(time.time()),
  68. msg_id="%s-%s" % (year, uuid.uuid4()),
  69. i=_i,
  70. )
  71. content = json.dumps(msg, sort_keys=True)
  72. hashhex = hmac.new(
  73. project.hook_token.encode("utf-8"),
  74. content.encode("utf-8"),
  75. hashlib.sha1,
  76. ).hexdigest()
  77. hashhex256 = hmac.new(
  78. project.hook_token.encode("utf-8"),
  79. content.encode("utf-8"),
  80. hashlib.sha256,
  81. ).hexdigest()
  82. headers = {
  83. "X-Pagure": pagure_config["APP_URL"],
  84. "X-Pagure-project": project.fullname,
  85. "X-Pagure-Signature": hashhex,
  86. "X-Pagure-Signature-256": hashhex256,
  87. "X-Pagure-Topic": topic,
  88. "Content-Type": "application/json",
  89. }
  90. for url in sorted(urls):
  91. url = url.strip()
  92. _log.info("Calling url %s" % url)
  93. try:
  94. req = requests.post(url, headers=headers, data=content, timeout=60)
  95. if not req:
  96. _log.info(
  97. "An error occured while querying: %s - "
  98. "Error code: %s" % (url, req.status_code)
  99. )
  100. except (requests.exceptions.RequestException, Exception) as err:
  101. _log.info(
  102. "An error occured while querying: %s - Error: %s" % (url, err)
  103. )
  104. @conn.task(queue=pagure_config.get("WEBHOOK_CELERY_QUEUE", None), bind=True)
  105. @pagure_task
  106. def webhook_notification(
  107. self, session, topic, msg, namespace=None, name=None, user=None
  108. ):
  109. """Send webhook notifications about an event on that project.
  110. :arg session: SQLAlchemy session object
  111. :type session: sqlalchemy.orm.session.Session
  112. :arg topic: the topic for the notification
  113. :type topic: str
  114. :arg msg: the message to send via web-hook
  115. :type msg: str
  116. :kwarg namespace: the namespace of the project
  117. :type namespace: None or str
  118. :kwarg name: the name of the project
  119. :type name: None or str
  120. :kwarg user: the user of the project, only set if the project is a fork
  121. :type user: None or str
  122. """
  123. project = pagure.lib.query._get_project(
  124. session, namespace=namespace, name=name, user=user
  125. )
  126. if not project:
  127. session.close()
  128. raise RuntimeError(
  129. "Project: %s/%s from user: %s not found in the DB"
  130. % (namespace, name, user)
  131. )
  132. urls = project.settings.get("Web-hooks")
  133. if not urls:
  134. _log.info("No URLs set: %s" % urls)
  135. return
  136. urls = urls.split("\n")
  137. _log.info("Got the project and urls, going to the webhooks")
  138. call_web_hooks(project, topic, msg, urls)
  139. @conn.task(queue=pagure_config.get("LOGCOM_CELERY_QUEUE", None), bind=True)
  140. @pagure_task
  141. def log_commit_send_notifications(
  142. self,
  143. session,
  144. name,
  145. commits,
  146. abspath,
  147. branch,
  148. default_branch,
  149. namespace=None,
  150. username=None,
  151. ):
  152. """Send webhook notifications about an event on that project.
  153. :arg session: SQLAlchemy session object
  154. :type session: sqlalchemy.orm.session.Session
  155. :arg topic: the topic for the notification
  156. :type topic: str
  157. :arg msg: the message to send via web-hook
  158. :type msg: str
  159. :kwarg namespace: the namespace of the project
  160. :type namespace: None or str
  161. :kwarg name: the name of the project
  162. :type name: None or str
  163. :kwarg user: the user of the project, only set if the project is a fork
  164. :type user: None or str
  165. """
  166. _log.info(
  167. "Looking for project: %s%s of %s",
  168. "%s/" % namespace if namespace else "",
  169. name,
  170. username,
  171. )
  172. project = pagure.lib.query._get_project(
  173. session, name, user=username, namespace=namespace
  174. )
  175. if not project:
  176. _log.info("No project found")
  177. return
  178. _log.info("Found project: %s", project.fullname)
  179. _log.info("Processing %s commits in %s", len(commits), abspath)
  180. # Only log commits when the branch is the default branch
  181. log_all = pagure_config.get("LOG_ALL_COMMITS", False)
  182. if log_all or branch == default_branch:
  183. pagure.lib.git.log_commits_to_db(session, project, commits, abspath)
  184. else:
  185. _log.info(
  186. "Not logging commits not made on the default branch: %s",
  187. default_branch,
  188. )
  189. # Notify subscribed users that there are new commits
  190. email_watchcommits = pagure_config.get("EMAIL_ON_WATCHCOMMITS", True)
  191. _log.info("Sending notification about the commit: %s", email_watchcommits)
  192. if email_watchcommits:
  193. pagure.lib.notify.notify_new_commits(abspath, project, branch, commits)
  194. try:
  195. session.commit()
  196. except SQLAlchemyError as err: # pragma: no cover
  197. _log.exception(err)
  198. session.rollback()
  199. def get_files_to_load(title, new_commits_list, abspath):
  200. _log.info("%s: Retrieve the list of files changed" % title)
  201. file_list = []
  202. new_commits_list.reverse()
  203. n = len(new_commits_list)
  204. for idx, commit in enumerate(new_commits_list):
  205. if (idx % 100) == 0:
  206. _log.info(
  207. "Loading files change in commits for %s: %s/%s", title, idx, n
  208. )
  209. if commit == new_commits_list[0]:
  210. filenames = pagure.lib.git.read_git_lines(
  211. [
  212. "diff-tree",
  213. "--no-commit-id",
  214. "--name-only",
  215. "-r",
  216. "--root",
  217. commit,
  218. ],
  219. abspath,
  220. )
  221. else:
  222. filenames = pagure.lib.git.read_git_lines(
  223. ["diff-tree", "--no-commit-id", "--name-only", "-r", commit],
  224. abspath,
  225. )
  226. for line in filenames:
  227. if line.strip():
  228. file_list.append(line.strip())
  229. return file_list
  230. @conn.task(queue=pagure_config.get("LOADJSON_CELERY_QUEUE", None), bind=True)
  231. @pagure_task
  232. def load_json_commits_to_db(
  233. self,
  234. session,
  235. name,
  236. commits,
  237. abspath,
  238. data_type,
  239. agent,
  240. namespace=None,
  241. username=None,
  242. ):
  243. """Loads into the database the specified commits that have been pushed
  244. to either the tickets or the pull-request repository.
  245. """
  246. if data_type not in ["ticket", "pull-request"]:
  247. _log.info("LOADJSON: Invalid data_type retrieved: %s", data_type)
  248. return
  249. _log.info(
  250. "LOADJSON: Looking for project: %s%s of user: %s",
  251. "%s/" % namespace if namespace else "",
  252. name,
  253. username,
  254. )
  255. project = pagure.lib.query._get_project(
  256. session, name, user=username, namespace=namespace
  257. )
  258. if not project:
  259. _log.info("LOADJSON: No project found")
  260. return
  261. _log.info("LOADJSON: Found project: %s", project.fullname)
  262. _log.info(
  263. "LOADJSON: %s: Processing %s commits in %s",
  264. project.fullname,
  265. len(commits),
  266. abspath,
  267. )
  268. file_list = set(get_files_to_load(project.fullname, commits, abspath))
  269. n = len(file_list)
  270. _log.info("LOADJSON: %s files to process" % n)
  271. mail_body = [
  272. "Good Morning",
  273. "",
  274. "This is the log of loading all the files pushed in the git repo into",
  275. "the database. It should ignore files that are not JSON files, this",
  276. "is fine.",
  277. "",
  278. ]
  279. for idx, filename in enumerate(sorted(file_list)):
  280. _log.info(
  281. "LOADJSON: Loading: %s: %s -- %s/%s",
  282. project.fullname,
  283. filename,
  284. idx + 1,
  285. n,
  286. )
  287. tmp = "Loading: %s -- %s/%s" % (filename, idx + 1, n)
  288. try:
  289. json_data = None
  290. data = "".join(
  291. pagure.lib.git.read_git_lines(
  292. ["show", "HEAD:%s" % filename], abspath
  293. )
  294. )
  295. if data and not filename.startswith("files/"):
  296. try:
  297. json_data = json.loads(data)
  298. except ValueError:
  299. pass
  300. if json_data:
  301. if data_type == "ticket":
  302. pagure.lib.git.update_ticket_from_git(
  303. session,
  304. reponame=name,
  305. namespace=namespace,
  306. username=username,
  307. issue_uid=filename,
  308. json_data=json_data,
  309. agent=agent,
  310. )
  311. elif data_type == "pull-request":
  312. pagure.lib.git.update_request_from_git(
  313. session,
  314. reponame=name,
  315. namespace=namespace,
  316. username=username,
  317. request_uid=filename,
  318. json_data=json_data,
  319. )
  320. tmp += " ... ... Done"
  321. else:
  322. tmp += " ... ... SKIPPED - No JSON data"
  323. mail_body.append(tmp)
  324. except Exception as err:
  325. _log.info("data: %s", json_data)
  326. session.rollback()
  327. _log.exception(err)
  328. tmp += " ... ... FAILED\n"
  329. tmp += format_callstack()
  330. break
  331. finally:
  332. mail_body.append(tmp)
  333. try:
  334. session.commit()
  335. _log.info(
  336. "LOADJSON: Emailing results for %s to %s", project.fullname, agent
  337. )
  338. try:
  339. if not agent:
  340. raise pagure.exceptions.PagureException(
  341. "No agent found: %s" % agent
  342. )
  343. if agent != "pagure":
  344. user_obj = pagure.lib.query.get_user(session, agent)
  345. pagure.lib.notify.send_email(
  346. "\n".join(mail_body),
  347. "Issue import report",
  348. user_obj.default_email,
  349. )
  350. except pagure.exceptions.PagureException:
  351. _log.exception("LOADJSON: Could not find user %s" % agent)
  352. except SQLAlchemyError: # pragma: no cover
  353. session.rollback()
  354. _log.info("LOADJSON: Ready for another")
  355. @conn.task(queue=pagure_config.get("CI_CELERY_QUEUE", None), bind=True)
  356. @pagure_task
  357. def trigger_ci_build(
  358. self,
  359. session,
  360. cause,
  361. branch,
  362. branch_to,
  363. ci_type,
  364. project_name=None,
  365. pr_uid=None,
  366. ):
  367. """Triggers a new run of the CI system on the specified pull-request."""
  368. pagure.lib.plugins.get_plugin("Pagure CI")
  369. if not pr_uid and not project_name:
  370. _log.debug("No PR UID nor project name specified, can't trigger CI")
  371. session.close()
  372. return
  373. if pr_uid:
  374. pr = pagure.lib.query.get_request_by_uid(session, pr_uid)
  375. if pr.remote:
  376. project_name = pr.project.fullname
  377. else:
  378. project_name = pr.project_from.fullname
  379. user, namespace, project_name = split_project_fullname(project_name)
  380. _log.info("Pagure-CI: Looking for project: %s", project_name)
  381. project = pagure.lib.query.get_authorized_project(
  382. session=session,
  383. project_name=project_name,
  384. user=user,
  385. namespace=namespace,
  386. )
  387. if project is None:
  388. _log.warning(
  389. "Pagure-CI: No project could be found for the name %s",
  390. project_name,
  391. )
  392. session.close()
  393. return
  394. if project.is_fork:
  395. if (
  396. project.parent.ci_hook is None
  397. or project.parent.ci_hook.ci_url is None
  398. ):
  399. raise pagure.exceptions.PagureException(
  400. "Project %s not configured or incorectly configured for ci",
  401. project.parent.fullname,
  402. )
  403. elif project.ci_hook is None or project.ci_hook.ci_url is None:
  404. raise pagure.exceptions.PagureException(
  405. "Project %s not configured or incorectly configured for ci",
  406. project.fullname,
  407. )
  408. _log.info("Pagure-CI: project retrieved: %s", project.fullname)
  409. _log.info(
  410. "Pagure-CI: Trigger from %s cause (PR# or commit) %s branch: %s",
  411. project.fullname,
  412. cause,
  413. branch,
  414. )
  415. if ci_type == "jenkins":
  416. jenk_project = project
  417. if project.is_fork:
  418. jenk_project = project.parent
  419. trigger_jenkins_build(
  420. project_path=project.path,
  421. url=jenk_project.ci_hook.ci_url,
  422. job=jenk_project.ci_hook.ci_job,
  423. token=jenk_project.ci_hook.pagure_ci_token,
  424. branch=branch,
  425. branch_to=branch_to,
  426. cause=cause,
  427. ci_username=jenk_project.ci_hook.ci_username,
  428. ci_password=jenk_project.ci_hook.ci_password,
  429. )
  430. else:
  431. _log.warning("Pagure-CI:Un-supported CI type")
  432. _log.info("Pagure-CI: Ready for another")