pagure-webhook-server.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. This server listens to message sent via redis and send the corresponding
  7. web-hook request.
  8. Using this mechanism, we no longer block the main application if the
  9. receiving end is offline or so.
  10. """
  11. import datetime
  12. import hashlib
  13. import hmac
  14. import json
  15. import logging
  16. import os
  17. import requests
  18. import time
  19. import uuid
  20. import six
  21. import trollius
  22. import trollius_redis
  23. from kitchen.text.converters import to_bytes
  24. log = logging.getLogger(__name__)
  25. if 'PAGURE_CONFIG' not in os.environ \
  26. and os.path.exists('/etc/pagure/pagure.cfg'):
  27. print 'Using configuration file `/etc/pagure/pagure.cfg`'
  28. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  29. import pagure
  30. import pagure.lib
  31. from pagure.exceptions import PagureEvException
  32. _i = 0
  33. def call_web_hooks(project, topic, msg):
  34. ''' Sends the web-hook notification. '''
  35. log.info(
  36. "Processing project: %s - topic: %s", project.fullname, topic)
  37. log.debug('msg: %s', msg)
  38. # Send web-hooks notification
  39. global _i
  40. _i += 1
  41. year = datetime.datetime.now().year
  42. if isinstance(topic, six.text_type):
  43. topic = to_bytes(topic, encoding='utf8', nonstring="passthru")
  44. msg = dict(
  45. topic=topic.decode('utf-8'),
  46. msg=msg,
  47. timestamp=int(time.time()),
  48. msg_id=str(year) + '-' + str(uuid.uuid4()),
  49. i=_i,
  50. )
  51. content = json.dumps(msg)
  52. hashhex = hmac.new(
  53. str(project.hook_token), content, hashlib.sha1).hexdigest()
  54. headers = {
  55. 'X-Pagure-Topic': topic,
  56. 'X-Pagure-Signature': hashhex
  57. }
  58. msg = json.dumps(msg)
  59. for url in project.settings.get('Web-hooks').split('\n'):
  60. url = url.strip()
  61. log.info('Calling url %s' % url)
  62. try:
  63. req = requests.post(
  64. url,
  65. headers=headers,
  66. data={'payload': msg}
  67. )
  68. if not req:
  69. log.info(
  70. 'An error occured while querying: %s - '
  71. 'Error code: %s' % (url, req.status_code))
  72. except (requests.exceptions.RequestException, Exception) as err:
  73. log.info(
  74. 'An error occured while querying: %s - Error: %s' % (
  75. url, err))
  76. @trollius.coroutine
  77. def handle_messages():
  78. connection = yield trollius.From(trollius_redis.Connection.create(
  79. host='0.0.0.0', port=6379, db=0))
  80. # Create subscriber.
  81. subscriber = yield trollius.From(connection.start_subscribe())
  82. # Subscribe to channel.
  83. yield trollius.From(subscriber.subscribe(['pagure.hook']))
  84. # Inside a while loop, wait for incoming events.
  85. while True:
  86. reply = yield trollius.From(subscriber.next_published())
  87. log.info(
  88. 'Received: %s on channel: %s',
  89. repr(reply.value), reply.channel)
  90. data = json.loads(reply.value)
  91. username = None
  92. if data['project'].startswith('forks'):
  93. _, username, projectname = data['project'].split('/', 2)
  94. else:
  95. projectname = data['project']
  96. project = pagure.lib.get_project(
  97. session=pagure.SESSION, name=projectname, user=username)
  98. log.info('Got the project, going to the webhooks')
  99. call_web_hooks(project, data['topic'], data['msg'])
  100. def main():
  101. server = None
  102. try:
  103. loop = trollius.get_event_loop()
  104. tasks = [
  105. trollius.async(handle_messages()),
  106. ]
  107. loop.run_until_complete(trollius.wait(tasks))
  108. loop.run_forever()
  109. except KeyboardInterrupt:
  110. pass
  111. except trollius.ConnectionResetError:
  112. pass
  113. log.info("End Connection")
  114. loop.close()
  115. log.info("End")
  116. if __name__ == '__main__':
  117. log = logging.getLogger("")
  118. formatter = logging.Formatter(
  119. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  120. # setup console logging
  121. log.setLevel(logging.DEBUG)
  122. ch = logging.StreamHandler()
  123. ch.setLevel(logging.DEBUG)
  124. aslog = logging.getLogger("asyncio")
  125. aslog.setLevel(logging.DEBUG)
  126. ch.setFormatter(formatter)
  127. log.addHandler(ch)
  128. main()