homeserver.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2014-2016 OpenMarket Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import gc
  17. import logging
  18. import os
  19. import sys
  20. import traceback
  21. from six import iteritems
  22. import psutil
  23. from prometheus_client import Gauge
  24. from twisted.application import service
  25. from twisted.internet import defer, reactor
  26. from twisted.web.resource import EncodingResourceWrapper, NoResource
  27. from twisted.web.server import GzipEncoderFactory
  28. from twisted.web.static import File
  29. import synapse
  30. import synapse.config.logger
  31. from synapse import events
  32. from synapse.api.urls import (
  33. CONTENT_REPO_PREFIX,
  34. FEDERATION_PREFIX,
  35. LEGACY_MEDIA_PREFIX,
  36. MEDIA_PREFIX,
  37. SERVER_KEY_V2_PREFIX,
  38. STATIC_PREFIX,
  39. WEB_CLIENT_PREFIX,
  40. )
  41. from synapse.app import _base
  42. from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
  43. from synapse.config._base import ConfigError
  44. from synapse.config.homeserver import HomeServerConfig
  45. from synapse.crypto import context_factory
  46. from synapse.federation.transport.server import TransportLayerServer
  47. from synapse.http.additional_resource import AdditionalResource
  48. from synapse.http.server import RootRedirect
  49. from synapse.http.site import SynapseSite
  50. from synapse.metrics import RegistryProxy
  51. from synapse.metrics.background_process_metrics import run_as_background_process
  52. from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
  53. from synapse.module_api import ModuleApi
  54. from synapse.python_dependencies import check_requirements
  55. from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
  56. from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
  57. from synapse.rest import ClientRestResource
  58. from synapse.rest.key.v2 import KeyApiV2Resource
  59. from synapse.rest.media.v0.content_repository import ContentRepoResource
  60. from synapse.rest.well_known import WellKnownResource
  61. from synapse.server import HomeServer
  62. from synapse.storage import DataStore, are_all_users_on_domain
  63. from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
  64. from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
  65. from synapse.util.caches import CACHE_SIZE_FACTOR
  66. from synapse.util.httpresourcetree import create_resource_tree
  67. from synapse.util.logcontext import LoggingContext
  68. from synapse.util.manhole import manhole
  69. from synapse.util.module_loader import load_module
  70. from synapse.util.rlimit import change_resource_limit
  71. from synapse.util.versionstring import get_version_string
  72. logger = logging.getLogger("synapse.app.homeserver")
  73. def gz_wrap(r):
  74. return EncodingResourceWrapper(r, [GzipEncoderFactory()])
  75. class SynapseHomeServer(HomeServer):
  76. DATASTORE_CLASS = DataStore
  77. def _listener_http(self, config, listener_config):
  78. port = listener_config["port"]
  79. bind_addresses = listener_config["bind_addresses"]
  80. tls = listener_config.get("tls", False)
  81. site_tag = listener_config.get("tag", port)
  82. if tls and config.no_tls:
  83. return
  84. resources = {}
  85. for res in listener_config["resources"]:
  86. for name in res["names"]:
  87. resources.update(self._configure_named_resource(
  88. name, res.get("compress", False),
  89. ))
  90. additional_resources = listener_config.get("additional_resources", {})
  91. logger.debug("Configuring additional resources: %r",
  92. additional_resources)
  93. module_api = ModuleApi(self, self.get_auth_handler())
  94. for path, resmodule in additional_resources.items():
  95. handler_cls, config = load_module(resmodule)
  96. handler = handler_cls(config, module_api)
  97. resources[path] = AdditionalResource(self, handler.handle_request)
  98. # try to find something useful to redirect '/' to
  99. if WEB_CLIENT_PREFIX in resources:
  100. root_resource = RootRedirect(WEB_CLIENT_PREFIX)
  101. elif STATIC_PREFIX in resources:
  102. root_resource = RootRedirect(STATIC_PREFIX)
  103. else:
  104. root_resource = NoResource()
  105. root_resource = create_resource_tree(resources, root_resource)
  106. if tls:
  107. listen_ssl(
  108. bind_addresses,
  109. port,
  110. SynapseSite(
  111. "synapse.access.https.%s" % (site_tag,),
  112. site_tag,
  113. listener_config,
  114. root_resource,
  115. self.version_string,
  116. ),
  117. self.tls_server_context_factory,
  118. )
  119. else:
  120. listen_tcp(
  121. bind_addresses,
  122. port,
  123. SynapseSite(
  124. "synapse.access.http.%s" % (site_tag,),
  125. site_tag,
  126. listener_config,
  127. root_resource,
  128. self.version_string,
  129. )
  130. )
  131. logger.info("Synapse now listening on port %d", port)
  132. def _configure_named_resource(self, name, compress=False):
  133. """Build a resource map for a named resource
  134. Args:
  135. name (str): named resource: one of "client", "federation", etc
  136. compress (bool): whether to enable gzip compression for this
  137. resource
  138. Returns:
  139. dict[str, Resource]: map from path to HTTP resource
  140. """
  141. resources = {}
  142. if name == "client":
  143. client_resource = ClientRestResource(self)
  144. if compress:
  145. client_resource = gz_wrap(client_resource)
  146. resources.update({
  147. "/_matrix/client/api/v1": client_resource,
  148. "/_matrix/client/r0": client_resource,
  149. "/_matrix/client/unstable": client_resource,
  150. "/_matrix/client/v2_alpha": client_resource,
  151. "/_matrix/client/versions": client_resource,
  152. "/.well-known/matrix/client": WellKnownResource(self),
  153. })
  154. if self.get_config().saml2_enabled:
  155. from synapse.rest.saml2 import SAML2Resource
  156. resources["/_matrix/saml2"] = SAML2Resource(self)
  157. if name == "consent":
  158. from synapse.rest.consent.consent_resource import ConsentResource
  159. consent_resource = ConsentResource(self)
  160. if compress:
  161. consent_resource = gz_wrap(consent_resource)
  162. resources.update({
  163. "/_matrix/consent": consent_resource,
  164. })
  165. if name == "federation":
  166. resources.update({
  167. FEDERATION_PREFIX: TransportLayerServer(self),
  168. })
  169. if name in ["static", "client"]:
  170. resources.update({
  171. STATIC_PREFIX: File(
  172. os.path.join(os.path.dirname(synapse.__file__), "static")
  173. ),
  174. })
  175. if name in ["media", "federation", "client"]:
  176. if self.get_config().enable_media_repo:
  177. media_repo = self.get_media_repository_resource()
  178. resources.update({
  179. MEDIA_PREFIX: media_repo,
  180. LEGACY_MEDIA_PREFIX: media_repo,
  181. CONTENT_REPO_PREFIX: ContentRepoResource(
  182. self, self.config.uploads_path
  183. ),
  184. })
  185. elif name == "media":
  186. raise ConfigError(
  187. "'media' resource conflicts with enable_media_repo=False",
  188. )
  189. if name in ["keys", "federation"]:
  190. resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
  191. if name == "webclient":
  192. webclient_path = self.get_config().web_client_location
  193. if webclient_path is None:
  194. logger.warning(
  195. "Not enabling webclient resource, as web_client_location is unset."
  196. )
  197. else:
  198. # GZip is disabled here due to
  199. # https://twistedmatrix.com/trac/ticket/7678
  200. resources[WEB_CLIENT_PREFIX] = File(webclient_path)
  201. if name == "metrics" and self.get_config().enable_metrics:
  202. resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
  203. if name == "replication":
  204. resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
  205. return resources
  206. def start_listening(self):
  207. config = self.get_config()
  208. for listener in config.listeners:
  209. if listener["type"] == "http":
  210. self._listener_http(config, listener)
  211. elif listener["type"] == "manhole":
  212. listen_tcp(
  213. listener["bind_addresses"],
  214. listener["port"],
  215. manhole(
  216. username="matrix",
  217. password="rabbithole",
  218. globals={"hs": self},
  219. )
  220. )
  221. elif listener["type"] == "replication":
  222. bind_addresses = listener["bind_addresses"]
  223. for address in bind_addresses:
  224. factory = ReplicationStreamProtocolFactory(self)
  225. server_listener = reactor.listenTCP(
  226. listener["port"], factory, interface=address
  227. )
  228. reactor.addSystemEventTrigger(
  229. "before", "shutdown", server_listener.stopListening,
  230. )
  231. elif listener["type"] == "metrics":
  232. if not self.get_config().enable_metrics:
  233. logger.warn(("Metrics listener configured, but "
  234. "enable_metrics is not True!"))
  235. else:
  236. _base.listen_metrics(listener["bind_addresses"],
  237. listener["port"])
  238. else:
  239. logger.warn("Unrecognized listener type: %s", listener["type"])
  240. def run_startup_checks(self, db_conn, database_engine):
  241. all_users_native = are_all_users_on_domain(
  242. db_conn.cursor(), database_engine, self.hostname
  243. )
  244. if not all_users_native:
  245. quit_with_error(
  246. "Found users in database not native to %s!\n"
  247. "You cannot changed a synapse server_name after it's been configured"
  248. % (self.hostname,)
  249. )
  250. try:
  251. database_engine.check_database(db_conn.cursor())
  252. except IncorrectDatabaseSetup as e:
  253. quit_with_error(str(e))
  254. # Gauges to expose monthly active user control metrics
  255. current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
  256. max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
  257. registered_reserved_users_mau_gauge = Gauge(
  258. "synapse_admin_mau:registered_reserved_users",
  259. "Registered users with reserved threepids"
  260. )
  261. def setup(config_options):
  262. """
  263. Args:
  264. config_options_options: The options passed to Synapse. Usually
  265. `sys.argv[1:]`.
  266. Returns:
  267. HomeServer
  268. """
  269. try:
  270. config = HomeServerConfig.load_or_generate_config(
  271. "Synapse Homeserver",
  272. config_options,
  273. )
  274. except ConfigError as e:
  275. sys.stderr.write("\n" + str(e) + "\n")
  276. sys.exit(1)
  277. if not config:
  278. # If a config isn't returned, and an exception isn't raised, we're just
  279. # generating config files and shouldn't try to continue.
  280. sys.exit(0)
  281. synapse.config.logger.setup_logging(config, use_worker_options=False)
  282. events.USE_FROZEN_DICTS = config.use_frozen_dicts
  283. database_engine = create_engine(config.database_config)
  284. config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
  285. hs = SynapseHomeServer(
  286. config.server_name,
  287. db_config=config.database_config,
  288. config=config,
  289. version_string="Synapse/" + get_version_string(synapse),
  290. database_engine=database_engine,
  291. )
  292. logger.info("Preparing database: %s...", config.database_config['name'])
  293. try:
  294. with hs.get_db_conn(run_new_connection=False) as db_conn:
  295. prepare_database(db_conn, database_engine, config=config)
  296. database_engine.on_new_connection(db_conn)
  297. hs.run_startup_checks(db_conn, database_engine)
  298. db_conn.commit()
  299. except UpgradeDatabaseException:
  300. sys.stderr.write(
  301. "\nFailed to upgrade database.\n"
  302. "Have you checked for version specific instructions in"
  303. " UPGRADES.rst?\n"
  304. )
  305. sys.exit(1)
  306. logger.info("Database prepared in %s.", config.database_config['name'])
  307. hs.setup()
  308. @defer.inlineCallbacks
  309. def start():
  310. try:
  311. # Check if the certificate is still valid.
  312. cert_days_remaining = hs.config.is_disk_cert_valid()
  313. if hs.config.acme_enabled:
  314. # If ACME is enabled, we might need to provision a certificate
  315. # before starting.
  316. acme = hs.get_acme_handler()
  317. # Start up the webservices which we will respond to ACME
  318. # challenges with.
  319. yield acme.start_listening()
  320. # We want to reprovision if cert_days_remaining is None (meaning no
  321. # certificate exists), or the days remaining number it returns
  322. # is less than our re-registration threshold.
  323. if (cert_days_remaining is None) or (
  324. not cert_days_remaining > hs.config.acme_reprovision_threshold
  325. ):
  326. yield acme.provision_certificate()
  327. # Read the certificate from disk and build the context factories for
  328. # TLS.
  329. hs.config.read_certificate_from_disk()
  330. hs.tls_server_context_factory = context_factory.ServerContextFactory(config)
  331. hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
  332. config
  333. )
  334. # It is now safe to start your Synapse.
  335. hs.start_listening()
  336. hs.get_pusherpool().start()
  337. hs.get_datastore().start_profiling()
  338. hs.get_datastore().start_doing_background_updates()
  339. except Exception as e:
  340. # If a DeferredList failed (like in listening on the ACME listener),
  341. # we need to print the subfailure explicitly.
  342. if isinstance(e, defer.FirstError):
  343. e.subFailure.printTraceback(sys.stderr)
  344. sys.exit(1)
  345. # Something else went wrong when starting. Print it and bail out.
  346. traceback.print_exc(file=sys.stderr)
  347. sys.exit(1)
  348. reactor.callWhenRunning(start)
  349. return hs
  350. class SynapseService(service.Service):
  351. """A twisted Service class that will start synapse. Used to run synapse
  352. via twistd and a .tac.
  353. """
  354. def __init__(self, config):
  355. self.config = config
  356. def startService(self):
  357. hs = setup(self.config)
  358. change_resource_limit(hs.config.soft_file_limit)
  359. if hs.config.gc_thresholds:
  360. gc.set_threshold(*hs.config.gc_thresholds)
  361. def stopService(self):
  362. return self._port.stopListening()
  363. def run(hs):
  364. PROFILE_SYNAPSE = False
  365. if PROFILE_SYNAPSE:
  366. def profile(func):
  367. from cProfile import Profile
  368. from threading import current_thread
  369. def profiled(*args, **kargs):
  370. profile = Profile()
  371. profile.enable()
  372. func(*args, **kargs)
  373. profile.disable()
  374. ident = current_thread().ident
  375. profile.dump_stats("/tmp/%s.%s.%i.pstat" % (
  376. hs.hostname, func.__name__, ident
  377. ))
  378. return profiled
  379. from twisted.python.threadpool import ThreadPool
  380. ThreadPool._worker = profile(ThreadPool._worker)
  381. reactor.run = profile(reactor.run)
  382. clock = hs.get_clock()
  383. start_time = clock.time()
  384. stats = {}
  385. # Contains the list of processes we will be monitoring
  386. # currently either 0 or 1
  387. stats_process = []
  388. def start_phone_stats_home():
  389. return run_as_background_process("phone_stats_home", phone_stats_home)
  390. @defer.inlineCallbacks
  391. def phone_stats_home():
  392. logger.info("Gathering stats for reporting")
  393. now = int(hs.get_clock().time())
  394. uptime = int(now - start_time)
  395. if uptime < 0:
  396. uptime = 0
  397. stats["homeserver"] = hs.config.server_name
  398. stats["timestamp"] = now
  399. stats["uptime_seconds"] = uptime
  400. version = sys.version_info
  401. stats["python_version"] = "{}.{}.{}".format(
  402. version.major, version.minor, version.micro
  403. )
  404. stats["total_users"] = yield hs.get_datastore().count_all_users()
  405. total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
  406. stats["total_nonbridged_users"] = total_nonbridged_users
  407. daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
  408. for name, count in iteritems(daily_user_type_results):
  409. stats["daily_user_type_" + name] = count
  410. room_count = yield hs.get_datastore().get_room_count()
  411. stats["total_room_count"] = room_count
  412. stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
  413. stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
  414. stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
  415. r30_results = yield hs.get_datastore().count_r30_users()
  416. for name, count in iteritems(r30_results):
  417. stats["r30_users_" + name] = count
  418. daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
  419. stats["daily_sent_messages"] = daily_sent_messages
  420. stats["cache_factor"] = CACHE_SIZE_FACTOR
  421. stats["event_cache_size"] = hs.config.event_cache_size
  422. if len(stats_process) > 0:
  423. stats["memory_rss"] = 0
  424. stats["cpu_average"] = 0
  425. for process in stats_process:
  426. stats["memory_rss"] += process.memory_info().rss
  427. stats["cpu_average"] += int(process.cpu_percent(interval=None))
  428. logger.info("Reporting stats to matrix.org: %s" % (stats,))
  429. try:
  430. yield hs.get_simple_http_client().put_json(
  431. "https://matrix.org/report-usage-stats/push",
  432. stats
  433. )
  434. except Exception as e:
  435. logger.warn("Error reporting stats: %s", e)
  436. def performance_stats_init():
  437. try:
  438. process = psutil.Process()
  439. # Ensure we can fetch both, and make the initial request for cpu_percent
  440. # so the next request will use this as the initial point.
  441. process.memory_info().rss
  442. process.cpu_percent(interval=None)
  443. logger.info("report_stats can use psutil")
  444. stats_process.append(process)
  445. except (AttributeError):
  446. logger.warning(
  447. "Unable to read memory/cpu stats. Disabling reporting."
  448. )
  449. def generate_user_daily_visit_stats():
  450. return run_as_background_process(
  451. "generate_user_daily_visits",
  452. hs.get_datastore().generate_user_daily_visits,
  453. )
  454. # Rather than update on per session basis, batch up the requests.
  455. # If you increase the loop period, the accuracy of user_daily_visits
  456. # table will decrease
  457. clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
  458. # monthly active user limiting functionality
  459. def reap_monthly_active_users():
  460. return run_as_background_process(
  461. "reap_monthly_active_users",
  462. hs.get_datastore().reap_monthly_active_users,
  463. )
  464. clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
  465. reap_monthly_active_users()
  466. @defer.inlineCallbacks
  467. def generate_monthly_active_users():
  468. current_mau_count = 0
  469. reserved_count = 0
  470. store = hs.get_datastore()
  471. if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
  472. current_mau_count = yield store.get_monthly_active_count()
  473. reserved_count = yield store.get_registered_reserved_users_count()
  474. current_mau_gauge.set(float(current_mau_count))
  475. registered_reserved_users_mau_gauge.set(float(reserved_count))
  476. max_mau_gauge.set(float(hs.config.max_mau_value))
  477. def start_generate_monthly_active_users():
  478. return run_as_background_process(
  479. "generate_monthly_active_users",
  480. generate_monthly_active_users,
  481. )
  482. start_generate_monthly_active_users()
  483. if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
  484. clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
  485. # End of monthly active user settings
  486. if hs.config.report_stats:
  487. logger.info("Scheduling stats reporting for 3 hour intervals")
  488. clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
  489. # We need to defer this init for the cases that we daemonize
  490. # otherwise the process ID we get is that of the non-daemon process
  491. clock.call_later(0, performance_stats_init)
  492. # We wait 5 minutes to send the first set of stats as the server can
  493. # be quite busy the first few minutes
  494. clock.call_later(5 * 60, start_phone_stats_home)
  495. if hs.config.daemonize and hs.config.print_pidfile:
  496. print(hs.config.pid_file)
  497. _base.start_reactor(
  498. "synapse-homeserver",
  499. hs.config.soft_file_limit,
  500. hs.config.gc_thresholds,
  501. hs.config.pid_file,
  502. hs.config.daemonize,
  503. hs.config.cpu_affinity,
  504. logger,
  505. )
  506. def main():
  507. with LoggingContext("main"):
  508. # check base requirements
  509. check_requirements()
  510. hs = setup(sys.argv[1:])
  511. run(hs)
  512. if __name__ == '__main__':
  513. main()