_base.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import gc
  16. import logging
  17. import sys
  18. from daemonize import Daemonize
  19. from twisted.internet import error, reactor
  20. from synapse.util import PreserveLoggingContext
  21. from synapse.util.rlimit import change_resource_limit
  22. try:
  23. import affinity
  24. except Exception:
  25. affinity = None
  26. logger = logging.getLogger(__name__)
  27. def start_worker_reactor(appname, config):
  28. """ Run the reactor in the main process
  29. Daemonizes if necessary, and then configures some resources, before starting
  30. the reactor. Pulls configuration from the 'worker' settings in 'config'.
  31. Args:
  32. appname (str): application name which will be sent to syslog
  33. config (synapse.config.Config): config object
  34. """
  35. logger = logging.getLogger(config.worker_app)
  36. start_reactor(
  37. appname,
  38. config.soft_file_limit,
  39. config.gc_thresholds,
  40. config.worker_pid_file,
  41. config.worker_daemonize,
  42. config.worker_cpu_affinity,
  43. logger,
  44. )
  45. def start_reactor(
  46. appname,
  47. soft_file_limit,
  48. gc_thresholds,
  49. pid_file,
  50. daemonize,
  51. cpu_affinity,
  52. logger,
  53. ):
  54. """ Run the reactor in the main process
  55. Daemonizes if necessary, and then configures some resources, before starting
  56. the reactor
  57. Args:
  58. appname (str): application name which will be sent to syslog
  59. soft_file_limit (int):
  60. gc_thresholds:
  61. pid_file (str): name of pid file to write to if daemonize is True
  62. daemonize (bool): true to run the reactor in a background process
  63. cpu_affinity (int|None): cpu affinity mask
  64. logger (logging.Logger): logger instance to pass to Daemonize
  65. """
  66. def run():
  67. # make sure that we run the reactor with the sentinel log context,
  68. # otherwise other PreserveLoggingContext instances will get confused
  69. # and complain when they see the logcontext arbitrarily swapping
  70. # between the sentinel and `run` logcontexts.
  71. with PreserveLoggingContext():
  72. logger.info("Running")
  73. if cpu_affinity is not None:
  74. if not affinity:
  75. quit_with_error(
  76. "Missing package 'affinity' required for cpu_affinity\n"
  77. "option\n\n"
  78. "Install by running:\n\n"
  79. " pip install affinity\n\n"
  80. )
  81. logger.info("Setting CPU affinity to %s" % cpu_affinity)
  82. affinity.set_process_affinity_mask(0, cpu_affinity)
  83. change_resource_limit(soft_file_limit)
  84. if gc_thresholds:
  85. gc.set_threshold(*gc_thresholds)
  86. reactor.run()
  87. if daemonize:
  88. daemon = Daemonize(
  89. app=appname,
  90. pid=pid_file,
  91. action=run,
  92. auto_close_fds=False,
  93. verbose=True,
  94. logger=logger,
  95. )
  96. daemon.start()
  97. else:
  98. run()
  99. def quit_with_error(error_string):
  100. message_lines = error_string.split("\n")
  101. line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
  102. sys.stderr.write("*" * line_length + '\n')
  103. for line in message_lines:
  104. sys.stderr.write(" %s\n" % (line.rstrip(),))
  105. sys.stderr.write("*" * line_length + '\n')
  106. sys.exit(1)
  107. def listen_metrics(bind_addresses, port):
  108. """
  109. Start Prometheus metrics server.
  110. """
  111. from synapse.metrics import RegistryProxy
  112. from prometheus_client import start_http_server
  113. for host in bind_addresses:
  114. reactor.callInThread(start_http_server, int(port),
  115. addr=host, registry=RegistryProxy)
  116. logger.info("Metrics now reporting on %s:%d", host, port)
  117. def listen_tcp(bind_addresses, port, factory, backlog=50):
  118. """
  119. Create a TCP socket for a port and several addresses
  120. """
  121. for address in bind_addresses:
  122. try:
  123. reactor.listenTCP(
  124. port,
  125. factory,
  126. backlog,
  127. address
  128. )
  129. except error.CannotListenError as e:
  130. check_bind_error(e, address, bind_addresses)
  131. def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
  132. """
  133. Create an SSL socket for a port and several addresses
  134. """
  135. for address in bind_addresses:
  136. try:
  137. reactor.listenSSL(
  138. port,
  139. factory,
  140. context_factory,
  141. backlog,
  142. address
  143. )
  144. except error.CannotListenError as e:
  145. check_bind_error(e, address, bind_addresses)
  146. def check_bind_error(e, address, bind_addresses):
  147. """
  148. This method checks an exception occurred while binding on 0.0.0.0.
  149. If :: is specified in the bind addresses a warning is shown.
  150. The exception is still raised otherwise.
  151. Binding on both 0.0.0.0 and :: causes an exception on Linux and macOS
  152. because :: binds on both IPv4 and IPv6 (as per RFC 3493).
  153. When binding on 0.0.0.0 after :: this can safely be ignored.
  154. Args:
  155. e (Exception): Exception that was caught.
  156. address (str): Address on which binding was attempted.
  157. bind_addresses (list): Addresses on which the service listens.
  158. """
  159. if address == '0.0.0.0' and '::' in bind_addresses:
  160. logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
  161. else:
  162. raise e