homeserver.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2014-2016 OpenMarket Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import gc
  17. import logging
  18. import os
  19. import sys
  20. import traceback
  21. from six import iteritems
  22. import psutil
  23. from prometheus_client import Gauge
  24. from twisted.application import service
  25. from twisted.internet import defer, reactor
  26. from twisted.web.resource import EncodingResourceWrapper, NoResource
  27. from twisted.web.server import GzipEncoderFactory
  28. from twisted.web.static import File
  29. import synapse
  30. import synapse.config.logger
  31. from synapse import events
  32. from synapse.api.urls import (
  33. CONTENT_REPO_PREFIX,
  34. FEDERATION_PREFIX,
  35. LEGACY_MEDIA_PREFIX,
  36. MEDIA_PREFIX,
  37. SERVER_KEY_V2_PREFIX,
  38. STATIC_PREFIX,
  39. WEB_CLIENT_PREFIX,
  40. )
  41. from synapse.app import _base
  42. from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
  43. from synapse.config._base import ConfigError
  44. from synapse.config.homeserver import HomeServerConfig
  45. from synapse.federation.transport.server import TransportLayerServer
  46. from synapse.http.additional_resource import AdditionalResource
  47. from synapse.http.server import RootRedirect
  48. from synapse.http.site import SynapseSite
  49. from synapse.metrics import RegistryProxy
  50. from synapse.metrics.background_process_metrics import run_as_background_process
  51. from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
  52. from synapse.module_api import ModuleApi
  53. from synapse.python_dependencies import check_requirements
  54. from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
  55. from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
  56. from synapse.rest import ClientRestResource
  57. from synapse.rest.key.v2 import KeyApiV2Resource
  58. from synapse.rest.media.v0.content_repository import ContentRepoResource
  59. from synapse.rest.well_known import WellKnownResource
  60. from synapse.server import HomeServer
  61. from synapse.storage import DataStore, are_all_users_on_domain
  62. from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
  63. from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
  64. from synapse.util.caches import CACHE_SIZE_FACTOR
  65. from synapse.util.httpresourcetree import create_resource_tree
  66. from synapse.util.logcontext import LoggingContext
  67. from synapse.util.manhole import manhole
  68. from synapse.util.module_loader import load_module
  69. from synapse.util.rlimit import change_resource_limit
  70. from synapse.util.versionstring import get_version_string
  71. logger = logging.getLogger("synapse.app.homeserver")
  72. def gz_wrap(r):
  73. return EncodingResourceWrapper(r, [GzipEncoderFactory()])
  74. class SynapseHomeServer(HomeServer):
  75. DATASTORE_CLASS = DataStore
  76. _listening_services = []
  77. def _listener_http(self, config, listener_config):
  78. port = listener_config["port"]
  79. bind_addresses = listener_config["bind_addresses"]
  80. tls = listener_config.get("tls", False)
  81. site_tag = listener_config.get("tag", port)
  82. if tls and config.no_tls:
  83. raise ConfigError(
  84. "Listener on port %i has TLS enabled, but no_tls is set" % (port,),
  85. )
  86. resources = {}
  87. for res in listener_config["resources"]:
  88. for name in res["names"]:
  89. if name == "openid" and "federation" in res["names"]:
  90. # Skip loading openid resource if federation is defined
  91. # since federation resource will include openid
  92. continue
  93. resources.update(self._configure_named_resource(
  94. name, res.get("compress", False),
  95. ))
  96. additional_resources = listener_config.get("additional_resources", {})
  97. logger.debug("Configuring additional resources: %r",
  98. additional_resources)
  99. module_api = ModuleApi(self, self.get_auth_handler())
  100. for path, resmodule in additional_resources.items():
  101. handler_cls, config = load_module(resmodule)
  102. handler = handler_cls(config, module_api)
  103. resources[path] = AdditionalResource(self, handler.handle_request)
  104. # try to find something useful to redirect '/' to
  105. if WEB_CLIENT_PREFIX in resources:
  106. root_resource = RootRedirect(WEB_CLIENT_PREFIX)
  107. elif STATIC_PREFIX in resources:
  108. root_resource = RootRedirect(STATIC_PREFIX)
  109. else:
  110. root_resource = NoResource()
  111. root_resource = create_resource_tree(resources, root_resource)
  112. if tls:
  113. return listen_ssl(
  114. bind_addresses,
  115. port,
  116. SynapseSite(
  117. "synapse.access.https.%s" % (site_tag,),
  118. site_tag,
  119. listener_config,
  120. root_resource,
  121. self.version_string,
  122. ),
  123. self.tls_server_context_factory,
  124. reactor=self.get_reactor(),
  125. )
  126. else:
  127. return listen_tcp(
  128. bind_addresses,
  129. port,
  130. SynapseSite(
  131. "synapse.access.http.%s" % (site_tag,),
  132. site_tag,
  133. listener_config,
  134. root_resource,
  135. self.version_string,
  136. ),
  137. reactor=self.get_reactor(),
  138. )
  139. def _configure_named_resource(self, name, compress=False):
  140. """Build a resource map for a named resource
  141. Args:
  142. name (str): named resource: one of "client", "federation", etc
  143. compress (bool): whether to enable gzip compression for this
  144. resource
  145. Returns:
  146. dict[str, Resource]: map from path to HTTP resource
  147. """
  148. resources = {}
  149. if name == "client":
  150. client_resource = ClientRestResource(self)
  151. if compress:
  152. client_resource = gz_wrap(client_resource)
  153. resources.update({
  154. "/_matrix/client/api/v1": client_resource,
  155. "/_matrix/client/r0": client_resource,
  156. "/_matrix/client/unstable": client_resource,
  157. "/_matrix/client/v2_alpha": client_resource,
  158. "/_matrix/client/versions": client_resource,
  159. "/.well-known/matrix/client": WellKnownResource(self),
  160. })
  161. if self.get_config().saml2_enabled:
  162. from synapse.rest.saml2 import SAML2Resource
  163. resources["/_matrix/saml2"] = SAML2Resource(self)
  164. if name == "consent":
  165. from synapse.rest.consent.consent_resource import ConsentResource
  166. consent_resource = ConsentResource(self)
  167. if compress:
  168. consent_resource = gz_wrap(consent_resource)
  169. resources.update({
  170. "/_matrix/consent": consent_resource,
  171. })
  172. if name == "federation":
  173. resources.update({
  174. FEDERATION_PREFIX: TransportLayerServer(self),
  175. })
  176. if name == "openid":
  177. resources.update({
  178. FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]),
  179. })
  180. if name in ["static", "client"]:
  181. resources.update({
  182. STATIC_PREFIX: File(
  183. os.path.join(os.path.dirname(synapse.__file__), "static")
  184. ),
  185. })
  186. if name in ["media", "federation", "client"]:
  187. if self.get_config().enable_media_repo:
  188. media_repo = self.get_media_repository_resource()
  189. resources.update({
  190. MEDIA_PREFIX: media_repo,
  191. LEGACY_MEDIA_PREFIX: media_repo,
  192. CONTENT_REPO_PREFIX: ContentRepoResource(
  193. self, self.config.uploads_path
  194. ),
  195. })
  196. elif name == "media":
  197. raise ConfigError(
  198. "'media' resource conflicts with enable_media_repo=False",
  199. )
  200. if name in ["keys", "federation"]:
  201. resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
  202. if name == "webclient":
  203. webclient_path = self.get_config().web_client_location
  204. if webclient_path is None:
  205. logger.warning(
  206. "Not enabling webclient resource, as web_client_location is unset."
  207. )
  208. else:
  209. # GZip is disabled here due to
  210. # https://twistedmatrix.com/trac/ticket/7678
  211. resources[WEB_CLIENT_PREFIX] = File(webclient_path)
  212. if name == "metrics" and self.get_config().enable_metrics:
  213. resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
  214. if name == "replication":
  215. resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
  216. return resources
  217. def start_listening(self, listeners):
  218. config = self.get_config()
  219. for listener in listeners:
  220. if listener["type"] == "http":
  221. self._listening_services.extend(
  222. self._listener_http(config, listener)
  223. )
  224. elif listener["type"] == "manhole":
  225. listen_tcp(
  226. listener["bind_addresses"],
  227. listener["port"],
  228. manhole(
  229. username="matrix",
  230. password="rabbithole",
  231. globals={"hs": self},
  232. )
  233. )
  234. elif listener["type"] == "replication":
  235. bind_addresses = listener["bind_addresses"]
  236. for address in bind_addresses:
  237. factory = ReplicationStreamProtocolFactory(self)
  238. server_listener = reactor.listenTCP(
  239. listener["port"], factory, interface=address
  240. )
  241. reactor.addSystemEventTrigger(
  242. "before", "shutdown", server_listener.stopListening,
  243. )
  244. elif listener["type"] == "metrics":
  245. if not self.get_config().enable_metrics:
  246. logger.warn(("Metrics listener configured, but "
  247. "enable_metrics is not True!"))
  248. else:
  249. _base.listen_metrics(listener["bind_addresses"],
  250. listener["port"])
  251. else:
  252. logger.warn("Unrecognized listener type: %s", listener["type"])
  253. def run_startup_checks(self, db_conn, database_engine):
  254. all_users_native = are_all_users_on_domain(
  255. db_conn.cursor(), database_engine, self.hostname
  256. )
  257. if not all_users_native:
  258. quit_with_error(
  259. "Found users in database not native to %s!\n"
  260. "You cannot changed a synapse server_name after it's been configured"
  261. % (self.hostname,)
  262. )
  263. try:
  264. database_engine.check_database(db_conn.cursor())
  265. except IncorrectDatabaseSetup as e:
  266. quit_with_error(str(e))
  267. # Gauges to expose monthly active user control metrics
  268. current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
  269. max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
  270. registered_reserved_users_mau_gauge = Gauge(
  271. "synapse_admin_mau:registered_reserved_users",
  272. "Registered users with reserved threepids"
  273. )
  274. def setup(config_options):
  275. """
  276. Args:
  277. config_options_options: The options passed to Synapse. Usually
  278. `sys.argv[1:]`.
  279. Returns:
  280. HomeServer
  281. """
  282. try:
  283. config = HomeServerConfig.load_or_generate_config(
  284. "Synapse Homeserver",
  285. config_options,
  286. )
  287. except ConfigError as e:
  288. sys.stderr.write("\n" + str(e) + "\n")
  289. sys.exit(1)
  290. if not config:
  291. # If a config isn't returned, and an exception isn't raised, we're just
  292. # generating config files and shouldn't try to continue.
  293. sys.exit(0)
  294. synapse.config.logger.setup_logging(
  295. config,
  296. use_worker_options=False
  297. )
  298. events.USE_FROZEN_DICTS = config.use_frozen_dicts
  299. database_engine = create_engine(config.database_config)
  300. config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
  301. hs = SynapseHomeServer(
  302. config.server_name,
  303. db_config=config.database_config,
  304. config=config,
  305. version_string="Synapse/" + get_version_string(synapse),
  306. database_engine=database_engine,
  307. )
  308. logger.info("Preparing database: %s...", config.database_config['name'])
  309. try:
  310. with hs.get_db_conn(run_new_connection=False) as db_conn:
  311. prepare_database(db_conn, database_engine, config=config)
  312. database_engine.on_new_connection(db_conn)
  313. hs.run_startup_checks(db_conn, database_engine)
  314. db_conn.commit()
  315. except UpgradeDatabaseException:
  316. sys.stderr.write(
  317. "\nFailed to upgrade database.\n"
  318. "Have you checked for version specific instructions in"
  319. " UPGRADES.rst?\n"
  320. )
  321. sys.exit(1)
  322. logger.info("Database prepared in %s.", config.database_config['name'])
  323. hs.setup()
  324. @defer.inlineCallbacks
  325. def start():
  326. try:
  327. # Check if the certificate is still valid.
  328. cert_days_remaining = hs.config.is_disk_cert_valid()
  329. if hs.config.acme_enabled:
  330. # If ACME is enabled, we might need to provision a certificate
  331. # before starting.
  332. acme = hs.get_acme_handler()
  333. # Start up the webservices which we will respond to ACME
  334. # challenges with.
  335. yield acme.start_listening()
  336. # We want to reprovision if cert_days_remaining is None (meaning no
  337. # certificate exists), or the days remaining number it returns
  338. # is less than our re-registration threshold.
  339. if (cert_days_remaining is None) or (
  340. not cert_days_remaining > hs.config.acme_reprovision_threshold
  341. ):
  342. yield acme.provision_certificate()
  343. _base.start(hs, config.listeners)
  344. hs.get_pusherpool().start()
  345. hs.get_datastore().start_doing_background_updates()
  346. except Exception as e:
  347. # If a DeferredList failed (like in listening on the ACME listener),
  348. # we need to print the subfailure explicitly.
  349. if isinstance(e, defer.FirstError):
  350. e.subFailure.printTraceback(sys.stderr)
  351. sys.exit(1)
  352. # Something else went wrong when starting. Print it and bail out.
  353. traceback.print_exc(file=sys.stderr)
  354. sys.exit(1)
  355. reactor.callWhenRunning(start)
  356. return hs
  357. class SynapseService(service.Service):
  358. """A twisted Service class that will start synapse. Used to run synapse
  359. via twistd and a .tac.
  360. """
  361. def __init__(self, config):
  362. self.config = config
  363. def startService(self):
  364. hs = setup(self.config)
  365. change_resource_limit(hs.config.soft_file_limit)
  366. if hs.config.gc_thresholds:
  367. gc.set_threshold(*hs.config.gc_thresholds)
  368. def stopService(self):
  369. return self._port.stopListening()
  370. def run(hs):
  371. PROFILE_SYNAPSE = False
  372. if PROFILE_SYNAPSE:
  373. def profile(func):
  374. from cProfile import Profile
  375. from threading import current_thread
  376. def profiled(*args, **kargs):
  377. profile = Profile()
  378. profile.enable()
  379. func(*args, **kargs)
  380. profile.disable()
  381. ident = current_thread().ident
  382. profile.dump_stats("/tmp/%s.%s.%i.pstat" % (
  383. hs.hostname, func.__name__, ident
  384. ))
  385. return profiled
  386. from twisted.python.threadpool import ThreadPool
  387. ThreadPool._worker = profile(ThreadPool._worker)
  388. reactor.run = profile(reactor.run)
  389. clock = hs.get_clock()
  390. start_time = clock.time()
  391. stats = {}
  392. # Contains the list of processes we will be monitoring
  393. # currently either 0 or 1
  394. stats_process = []
  395. def start_phone_stats_home():
  396. return run_as_background_process("phone_stats_home", phone_stats_home)
  397. @defer.inlineCallbacks
  398. def phone_stats_home():
  399. logger.info("Gathering stats for reporting")
  400. now = int(hs.get_clock().time())
  401. uptime = int(now - start_time)
  402. if uptime < 0:
  403. uptime = 0
  404. stats["homeserver"] = hs.config.server_name
  405. stats["timestamp"] = now
  406. stats["uptime_seconds"] = uptime
  407. version = sys.version_info
  408. stats["python_version"] = "{}.{}.{}".format(
  409. version.major, version.minor, version.micro
  410. )
  411. stats["total_users"] = yield hs.get_datastore().count_all_users()
  412. total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
  413. stats["total_nonbridged_users"] = total_nonbridged_users
  414. daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
  415. for name, count in iteritems(daily_user_type_results):
  416. stats["daily_user_type_" + name] = count
  417. room_count = yield hs.get_datastore().get_room_count()
  418. stats["total_room_count"] = room_count
  419. stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
  420. stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
  421. stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
  422. r30_results = yield hs.get_datastore().count_r30_users()
  423. for name, count in iteritems(r30_results):
  424. stats["r30_users_" + name] = count
  425. daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
  426. stats["daily_sent_messages"] = daily_sent_messages
  427. stats["cache_factor"] = CACHE_SIZE_FACTOR
  428. stats["event_cache_size"] = hs.config.event_cache_size
  429. if len(stats_process) > 0:
  430. stats["memory_rss"] = 0
  431. stats["cpu_average"] = 0
  432. for process in stats_process:
  433. stats["memory_rss"] += process.memory_info().rss
  434. stats["cpu_average"] += int(process.cpu_percent(interval=None))
  435. logger.info("Reporting stats to matrix.org: %s" % (stats,))
  436. try:
  437. yield hs.get_simple_http_client().put_json(
  438. "https://matrix.org/report-usage-stats/push",
  439. stats
  440. )
  441. except Exception as e:
  442. logger.warn("Error reporting stats: %s", e)
  443. def performance_stats_init():
  444. try:
  445. process = psutil.Process()
  446. # Ensure we can fetch both, and make the initial request for cpu_percent
  447. # so the next request will use this as the initial point.
  448. process.memory_info().rss
  449. process.cpu_percent(interval=None)
  450. logger.info("report_stats can use psutil")
  451. stats_process.append(process)
  452. except (AttributeError):
  453. logger.warning(
  454. "Unable to read memory/cpu stats. Disabling reporting."
  455. )
  456. def generate_user_daily_visit_stats():
  457. return run_as_background_process(
  458. "generate_user_daily_visits",
  459. hs.get_datastore().generate_user_daily_visits,
  460. )
  461. # Rather than update on per session basis, batch up the requests.
  462. # If you increase the loop period, the accuracy of user_daily_visits
  463. # table will decrease
  464. clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
  465. # monthly active user limiting functionality
  466. def reap_monthly_active_users():
  467. return run_as_background_process(
  468. "reap_monthly_active_users",
  469. hs.get_datastore().reap_monthly_active_users,
  470. )
  471. clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
  472. reap_monthly_active_users()
  473. @defer.inlineCallbacks
  474. def generate_monthly_active_users():
  475. current_mau_count = 0
  476. reserved_count = 0
  477. store = hs.get_datastore()
  478. if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
  479. current_mau_count = yield store.get_monthly_active_count()
  480. reserved_count = yield store.get_registered_reserved_users_count()
  481. current_mau_gauge.set(float(current_mau_count))
  482. registered_reserved_users_mau_gauge.set(float(reserved_count))
  483. max_mau_gauge.set(float(hs.config.max_mau_value))
  484. def start_generate_monthly_active_users():
  485. return run_as_background_process(
  486. "generate_monthly_active_users",
  487. generate_monthly_active_users,
  488. )
  489. start_generate_monthly_active_users()
  490. if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
  491. clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
  492. # End of monthly active user settings
  493. if hs.config.report_stats:
  494. logger.info("Scheduling stats reporting for 3 hour intervals")
  495. clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
  496. # We need to defer this init for the cases that we daemonize
  497. # otherwise the process ID we get is that of the non-daemon process
  498. clock.call_later(0, performance_stats_init)
  499. # We wait 5 minutes to send the first set of stats as the server can
  500. # be quite busy the first few minutes
  501. clock.call_later(5 * 60, start_phone_stats_home)
  502. if hs.config.daemonize and hs.config.print_pidfile:
  503. print(hs.config.pid_file)
  504. _base.start_reactor(
  505. "synapse-homeserver",
  506. hs.config.soft_file_limit,
  507. hs.config.gc_thresholds,
  508. hs.config.pid_file,
  509. hs.config.daemonize,
  510. hs.config.cpu_affinity,
  511. logger,
  512. )
  513. def main():
  514. with LoggingContext("main"):
  515. # check base requirements
  516. check_requirements()
  517. hs = setup(sys.argv[1:])
  518. run(hs)
  519. if __name__ == '__main__':
  520. main()