pagure_logcom_server.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. (c) 2016 - Copyright Red Hat Inc
  5. Authors:
  6. Pierre-Yves Chibon <pingou@pingoured.fr>
  7. This server listens to message sent via redis post commits and log the
  8. user's activity in the database.
  9. Using this mechanism, we no longer need to block the git push until all the
  10. activity has been logged (which is you push the kernel tree for the first
  11. time can be really time-consuming).
  12. """
  13. import json
  14. import logging
  15. import os
  16. from sqlalchemy.exc import SQLAlchemyError
  17. import trollius
  18. import trollius_redis
  19. _log = logging.getLogger(__name__)
  20. if 'PAGURE_CONFIG' not in os.environ \
  21. and os.path.exists('/etc/pagure/pagure.cfg'):
  22. print 'Using configuration file `/etc/pagure/pagure.cfg`'
  23. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  24. import pagure
  25. import pagure.lib
  26. @trollius.coroutine
  27. def handle_messages():
  28. ''' Handles connecting to redis and acting upon messages received.
  29. In this case, it means logging into the DB the commits specified in the
  30. message for the default repo or sending commit notification emails.
  31. The currently accepted message format looks like:
  32. ::
  33. {
  34. "project": {
  35. "name": "foo",
  36. "namespace": null,
  37. "parent": null,
  38. "username": {
  39. "name": "user"
  40. }
  41. },
  42. "abspath": "/srv/git/repositories/pagure.git",
  43. "commits": [
  44. "b7b4059c44d692d7df3227ce58ce01191e5407bd",
  45. "f8d0899bb6654590ffdef66b539fd3b8cf873b35",
  46. "9b6fdc48d3edab82d3de28953271ea52b0a96117"
  47. ],
  48. "branch": "master",
  49. "default_branch": "master"
  50. }
  51. '''
  52. host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0')
  53. port = pagure.APP.config.get('REDIS_PORT', 6379)
  54. dbname = pagure.APP.config.get('REDIS_DB', 0)
  55. connection = yield trollius.From(trollius_redis.Connection.create(
  56. host=host, port=port, db=dbname))
  57. # Create subscriber.
  58. subscriber = yield trollius.From(connection.start_subscribe())
  59. # Subscribe to channel.
  60. yield trollius.From(subscriber.subscribe(['pagure.logcom']))
  61. # Inside a while loop, wait for incoming events.
  62. while True:
  63. reply = yield trollius.From(subscriber.next_published())
  64. _log.info(
  65. 'Received: %s on channel: %s',
  66. repr(reply.value), reply.channel)
  67. data = json.loads(reply.value)
  68. commits = data['commits']
  69. abspath = data['abspath']
  70. branch = data['branch']
  71. default_branch = data['default_branch']
  72. repo = data['project']['name']
  73. username = data['project']['username']['name'] \
  74. if data['project']['parent'] else None
  75. namespace = data['project']['namespace']
  76. session = pagure.lib.create_session(pagure.APP.config['DB_URL'])
  77. _log.info('Looking for project: %s%s of %s',
  78. '%s/' % namespace if namespace else '',
  79. repo, username)
  80. project = pagure.lib._get_project(
  81. pagure.SESSION, repo, user=username, namespace=namespace)
  82. if not project:
  83. _log.info('No project found')
  84. continue
  85. _log.info('Found project: %s', project.fullname)
  86. _log.info('Processing %s commits in %s', len(commits), abspath)
  87. # Only log commits when the branch is the default branch
  88. if branch == default_branch:
  89. pagure.lib.git.log_commits_to_db(
  90. session, project, commits, abspath)
  91. # Notify subscribed users that there are new commits
  92. pagure.lib.notify.notify_new_commits(
  93. abspath, project, branch, commits)
  94. try:
  95. session.commit()
  96. except SQLAlchemyError as err: # pragma: no cover
  97. session.rollback()
  98. finally:
  99. session.close()
  100. _log.info('Ready for another')
  101. def main():
  102. ''' Start the main async loop. '''
  103. try:
  104. loop = trollius.get_event_loop()
  105. tasks = [
  106. trollius.async(handle_messages()),
  107. ]
  108. loop.run_until_complete(trollius.wait(tasks))
  109. loop.run_forever()
  110. except KeyboardInterrupt:
  111. pass
  112. except trollius.ConnectionResetError:
  113. pass
  114. _log.info("End Connection")
  115. loop.close()
  116. _log.info("End")
  117. if __name__ == '__main__':
  118. formatter = logging.Formatter(
  119. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  120. logging.basicConfig(level=logging.DEBUG)
  121. # setup console logging
  122. _log.setLevel(logging.DEBUG)
  123. shellhandler = logging.StreamHandler()
  124. shellhandler.setLevel(logging.DEBUG)
  125. aslog = logging.getLogger("asyncio")
  126. aslog.setLevel(logging.DEBUG)
  127. aslog = logging.getLogger("trollius")
  128. aslog.setLevel(logging.DEBUG)
  129. shellhandler.setFormatter(formatter)
  130. _log.addHandler(shellhandler)
  131. main()