pagure_loadjson_server.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. (c) 2017 - Copyright Red Hat Inc
  5. Authors:
  6. Pierre-Yves Chibon <pingou@pingoured.fr>
  7. This server listens to message sent to redis via post commits hook and find
  8. the list of files modified by the commits listed in the message and sync
  9. them into the database.
  10. Using this mechanism, we no longer need to block the git push until all the
  11. files have been uploaded (which when migrating some large projects over to
  12. pagure can be really time-consuming).
  13. """
  14. import json
  15. import logging
  16. import os
  17. import traceback
  18. import inspect
  19. import trollius
  20. import trollius_redis
  21. from sqlalchemy.exc import SQLAlchemyError
  22. _log = logging.getLogger(__name__)
  23. if 'PAGURE_CONFIG' not in os.environ \
  24. and os.path.exists('/etc/pagure/pagure.cfg'):
  25. print 'Using configuration file `/etc/pagure/pagure.cfg`'
  26. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  27. import pagure
  28. import pagure.exceptions
  29. import pagure.lib
  30. import pagure.lib.notify
  31. def format_callstack():
  32. """ Format the callstack to find out the stack trace. """
  33. ind = 0
  34. for ind, frame in enumerate(f[0] for f in inspect.stack()):
  35. if '__name__' not in frame.f_globals:
  36. continue
  37. modname = frame.f_globals['__name__'].split('.')[0]
  38. if modname != "logging":
  39. break
  40. def _format_frame(frame):
  41. """ Format the frame. """
  42. return ' File "%s", line %i in %s\n %s' % (frame)
  43. stack = traceback.extract_stack()
  44. stack = stack[:-ind]
  45. return "\n".join([_format_frame(frame) for frame in stack])
  46. def get_files_to_load(title, new_commits_list, abspath):
  47. _log.info('%s: Retrieve the list of files changed' % title)
  48. file_list = []
  49. new_commits_list.reverse()
  50. n = len(new_commits_list)
  51. for idx, commit in enumerate(new_commits_list):
  52. if (idx % 100) == 0:
  53. _log.info(
  54. 'Loading files change in commits for %s: %s/%s',
  55. title, idx, n)
  56. if commit == new_commits_list[0]:
  57. filenames = pagure.lib.git.read_git_lines(
  58. ['diff-tree', '--no-commit-id', '--name-only', '-r', '--root',
  59. commit], abspath)
  60. else:
  61. filenames = pagure.lib.git.read_git_lines(
  62. ['diff-tree', '--no-commit-id', '--name-only', '-r', commit],
  63. abspath)
  64. for line in filenames:
  65. if line.strip():
  66. file_list.append(line.strip())
  67. return file_list
  68. @trollius.coroutine
  69. def handle_messages():
  70. ''' Handles connecting to redis and acting upon messages received.
  71. In this case, it means logging into the DB the commits specified in the
  72. message for the specified repo.
  73. The currently accepted message format looks like:
  74. ::
  75. {
  76. "project": {
  77. "name": "foo",
  78. "namespace": null,
  79. "parent": null,
  80. "username": {
  81. "name": "user"
  82. }
  83. },
  84. "abspath": "/srv/git/repositories/pagure.git",
  85. "commits": [
  86. "b7b4059c44d692d7df3227ce58ce01191e5407bd",
  87. "f8d0899bb6654590ffdef66b539fd3b8cf873b35",
  88. "9b6fdc48d3edab82d3de28953271ea52b0a96117"
  89. ],
  90. "data_type": "ticket",
  91. "agent": "pingou",
  92. }
  93. '''
  94. host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0')
  95. port = pagure.APP.config.get('REDIS_PORT', 6379)
  96. dbname = pagure.APP.config.get('REDIS_DB', 0)
  97. connection = yield trollius.From(trollius_redis.Connection.create(
  98. host=host, port=port, db=dbname))
  99. # Create subscriber.
  100. subscriber = yield trollius.From(connection.start_subscribe())
  101. # Subscribe to channel.
  102. yield trollius.From(subscriber.subscribe(['pagure.loadjson']))
  103. # Inside a while loop, wait for incoming events.
  104. while True:
  105. reply = yield trollius.From(subscriber.next_published())
  106. _log.info(
  107. 'Received: %s on channel: %s',
  108. repr(reply.value), reply.channel)
  109. _log.info('Loading the json')
  110. data = json.loads(reply.value)
  111. _log.info('Done: loading the json')
  112. commits = data['commits']
  113. abspath = data['abspath']
  114. repo = data['project']['name']
  115. username = data['project']['username']['name'] \
  116. if data['project']['parent'] else None
  117. namespace = data['project']['namespace']
  118. data_type = data['data_type']
  119. agent = data['agent']
  120. if data_type not in ['ticket', 'pull-request']:
  121. _log.info('Invalid data_type retrieved: %s', data_type)
  122. continue
  123. session = pagure.lib.create_session(pagure.APP.config['DB_URL'])
  124. _log.info('Looking for project: %s%s of user: %s',
  125. '%s/' % namespace if namespace else '',
  126. repo, username)
  127. project = pagure.lib._get_project(
  128. session, repo, user=username, namespace=namespace)
  129. if not project:
  130. _log.info('No project found')
  131. continue
  132. _log.info('Found project: %s', project.fullname)
  133. _log.info(
  134. '%s: Processing %s commits in %s', project.fullname,
  135. len(commits), abspath)
  136. file_list = set(get_files_to_load(project.fullname, commits, abspath))
  137. n = len(file_list)
  138. _log.info('%s files to process' % n)
  139. mail_body = []
  140. for idx, filename in enumerate(file_list):
  141. _log.info(
  142. 'Loading: %s: %s -- %s/%s', project.fullname, filename,
  143. idx+1, n)
  144. tmp = 'Loading: %s -- %s/%s' % (filename, idx+1, n)
  145. json_data = None
  146. data = ''.join(
  147. pagure.lib.git.read_git_lines(
  148. ['show', 'HEAD:%s' % filename], abspath))
  149. if data and not filename.startswith('files/'):
  150. try:
  151. json_data = json.loads(data)
  152. except ValueError:
  153. pass
  154. if json_data:
  155. try:
  156. if data_type == 'ticket':
  157. pagure.lib.git.update_ticket_from_git(
  158. session,
  159. reponame=repo,
  160. namespace=namespace,
  161. username=username,
  162. issue_uid=filename,
  163. json_data=json_data
  164. )
  165. tmp += ' ... ... Done'
  166. except Exception as err:
  167. _log.info('data: %s', json_data)
  168. session.rollback()
  169. _log.exception(err)
  170. tmp += ' ... ... FAILED\n'
  171. tmp += format_callstack()
  172. break
  173. finally:
  174. mail_body.append(tmp)
  175. else:
  176. tmp += ' ... ... SKIPPED - No JSON data'
  177. mail_body.append(tmp)
  178. try:
  179. session.commit()
  180. _log.info(
  181. 'Emailing results for %s to %s', project.fullname, agent)
  182. try:
  183. if not agent:
  184. raise pagure.exceptions.PagureException(
  185. 'No agent found: %s' % agent)
  186. user_obj = pagure.lib.get_user(session, agent)
  187. pagure.lib.notify.send_email(
  188. '\n'.join(mail_body),
  189. 'Issue import report',
  190. user_obj.default_email)
  191. except pagure.exceptions.PagureException as err:
  192. _log.exception('Could not find user %s' % agent)
  193. except SQLAlchemyError as err: # pragma: no cover
  194. session.rollback()
  195. finally:
  196. session.close()
  197. _log.info('Ready for another')
  198. def main():
  199. ''' Start the main async loop. '''
  200. try:
  201. loop = trollius.get_event_loop()
  202. tasks = [
  203. trollius.async(handle_messages()),
  204. ]
  205. loop.run_until_complete(trollius.wait(tasks))
  206. loop.run_forever()
  207. except KeyboardInterrupt:
  208. pass
  209. except trollius.ConnectionResetError:
  210. pass
  211. _log.info("End Connection")
  212. loop.close()
  213. _log.info("End")
  214. if __name__ == '__main__':
  215. formatter = logging.Formatter(
  216. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  217. logging.basicConfig(level=logging.DEBUG)
  218. # setup console logging
  219. _log.setLevel(logging.DEBUG)
  220. shellhandler = logging.StreamHandler()
  221. shellhandler.setLevel(logging.DEBUG)
  222. aslog = logging.getLogger("asyncio")
  223. aslog.setLevel(logging.DEBUG)
  224. aslog = logging.getLogger("trollius")
  225. aslog.setLevel(logging.DEBUG)
  226. # Turn down the logs coming from python-markdown
  227. mklog = logging.getLogger("MARKDOWN")
  228. mklog.setLevel(logging.WARN)
  229. shellhandler.setFormatter(formatter)
  230. _log.addHandler(shellhandler)
  231. main()