pagure_stream_server.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015-2017 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. Streaming server for pagure's eventsource feature
  7. This server takes messages sent to redis and publish them at the specified
  8. endpoint
  9. To test, run this script and in another terminal
  10. nc localhost 8080
  11. HELLO
  12. GET /test/issue/26?foo=bar HTTP/1.1
  13. """
  14. import logging
  15. import os
  16. import urlparse
  17. import redis
  18. import trollius
  19. log = logging.getLogger(__name__)
  20. if 'PAGURE_CONFIG' not in os.environ \
  21. and os.path.exists('/etc/pagure/pagure.cfg'):
  22. print 'Using configuration file `/etc/pagure/pagure.cfg`'
  23. os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
  24. import pagure # noqa: E402
  25. import pagure.lib # noqa: E402
  26. from pagure.exceptions import PagureEvException # noqa: E402
  27. SERVER = None
  28. POOL = redis.ConnectionPool(
  29. host=pagure.APP.config['REDIS_HOST'],
  30. port=pagure.APP.config['REDIS_PORT'],
  31. db=pagure.APP.config['REDIS_DB'])
  32. def _get_issue(repo, objid):
  33. """Get a Ticket (issue) instance for a given repo (Project) and
  34. objid (issue number).
  35. """
  36. issue = None
  37. if not repo.settings.get('issue_tracker', True):
  38. raise PagureEvException("No issue tracker found for this project")
  39. issue = pagure.lib.search_issues(
  40. pagure.SESSION, repo, issueid=objid)
  41. if issue is None or issue.project != repo:
  42. raise PagureEvException("Issue '%s' not found" % objid)
  43. if issue.private:
  44. # TODO: find a way to do auth
  45. raise PagureEvException(
  46. "This issue is private and you are not allowed to view it")
  47. return issue
  48. def _get_pull_request(repo, objid):
  49. """Get a PullRequest instance for a given repo (Project) and objid
  50. (request number).
  51. """
  52. if not repo.settings.get('pull_requests', True):
  53. raise PagureEvException(
  54. "No pull-request tracker found for this project")
  55. request = pagure.lib.search_pull_requests(
  56. pagure.SESSION, project_id=repo.id, requestid=objid)
  57. if request is None or request.project != repo:
  58. raise PagureEvException("Pull-Request '%s' not found" % objid)
  59. return request
  60. # Dict representing known object types that we handle requests for,
  61. # and the bound functions for getting an object instance from the
  62. # parsed path data. Has to come after the functions it binds
  63. OBJECTS = {
  64. 'issue': _get_issue,
  65. 'pull-request': _get_pull_request
  66. }
  67. def _parse_path(path):
  68. """Get the repo name, object type, object ID, and (if present)
  69. username and/or namespace from a URL path component. Will only
  70. handle the known object types from the OBJECTS dict. Assumes:
  71. * Project name comes immediately before object type
  72. * Object ID comes immediately after object type
  73. * If a fork, path starts with /fork/(username)
  74. * Namespace, if present, comes after fork username (if present) or at start
  75. * No other components come before the project name
  76. * None of the parsed items can contain a /
  77. """
  78. username = None
  79. namespace = None
  80. # path always starts with / so split and throw away first item
  81. items = path.split('/')[1:]
  82. # find the *last* match for any object type
  83. try:
  84. objtype = [item for item in items if item in OBJECTS][-1]
  85. except IndexError:
  86. raise PagureEvException(
  87. "No known object type found in path: %s" % path)
  88. try:
  89. # objid is the item after objtype, we need all items up to it
  90. items = items[:items.index(objtype) + 2]
  91. # now strip the repo, objtype and objid off the end
  92. (repo, objtype, objid) = items[-3:]
  93. items = items[:-3]
  94. except (IndexError, ValueError):
  95. raise PagureEvException(
  96. "No project or object ID found in path: %s" % path)
  97. # now check for a fork
  98. if items and items[0] == 'fork':
  99. try:
  100. # get the username and strip it and 'fork'
  101. username = items[1]
  102. items = items[2:]
  103. except IndexError:
  104. raise PagureEvException(
  105. "Path starts with /fork but no user found! Path: %s" % path)
  106. # if we still have an item left, it must be the namespace
  107. if items:
  108. namespace = items.pop(0)
  109. # if we have any items left at this point, we've no idea
  110. if items:
  111. raise PagureEvException(
  112. "More path components than expected! Path: %s" % path)
  113. return username, namespace, repo, objtype, objid
  114. def get_obj_from_path(path):
  115. """ Return the Ticket or Request object based on the path provided.
  116. """
  117. (username, namespace, reponame, objtype, objid) = _parse_path(path)
  118. repo = pagure.get_authorized_project(
  119. pagure.SESSION, reponame, user=username, namespace=namespace)
  120. if repo is None:
  121. raise PagureEvException("Project '%s' not found" % reponame)
  122. # find the appropriate object getter function from OBJECTS
  123. try:
  124. getfunc = OBJECTS[objtype]
  125. except KeyError:
  126. raise PagureEvException("Invalid object provided: '%s'" % objtype)
  127. return getfunc(repo, objid)
  128. @trollius.coroutine
  129. def handle_client(client_reader, client_writer):
  130. data = None
  131. while True:
  132. # give client a chance to respond, timeout after 10 seconds
  133. line = yield trollius.From(trollius.wait_for(
  134. client_reader.readline(),
  135. timeout=10.0))
  136. if not line.decode().strip():
  137. break
  138. line = line.decode().rstrip()
  139. if data is None:
  140. data = line
  141. if data is None:
  142. log.warning("Expected ticket uid, received None")
  143. return
  144. data = data.decode().rstrip().split()
  145. log.info("Received %s", data)
  146. if not data:
  147. log.warning("No URL provided: %s" % data)
  148. return
  149. if '/' not in data[1]:
  150. log.warning("Invalid URL provided: %s" % data[1])
  151. return
  152. url = urlparse.urlsplit(data[1])
  153. try:
  154. obj = get_obj_from_path(url.path)
  155. except PagureEvException as err:
  156. log.warning(err.message)
  157. return
  158. origin = pagure.APP.config.get('APP_URL')
  159. if origin.endswith('/'):
  160. origin = origin[:-1]
  161. client_writer.write((
  162. "HTTP/1.0 200 OK\n"
  163. "Content-Type: text/event-stream\n"
  164. "Cache: nocache\n"
  165. "Connection: keep-alive\n"
  166. "Access-Control-Allow-Origin: %s\n\n" % origin
  167. ).encode())
  168. conn = redis.Redis(connection_pool=POOL)
  169. subscriber = conn.pubsub(ignore_subscribe_messages=True)
  170. try:
  171. subscriber.subscribe('pagure.%s' % obj.uid)
  172. # Inside a while loop, wait for incoming events.
  173. oncall = 0
  174. while True:
  175. msg = subscriber.get_message()
  176. if msg is None:
  177. # Send a ping to see if the client is still alive
  178. if oncall >= 5:
  179. # Only send a ping once every 5 seconds
  180. client_writer.write(('event: ping\n\n').encode())
  181. oncall = 0
  182. oncall += 1
  183. yield trollius.From(client_writer.drain())
  184. yield trollius.From(trollius.sleep(1))
  185. else:
  186. log.info("Sending %s", msg['data'])
  187. client_writer.write(('data: %s\n\n' % msg['data']).encode())
  188. yield trollius.From(client_writer.drain())
  189. except OSError:
  190. log.info("Client closed connection")
  191. except trollius.ConnectionResetError as err:
  192. log.exception("ERROR: ConnectionResetError in handle_client")
  193. except Exception as err:
  194. log.exception("ERROR: Exception in handle_client")
  195. log.info(type(err))
  196. finally:
  197. # Wathever happens, close the connection.
  198. log.info("Client left. Goodbye!")
  199. subscriber.close()
  200. client_writer.close()
  201. @trollius.coroutine
  202. def stats(client_reader, client_writer):
  203. try:
  204. log.info('Clients: %s', SERVER.active_count)
  205. client_writer.write((
  206. "HTTP/1.0 200 OK\n"
  207. "Cache: nocache\n\n"
  208. ).encode())
  209. client_writer.write(('data: %s\n\n' % SERVER.active_count).encode())
  210. yield trollius.From(client_writer.drain())
  211. except trollius.ConnectionResetError as err:
  212. log.info(err)
  213. finally:
  214. client_writer.close()
  215. return
  216. def main():
  217. global SERVER
  218. try:
  219. loop = trollius.get_event_loop()
  220. coro = trollius.start_server(
  221. handle_client,
  222. host=None,
  223. port=pagure.APP.config['EVENTSOURCE_PORT'],
  224. loop=loop)
  225. SERVER = loop.run_until_complete(coro)
  226. log.info(
  227. 'Serving server at {}'.format(SERVER.sockets[0].getsockname()))
  228. if pagure.APP.config.get('EV_STATS_PORT'):
  229. stats_coro = trollius.start_server(
  230. stats,
  231. host=None,
  232. port=pagure.APP.config.get('EV_STATS_PORT'),
  233. loop=loop)
  234. stats_server = loop.run_until_complete(stats_coro)
  235. log.info('Serving stats at {}'.format(
  236. stats_server.sockets[0].getsockname()))
  237. loop.run_forever()
  238. except KeyboardInterrupt:
  239. pass
  240. except trollius.ConnectionResetError as err:
  241. log.exception("ERROR: ConnectionResetError in main")
  242. except Exception:
  243. log.exception("ERROR: Exception in main")
  244. finally:
  245. # Close the server
  246. SERVER.close()
  247. if pagure.APP.config.get('EV_STATS_PORT'):
  248. stats_server.close()
  249. log.info("End Connection")
  250. loop.run_until_complete(SERVER.wait_closed())
  251. loop.close()
  252. log.info("End")
  253. if __name__ == '__main__':
  254. log = logging.getLogger("")
  255. formatter = logging.Formatter(
  256. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
  257. # setup console logging
  258. log.setLevel(logging.DEBUG)
  259. ch = logging.StreamHandler()
  260. ch.setLevel(logging.DEBUG)
  261. aslog = logging.getLogger("asyncio")
  262. aslog.setLevel(logging.DEBUG)
  263. ch.setFormatter(formatter)
  264. log.addHandler(ch)
  265. main()