_base.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import gc
  16. import logging
  17. import os
  18. import signal
  19. import socket
  20. import sys
  21. import traceback
  22. from daemonize import Daemonize
  23. from twisted.internet import defer, error, reactor
  24. from twisted.protocols.tls import TLSMemoryBIOFactory
  25. import synapse
  26. from synapse.app import check_bind_error
  27. from synapse.crypto import context_factory
  28. from synapse.logging.context import PreserveLoggingContext
  29. from synapse.util.async_helpers import Linearizer
  30. from synapse.util.rlimit import change_resource_limit
  31. from synapse.util.versionstring import get_version_string
  32. logger = logging.getLogger(__name__)
  33. # list of tuples of function, args list, kwargs dict
  34. _sighup_callbacks = []
  35. def register_sighup(func, *args, **kwargs):
  36. """
  37. Register a function to be called when a SIGHUP occurs.
  38. Args:
  39. func (function): Function to be called when sent a SIGHUP signal.
  40. Will be called with a single default argument, the homeserver.
  41. *args, **kwargs: args and kwargs to be passed to the target function.
  42. """
  43. _sighup_callbacks.append((func, args, kwargs))
  44. def start_worker_reactor(appname, config, run_command=reactor.run):
  45. """ Run the reactor in the main process
  46. Daemonizes if necessary, and then configures some resources, before starting
  47. the reactor. Pulls configuration from the 'worker' settings in 'config'.
  48. Args:
  49. appname (str): application name which will be sent to syslog
  50. config (synapse.config.Config): config object
  51. run_command (Callable[]): callable that actually runs the reactor
  52. """
  53. logger = logging.getLogger(config.worker_app)
  54. start_reactor(
  55. appname,
  56. soft_file_limit=config.soft_file_limit,
  57. gc_thresholds=config.gc_thresholds,
  58. pid_file=config.worker_pid_file,
  59. daemonize=config.worker_daemonize,
  60. print_pidfile=config.print_pidfile,
  61. logger=logger,
  62. run_command=run_command,
  63. )
  64. def start_reactor(
  65. appname,
  66. soft_file_limit,
  67. gc_thresholds,
  68. pid_file,
  69. daemonize,
  70. print_pidfile,
  71. logger,
  72. run_command=reactor.run,
  73. ):
  74. """ Run the reactor in the main process
  75. Daemonizes if necessary, and then configures some resources, before starting
  76. the reactor
  77. Args:
  78. appname (str): application name which will be sent to syslog
  79. soft_file_limit (int):
  80. gc_thresholds:
  81. pid_file (str): name of pid file to write to if daemonize is True
  82. daemonize (bool): true to run the reactor in a background process
  83. print_pidfile (bool): whether to print the pid file, if daemonize is True
  84. logger (logging.Logger): logger instance to pass to Daemonize
  85. run_command (Callable[]): callable that actually runs the reactor
  86. """
  87. install_dns_limiter(reactor)
  88. def run():
  89. logger.info("Running")
  90. change_resource_limit(soft_file_limit)
  91. if gc_thresholds:
  92. gc.set_threshold(*gc_thresholds)
  93. run_command()
  94. # make sure that we run the reactor with the sentinel log context,
  95. # otherwise other PreserveLoggingContext instances will get confused
  96. # and complain when they see the logcontext arbitrarily swapping
  97. # between the sentinel and `run` logcontexts.
  98. #
  99. # We also need to drop the logcontext before forking if we're daemonizing,
  100. # otherwise the cputime metrics get confused about the per-thread resource usage
  101. # appearing to go backwards.
  102. with PreserveLoggingContext():
  103. if daemonize:
  104. if print_pidfile:
  105. print(pid_file)
  106. daemon = Daemonize(
  107. app=appname,
  108. pid=pid_file,
  109. action=run,
  110. auto_close_fds=False,
  111. verbose=True,
  112. logger=logger,
  113. )
  114. daemon.start()
  115. else:
  116. run()
  117. def quit_with_error(error_string):
  118. message_lines = error_string.split("\n")
  119. line_length = max(len(l) for l in message_lines if len(l) < 80) + 2
  120. sys.stderr.write("*" * line_length + "\n")
  121. for line in message_lines:
  122. sys.stderr.write(" %s\n" % (line.rstrip(),))
  123. sys.stderr.write("*" * line_length + "\n")
  124. sys.exit(1)
  125. def listen_metrics(bind_addresses, port):
  126. """
  127. Start Prometheus metrics server.
  128. """
  129. from synapse.metrics import RegistryProxy, start_http_server
  130. for host in bind_addresses:
  131. logger.info("Starting metrics listener on %s:%d", host, port)
  132. start_http_server(port, addr=host, registry=RegistryProxy)
  133. def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
  134. """
  135. Create a TCP socket for a port and several addresses
  136. Returns:
  137. list[twisted.internet.tcp.Port]: listening for TCP connections
  138. """
  139. r = []
  140. for address in bind_addresses:
  141. try:
  142. r.append(reactor.listenTCP(port, factory, backlog, address))
  143. except error.CannotListenError as e:
  144. check_bind_error(e, address, bind_addresses)
  145. return r
  146. def listen_ssl(
  147. bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
  148. ):
  149. """
  150. Create an TLS-over-TCP socket for a port and several addresses
  151. Returns:
  152. list of twisted.internet.tcp.Port listening for TLS connections
  153. """
  154. r = []
  155. for address in bind_addresses:
  156. try:
  157. r.append(
  158. reactor.listenSSL(port, factory, context_factory, backlog, address)
  159. )
  160. except error.CannotListenError as e:
  161. check_bind_error(e, address, bind_addresses)
  162. return r
  163. def refresh_certificate(hs):
  164. """
  165. Refresh the TLS certificates that Synapse is using by re-reading them from
  166. disk and updating the TLS context factories to use them.
  167. """
  168. if not hs.config.has_tls_listener():
  169. # attempt to reload the certs for the good of the tls_fingerprints
  170. hs.config.read_certificate_from_disk(require_cert_and_key=False)
  171. return
  172. hs.config.read_certificate_from_disk(require_cert_and_key=True)
  173. hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config)
  174. if hs._listening_services:
  175. logger.info("Updating context factories...")
  176. for i in hs._listening_services:
  177. # When you listenSSL, it doesn't make an SSL port but a TCP one with
  178. # a TLS wrapping factory around the factory you actually want to get
  179. # requests. This factory attribute is public but missing from
  180. # Twisted's documentation.
  181. if isinstance(i.factory, TLSMemoryBIOFactory):
  182. addr = i.getHost()
  183. logger.info(
  184. "Replacing TLS context factory on [%s]:%i", addr.host, addr.port
  185. )
  186. # We want to replace TLS factories with a new one, with the new
  187. # TLS configuration. We do this by reaching in and pulling out
  188. # the wrappedFactory, and then re-wrapping it.
  189. i.factory = TLSMemoryBIOFactory(
  190. hs.tls_server_context_factory, False, i.factory.wrappedFactory
  191. )
  192. logger.info("Context factories updated.")
  193. def start(hs, listeners=None):
  194. """
  195. Start a Synapse server or worker.
  196. Should be called once the reactor is running and (if we're using ACME) the
  197. TLS certificates are in place.
  198. Will start the main HTTP listeners and do some other startup tasks, and then
  199. notify systemd.
  200. Args:
  201. hs (synapse.server.HomeServer)
  202. listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
  203. """
  204. try:
  205. # Set up the SIGHUP machinery.
  206. if hasattr(signal, "SIGHUP"):
  207. def handle_sighup(*args, **kwargs):
  208. # Tell systemd our state, if we're using it. This will silently fail if
  209. # we're not using systemd.
  210. sdnotify(b"RELOADING=1")
  211. for i, args, kwargs in _sighup_callbacks:
  212. i(hs, *args, **kwargs)
  213. sdnotify(b"READY=1")
  214. signal.signal(signal.SIGHUP, handle_sighup)
  215. register_sighup(refresh_certificate)
  216. # Load the certificate from disk.
  217. refresh_certificate(hs)
  218. # Start the tracer
  219. synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
  220. hs.config
  221. )
  222. # It is now safe to start your Synapse.
  223. hs.start_listening(listeners)
  224. hs.get_datastore().db.start_profiling()
  225. setup_sentry(hs)
  226. setup_sdnotify(hs)
  227. # We now freeze all allocated objects in the hopes that (almost)
  228. # everything currently allocated are things that will be used for the
  229. # rest of time. Doing so means less work each GC (hopefully).
  230. #
  231. # This only works on Python 3.7
  232. if sys.version_info >= (3, 7):
  233. gc.collect()
  234. gc.freeze()
  235. except Exception:
  236. traceback.print_exc(file=sys.stderr)
  237. reactor = hs.get_reactor()
  238. if reactor.running:
  239. reactor.stop()
  240. sys.exit(1)
  241. def setup_sentry(hs):
  242. """Enable sentry integration, if enabled in configuration
  243. Args:
  244. hs (synapse.server.HomeServer)
  245. """
  246. if not hs.config.sentry_enabled:
  247. return
  248. import sentry_sdk
  249. sentry_sdk.init(dsn=hs.config.sentry_dsn, release=get_version_string(synapse))
  250. # We set some default tags that give some context to this instance
  251. with sentry_sdk.configure_scope() as scope:
  252. scope.set_tag("matrix_server_name", hs.config.server_name)
  253. app = hs.config.worker_app if hs.config.worker_app else "synapse.app.homeserver"
  254. name = hs.config.worker_name if hs.config.worker_name else "master"
  255. scope.set_tag("worker_app", app)
  256. scope.set_tag("worker_name", name)
  257. def setup_sdnotify(hs):
  258. """Adds process state hooks to tell systemd what we are up to.
  259. """
  260. # Tell systemd our state, if we're using it. This will silently fail if
  261. # we're not using systemd.
  262. sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))
  263. hs.get_reactor().addSystemEventTrigger(
  264. "before", "shutdown", sdnotify, b"STOPPING=1"
  265. )
  266. def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
  267. """Replaces the resolver with one that limits the number of in flight DNS
  268. requests.
  269. This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
  270. can run out of file descriptors and infinite loop if we attempt to do too
  271. many DNS queries at once
  272. """
  273. new_resolver = _LimitedHostnameResolver(
  274. reactor.nameResolver, max_dns_requests_in_flight
  275. )
  276. reactor.installNameResolver(new_resolver)
  277. class _LimitedHostnameResolver(object):
  278. """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups.
  279. """
  280. def __init__(self, resolver, max_dns_requests_in_flight):
  281. self._resolver = resolver
  282. self._limiter = Linearizer(
  283. name="dns_client_limiter", max_count=max_dns_requests_in_flight
  284. )
  285. def resolveHostName(
  286. self,
  287. resolutionReceiver,
  288. hostName,
  289. portNumber=0,
  290. addressTypes=None,
  291. transportSemantics="TCP",
  292. ):
  293. # We need this function to return `resolutionReceiver` so we do all the
  294. # actual logic involving deferreds in a separate function.
  295. # even though this is happening within the depths of twisted, we need to drop
  296. # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
  297. # the logcontext if it returns an incomplete deferred; (b) _resolve will
  298. # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
  299. with PreserveLoggingContext():
  300. self._resolve(
  301. resolutionReceiver,
  302. hostName,
  303. portNumber,
  304. addressTypes,
  305. transportSemantics,
  306. )
  307. return resolutionReceiver
  308. @defer.inlineCallbacks
  309. def _resolve(
  310. self,
  311. resolutionReceiver,
  312. hostName,
  313. portNumber=0,
  314. addressTypes=None,
  315. transportSemantics="TCP",
  316. ):
  317. with (yield self._limiter.queue(())):
  318. # resolveHostName doesn't return a Deferred, so we need to hook into
  319. # the receiver interface to get told when resolution has finished.
  320. deferred = defer.Deferred()
  321. receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
  322. self._resolver.resolveHostName(
  323. receiver, hostName, portNumber, addressTypes, transportSemantics
  324. )
  325. yield deferred
  326. class _DeferredResolutionReceiver(object):
  327. """Wraps a IResolutionReceiver and simply resolves the given deferred when
  328. resolution is complete
  329. """
  330. def __init__(self, receiver, deferred):
  331. self._receiver = receiver
  332. self._deferred = deferred
  333. def resolutionBegan(self, resolutionInProgress):
  334. self._receiver.resolutionBegan(resolutionInProgress)
  335. def addressResolved(self, address):
  336. self._receiver.addressResolved(address)
  337. def resolutionComplete(self):
  338. self._deferred.callback(())
  339. self._receiver.resolutionComplete()
  340. sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
  341. def sdnotify(state):
  342. """
  343. Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
  344. This function is based on the sdnotify python package, but since it's only a few
  345. lines of code, it's easier to duplicate it here than to add a dependency on a
  346. package which many OSes don't include as a matter of principle.
  347. Args:
  348. state (bytes): notification to send
  349. """
  350. if not isinstance(state, bytes):
  351. raise TypeError("sdnotify should be called with a bytes")
  352. if not sdnotify_sockaddr:
  353. return
  354. addr = sdnotify_sockaddr
  355. if addr[0] == "@":
  356. addr = "\0" + addr[1:]
  357. try:
  358. with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
  359. sock.connect(addr)
  360. sock.sendall(state)
  361. except Exception as e:
  362. # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
  363. # unless systemd is expecting us to notify it.
  364. logger.warning("Unable to send notification to systemd: %s", e)