pagure_stream_server.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015-2017 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. Streaming server for pagure's eventsource feature
  7. This server takes messages sent to redis and publish them at the specified
  8. endpoint
  9. To test, run this script and in another terminal
  10. nc localhost 8080
  11. HELLO
  12. GET /test/issue/26?foo=bar HTTP/1.1
  13. """
  14. from __future__ import unicode_literals
  15. import logging
  16. import os
  17. import redis
  18. from trololio import asyncio as trololio
  19. from six.moves.urllib.parse import urlparse
  20. log = logging.getLogger(__name__)
  21. if 'PAGURE_CONFIG' not in os.environ \
  22. and os.path.exists('/etc/pagure/pagure.cfg'):
  23. print('Using configuration file `/etc/pagure/pagure.cfg`')
  24. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  25. import pagure # noqa: E402
  26. import pagure.lib.query # noqa: E402
  27. from pagure.exceptions import PagureException, PagureEvException # noqa: E402
  28. SERVER = None
  29. SESSION = None
  30. POOL = redis.ConnectionPool(
  31. host=pagure.config.config['REDIS_HOST'],
  32. port=pagure.config.config['REDIS_PORT'],
  33. db=pagure.config.config['REDIS_DB'])
  34. def _get_session():
  35. global SESSION
  36. if SESSION is None:
  37. print(pagure.config.config['DB_URL'])
  38. SESSION = pagure.lib.query.create_session(
  39. pagure.config.config['DB_URL'])
  40. return SESSION
  41. def _get_issue(repo, objid):
  42. """Get a Ticket (issue) instance for a given repo (Project) and
  43. objid (issue number).
  44. """
  45. issue = None
  46. if not repo.settings.get('issue_tracker', True):
  47. raise PagureEvException("No issue tracker found for this project")
  48. session = _get_session()
  49. issue = pagure.lib.query.search_issues(session, repo, issueid=objid)
  50. if issue is None or issue.project != repo:
  51. raise PagureEvException("Issue '%s' not found" % objid)
  52. if issue.private:
  53. # TODO: find a way to do auth
  54. raise PagureEvException(
  55. "This issue is private and you are not allowed to view it")
  56. return issue
  57. def _get_pull_request(repo, objid):
  58. """Get a PullRequest instance for a given repo (Project) and objid
  59. (request number).
  60. """
  61. if not repo.settings.get('pull_requests', True):
  62. raise PagureEvException(
  63. "No pull-request tracker found for this project")
  64. session = _get_session()
  65. request = pagure.lib.query.search_pull_requests(
  66. session, project_id=repo.id, requestid=objid)
  67. if request is None or request.project != repo:
  68. raise PagureEvException("Pull-Request '%s' not found" % objid)
  69. return request
  70. # Dict representing known object types that we handle requests for,
  71. # and the bound functions for getting an object instance from the
  72. # parsed path data. Has to come after the functions it binds
  73. OBJECTS = {
  74. 'issue': _get_issue,
  75. 'pull-request': _get_pull_request
  76. }
  77. def get_obj_from_path(path):
  78. """ Return the Ticket or Request object based on the path provided.
  79. """
  80. (username, namespace, reponame, objtype, objid) = pagure.utils.parse_path(
  81. path)
  82. session = _get_session()
  83. repo = pagure.lib.query.get_authorized_project(
  84. session, reponame, user=username, namespace=namespace)
  85. if repo is None:
  86. raise PagureEvException("Project '%s' not found" % reponame)
  87. # find the appropriate object getter function from OBJECTS
  88. try:
  89. getfunc = OBJECTS[objtype]
  90. except KeyError:
  91. raise PagureEvException("Invalid object provided: '%s'" % objtype)
  92. return getfunc(repo, objid)
  93. @trololio.coroutine
  94. def handle_client(client_reader, client_writer):
  95. data = None
  96. while True:
  97. # give client a chance to respond, timeout after 10 seconds
  98. line = yield trololio.From(trololio.wait_for(
  99. client_reader.readline(),
  100. timeout=10.0))
  101. if not line.decode().strip():
  102. break
  103. line = line.decode().rstrip()
  104. if data is None:
  105. data = line
  106. if data is None:
  107. log.warning("Expected ticket uid, received None")
  108. return
  109. data = data.decode().rstrip().split()
  110. log.info("Received %s", data)
  111. if not data:
  112. log.warning("No URL provided: %s" % data)
  113. return
  114. if '/' not in data[1]:
  115. log.warning("Invalid URL provided: %s" % data[1])
  116. return
  117. url = urlparse(data[1])
  118. try:
  119. obj = get_obj_from_path(url.path)
  120. except PagureException as err:
  121. log.warning(err.message)
  122. return
  123. origin = pagure.config.config.get('APP_URL')
  124. if origin.endswith('/'):
  125. origin = origin[:-1]
  126. client_writer.write((
  127. "HTTP/1.0 200 OK\n"
  128. "Content-Type: text/event-stream\n"
  129. "Cache: nocache\n"
  130. "Connection: keep-alive\n"
  131. "Access-Control-Allow-Origin: %s\n\n" % origin
  132. ).encode())
  133. conn = redis.Redis(connection_pool=POOL)
  134. subscriber = conn.pubsub(ignore_subscribe_messages=True)
  135. try:
  136. subscriber.subscribe('pagure.%s' % obj.uid)
  137. # Inside a while loop, wait for incoming events.
  138. oncall = 0
  139. while True:
  140. msg = subscriber.get_message()
  141. if msg is None:
  142. # Send a ping to see if the client is still alive
  143. if oncall >= 5:
  144. # Only send a ping once every 5 seconds
  145. client_writer.write(('event: ping\n\n').encode())
  146. oncall = 0
  147. oncall += 1
  148. yield trololio.From(client_writer.drain())
  149. yield trololio.From(trololio.sleep(1))
  150. else:
  151. log.info("Sending %s", msg['data'])
  152. client_writer.write(('data: %s\n\n' % msg['data']).encode())
  153. yield trololio.From(client_writer.drain())
  154. except OSError:
  155. log.info("Client closed connection")
  156. except trololio.ConnectionResetError as err:
  157. log.exception("ERROR: ConnectionResetError in handle_client")
  158. except Exception as err:
  159. log.exception("ERROR: Exception in handle_client")
  160. log.info(type(err))
  161. finally:
  162. # Wathever happens, close the connection.
  163. log.info("Client left. Goodbye!")
  164. subscriber.close()
  165. client_writer.close()
  166. @trololio.coroutine
  167. def stats(client_reader, client_writer):
  168. try:
  169. log.info('Clients: %s', SERVER.active_count)
  170. client_writer.write((
  171. "HTTP/1.0 200 OK\n"
  172. "Cache: nocache\n\n"
  173. ).encode())
  174. client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
  175. yield trololio.From(client_writer.drain())
  176. except trololio.ConnectionResetError as err:
  177. log.info(err)
  178. finally:
  179. client_writer.close()
  180. return
  181. def main():
  182. global SERVER
  183. _get_session()
  184. try:
  185. loop = trololio.get_event_loop()
  186. coro = trololio.start_server(
  187. handle_client,
  188. host=None,
  189. port=pagure.config.config['EVENTSOURCE_PORT'],
  190. loop=loop)
  191. SERVER = loop.run_until_complete(coro)
  192. log.info(
  193. 'Serving server at {}'.format(SERVER.sockets[0].getsockname()))
  194. if pagure.config.config.get('EV_STATS_PORT'):
  195. stats_coro = trololio.start_server(
  196. stats,
  197. host=None,
  198. port=pagure.config.config.get('EV_STATS_PORT'),
  199. loop=loop)
  200. stats_server = loop.run_until_complete(stats_coro)
  201. log.info('Serving stats at {}'.format(
  202. stats_server.sockets[0].getsockname()))
  203. loop.run_forever()
  204. except KeyboardInterrupt:
  205. pass
  206. except trololio.ConnectionResetError as err:
  207. log.exception("ERROR: ConnectionResetError in main")
  208. except Exception:
  209. log.exception("ERROR: Exception in main")
  210. finally:
  211. # Close the server
  212. SERVER.close()
  213. if pagure.config.config.get('EV_STATS_PORT'):
  214. stats_server.close()
  215. log.info("End Connection")
  216. loop.run_until_complete(SERVER.wait_closed())
  217. loop.close()
  218. log.info("End")
  219. if __name__ == '__main__':
  220. log = logging.getLogger("")
  221. formatter = logging.Formatter(
  222. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  223. # setup console logging
  224. log.setLevel(logging.DEBUG)
  225. ch = logging.StreamHandler()
  226. ch.setLevel(logging.DEBUG)
  227. aslog = logging.getLogger("asyncio")
  228. aslog.setLevel(logging.DEBUG)
  229. ch.setFormatter(formatter)
  230. log.addHandler(ch)
  231. main()