123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- # -*- coding: utf-8 -*-
- """
- (c) 2018 - Copyright Red Hat Inc
- Authors:
- Pierre-Yves Chibon <pingou@pingoured.fr>
- """
- from __future__ import absolute_import, unicode_literals
- import datetime
- import hashlib
- import hmac
- import json
- import os
- import os.path
- import time
- import uuid
- import requests
- import six
- from celery import Celery
- from celery.signals import after_setup_task_logger
- from celery.utils.log import get_task_logger
- from kitchen.text.converters import to_bytes
- from sqlalchemy.exc import SQLAlchemyError
- import pagure.lib.query
- from pagure.config import config as pagure_config
- from pagure.lib.lib_ci import trigger_jenkins_build
- from pagure.lib.tasks_utils import pagure_task
- from pagure.mail_logging import format_callstack
- from pagure.utils import set_up_logging, split_project_fullname
- # logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})
- _log = get_task_logger(__name__)
- _i = 0
- if os.environ.get("PAGURE_BROKER_URL"): # pragma: no cover
- broker_url = os.environ["PAGURE_BROKER_URL"]
- elif pagure_config.get("BROKER_URL"):
- broker_url = pagure_config["BROKER_URL"]
- elif pagure_config.get("REDIS_SOCKET"):
- broker_url = "redis+socket://%s?virtual_host=%d" % (
- pagure_config["REDIS_SOCKET"],
- pagure_config["REDIS_DB"],
- )
- elif "REDIS_HOST" in pagure_config and "REDIS_PORT" in pagure_config:
- broker_url = "redis://%s:%d/%d" % (
- pagure_config["REDIS_HOST"],
- pagure_config["REDIS_PORT"],
- pagure_config["REDIS_DB"],
- )
- conn = Celery("tasks", broker=broker_url, backend=broker_url)
- conn.conf.update(pagure_config["CELERY_CONFIG"])
- @after_setup_task_logger.connect
- def augment_celery_log(**kwargs):
- set_up_logging(force=True)
- def call_web_hooks(project, topic, msg, urls):
- """Sends the web-hook notification."""
- _log.info("Processing project: %s - topic: %s", project.fullname, topic)
- _log.debug("msg: %s", msg)
- # Send web-hooks notification
- global _i
- _i += 1
- year = datetime.datetime.utcnow().year
- if isinstance(topic, six.text_type):
- topic = to_bytes(topic, encoding="utf8", nonstring="passthru")
- msg["pagure_instance"] = pagure_config["APP_URL"]
- msg["project_fullname"] = project.fullname
- msg = dict(
- topic=topic.decode("utf-8"),
- msg=msg,
- timestamp=int(time.time()),
- msg_id="%s-%s" % (year, uuid.uuid4()),
- i=_i,
- )
- content = json.dumps(msg, sort_keys=True)
- hashhex = hmac.new(
- project.hook_token.encode("utf-8"),
- content.encode("utf-8"),
- hashlib.sha1,
- ).hexdigest()
- hashhex256 = hmac.new(
- project.hook_token.encode("utf-8"),
- content.encode("utf-8"),
- hashlib.sha256,
- ).hexdigest()
- headers = {
- "X-Pagure": pagure_config["APP_URL"],
- "X-Pagure-project": project.fullname,
- "X-Pagure-Signature": hashhex,
- "X-Pagure-Signature-256": hashhex256,
- "X-Pagure-Topic": topic,
- "Content-Type": "application/json",
- }
- for url in sorted(urls):
- url = url.strip()
- _log.info("Calling url %s" % url)
- try:
- req = requests.post(url, headers=headers, data=content, timeout=60)
- if not req:
- _log.info(
- "An error occured while querying: %s - "
- "Error code: %s" % (url, req.status_code)
- )
- except (requests.exceptions.RequestException, Exception) as err:
- _log.info(
- "An error occured while querying: %s - Error: %s" % (url, err)
- )
- @conn.task(queue=pagure_config.get("WEBHOOK_CELERY_QUEUE", None), bind=True)
- @pagure_task
- def webhook_notification(
- self, session, topic, msg, namespace=None, name=None, user=None
- ):
- """Send webhook notifications about an event on that project.
- :arg session: SQLAlchemy session object
- :type session: sqlalchemy.orm.session.Session
- :arg topic: the topic for the notification
- :type topic: str
- :arg msg: the message to send via web-hook
- :type msg: str
- :kwarg namespace: the namespace of the project
- :type namespace: None or str
- :kwarg name: the name of the project
- :type name: None or str
- :kwarg user: the user of the project, only set if the project is a fork
- :type user: None or str
- """
- project = pagure.lib.query._get_project(
- session, namespace=namespace, name=name, user=user
- )
- if not project:
- session.close()
- raise RuntimeError(
- "Project: %s/%s from user: %s not found in the DB"
- % (namespace, name, user)
- )
- urls = project.settings.get("Web-hooks")
- if not urls:
- _log.info("No URLs set: %s" % urls)
- return
- urls = urls.split("\n")
- _log.info("Got the project and urls, going to the webhooks")
- call_web_hooks(project, topic, msg, urls)
- @conn.task(queue=pagure_config.get("LOGCOM_CELERY_QUEUE", None), bind=True)
- @pagure_task
- def log_commit_send_notifications(
- self,
- session,
- name,
- commits,
- abspath,
- branch,
- default_branch,
- namespace=None,
- username=None,
- ):
- """Send webhook notifications about an event on that project.
- :arg session: SQLAlchemy session object
- :type session: sqlalchemy.orm.session.Session
- :arg topic: the topic for the notification
- :type topic: str
- :arg msg: the message to send via web-hook
- :type msg: str
- :kwarg namespace: the namespace of the project
- :type namespace: None or str
- :kwarg name: the name of the project
- :type name: None or str
- :kwarg user: the user of the project, only set if the project is a fork
- :type user: None or str
- """
- _log.info(
- "Looking for project: %s%s of %s",
- "%s/" % namespace if namespace else "",
- name,
- username,
- )
- project = pagure.lib.query._get_project(
- session, name, user=username, namespace=namespace
- )
- if not project:
- _log.info("No project found")
- return
- _log.info("Found project: %s", project.fullname)
- _log.info("Processing %s commits in %s", len(commits), abspath)
- # Only log commits when the branch is the default branch
- log_all = pagure_config.get("LOG_ALL_COMMITS", False)
- if log_all or branch == default_branch:
- pagure.lib.git.log_commits_to_db(session, project, commits, abspath)
- else:
- _log.info(
- "Not logging commits not made on the default branch: %s",
- default_branch,
- )
- # Notify subscribed users that there are new commits
- email_watchcommits = pagure_config.get("EMAIL_ON_WATCHCOMMITS", True)
- _log.info("Sending notification about the commit: %s", email_watchcommits)
- if email_watchcommits:
- pagure.lib.notify.notify_new_commits(abspath, project, branch, commits)
- try:
- session.commit()
- except SQLAlchemyError as err: # pragma: no cover
- _log.exception(err)
- session.rollback()
- def get_files_to_load(title, new_commits_list, abspath):
- _log.info("%s: Retrieve the list of files changed" % title)
- file_list = []
- new_commits_list.reverse()
- n = len(new_commits_list)
- for idx, commit in enumerate(new_commits_list):
- if (idx % 100) == 0:
- _log.info(
- "Loading files change in commits for %s: %s/%s", title, idx, n
- )
- if commit == new_commits_list[0]:
- filenames = pagure.lib.git.read_git_lines(
- [
- "diff-tree",
- "--no-commit-id",
- "--name-only",
- "-r",
- "--root",
- commit,
- ],
- abspath,
- )
- else:
- filenames = pagure.lib.git.read_git_lines(
- ["diff-tree", "--no-commit-id", "--name-only", "-r", commit],
- abspath,
- )
- for line in filenames:
- if line.strip():
- file_list.append(line.strip())
- return file_list
- @conn.task(queue=pagure_config.get("LOADJSON_CELERY_QUEUE", None), bind=True)
- @pagure_task
- def load_json_commits_to_db(
- self,
- session,
- name,
- commits,
- abspath,
- data_type,
- agent,
- namespace=None,
- username=None,
- ):
- """Loads into the database the specified commits that have been pushed
- to either the tickets or the pull-request repository.
- """
- if data_type not in ["ticket", "pull-request"]:
- _log.info("LOADJSON: Invalid data_type retrieved: %s", data_type)
- return
- _log.info(
- "LOADJSON: Looking for project: %s%s of user: %s",
- "%s/" % namespace if namespace else "",
- name,
- username,
- )
- project = pagure.lib.query._get_project(
- session, name, user=username, namespace=namespace
- )
- if not project:
- _log.info("LOADJSON: No project found")
- return
- _log.info("LOADJSON: Found project: %s", project.fullname)
- _log.info(
- "LOADJSON: %s: Processing %s commits in %s",
- project.fullname,
- len(commits),
- abspath,
- )
- file_list = set(get_files_to_load(project.fullname, commits, abspath))
- n = len(file_list)
- _log.info("LOADJSON: %s files to process" % n)
- mail_body = [
- "Good Morning",
- "",
- "This is the log of loading all the files pushed in the git repo into",
- "the database. It should ignore files that are not JSON files, this",
- "is fine.",
- "",
- ]
- for idx, filename in enumerate(sorted(file_list)):
- _log.info(
- "LOADJSON: Loading: %s: %s -- %s/%s",
- project.fullname,
- filename,
- idx + 1,
- n,
- )
- tmp = "Loading: %s -- %s/%s" % (filename, idx + 1, n)
- try:
- json_data = None
- data = "".join(
- pagure.lib.git.read_git_lines(
- ["show", "HEAD:%s" % filename], abspath
- )
- )
- if data and not filename.startswith("files/"):
- try:
- json_data = json.loads(data)
- except ValueError:
- pass
- if json_data:
- if data_type == "ticket":
- pagure.lib.git.update_ticket_from_git(
- session,
- reponame=name,
- namespace=namespace,
- username=username,
- issue_uid=filename,
- json_data=json_data,
- agent=agent,
- )
- elif data_type == "pull-request":
- pagure.lib.git.update_request_from_git(
- session,
- reponame=name,
- namespace=namespace,
- username=username,
- request_uid=filename,
- json_data=json_data,
- )
- tmp += " ... ... Done"
- else:
- tmp += " ... ... SKIPPED - No JSON data"
- mail_body.append(tmp)
- except Exception as err:
- _log.info("data: %s", json_data)
- session.rollback()
- _log.exception(err)
- tmp += " ... ... FAILED\n"
- tmp += format_callstack()
- break
- finally:
- mail_body.append(tmp)
- try:
- session.commit()
- _log.info(
- "LOADJSON: Emailing results for %s to %s", project.fullname, agent
- )
- try:
- if not agent:
- raise pagure.exceptions.PagureException(
- "No agent found: %s" % agent
- )
- if agent != "pagure":
- user_obj = pagure.lib.query.get_user(session, agent)
- pagure.lib.notify.send_email(
- "\n".join(mail_body),
- "Issue import report",
- user_obj.default_email,
- )
- except pagure.exceptions.PagureException:
- _log.exception("LOADJSON: Could not find user %s" % agent)
- except SQLAlchemyError: # pragma: no cover
- session.rollback()
- _log.info("LOADJSON: Ready for another")
- @conn.task(queue=pagure_config.get("CI_CELERY_QUEUE", None), bind=True)
- @pagure_task
- def trigger_ci_build(
- self,
- session,
- cause,
- branch,
- branch_to,
- ci_type,
- project_name=None,
- pr_uid=None,
- ):
- """Triggers a new run of the CI system on the specified pull-request."""
- pagure.lib.plugins.get_plugin("Pagure CI")
- if not pr_uid and not project_name:
- _log.debug("No PR UID nor project name specified, can't trigger CI")
- session.close()
- return
- if pr_uid:
- pr = pagure.lib.query.get_request_by_uid(session, pr_uid)
- if pr.remote:
- project_name = pr.project.fullname
- else:
- project_name = pr.project_from.fullname
- user, namespace, project_name = split_project_fullname(project_name)
- _log.info("Pagure-CI: Looking for project: %s", project_name)
- project = pagure.lib.query.get_authorized_project(
- session=session,
- project_name=project_name,
- user=user,
- namespace=namespace,
- )
- if project is None:
- _log.warning(
- "Pagure-CI: No project could be found for the name %s",
- project_name,
- )
- session.close()
- return
- if project.is_fork:
- if (
- project.parent.ci_hook is None
- or project.parent.ci_hook.ci_url is None
- ):
- raise pagure.exceptions.PagureException(
- "Project %s not configured or incorectly configured for ci",
- project.parent.fullname,
- )
- elif project.ci_hook is None or project.ci_hook.ci_url is None:
- raise pagure.exceptions.PagureException(
- "Project %s not configured or incorectly configured for ci",
- project.fullname,
- )
- _log.info("Pagure-CI: project retrieved: %s", project.fullname)
- _log.info(
- "Pagure-CI: Trigger from %s cause (PR# or commit) %s branch: %s",
- project.fullname,
- cause,
- branch,
- )
- if ci_type == "jenkins":
- jenk_project = project
- if project.is_fork:
- jenk_project = project.parent
- trigger_jenkins_build(
- project_path=project.path,
- url=jenk_project.ci_hook.ci_url,
- job=jenk_project.ci_hook.ci_job,
- token=jenk_project.ci_hook.pagure_ci_token,
- branch=branch,
- branch_to=branch_to,
- cause=cause,
- ci_username=jenk_project.ci_hook.ci_username,
- ci_password=jenk_project.ci_hook.ci_password,
- )
- else:
- _log.warning("Pagure-CI:Un-supported CI type")
- _log.info("Pagure-CI: Ready for another")
|