pagure_stream_server.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. #!/usr/bin/env python
  2. """
  3. (c) 2015-2017 - Copyright Red Hat Inc
  4. Authors:
  5. Pierre-Yves Chibon <pingou@pingoured.fr>
  6. Streaming server for pagure's eventsource feature
  7. This server takes messages sent to redis and publish them at the specified
  8. endpoint
  9. To test, run this script and in another terminal
  10. nc localhost 8080
  11. HELLO
  12. GET /test/issue/26?foo=bar HTTP/1.1
  13. """
  14. from __future__ import unicode_literals, absolute_import
  15. import logging
  16. import os
  17. import redis
  18. import trololio
  19. from six.moves.urllib.parse import urlparse
  20. log = logging.getLogger(__name__)
  21. if "PAGURE_CONFIG" not in os.environ and os.path.exists(
  22. "/etc/pagure/pagure.cfg"
  23. ):
  24. print("Using configuration file `/etc/pagure/pagure.cfg`")
  25. os.environ["PAGURE_CONFIG"] = "/etc/pagure/pagure.cfg"
  26. import pagure # noqa: E402
  27. import pagure.lib.model_base # noqa: E402
  28. import pagure.lib.query # noqa: E402
  29. from pagure.exceptions import PagureException, PagureEvException # noqa: E402
  30. SERVER = None
  31. SESSION = None
  32. POOL = redis.ConnectionPool(
  33. host=pagure.config.config["REDIS_HOST"],
  34. port=pagure.config.config["REDIS_PORT"],
  35. db=pagure.config.config["REDIS_DB"],
  36. )
  37. def _get_session():
  38. global SESSION
  39. if SESSION is None:
  40. print(pagure.config.config["DB_URL"])
  41. SESSION = pagure.lib.model_base.create_session(
  42. pagure.config.config["DB_URL"]
  43. )
  44. return SESSION
  45. def _get_issue(repo, objid):
  46. """Get a Ticket (issue) instance for a given repo (Project) and
  47. objid (issue number).
  48. """
  49. issue = None
  50. if not repo.settings.get("issue_tracker", True):
  51. raise PagureEvException("No issue tracker found for this project")
  52. session = _get_session()
  53. issue = pagure.lib.query.search_issues(session, repo, issueid=objid)
  54. if issue is None or issue.project != repo:
  55. raise PagureEvException("Issue '%s' not found" % objid)
  56. if issue.private:
  57. # TODO: find a way to do auth
  58. raise PagureEvException(
  59. "This issue is private and you are not allowed to view it"
  60. )
  61. return issue
  62. def _get_pull_request(repo, objid):
  63. """Get a PullRequest instance for a given repo (Project) and objid
  64. (request number).
  65. """
  66. if not repo.settings.get("pull_requests", True):
  67. raise PagureEvException(
  68. "No pull-request tracker found for this project"
  69. )
  70. session = _get_session()
  71. request = pagure.lib.query.search_pull_requests(
  72. session, project_id=repo.id, requestid=objid
  73. )
  74. if request is None or request.project != repo:
  75. raise PagureEvException("Pull-Request '%s' not found" % objid)
  76. return request
  77. # Dict representing known object types that we handle requests for,
  78. # and the bound functions for getting an object instance from the
  79. # parsed path data. Has to come after the functions it binds
  80. OBJECTS = {"issue": _get_issue, "pull-request": _get_pull_request}
  81. def get_obj_from_path(path):
  82. """ Return the Ticket or Request object based on the path provided.
  83. """
  84. (username, namespace, reponame, objtype, objid) = pagure.utils.parse_path(
  85. path
  86. )
  87. session = _get_session()
  88. repo = pagure.lib.query.get_authorized_project(
  89. session, reponame, user=username, namespace=namespace
  90. )
  91. if repo is None:
  92. raise PagureEvException("Project '%s' not found" % reponame)
  93. # find the appropriate object getter function from OBJECTS
  94. try:
  95. getfunc = OBJECTS[objtype]
  96. except KeyError:
  97. raise PagureEvException("Invalid object provided: '%s'" % objtype)
  98. return getfunc(repo, objid)
  99. @trololio.coroutine
  100. def handle_client(client_reader, client_writer):
  101. data = None
  102. while True:
  103. # give client a chance to respond, timeout after 10 seconds
  104. line = yield trololio.From(
  105. trololio.asyncio.wait_for(client_reader.readline(), timeout=10.0)
  106. )
  107. if not line.decode().strip():
  108. break
  109. line = line.decode().rstrip()
  110. if data is None:
  111. data = line
  112. if data is None:
  113. log.warning("Expected ticket uid, received None")
  114. return
  115. data = data.decode().rstrip().split()
  116. log.info("Received %s", data)
  117. if not data:
  118. log.warning("No URL provided: %s" % data)
  119. return
  120. if "/" not in data[1]:
  121. log.warning("Invalid URL provided: %s" % data[1])
  122. return
  123. url = urlparse(data[1])
  124. try:
  125. obj = get_obj_from_path(url.path)
  126. except PagureException as err:
  127. log.warning(err.message)
  128. return
  129. origin = pagure.config.config.get("APP_URL")
  130. if origin.endswith("/"):
  131. origin = origin[:-1]
  132. client_writer.write(
  133. (
  134. "HTTP/1.0 200 OK\n"
  135. "Content-Type: text/event-stream\n"
  136. "Cache: nocache\n"
  137. "Connection: keep-alive\n"
  138. "Access-Control-Allow-Origin: %s\n\n" % origin
  139. ).encode()
  140. )
  141. conn = redis.Redis(connection_pool=POOL)
  142. subscriber = conn.pubsub(ignore_subscribe_messages=True)
  143. try:
  144. subscriber.subscribe("pagure.%s" % obj.uid)
  145. # Inside a while loop, wait for incoming events.
  146. oncall = 0
  147. while True:
  148. msg = subscriber.get_message()
  149. if msg is None:
  150. # Send a ping to see if the client is still alive
  151. if oncall >= 5:
  152. # Only send a ping once every 5 seconds
  153. client_writer.write(("event: ping\n\n").encode())
  154. oncall = 0
  155. oncall += 1
  156. yield trololio.From(client_writer.drain())
  157. yield trololio.From(trololio.asyncio.sleep(1))
  158. else:
  159. log.info("Sending %s", msg["data"])
  160. client_writer.write(("data: %s\n\n" % msg["data"]).encode())
  161. yield trololio.From(client_writer.drain())
  162. except OSError:
  163. log.info("Client closed connection")
  164. except trololio.ConnectionResetError as err:
  165. log.exception("ERROR: ConnectionResetError in handle_client")
  166. except Exception as err:
  167. log.exception("ERROR: Exception in handle_client")
  168. log.info(type(err))
  169. finally:
  170. # Wathever happens, close the connection.
  171. log.info("Client left. Goodbye!")
  172. subscriber.close()
  173. client_writer.close()
  174. @trololio.coroutine
  175. def stats(client_reader, client_writer):
  176. try:
  177. log.info("Clients: %s", SERVER.active_count)
  178. client_writer.write(
  179. ("HTTP/1.0 200 OK\n" "Cache: nocache\n\n").encode()
  180. )
  181. client_writer.write(("data: %s\n\n" % SERVER.active_count).encode())
  182. yield trololio.From(client_writer.drain())
  183. except trololio.ConnectionResetError as err:
  184. log.info(err)
  185. finally:
  186. client_writer.close()
  187. return
  188. def main():
  189. global SERVER
  190. _get_session()
  191. try:
  192. loop = trololio.asyncio.get_event_loop()
  193. coro = trololio.asyncio.start_server(
  194. handle_client,
  195. host=None,
  196. port=pagure.config.config["EVENTSOURCE_PORT"],
  197. loop=loop,
  198. )
  199. SERVER = loop.run_until_complete(coro)
  200. log.info(
  201. "Serving server at {}".format(SERVER.sockets[0].getsockname())
  202. )
  203. if pagure.config.config.get("EV_STATS_PORT"):
  204. stats_coro = trololio.asyncio.start_server(
  205. stats,
  206. host=None,
  207. port=pagure.config.config.get("EV_STATS_PORT"),
  208. loop=loop,
  209. )
  210. stats_server = loop.run_until_complete(stats_coro)
  211. log.info(
  212. "Serving stats at {}".format(
  213. stats_server.sockets[0].getsockname()
  214. )
  215. )
  216. loop.run_forever()
  217. except KeyboardInterrupt:
  218. pass
  219. except trololio.ConnectionResetError as err:
  220. log.exception("ERROR: ConnectionResetError in main")
  221. except Exception:
  222. log.exception("ERROR: Exception in main")
  223. finally:
  224. # Close the server
  225. SERVER.close()
  226. if pagure.config.config.get("EV_STATS_PORT"):
  227. stats_server.close()
  228. log.info("End Connection")
  229. loop.run_until_complete(SERVER.wait_closed())
  230. loop.close()
  231. log.info("End")
  232. if __name__ == "__main__":
  233. log = logging.getLogger("")
  234. formatter = logging.Formatter(
  235. "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s"
  236. )
  237. # setup console logging
  238. log.setLevel(logging.DEBUG)
  239. ch = logging.StreamHandler()
  240. ch.setLevel(logging.DEBUG)
  241. aslog = logging.getLogger("asyncio")
  242. aslog.setLevel(logging.DEBUG)
  243. ch.setFormatter(formatter)
  244. log.addHandler(ch)
  245. main()