pagure-webhook-server.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. This server listens to message sent via redis and send the corresponding
  7. web-hook request.
  8. Using this mechanism, we no longer block the main application if the
  9. receiving end is offline or so.
  10. """
  11. import datetime
  12. import hashlib
  13. import hmac
  14. import json
  15. import logging
  16. import os
  17. import requests
  18. import time
  19. import uuid
  20. import six
  21. import trollius
  22. import trollius_redis
  23. from kitchen.text.converters import to_bytes
  24. log = logging.getLogger(__name__)
  25. if 'PAGURE_CONFIG' not in os.environ \
  26. and os.path.exists('/etc/pagure/pagure.cfg'):
  27. print 'Using configuration file `/etc/pagure/pagure.cfg`'
  28. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  29. import pagure
  30. import pagure.lib
  31. from pagure.exceptions import PagureEvException
  32. _i = 0
  33. def call_web_hooks(project, topic, msg, urls):
  34. ''' Sends the web-hook notification. '''
  35. log.info(
  36. "Processing project: %s - topic: %s", project.fullname, topic)
  37. log.debug('msg: %s', msg)
  38. # Send web-hooks notification
  39. global _i
  40. _i += 1
  41. year = datetime.datetime.now().year
  42. if isinstance(topic, six.text_type):
  43. topic = to_bytes(topic, encoding='utf8', nonstring="passthru")
  44. msg['pagure_instance'] = pagure.APP.config['APP_URL']
  45. msg['project_fullname'] = project.fullname
  46. msg = dict(
  47. topic=topic.decode('utf-8'),
  48. msg=msg,
  49. timestamp=int(time.time()),
  50. msg_id=str(year) + '-' + str(uuid.uuid4()),
  51. i=_i,
  52. )
  53. content = json.dumps(msg)
  54. hashhex = hmac.new(
  55. str(project.hook_token), content, hashlib.sha1).hexdigest()
  56. hashhex256 = hmac.new(
  57. str(project.hook_token), content, hashlib.sha256).hexdigest()
  58. headers = {
  59. 'X-Pagure': pagure.APP.config['APP_URL'],
  60. 'X-Pagure-project': project.fullname,
  61. 'X-Pagure-Signature': hashhex,
  62. 'X-Pagure-Signature-256': hashhex256,
  63. 'X-Pagure-Topic': topic,
  64. 'Content-Type': 'application/json',
  65. }
  66. for url in urls:
  67. url = url.strip()
  68. log.info('Calling url %s' % url)
  69. try:
  70. req = requests.post(
  71. url,
  72. headers=headers,
  73. data=content,
  74. timeout=60,
  75. )
  76. if not req:
  77. log.info(
  78. 'An error occured while querying: %s - '
  79. 'Error code: %s' % (url, req.status_code))
  80. except (requests.exceptions.RequestException, Exception) as err:
  81. log.info(
  82. 'An error occured while querying: %s - Error: %s' % (
  83. url, err))
  84. @trollius.coroutine
  85. def handle_messages():
  86. host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0')
  87. port = pagure.APP.config.get('REDIS_PORT', 6379)
  88. dbname = pagure.APP.config.get('REDIS_DB', 0)
  89. connection = yield trollius.From(trollius_redis.Connection.create(
  90. host=host, port=port, db=dbname))
  91. # Create subscriber.
  92. subscriber = yield trollius.From(connection.start_subscribe())
  93. # Subscribe to channel.
  94. yield trollius.From(subscriber.subscribe(['pagure.hook']))
  95. # Inside a while loop, wait for incoming events.
  96. while True:
  97. reply = yield trollius.From(subscriber.next_published())
  98. log.info(
  99. 'Received: %s on channel: %s',
  100. repr(reply.value), reply.channel)
  101. data = json.loads(reply.value)
  102. username = None
  103. if data['project'].startswith('forks'):
  104. username, projectname = data['project'].split('/', 2)[1:]
  105. else:
  106. projectname = data['project']
  107. namespace = None
  108. if '/' in projectname:
  109. namespace, projectname = projectname.split('/', 1)
  110. log.info(
  111. 'Searching %s/%s/%s' % (username, namespace, projectname))
  112. session = pagure.lib.create_session(pagure.APP.config['DB_URL'])
  113. project = pagure.lib._get_project(
  114. session=session, name=projectname, user=username,
  115. namespace=namespace)
  116. if not project:
  117. log.info('No project found with these criteria')
  118. session.close()
  119. continue
  120. urls = project.settings.get('Web-hooks')
  121. session.close()
  122. if not urls:
  123. log.info('No URLs set: %s' % urls)
  124. continue
  125. urls = urls.split('\n')
  126. log.info('Got the project, going to the webhooks')
  127. call_web_hooks(project, data['topic'], data['msg'], urls)
  128. def main():
  129. server = None
  130. try:
  131. loop = trollius.get_event_loop()
  132. tasks = [
  133. trollius.async(handle_messages()),
  134. ]
  135. loop.run_until_complete(trollius.wait(tasks))
  136. loop.run_forever()
  137. except KeyboardInterrupt:
  138. pass
  139. except trollius.ConnectionResetError:
  140. pass
  141. log.info("End Connection")
  142. loop.close()
  143. log.info("End")
  144. if __name__ == '__main__':
  145. log = logging.getLogger("")
  146. formatter = logging.Formatter(
  147. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  148. # setup console logging
  149. log.setLevel(logging.DEBUG)
  150. ch = logging.StreamHandler()
  151. ch.setLevel(logging.DEBUG)
  152. aslog = logging.getLogger("asyncio")
  153. aslog.setLevel(logging.DEBUG)
  154. ch.setFormatter(formatter)
  155. log.addHandler(ch)
  156. main()