# -*- coding: utf-8 -*- """ (c) 2018 - Copyright Red Hat Inc Authors: Pierre-Yves Chibon """ from __future__ import absolute_import, unicode_literals import datetime import hashlib import hmac import json import os import os.path import time import uuid import requests import six from celery import Celery from celery.signals import after_setup_task_logger from celery.utils.log import get_task_logger from kitchen.text.converters import to_bytes from sqlalchemy.exc import SQLAlchemyError import pagure.lib.query from pagure.config import config as pagure_config from pagure.lib.lib_ci import trigger_jenkins_build from pagure.lib.tasks_utils import pagure_task from pagure.mail_logging import format_callstack from pagure.utils import set_up_logging, split_project_fullname # logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1}) _log = get_task_logger(__name__) _i = 0 if os.environ.get("PAGURE_BROKER_URL"): # pragma: no cover broker_url = os.environ["PAGURE_BROKER_URL"] elif pagure_config.get("BROKER_URL"): broker_url = pagure_config["BROKER_URL"] elif pagure_config.get("REDIS_SOCKET"): broker_url = "redis+socket://%s?virtual_host=%d" % ( pagure_config["REDIS_SOCKET"], pagure_config["REDIS_DB"], ) elif "REDIS_HOST" in pagure_config and "REDIS_PORT" in pagure_config: broker_url = "redis://%s:%d/%d" % ( pagure_config["REDIS_HOST"], pagure_config["REDIS_PORT"], pagure_config["REDIS_DB"], ) conn = Celery("tasks", broker=broker_url, backend=broker_url) conn.conf.update(pagure_config["CELERY_CONFIG"]) @after_setup_task_logger.connect def augment_celery_log(**kwargs): set_up_logging(force=True) def call_web_hooks(project, topic, msg, urls): """Sends the web-hook notification.""" _log.info("Processing project: %s - topic: %s", project.fullname, topic) _log.debug("msg: %s", msg) # Send web-hooks notification global _i _i += 1 year = datetime.datetime.utcnow().year if isinstance(topic, six.text_type): topic = to_bytes(topic, encoding="utf8", nonstring="passthru") msg["pagure_instance"] = pagure_config["APP_URL"] msg["project_fullname"] = project.fullname msg = dict( topic=topic.decode("utf-8"), msg=msg, timestamp=int(time.time()), msg_id="%s-%s" % (year, uuid.uuid4()), i=_i, ) content = json.dumps(msg, sort_keys=True) hashhex = hmac.new( project.hook_token.encode("utf-8"), content.encode("utf-8"), hashlib.sha1, ).hexdigest() hashhex256 = hmac.new( project.hook_token.encode("utf-8"), content.encode("utf-8"), hashlib.sha256, ).hexdigest() headers = { "X-Pagure": pagure_config["APP_URL"], "X-Pagure-project": project.fullname, "X-Pagure-Signature": hashhex, "X-Pagure-Signature-256": hashhex256, "X-Pagure-Topic": topic, "Content-Type": "application/json", } for url in sorted(urls): url = url.strip() _log.info("Calling url %s" % url) try: req = requests.post(url, headers=headers, data=content, timeout=60) if not req: _log.info( "An error occured while querying: %s - " "Error code: %s" % (url, req.status_code) ) except (requests.exceptions.RequestException, Exception) as err: _log.info( "An error occured while querying: %s - Error: %s" % (url, err) ) @conn.task(queue=pagure_config.get("WEBHOOK_CELERY_QUEUE", None), bind=True) @pagure_task def webhook_notification( self, session, topic, msg, namespace=None, name=None, user=None ): """Send webhook notifications about an event on that project. :arg session: SQLAlchemy session object :type session: sqlalchemy.orm.session.Session :arg topic: the topic for the notification :type topic: str :arg msg: the message to send via web-hook :type msg: str :kwarg namespace: the namespace of the project :type namespace: None or str :kwarg name: the name of the project :type name: None or str :kwarg user: the user of the project, only set if the project is a fork :type user: None or str """ project = pagure.lib.query._get_project( session, namespace=namespace, name=name, user=user ) if not project: session.close() raise RuntimeError( "Project: %s/%s from user: %s not found in the DB" % (namespace, name, user) ) urls = project.settings.get("Web-hooks") if not urls: _log.info("No URLs set: %s" % urls) return urls = urls.split("\n") _log.info("Got the project and urls, going to the webhooks") call_web_hooks(project, topic, msg, urls) @conn.task(queue=pagure_config.get("LOGCOM_CELERY_QUEUE", None), bind=True) @pagure_task def log_commit_send_notifications( self, session, name, commits, abspath, branch, default_branch, namespace=None, username=None, ): """Send webhook notifications about an event on that project. :arg session: SQLAlchemy session object :type session: sqlalchemy.orm.session.Session :arg topic: the topic for the notification :type topic: str :arg msg: the message to send via web-hook :type msg: str :kwarg namespace: the namespace of the project :type namespace: None or str :kwarg name: the name of the project :type name: None or str :kwarg user: the user of the project, only set if the project is a fork :type user: None or str """ _log.info( "Looking for project: %s%s of %s", "%s/" % namespace if namespace else "", name, username, ) project = pagure.lib.query._get_project( session, name, user=username, namespace=namespace ) if not project: _log.info("No project found") return _log.info("Found project: %s", project.fullname) _log.info("Processing %s commits in %s", len(commits), abspath) # Only log commits when the branch is the default branch log_all = pagure_config.get("LOG_ALL_COMMITS", False) if log_all or branch == default_branch: pagure.lib.git.log_commits_to_db(session, project, commits, abspath) else: _log.info( "Not logging commits not made on the default branch: %s", default_branch, ) # Notify subscribed users that there are new commits email_watchcommits = pagure_config.get("EMAIL_ON_WATCHCOMMITS", True) _log.info("Sending notification about the commit: %s", email_watchcommits) if email_watchcommits: pagure.lib.notify.notify_new_commits(abspath, project, branch, commits) try: session.commit() except SQLAlchemyError as err: # pragma: no cover _log.exception(err) session.rollback() def get_files_to_load(title, new_commits_list, abspath): _log.info("%s: Retrieve the list of files changed" % title) file_list = [] new_commits_list.reverse() n = len(new_commits_list) for idx, commit in enumerate(new_commits_list): if (idx % 100) == 0: _log.info( "Loading files change in commits for %s: %s/%s", title, idx, n ) if commit == new_commits_list[0]: filenames = pagure.lib.git.read_git_lines( [ "diff-tree", "--no-commit-id", "--name-only", "-r", "--root", commit, ], abspath, ) else: filenames = pagure.lib.git.read_git_lines( ["diff-tree", "--no-commit-id", "--name-only", "-r", commit], abspath, ) for line in filenames: if line.strip(): file_list.append(line.strip()) return file_list @conn.task(queue=pagure_config.get("LOADJSON_CELERY_QUEUE", None), bind=True) @pagure_task def load_json_commits_to_db( self, session, name, commits, abspath, data_type, agent, namespace=None, username=None, ): """Loads into the database the specified commits that have been pushed to either the tickets or the pull-request repository. """ if data_type not in ["ticket", "pull-request"]: _log.info("LOADJSON: Invalid data_type retrieved: %s", data_type) return _log.info( "LOADJSON: Looking for project: %s%s of user: %s", "%s/" % namespace if namespace else "", name, username, ) project = pagure.lib.query._get_project( session, name, user=username, namespace=namespace ) if not project: _log.info("LOADJSON: No project found") return _log.info("LOADJSON: Found project: %s", project.fullname) _log.info( "LOADJSON: %s: Processing %s commits in %s", project.fullname, len(commits), abspath, ) file_list = set(get_files_to_load(project.fullname, commits, abspath)) n = len(file_list) _log.info("LOADJSON: %s files to process" % n) mail_body = [ "Good Morning", "", "This is the log of loading all the files pushed in the git repo into", "the database. It should ignore files that are not JSON files, this", "is fine.", "", ] for idx, filename in enumerate(sorted(file_list)): _log.info( "LOADJSON: Loading: %s: %s -- %s/%s", project.fullname, filename, idx + 1, n, ) tmp = "Loading: %s -- %s/%s" % (filename, idx + 1, n) try: json_data = None data = "".join( pagure.lib.git.read_git_lines( ["show", "HEAD:%s" % filename], abspath ) ) if data and not filename.startswith("files/"): try: json_data = json.loads(data) except ValueError: pass if json_data: if data_type == "ticket": pagure.lib.git.update_ticket_from_git( session, reponame=name, namespace=namespace, username=username, issue_uid=filename, json_data=json_data, agent=agent, ) elif data_type == "pull-request": pagure.lib.git.update_request_from_git( session, reponame=name, namespace=namespace, username=username, request_uid=filename, json_data=json_data, ) tmp += " ... ... Done" else: tmp += " ... ... SKIPPED - No JSON data" mail_body.append(tmp) except Exception as err: _log.info("data: %s", json_data) session.rollback() _log.exception(err) tmp += " ... ... FAILED\n" tmp += format_callstack() break finally: mail_body.append(tmp) try: session.commit() _log.info( "LOADJSON: Emailing results for %s to %s", project.fullname, agent ) try: if not agent: raise pagure.exceptions.PagureException( "No agent found: %s" % agent ) if agent != "pagure": user_obj = pagure.lib.query.get_user(session, agent) pagure.lib.notify.send_email( "\n".join(mail_body), "Issue import report", user_obj.default_email, ) except pagure.exceptions.PagureException: _log.exception("LOADJSON: Could not find user %s" % agent) except SQLAlchemyError: # pragma: no cover session.rollback() _log.info("LOADJSON: Ready for another") @conn.task(queue=pagure_config.get("CI_CELERY_QUEUE", None), bind=True) @pagure_task def trigger_ci_build( self, session, cause, branch, branch_to, ci_type, project_name=None, pr_uid=None, ): """Triggers a new run of the CI system on the specified pull-request.""" pagure.lib.plugins.get_plugin("Pagure CI") if not pr_uid and not project_name: _log.debug("No PR UID nor project name specified, can't trigger CI") session.close() return if pr_uid: pr = pagure.lib.query.get_request_by_uid(session, pr_uid) if pr.remote: project_name = pr.project.fullname else: project_name = pr.project_from.fullname user, namespace, project_name = split_project_fullname(project_name) _log.info("Pagure-CI: Looking for project: %s", project_name) project = pagure.lib.query.get_authorized_project( session=session, project_name=project_name, user=user, namespace=namespace, ) if project is None: _log.warning( "Pagure-CI: No project could be found for the name %s", project_name, ) session.close() return if project.is_fork: if ( project.parent.ci_hook is None or project.parent.ci_hook.ci_url is None ): raise pagure.exceptions.PagureException( "Project %s not configured or incorectly configured for ci", project.parent.fullname, ) elif project.ci_hook is None or project.ci_hook.ci_url is None: raise pagure.exceptions.PagureException( "Project %s not configured or incorectly configured for ci", project.fullname, ) _log.info("Pagure-CI: project retrieved: %s", project.fullname) _log.info( "Pagure-CI: Trigger from %s cause (PR# or commit) %s branch: %s", project.fullname, cause, branch, ) if ci_type == "jenkins": jenk_project = project if project.is_fork: jenk_project = project.parent trigger_jenkins_build( project_path=project.path, url=jenk_project.ci_hook.ci_url, job=jenk_project.ci_hook.ci_job, token=jenk_project.ci_hook.pagure_ci_token, branch=branch, branch_to=branch_to, cause=cause, ci_username=jenk_project.ci_hook.ci_username, ci_password=jenk_project.ci_hook.ci_password, ) else: _log.warning("Pagure-CI:Un-supported CI type") _log.info("Pagure-CI: Ready for another")