_base.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import gc
  16. import logging
  17. import sys
  18. import psutil
  19. from daemonize import Daemonize
  20. from twisted.internet import error, reactor
  21. from synapse.util import PreserveLoggingContext
  22. from synapse.util.rlimit import change_resource_limit
  23. logger = logging.getLogger(__name__)
  24. def start_worker_reactor(appname, config):
  25. """ Run the reactor in the main process
  26. Daemonizes if necessary, and then configures some resources, before starting
  27. the reactor. Pulls configuration from the 'worker' settings in 'config'.
  28. Args:
  29. appname (str): application name which will be sent to syslog
  30. config (synapse.config.Config): config object
  31. """
  32. logger = logging.getLogger(config.worker_app)
  33. start_reactor(
  34. appname,
  35. config.soft_file_limit,
  36. config.gc_thresholds,
  37. config.worker_pid_file,
  38. config.worker_daemonize,
  39. config.worker_cpu_affinity,
  40. logger,
  41. )
  42. def start_reactor(
  43. appname,
  44. soft_file_limit,
  45. gc_thresholds,
  46. pid_file,
  47. daemonize,
  48. cpu_affinity,
  49. logger,
  50. ):
  51. """ Run the reactor in the main process
  52. Daemonizes if necessary, and then configures some resources, before starting
  53. the reactor
  54. Args:
  55. appname (str): application name which will be sent to syslog
  56. soft_file_limit (int):
  57. gc_thresholds:
  58. pid_file (str): name of pid file to write to if daemonize is True
  59. daemonize (bool): true to run the reactor in a background process
  60. cpu_affinity (int|None): cpu affinity mask
  61. logger (logging.Logger): logger instance to pass to Daemonize
  62. """
  63. def run():
  64. # make sure that we run the reactor with the sentinel log context,
  65. # otherwise other PreserveLoggingContext instances will get confused
  66. # and complain when they see the logcontext arbitrarily swapping
  67. # between the sentinel and `run` logcontexts.
  68. with PreserveLoggingContext():
  69. logger.info("Running")
  70. if cpu_affinity is not None:
  71. # Turn the bitmask into bits, reverse it so we go from 0 up
  72. mask_to_bits = bin(cpu_affinity)[2:][::-1]
  73. cpus = []
  74. cpu_num = 0
  75. for i in mask_to_bits:
  76. if i == "1":
  77. cpus.append(cpu_num)
  78. cpu_num += 1
  79. p = psutil.Process()
  80. p.cpu_affinity(cpus)
  81. change_resource_limit(soft_file_limit)
  82. if gc_thresholds:
  83. gc.set_threshold(*gc_thresholds)
  84. reactor.run()
  85. if daemonize:
  86. daemon = Daemonize(
  87. app=appname,
  88. pid=pid_file,
  89. action=run,
  90. auto_close_fds=False,
  91. verbose=True,
  92. logger=logger,
  93. )
  94. daemon.start()
  95. else:
  96. run()
  97. def quit_with_error(error_string):
  98. message_lines = error_string.split("\n")
  99. line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
  100. sys.stderr.write("*" * line_length + '\n')
  101. for line in message_lines:
  102. sys.stderr.write(" %s\n" % (line.rstrip(),))
  103. sys.stderr.write("*" * line_length + '\n')
  104. sys.exit(1)
  105. def listen_metrics(bind_addresses, port):
  106. """
  107. Start Prometheus metrics server.
  108. """
  109. from synapse.metrics import RegistryProxy
  110. from prometheus_client import start_http_server
  111. for host in bind_addresses:
  112. reactor.callInThread(start_http_server, int(port),
  113. addr=host, registry=RegistryProxy)
  114. logger.info("Metrics now reporting on %s:%d", host, port)
  115. def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
  116. """
  117. Create a TCP socket for a port and several addresses
  118. """
  119. for address in bind_addresses:
  120. try:
  121. reactor.listenTCP(
  122. port,
  123. factory,
  124. backlog,
  125. address
  126. )
  127. except error.CannotListenError as e:
  128. check_bind_error(e, address, bind_addresses)
  129. def listen_ssl(
  130. bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
  131. ):
  132. """
  133. Create an SSL socket for a port and several addresses
  134. """
  135. for address in bind_addresses:
  136. try:
  137. reactor.listenSSL(
  138. port,
  139. factory,
  140. context_factory,
  141. backlog,
  142. address
  143. )
  144. except error.CannotListenError as e:
  145. check_bind_error(e, address, bind_addresses)
  146. def check_bind_error(e, address, bind_addresses):
  147. """
  148. This method checks an exception occurred while binding on 0.0.0.0.
  149. If :: is specified in the bind addresses a warning is shown.
  150. The exception is still raised otherwise.
  151. Binding on both 0.0.0.0 and :: causes an exception on Linux and macOS
  152. because :: binds on both IPv4 and IPv6 (as per RFC 3493).
  153. When binding on 0.0.0.0 after :: this can safely be ignored.
  154. Args:
  155. e (Exception): Exception that was caught.
  156. address (str): Address on which binding was attempted.
  157. bind_addresses (list): Addresses on which the service listens.
  158. """
  159. if address == '0.0.0.0' and '::' in bind_addresses:
  160. logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
  161. else:
  162. raise e