123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- (c) 2017 - Copyright Red Hat Inc
- Authors:
- Pierre-Yves Chibon <pingou@pingoured.fr>
- This server listens to message sent to redis via post commits hook and find
- the list of files modified by the commits listed in the message and sync
- them into the database.
- Using this mechanism, we no longer need to block the git push until all the
- files have been uploaded (which when migrating some large projects over to
- pagure can be really time-consuming).
- """
- import json
- import logging
- import os
- import traceback
- import inspect
- import trollius
- import trollius_redis
- from sqlalchemy.exc import SQLAlchemyError
- _log = logging.getLogger(__name__)
- if 'PAGURE_CONFIG' not in os.environ \
- and os.path.exists('/etc/pagure/pagure.cfg'):
- print 'Using configuration file `/etc/pagure/pagure.cfg`'
- os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
- import pagure
- import pagure.exceptions
- import pagure.lib
- import pagure.lib.notify
- def format_callstack():
- """ Format the callstack to find out the stack trace. """
- ind = 0
- for ind, frame in enumerate(f[0] for f in inspect.stack()):
- if '__name__' not in frame.f_globals:
- continue
- modname = frame.f_globals['__name__'].split('.')[0]
- if modname != "logging":
- break
- def _format_frame(frame):
- """ Format the frame. """
- return ' File "%s", line %i in %s\n %s' % (frame)
- stack = traceback.extract_stack()
- stack = stack[:-ind]
- return "\n".join([_format_frame(frame) for frame in stack])
- def get_files_to_load(title, new_commits_list, abspath):
- _log.info('%s: Retrieve the list of files changed' % title)
- file_list = []
- new_commits_list.reverse()
- n = len(new_commits_list)
- for idx, commit in enumerate(new_commits_list):
- if (idx % 100) == 0:
- _log.info(
- 'Loading files change in commits for %s: %s/%s',
- title, idx, n)
- if commit == new_commits_list[0]:
- filenames = pagure.lib.git.read_git_lines(
- ['diff-tree', '--no-commit-id', '--name-only', '-r', '--root',
- commit], abspath)
- else:
- filenames = pagure.lib.git.read_git_lines(
- ['diff-tree', '--no-commit-id', '--name-only', '-r', commit],
- abspath)
- for line in filenames:
- if line.strip():
- file_list.append(line.strip())
- return file_list
- @trollius.coroutine
- def handle_messages():
- ''' Handles connecting to redis and acting upon messages received.
- In this case, it means logging into the DB the commits specified in the
- message for the specified repo.
- The currently accepted message format looks like:
- ::
- {
- "project": {
- "name": "foo",
- "namespace": null,
- "parent": null,
- "username": {
- "name": "user"
- }
- },
- "abspath": "/srv/git/repositories/pagure.git",
- "commits": [
- "b7b4059c44d692d7df3227ce58ce01191e5407bd",
- "f8d0899bb6654590ffdef66b539fd3b8cf873b35",
- "9b6fdc48d3edab82d3de28953271ea52b0a96117"
- ],
- "data_type": "ticket",
- "agent": "pingou",
- }
- '''
- host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0')
- port = pagure.APP.config.get('REDIS_PORT', 6379)
- dbname = pagure.APP.config.get('REDIS_DB', 0)
- connection = yield trollius.From(trollius_redis.Connection.create(
- host=host, port=port, db=dbname))
- # Create subscriber.
- subscriber = yield trollius.From(connection.start_subscribe())
- # Subscribe to channel.
- yield trollius.From(subscriber.subscribe(['pagure.loadjson']))
- # Inside a while loop, wait for incoming events.
- while True:
- reply = yield trollius.From(subscriber.next_published())
- _log.info(
- 'Received: %s on channel: %s',
- repr(reply.value), reply.channel)
- _log.info('Loading the json')
- data = json.loads(reply.value)
- _log.info('Done: loading the json')
- commits = data['commits']
- abspath = data['abspath']
- repo = data['project']['name']
- username = data['project']['username']['name'] \
- if data['project']['parent'] else None
- namespace = data['project']['namespace']
- data_type = data['data_type']
- agent = data['agent']
- if data_type not in ['ticket', 'pull-request']:
- _log.info('Invalid data_type retrieved: %s', data_type)
- continue
- session = pagure.lib.create_session(pagure.APP.config['DB_URL'])
- _log.info('Looking for project: %s%s of user: %s',
- '%s/' % namespace if namespace else '',
- repo, username)
- project = pagure.lib._get_project(
- session, repo, user=username, namespace=namespace)
- if not project:
- _log.info('No project found')
- continue
- _log.info('Found project: %s', project.fullname)
- _log.info(
- '%s: Processing %s commits in %s', project.fullname,
- len(commits), abspath)
- file_list = set(get_files_to_load(project.fullname, commits, abspath))
- n = len(file_list)
- _log.info('%s files to process' % n)
- mail_body = []
- for idx, filename in enumerate(file_list):
- _log.info(
- 'Loading: %s: %s -- %s/%s', project.fullname, filename,
- idx+1, n)
- tmp = 'Loading: %s -- %s/%s' % (filename, idx+1, n)
- json_data = None
- data = ''.join(
- pagure.lib.git.read_git_lines(
- ['show', 'HEAD:%s' % filename], abspath))
- if data and not filename.startswith('files/'):
- try:
- json_data = json.loads(data)
- except ValueError:
- pass
- if json_data:
- try:
- if data_type == 'ticket':
- pagure.lib.git.update_ticket_from_git(
- session,
- reponame=repo,
- namespace=namespace,
- username=username,
- issue_uid=filename,
- json_data=json_data
- )
- tmp += ' ... ... Done'
- except Exception as err:
- _log.info('data: %s', json_data)
- session.rollback()
- _log.exception(err)
- tmp += ' ... ... FAILED\n'
- tmp += format_callstack()
- break
- finally:
- mail_body.append(tmp)
- else:
- tmp += ' ... ... SKIPPED - No JSON data'
- mail_body.append(tmp)
- try:
- session.commit()
- _log.info(
- 'Emailing results for %s to %s', project.fullname, agent)
- try:
- if not agent:
- raise pagure.exceptions.PagureException(
- 'No agent found: %s' % agent)
- user_obj = pagure.lib.get_user(session, agent)
- pagure.lib.notify.send_email(
- '\n'.join(mail_body),
- 'Issue import report',
- user_obj.default_email)
- except pagure.exceptions.PagureException as err:
- _log.exception('Could not find user %s' % agent)
- except SQLAlchemyError as err: # pragma: no cover
- session.rollback()
- finally:
- session.close()
- _log.info('Ready for another')
- def main():
- ''' Start the main async loop. '''
- try:
- loop = trollius.get_event_loop()
- tasks = [
- trollius.async(handle_messages()),
- ]
- loop.run_until_complete(trollius.wait(tasks))
- loop.run_forever()
- except KeyboardInterrupt:
- pass
- except trollius.ConnectionResetError:
- pass
- _log.info("End Connection")
- loop.close()
- _log.info("End")
- if __name__ == '__main__':
- formatter = logging.Formatter(
- "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
- logging.basicConfig(level=logging.DEBUG)
- # setup console logging
- _log.setLevel(logging.DEBUG)
- shellhandler = logging.StreamHandler()
- shellhandler.setLevel(logging.DEBUG)
- aslog = logging.getLogger("asyncio")
- aslog.setLevel(logging.DEBUG)
- aslog = logging.getLogger("trollius")
- aslog.setLevel(logging.DEBUG)
- # Turn down the logs coming from python-markdown
- mklog = logging.getLogger("MARKDOWN")
- mklog.setLevel(logging.WARN)
- shellhandler.setFormatter(formatter)
- _log.addHandler(shellhandler)
- main()
|