homeserver.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2014-2016 OpenMarket Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import gc
  17. import logging
  18. import os
  19. import sys
  20. from twisted.application import service
  21. from twisted.internet import defer, reactor
  22. from twisted.web.resource import EncodingResourceWrapper, NoResource
  23. from twisted.web.server import GzipEncoderFactory
  24. from twisted.web.static import File
  25. import synapse
  26. import synapse.config.logger
  27. from synapse import events
  28. from synapse.api.urls import (
  29. CONTENT_REPO_PREFIX,
  30. FEDERATION_PREFIX,
  31. LEGACY_MEDIA_PREFIX,
  32. MEDIA_PREFIX,
  33. SERVER_KEY_PREFIX,
  34. SERVER_KEY_V2_PREFIX,
  35. STATIC_PREFIX,
  36. WEB_CLIENT_PREFIX,
  37. )
  38. from synapse.app import _base
  39. from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
  40. from synapse.config._base import ConfigError
  41. from synapse.config.homeserver import HomeServerConfig
  42. from synapse.crypto import context_factory
  43. from synapse.federation.transport.server import TransportLayerServer
  44. from synapse.http.additional_resource import AdditionalResource
  45. from synapse.http.server import RootRedirect
  46. from synapse.http.site import SynapseSite
  47. from synapse.metrics import RegistryProxy
  48. from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
  49. from synapse.module_api import ModuleApi
  50. from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
  51. from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
  52. from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
  53. from synapse.rest import ClientRestResource
  54. from synapse.rest.key.v1.server_key_resource import LocalKey
  55. from synapse.rest.key.v2 import KeyApiV2Resource
  56. from synapse.rest.media.v0.content_repository import ContentRepoResource
  57. from synapse.server import HomeServer
  58. from synapse.storage import are_all_users_on_domain
  59. from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
  60. from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
  61. from synapse.util.caches import CACHE_SIZE_FACTOR
  62. from synapse.util.httpresourcetree import create_resource_tree
  63. from synapse.util.logcontext import LoggingContext
  64. from synapse.util.manhole import manhole
  65. from synapse.util.module_loader import load_module
  66. from synapse.util.rlimit import change_resource_limit
  67. from synapse.util.versionstring import get_version_string
  68. logger = logging.getLogger("synapse.app.homeserver")
  69. def gz_wrap(r):
  70. return EncodingResourceWrapper(r, [GzipEncoderFactory()])
  71. def build_resource_for_web_client(hs):
  72. webclient_path = hs.get_config().web_client_location
  73. if not webclient_path:
  74. try:
  75. import syweb
  76. except ImportError:
  77. quit_with_error(
  78. "Could not find a webclient.\n\n"
  79. "Please either install the matrix-angular-sdk or configure\n"
  80. "the location of the source to serve via the configuration\n"
  81. "option `web_client_location`\n\n"
  82. "To install the `matrix-angular-sdk` via pip, run:\n\n"
  83. " pip install '%(dep)s'\n"
  84. "\n"
  85. "You can also disable hosting of the webclient via the\n"
  86. "configuration option `web_client`\n"
  87. % {"dep": CONDITIONAL_REQUIREMENTS["web_client"].keys()[0]}
  88. )
  89. syweb_path = os.path.dirname(syweb.__file__)
  90. webclient_path = os.path.join(syweb_path, "webclient")
  91. # GZip is disabled here due to
  92. # https://twistedmatrix.com/trac/ticket/7678
  93. # (It can stay enabled for the API resources: they call
  94. # write() with the whole body and then finish() straight
  95. # after and so do not trigger the bug.
  96. # GzipFile was removed in commit 184ba09
  97. # return GzipFile(webclient_path) # TODO configurable?
  98. return File(webclient_path) # TODO configurable?
  99. class SynapseHomeServer(HomeServer):
  100. def _listener_http(self, config, listener_config):
  101. port = listener_config["port"]
  102. bind_addresses = listener_config["bind_addresses"]
  103. tls = listener_config.get("tls", False)
  104. site_tag = listener_config.get("tag", port)
  105. if tls and config.no_tls:
  106. return
  107. resources = {}
  108. for res in listener_config["resources"]:
  109. for name in res["names"]:
  110. resources.update(self._configure_named_resource(
  111. name, res.get("compress", False),
  112. ))
  113. additional_resources = listener_config.get("additional_resources", {})
  114. logger.debug("Configuring additional resources: %r",
  115. additional_resources)
  116. module_api = ModuleApi(self, self.get_auth_handler())
  117. for path, resmodule in additional_resources.items():
  118. handler_cls, config = load_module(resmodule)
  119. handler = handler_cls(config, module_api)
  120. resources[path] = AdditionalResource(self, handler.handle_request)
  121. if WEB_CLIENT_PREFIX in resources:
  122. root_resource = RootRedirect(WEB_CLIENT_PREFIX)
  123. else:
  124. root_resource = NoResource()
  125. root_resource = create_resource_tree(resources, root_resource)
  126. if tls:
  127. listen_ssl(
  128. bind_addresses,
  129. port,
  130. SynapseSite(
  131. "synapse.access.https.%s" % (site_tag,),
  132. site_tag,
  133. listener_config,
  134. root_resource,
  135. self.version_string,
  136. ),
  137. self.tls_server_context_factory,
  138. )
  139. else:
  140. listen_tcp(
  141. bind_addresses,
  142. port,
  143. SynapseSite(
  144. "synapse.access.http.%s" % (site_tag,),
  145. site_tag,
  146. listener_config,
  147. root_resource,
  148. self.version_string,
  149. )
  150. )
  151. logger.info("Synapse now listening on port %d", port)
  152. def _configure_named_resource(self, name, compress=False):
  153. """Build a resource map for a named resource
  154. Args:
  155. name (str): named resource: one of "client", "federation", etc
  156. compress (bool): whether to enable gzip compression for this
  157. resource
  158. Returns:
  159. dict[str, Resource]: map from path to HTTP resource
  160. """
  161. resources = {}
  162. if name == "client":
  163. client_resource = ClientRestResource(self)
  164. if compress:
  165. client_resource = gz_wrap(client_resource)
  166. resources.update({
  167. "/_matrix/client/api/v1": client_resource,
  168. "/_matrix/client/r0": client_resource,
  169. "/_matrix/client/unstable": client_resource,
  170. "/_matrix/client/v2_alpha": client_resource,
  171. "/_matrix/client/versions": client_resource,
  172. })
  173. if name == "consent":
  174. from synapse.rest.consent.consent_resource import ConsentResource
  175. consent_resource = ConsentResource(self)
  176. if compress:
  177. consent_resource = gz_wrap(consent_resource)
  178. resources.update({
  179. "/_matrix/consent": consent_resource,
  180. })
  181. if name == "federation":
  182. resources.update({
  183. FEDERATION_PREFIX: TransportLayerServer(self),
  184. })
  185. if name in ["static", "client"]:
  186. resources.update({
  187. STATIC_PREFIX: File(
  188. os.path.join(os.path.dirname(synapse.__file__), "static")
  189. ),
  190. })
  191. if name in ["media", "federation", "client"]:
  192. if self.get_config().enable_media_repo:
  193. media_repo = self.get_media_repository_resource()
  194. resources.update({
  195. MEDIA_PREFIX: media_repo,
  196. LEGACY_MEDIA_PREFIX: media_repo,
  197. CONTENT_REPO_PREFIX: ContentRepoResource(
  198. self, self.config.uploads_path
  199. ),
  200. })
  201. elif name == "media":
  202. raise ConfigError(
  203. "'media' resource conflicts with enable_media_repo=False",
  204. )
  205. if name in ["keys", "federation"]:
  206. resources.update({
  207. SERVER_KEY_PREFIX: LocalKey(self),
  208. SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
  209. })
  210. if name == "webclient":
  211. resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
  212. if name == "metrics" and self.get_config().enable_metrics:
  213. resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
  214. if name == "replication":
  215. resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
  216. return resources
  217. def start_listening(self):
  218. config = self.get_config()
  219. for listener in config.listeners:
  220. if listener["type"] == "http":
  221. self._listener_http(config, listener)
  222. elif listener["type"] == "manhole":
  223. listen_tcp(
  224. listener["bind_addresses"],
  225. listener["port"],
  226. manhole(
  227. username="matrix",
  228. password="rabbithole",
  229. globals={"hs": self},
  230. )
  231. )
  232. elif listener["type"] == "replication":
  233. bind_addresses = listener["bind_addresses"]
  234. for address in bind_addresses:
  235. factory = ReplicationStreamProtocolFactory(self)
  236. server_listener = reactor.listenTCP(
  237. listener["port"], factory, interface=address
  238. )
  239. reactor.addSystemEventTrigger(
  240. "before", "shutdown", server_listener.stopListening,
  241. )
  242. elif listener["type"] == "metrics":
  243. if not self.get_config().enable_metrics:
  244. logger.warn(("Metrics listener configured, but "
  245. "enable_metrics is not True!"))
  246. else:
  247. _base.listen_metrics(listener["bind_addresses"],
  248. listener["port"])
  249. else:
  250. logger.warn("Unrecognized listener type: %s", listener["type"])
  251. def run_startup_checks(self, db_conn, database_engine):
  252. all_users_native = are_all_users_on_domain(
  253. db_conn.cursor(), database_engine, self.hostname
  254. )
  255. if not all_users_native:
  256. quit_with_error(
  257. "Found users in database not native to %s!\n"
  258. "You cannot changed a synapse server_name after it's been configured"
  259. % (self.hostname,)
  260. )
  261. try:
  262. database_engine.check_database(db_conn.cursor())
  263. except IncorrectDatabaseSetup as e:
  264. quit_with_error(e.message)
  265. def setup(config_options):
  266. """
  267. Args:
  268. config_options_options: The options passed to Synapse. Usually
  269. `sys.argv[1:]`.
  270. Returns:
  271. HomeServer
  272. """
  273. try:
  274. config = HomeServerConfig.load_or_generate_config(
  275. "Synapse Homeserver",
  276. config_options,
  277. )
  278. except ConfigError as e:
  279. sys.stderr.write("\n" + e.message + "\n")
  280. sys.exit(1)
  281. if not config:
  282. # If a config isn't returned, and an exception isn't raised, we're just
  283. # generating config files and shouldn't try to continue.
  284. sys.exit(0)
  285. synapse.config.logger.setup_logging(config, use_worker_options=False)
  286. # check any extra requirements we have now we have a config
  287. check_requirements(config)
  288. events.USE_FROZEN_DICTS = config.use_frozen_dicts
  289. tls_server_context_factory = context_factory.ServerContextFactory(config)
  290. tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
  291. database_engine = create_engine(config.database_config)
  292. config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
  293. hs = SynapseHomeServer(
  294. config.server_name,
  295. db_config=config.database_config,
  296. tls_server_context_factory=tls_server_context_factory,
  297. tls_client_options_factory=tls_client_options_factory,
  298. config=config,
  299. version_string="Synapse/" + get_version_string(synapse),
  300. database_engine=database_engine,
  301. )
  302. logger.info("Preparing database: %s...", config.database_config['name'])
  303. try:
  304. db_conn = hs.get_db_conn(run_new_connection=False)
  305. prepare_database(db_conn, database_engine, config=config)
  306. database_engine.on_new_connection(db_conn)
  307. hs.run_startup_checks(db_conn, database_engine)
  308. db_conn.commit()
  309. except UpgradeDatabaseException:
  310. sys.stderr.write(
  311. "\nFailed to upgrade database.\n"
  312. "Have you checked for version specific instructions in"
  313. " UPGRADES.rst?\n"
  314. )
  315. sys.exit(1)
  316. logger.info("Database prepared in %s.", config.database_config['name'])
  317. hs.setup()
  318. hs.start_listening()
  319. def start():
  320. hs.get_pusherpool().start()
  321. hs.get_state_handler().start_caching()
  322. hs.get_datastore().start_profiling()
  323. hs.get_datastore().start_doing_background_updates()
  324. hs.get_federation_client().start_get_pdu_cache()
  325. reactor.callWhenRunning(start)
  326. return hs
  327. class SynapseService(service.Service):
  328. """A twisted Service class that will start synapse. Used to run synapse
  329. via twistd and a .tac.
  330. """
  331. def __init__(self, config):
  332. self.config = config
  333. def startService(self):
  334. hs = setup(self.config)
  335. change_resource_limit(hs.config.soft_file_limit)
  336. if hs.config.gc_thresholds:
  337. gc.set_threshold(*hs.config.gc_thresholds)
  338. def stopService(self):
  339. return self._port.stopListening()
  340. def run(hs):
  341. PROFILE_SYNAPSE = False
  342. if PROFILE_SYNAPSE:
  343. def profile(func):
  344. from cProfile import Profile
  345. from threading import current_thread
  346. def profiled(*args, **kargs):
  347. profile = Profile()
  348. profile.enable()
  349. func(*args, **kargs)
  350. profile.disable()
  351. ident = current_thread().ident
  352. profile.dump_stats("/tmp/%s.%s.%i.pstat" % (
  353. hs.hostname, func.__name__, ident
  354. ))
  355. return profiled
  356. from twisted.python.threadpool import ThreadPool
  357. ThreadPool._worker = profile(ThreadPool._worker)
  358. reactor.run = profile(reactor.run)
  359. clock = hs.get_clock()
  360. start_time = clock.time()
  361. stats = {}
  362. # Contains the list of processes we will be monitoring
  363. # currently either 0 or 1
  364. stats_process = []
  365. @defer.inlineCallbacks
  366. def phone_stats_home():
  367. logger.info("Gathering stats for reporting")
  368. now = int(hs.get_clock().time())
  369. uptime = int(now - start_time)
  370. if uptime < 0:
  371. uptime = 0
  372. stats["homeserver"] = hs.config.server_name
  373. stats["timestamp"] = now
  374. stats["uptime_seconds"] = uptime
  375. stats["total_users"] = yield hs.get_datastore().count_all_users()
  376. total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
  377. stats["total_nonbridged_users"] = total_nonbridged_users
  378. daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
  379. for name, count in daily_user_type_results.iteritems():
  380. stats["daily_user_type_" + name] = count
  381. room_count = yield hs.get_datastore().get_room_count()
  382. stats["total_room_count"] = room_count
  383. stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
  384. stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
  385. stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
  386. r30_results = yield hs.get_datastore().count_r30_users()
  387. for name, count in r30_results.iteritems():
  388. stats["r30_users_" + name] = count
  389. daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
  390. stats["daily_sent_messages"] = daily_sent_messages
  391. stats["cache_factor"] = CACHE_SIZE_FACTOR
  392. stats["event_cache_size"] = hs.config.event_cache_size
  393. if len(stats_process) > 0:
  394. stats["memory_rss"] = 0
  395. stats["cpu_average"] = 0
  396. for process in stats_process:
  397. stats["memory_rss"] += process.memory_info().rss
  398. stats["cpu_average"] += int(process.cpu_percent(interval=None))
  399. logger.info("Reporting stats to matrix.org: %s" % (stats,))
  400. try:
  401. yield hs.get_simple_http_client().put_json(
  402. "https://matrix.org/report-usage-stats/push",
  403. stats
  404. )
  405. except Exception as e:
  406. logger.warn("Error reporting stats: %s", e)
  407. def performance_stats_init():
  408. try:
  409. import psutil
  410. process = psutil.Process()
  411. # Ensure we can fetch both, and make the initial request for cpu_percent
  412. # so the next request will use this as the initial point.
  413. process.memory_info().rss
  414. process.cpu_percent(interval=None)
  415. logger.info("report_stats can use psutil")
  416. stats_process.append(process)
  417. except (ImportError, AttributeError):
  418. logger.warn(
  419. "report_stats enabled but psutil is not installed or incorrect version."
  420. " Disabling reporting of memory/cpu stats."
  421. " Ensuring psutil is available will help matrix.org track performance"
  422. " changes across releases."
  423. )
  424. def generate_user_daily_visit_stats():
  425. hs.get_datastore().generate_user_daily_visits()
  426. # Rather than update on per session basis, batch up the requests.
  427. # If you increase the loop period, the accuracy of user_daily_visits
  428. # table will decrease
  429. clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
  430. if hs.config.report_stats:
  431. logger.info("Scheduling stats reporting for 3 hour intervals")
  432. clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
  433. # We need to defer this init for the cases that we daemonize
  434. # otherwise the process ID we get is that of the non-daemon process
  435. clock.call_later(0, performance_stats_init)
  436. # We wait 5 minutes to send the first set of stats as the server can
  437. # be quite busy the first few minutes
  438. clock.call_later(5 * 60, phone_stats_home)
  439. if hs.config.daemonize and hs.config.print_pidfile:
  440. print (hs.config.pid_file)
  441. _base.start_reactor(
  442. "synapse-homeserver",
  443. hs.config.soft_file_limit,
  444. hs.config.gc_thresholds,
  445. hs.config.pid_file,
  446. hs.config.daemonize,
  447. hs.config.cpu_affinity,
  448. logger,
  449. )
  450. def main():
  451. with LoggingContext("main"):
  452. # check base requirements
  453. check_requirements()
  454. hs = setup(sys.argv[1:])
  455. run(hs)
  456. if __name__ == '__main__':
  457. main()