pusher.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2016 OpenMarket Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import synapse
  17. from synapse.server import HomeServer
  18. from synapse.util.versionstring import get_version_string
  19. from synapse.config._base import ConfigError
  20. from synapse.config.database import DatabaseConfig
  21. from synapse.config.logger import LoggingConfig
  22. from synapse.replication.slave.storage.events import SlavedEventStore
  23. from synapse.replication.slave.storage.pushers import SlavedPusherStore
  24. from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
  25. from synapse.storage.engines import create_engine
  26. from synapse.storage import DataStore
  27. from synapse.util.async import sleep
  28. from synapse.util.logcontext import (LoggingContext, preserve_fn)
  29. from twisted.internet import reactor, defer
  30. import sys
  31. import logging
  32. logger = logging.getLogger("synapse.app.pusher")
  33. class SlaveConfig(DatabaseConfig):
  34. def read_config(self, config):
  35. self.replication_url = config["replication_url"]
  36. self.server_name = config["server_name"]
  37. self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
  38. "use_insecure_ssl_client_just_for_testing_do_not_use", False
  39. )
  40. self.user_agent_suffix = None
  41. self.start_pushers = True
  42. def default_config(self, **kwargs):
  43. return """\
  44. ## Slave ##
  45. #replication_url: https://localhost:{replication_port}/_synapse/replication
  46. report_stats: False
  47. """
  48. class PusherSlaveConfig(SlaveConfig, LoggingConfig):
  49. pass
  50. class PusherSlaveStore(
  51. SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore
  52. ):
  53. update_pusher_last_stream_ordering_and_success = (
  54. DataStore.update_pusher_last_stream_ordering_and_success.__func__
  55. )
  56. class PusherServer(HomeServer):
  57. def get_db_conn(self, run_new_connection=True):
  58. # Any param beginning with cp_ is a parameter for adbapi, and should
  59. # not be passed to the database engine.
  60. db_params = {
  61. k: v for k, v in self.db_config.get("args", {}).items()
  62. if not k.startswith("cp_")
  63. }
  64. db_conn = self.database_engine.module.connect(**db_params)
  65. if run_new_connection:
  66. self.database_engine.on_new_connection(db_conn)
  67. return db_conn
  68. def setup(self):
  69. logger.info("Setting up.")
  70. self.datastore = PusherSlaveStore(self.get_db_conn(), self)
  71. logger.info("Finished setting up.")
  72. def remove_pusher(self, app_id, push_key, user_id):
  73. http_client = self.get_simple_http_client()
  74. replication_url = self.config.replication_url
  75. url = replication_url + "/remove_pushers"
  76. return http_client.post_json_get_json(url, {
  77. "remove": [{
  78. "app_id": app_id,
  79. "push_key": push_key,
  80. "user_id": user_id,
  81. }]
  82. })
  83. @defer.inlineCallbacks
  84. def replicate(self):
  85. http_client = self.get_simple_http_client()
  86. store = self.get_datastore()
  87. replication_url = self.config.replication_url
  88. pusher_pool = self.get_pusherpool()
  89. def stop_pusher(user_id, app_id, pushkey):
  90. key = "%s:%s" % (app_id, pushkey)
  91. pushers_for_user = pusher_pool.pushers.get(user_id, {})
  92. pusher = pushers_for_user.pop(key, None)
  93. if pusher is None:
  94. return
  95. logger.info("Stopping pusher %r / %r", user_id, key)
  96. pusher.on_stop()
  97. def start_pusher(user_id, app_id, pushkey):
  98. key = "%s:%s" % (app_id, pushkey)
  99. logger.info("Starting pusher %r / %r", user_id, key)
  100. return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
  101. @defer.inlineCallbacks
  102. def poke_pushers(results):
  103. pushers_rows = set(
  104. map(tuple, results.get("pushers", {}).get("rows", []))
  105. )
  106. deleted_pushers_rows = set(
  107. map(tuple, results.get("deleted_pushers", {}).get("rows", []))
  108. )
  109. for row in sorted(pushers_rows | deleted_pushers_rows):
  110. if row in deleted_pushers_rows:
  111. user_id, app_id, pushkey = row[1:4]
  112. stop_pusher(user_id, app_id, pushkey)
  113. elif row in pushers_rows:
  114. user_id = row[1]
  115. app_id = row[5]
  116. pushkey = row[8]
  117. yield start_pusher(user_id, app_id, pushkey)
  118. stream = results.get("events")
  119. if stream:
  120. min_stream_id = stream["rows"][0][0]
  121. max_stream_id = stream["position"]
  122. preserve_fn(pusher_pool.on_new_notifications)(
  123. min_stream_id, max_stream_id
  124. )
  125. stream = results.get("receipts")
  126. if stream:
  127. rows = stream["rows"]
  128. affected_room_ids = set(row[1] for row in rows)
  129. min_stream_id = rows[0][0]
  130. max_stream_id = stream["position"]
  131. preserve_fn(pusher_pool.on_new_receipts)(
  132. min_stream_id, max_stream_id, affected_room_ids
  133. )
  134. while True:
  135. try:
  136. args = store.stream_positions()
  137. args["timeout"] = 30000
  138. result = yield http_client.get_json(replication_url, args=args)
  139. yield store.process_replication(result)
  140. poke_pushers(result)
  141. except:
  142. logger.exception("Error replicating from %r", replication_url)
  143. sleep(30)
  144. def setup(config_options):
  145. try:
  146. config = PusherSlaveConfig.load_config(
  147. "Synapse pusher", config_options
  148. )
  149. except ConfigError as e:
  150. sys.stderr.write("\n" + e.message + "\n")
  151. sys.exit(1)
  152. config.setup_logging()
  153. database_engine = create_engine(config.database_config)
  154. ps = PusherServer(
  155. config.server_name,
  156. db_config=config.database_config,
  157. config=config,
  158. version_string=get_version_string("Synapse", synapse),
  159. database_engine=database_engine,
  160. )
  161. ps.setup()
  162. def start():
  163. ps.replicate()
  164. ps.get_pusherpool().start()
  165. ps.get_datastore().start_profiling()
  166. reactor.callWhenRunning(start)
  167. return ps
  168. if __name__ == '__main__':
  169. with LoggingContext("main"):
  170. ps = setup(sys.argv[1:])
  171. reactor.run()