pusher.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2016 OpenMarket Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import synapse
  17. from synapse.server import HomeServer
  18. from synapse.config._base import ConfigError
  19. from synapse.config.database import DatabaseConfig
  20. from synapse.config.logger import LoggingConfig
  21. from synapse.http.site import SynapseSite
  22. from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
  23. from synapse.replication.slave.storage.events import SlavedEventStore
  24. from synapse.replication.slave.storage.pushers import SlavedPusherStore
  25. from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
  26. from synapse.storage.engines import create_engine
  27. from synapse.storage import DataStore
  28. from synapse.util.async import sleep
  29. from synapse.util.httpresourcetree import create_resource_tree
  30. from synapse.util.logcontext import LoggingContext, preserve_fn
  31. from synapse.util.manhole import manhole
  32. from synapse.util.rlimit import change_resource_limit
  33. from synapse.util.versionstring import get_version_string
  34. from twisted.internet import reactor, defer
  35. from twisted.web.resource import Resource
  36. from daemonize import Daemonize
  37. import sys
  38. import logging
  39. logger = logging.getLogger("synapse.app.pusher")
  40. class SlaveConfig(DatabaseConfig):
  41. def read_config(self, config):
  42. self.replication_url = config["replication_url"]
  43. self.server_name = config["server_name"]
  44. self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
  45. "use_insecure_ssl_client_just_for_testing_do_not_use", False
  46. )
  47. self.user_agent_suffix = None
  48. self.start_pushers = True
  49. self.listeners = config["listeners"]
  50. self.soft_file_limit = config.get("soft_file_limit")
  51. self.daemonize = config.get("daemonize")
  52. self.pid_file = self.abspath(config.get("pid_file"))
  53. def default_config(self, server_name, **kwargs):
  54. pid_file = self.abspath("pusher.pid")
  55. return """\
  56. # Slave configuration
  57. # The replication listener on the synapse to talk to.
  58. #replication_url: https://localhost:{replication_port}/_synapse/replication
  59. server_name: "%(server_name)s"
  60. listeners: []
  61. # Enable a ssh manhole listener on the pusher.
  62. # - type: manhole
  63. # port: {manhole_port}
  64. # bind_address: 127.0.0.1
  65. # Enable a metric listener on the pusher.
  66. # - type: http
  67. # port: {metrics_port}
  68. # bind_address: 127.0.0.1
  69. # resources:
  70. # - names: ["metrics"]
  71. # compress: False
  72. report_stats: False
  73. daemonize: False
  74. pid_file: %(pid_file)s
  75. """ % locals()
  76. class PusherSlaveConfig(SlaveConfig, LoggingConfig):
  77. pass
  78. class PusherSlaveStore(
  79. SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore
  80. ):
  81. update_pusher_last_stream_ordering_and_success = (
  82. DataStore.update_pusher_last_stream_ordering_and_success.__func__
  83. )
  84. update_pusher_failing_since = (
  85. DataStore.update_pusher_failing_since.__func__
  86. )
  87. update_pusher_last_stream_ordering = (
  88. DataStore.update_pusher_last_stream_ordering.__func__
  89. )
  90. class PusherServer(HomeServer):
  91. def get_db_conn(self, run_new_connection=True):
  92. # Any param beginning with cp_ is a parameter for adbapi, and should
  93. # not be passed to the database engine.
  94. db_params = {
  95. k: v for k, v in self.db_config.get("args", {}).items()
  96. if not k.startswith("cp_")
  97. }
  98. db_conn = self.database_engine.module.connect(**db_params)
  99. if run_new_connection:
  100. self.database_engine.on_new_connection(db_conn)
  101. return db_conn
  102. def setup(self):
  103. logger.info("Setting up.")
  104. self.datastore = PusherSlaveStore(self.get_db_conn(), self)
  105. logger.info("Finished setting up.")
  106. def remove_pusher(self, app_id, push_key, user_id):
  107. http_client = self.get_simple_http_client()
  108. replication_url = self.config.replication_url
  109. url = replication_url + "/remove_pushers"
  110. return http_client.post_json_get_json(url, {
  111. "remove": [{
  112. "app_id": app_id,
  113. "push_key": push_key,
  114. "user_id": user_id,
  115. }]
  116. })
  117. def _listen_http(self, listener_config):
  118. port = listener_config["port"]
  119. bind_address = listener_config.get("bind_address", "")
  120. site_tag = listener_config.get("tag", port)
  121. resources = {}
  122. for res in listener_config["resources"]:
  123. for name in res["names"]:
  124. if name == "metrics":
  125. resources[METRICS_PREFIX] = MetricsResource(self)
  126. root_resource = create_resource_tree(resources, Resource())
  127. reactor.listenTCP(
  128. port,
  129. SynapseSite(
  130. "synapse.access.http.%s" % (site_tag,),
  131. site_tag,
  132. listener_config,
  133. root_resource,
  134. ),
  135. interface=bind_address
  136. )
  137. logger.info("Synapse pusher now listening on port %d", port)
  138. def start_listening(self):
  139. for listener in self.config.listeners:
  140. if listener["type"] == "http":
  141. self._listen_http(listener)
  142. elif listener["type"] == "manhole":
  143. reactor.listenTCP(
  144. listener["port"],
  145. manhole(
  146. username="matrix",
  147. password="rabbithole",
  148. globals={"hs": self},
  149. ),
  150. interface=listener.get("bind_address", '127.0.0.1')
  151. )
  152. else:
  153. logger.warn("Unrecognized listener type: %s", listener["type"])
  154. @defer.inlineCallbacks
  155. def replicate(self):
  156. http_client = self.get_simple_http_client()
  157. store = self.get_datastore()
  158. replication_url = self.config.replication_url
  159. pusher_pool = self.get_pusherpool()
  160. def stop_pusher(user_id, app_id, pushkey):
  161. key = "%s:%s" % (app_id, pushkey)
  162. pushers_for_user = pusher_pool.pushers.get(user_id, {})
  163. pusher = pushers_for_user.pop(key, None)
  164. if pusher is None:
  165. return
  166. logger.info("Stopping pusher %r / %r", user_id, key)
  167. pusher.on_stop()
  168. def start_pusher(user_id, app_id, pushkey):
  169. key = "%s:%s" % (app_id, pushkey)
  170. logger.info("Starting pusher %r / %r", user_id, key)
  171. return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
  172. @defer.inlineCallbacks
  173. def poke_pushers(results):
  174. pushers_rows = set(
  175. map(tuple, results.get("pushers", {}).get("rows", []))
  176. )
  177. deleted_pushers_rows = set(
  178. map(tuple, results.get("deleted_pushers", {}).get("rows", []))
  179. )
  180. for row in sorted(pushers_rows | deleted_pushers_rows):
  181. if row in deleted_pushers_rows:
  182. user_id, app_id, pushkey = row[1:4]
  183. stop_pusher(user_id, app_id, pushkey)
  184. elif row in pushers_rows:
  185. user_id = row[1]
  186. app_id = row[5]
  187. pushkey = row[8]
  188. yield start_pusher(user_id, app_id, pushkey)
  189. stream = results.get("events")
  190. if stream:
  191. min_stream_id = stream["rows"][0][0]
  192. max_stream_id = stream["position"]
  193. preserve_fn(pusher_pool.on_new_notifications)(
  194. min_stream_id, max_stream_id
  195. )
  196. stream = results.get("receipts")
  197. if stream:
  198. rows = stream["rows"]
  199. affected_room_ids = set(row[1] for row in rows)
  200. min_stream_id = rows[0][0]
  201. max_stream_id = stream["position"]
  202. preserve_fn(pusher_pool.on_new_receipts)(
  203. min_stream_id, max_stream_id, affected_room_ids
  204. )
  205. while True:
  206. try:
  207. args = store.stream_positions()
  208. args["timeout"] = 30000
  209. result = yield http_client.get_json(replication_url, args=args)
  210. yield store.process_replication(result)
  211. poke_pushers(result)
  212. except:
  213. logger.exception("Error replicating from %r", replication_url)
  214. sleep(30)
  215. def setup(config_options):
  216. try:
  217. config = PusherSlaveConfig.load_config(
  218. "Synapse pusher", config_options
  219. )
  220. except ConfigError as e:
  221. sys.stderr.write("\n" + e.message + "\n")
  222. sys.exit(1)
  223. if not config:
  224. sys.exit(0)
  225. config.setup_logging()
  226. database_engine = create_engine(config.database_config)
  227. ps = PusherServer(
  228. config.server_name,
  229. db_config=config.database_config,
  230. config=config,
  231. version_string=get_version_string("Synapse", synapse),
  232. database_engine=database_engine,
  233. )
  234. ps.setup()
  235. ps.start_listening()
  236. change_resource_limit(ps.config.soft_file_limit)
  237. def start():
  238. ps.replicate()
  239. ps.get_pusherpool().start()
  240. ps.get_datastore().start_profiling()
  241. reactor.callWhenRunning(start)
  242. return ps
  243. if __name__ == '__main__':
  244. with LoggingContext("main"):
  245. ps = setup(sys.argv[1:])
  246. if ps.config.daemonize:
  247. def run():
  248. with LoggingContext("run"):
  249. change_resource_limit(ps.config.soft_file_limit)
  250. reactor.run()
  251. daemon = Daemonize(
  252. app="synapse-pusher",
  253. pid=ps.config.pid_file,
  254. action=run,
  255. auto_close_fds=False,
  256. verbose=True,
  257. logger=logger,
  258. )
  259. daemon.start()
  260. else:
  261. reactor.run()