pagure_stream_server.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015-2017 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. Streaming server for pagure's eventsource feature
  7. This server takes messages sent to redis and publish them at the specified
  8. endpoint
  9. To test, run this script and in another terminal
  10. nc localhost 8080
  11. HELLO
  12. GET /test/issue/26?foo=bar HTTP/1.1
  13. """
  14. from __future__ import unicode_literals, absolute_import
  15. import logging
  16. import os
  17. import redis
  18. from trololio import asyncio as trololio
  19. from six.moves.urllib.parse import urlparse
  20. log = logging.getLogger(__name__)
  21. if 'PAGURE_CONFIG' not in os.environ \
  22. and os.path.exists('/etc/pagure/pagure.cfg'):
  23. print('Using configuration file `/etc/pagure/pagure.cfg`')
  24. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  25. import pagure # noqa: E402
  26. import pagure.lib.model_base # noqa: E402
  27. import pagure.lib.query # noqa: E402
  28. from pagure.exceptions import PagureException, PagureEvException # noqa: E402
  29. SERVER = None
  30. SESSION = None
  31. POOL = redis.ConnectionPool(
  32. host=pagure.config.config['REDIS_HOST'],
  33. port=pagure.config.config['REDIS_PORT'],
  34. db=pagure.config.config['REDIS_DB'])
  35. def _get_session():
  36. global SESSION
  37. if SESSION is None:
  38. print(pagure.config.config['DB_URL'])
  39. SESSION = pagure.lib.model_base.create_session(
  40. pagure.config.config['DB_URL'])
  41. return SESSION
  42. def _get_issue(repo, objid):
  43. """Get a Ticket (issue) instance for a given repo (Project) and
  44. objid (issue number).
  45. """
  46. issue = None
  47. if not repo.settings.get('issue_tracker', True):
  48. raise PagureEvException("No issue tracker found for this project")
  49. session = _get_session()
  50. issue = pagure.lib.query.search_issues(session, repo, issueid=objid)
  51. if issue is None or issue.project != repo:
  52. raise PagureEvException("Issue '%s' not found" % objid)
  53. if issue.private:
  54. # TODO: find a way to do auth
  55. raise PagureEvException(
  56. "This issue is private and you are not allowed to view it")
  57. return issue
  58. def _get_pull_request(repo, objid):
  59. """Get a PullRequest instance for a given repo (Project) and objid
  60. (request number).
  61. """
  62. if not repo.settings.get('pull_requests', True):
  63. raise PagureEvException(
  64. "No pull-request tracker found for this project")
  65. session = _get_session()
  66. request = pagure.lib.query.search_pull_requests(
  67. session, project_id=repo.id, requestid=objid)
  68. if request is None or request.project != repo:
  69. raise PagureEvException("Pull-Request '%s' not found" % objid)
  70. return request
  71. # Dict representing known object types that we handle requests for,
  72. # and the bound functions for getting an object instance from the
  73. # parsed path data. Has to come after the functions it binds
  74. OBJECTS = {
  75. 'issue': _get_issue,
  76. 'pull-request': _get_pull_request
  77. }
  78. def get_obj_from_path(path):
  79. """ Return the Ticket or Request object based on the path provided.
  80. """
  81. (username, namespace, reponame, objtype, objid) = pagure.utils.parse_path(
  82. path)
  83. session = _get_session()
  84. repo = pagure.lib.query.get_authorized_project(
  85. session, reponame, user=username, namespace=namespace)
  86. if repo is None:
  87. raise PagureEvException("Project '%s' not found" % reponame)
  88. # find the appropriate object getter function from OBJECTS
  89. try:
  90. getfunc = OBJECTS[objtype]
  91. except KeyError:
  92. raise PagureEvException("Invalid object provided: '%s'" % objtype)
  93. return getfunc(repo, objid)
  94. @trololio.coroutine
  95. def handle_client(client_reader, client_writer):
  96. data = None
  97. while True:
  98. # give client a chance to respond, timeout after 10 seconds
  99. line = yield trololio.From(trololio.wait_for(
  100. client_reader.readline(),
  101. timeout=10.0))
  102. if not line.decode().strip():
  103. break
  104. line = line.decode().rstrip()
  105. if data is None:
  106. data = line
  107. if data is None:
  108. log.warning("Expected ticket uid, received None")
  109. return
  110. data = data.decode().rstrip().split()
  111. log.info("Received %s", data)
  112. if not data:
  113. log.warning("No URL provided: %s" % data)
  114. return
  115. if '/' not in data[1]:
  116. log.warning("Invalid URL provided: %s" % data[1])
  117. return
  118. url = urlparse(data[1])
  119. try:
  120. obj = get_obj_from_path(url.path)
  121. except PagureException as err:
  122. log.warning(err.message)
  123. return
  124. origin = pagure.config.config.get('APP_URL')
  125. if origin.endswith('/'):
  126. origin = origin[:-1]
  127. client_writer.write((
  128. "HTTP/1.0 200 OK\n"
  129. "Content-Type: text/event-stream\n"
  130. "Cache: nocache\n"
  131. "Connection: keep-alive\n"
  132. "Access-Control-Allow-Origin: %s\n\n" % origin
  133. ).encode())
  134. conn = redis.Redis(connection_pool=POOL)
  135. subscriber = conn.pubsub(ignore_subscribe_messages=True)
  136. try:
  137. subscriber.subscribe('pagure.%s' % obj.uid)
  138. # Inside a while loop, wait for incoming events.
  139. oncall = 0
  140. while True:
  141. msg = subscriber.get_message()
  142. if msg is None:
  143. # Send a ping to see if the client is still alive
  144. if oncall >= 5:
  145. # Only send a ping once every 5 seconds
  146. client_writer.write(('event: ping\n\n').encode())
  147. oncall = 0
  148. oncall += 1
  149. yield trololio.From(client_writer.drain())
  150. yield trololio.From(trololio.sleep(1))
  151. else:
  152. log.info("Sending %s", msg['data'])
  153. client_writer.write(('data: %s\n\n' % msg['data']).encode())
  154. yield trololio.From(client_writer.drain())
  155. except OSError:
  156. log.info("Client closed connection")
  157. except trololio.ConnectionResetError as err:
  158. log.exception("ERROR: ConnectionResetError in handle_client")
  159. except Exception as err:
  160. log.exception("ERROR: Exception in handle_client")
  161. log.info(type(err))
  162. finally:
  163. # Wathever happens, close the connection.
  164. log.info("Client left. Goodbye!")
  165. subscriber.close()
  166. client_writer.close()
  167. @trololio.coroutine
  168. def stats(client_reader, client_writer):
  169. try:
  170. log.info('Clients: %s', SERVER.active_count)
  171. client_writer.write((
  172. "HTTP/1.0 200 OK\n"
  173. "Cache: nocache\n\n"
  174. ).encode())
  175. client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
  176. yield trololio.From(client_writer.drain())
  177. except trololio.ConnectionResetError as err:
  178. log.info(err)
  179. finally:
  180. client_writer.close()
  181. return
  182. def main():
  183. global SERVER
  184. _get_session()
  185. try:
  186. loop = trololio.get_event_loop()
  187. coro = trololio.start_server(
  188. handle_client,
  189. host=None,
  190. port=pagure.config.config['EVENTSOURCE_PORT'],
  191. loop=loop)
  192. SERVER = loop.run_until_complete(coro)
  193. log.info(
  194. 'Serving server at {}'.format(SERVER.sockets[0].getsockname()))
  195. if pagure.config.config.get('EV_STATS_PORT'):
  196. stats_coro = trololio.start_server(
  197. stats,
  198. host=None,
  199. port=pagure.config.config.get('EV_STATS_PORT'),
  200. loop=loop)
  201. stats_server = loop.run_until_complete(stats_coro)
  202. log.info('Serving stats at {}'.format(
  203. stats_server.sockets[0].getsockname()))
  204. loop.run_forever()
  205. except KeyboardInterrupt:
  206. pass
  207. except trololio.ConnectionResetError as err:
  208. log.exception("ERROR: ConnectionResetError in main")
  209. except Exception:
  210. log.exception("ERROR: Exception in main")
  211. finally:
  212. # Close the server
  213. SERVER.close()
  214. if pagure.config.config.get('EV_STATS_PORT'):
  215. stats_server.close()
  216. log.info("End Connection")
  217. loop.run_until_complete(SERVER.wait_closed())
  218. loop.close()
  219. log.info("End")
  220. if __name__ == '__main__':
  221. log = logging.getLogger("")
  222. formatter = logging.Formatter(
  223. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  224. # setup console logging
  225. log.setLevel(logging.DEBUG)
  226. ch = logging.StreamHandler()
  227. ch.setLevel(logging.DEBUG)
  228. aslog = logging.getLogger("asyncio")
  229. aslog.setLevel(logging.DEBUG)
  230. ch.setFormatter(formatter)
  231. log.addHandler(ch)
  232. main()