site.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import contextlib
  15. import logging
  16. import time
  17. from twisted.web.server import Request, Site
  18. from synapse.http import redact_uri
  19. from synapse.http.request_metrics import RequestMetrics, requests_counter
  20. from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
  21. logger = logging.getLogger(__name__)
  22. _next_request_seq = 0
  23. class SynapseRequest(Request):
  24. """Class which encapsulates an HTTP request to synapse.
  25. All of the requests processed in synapse are of this type.
  26. It extends twisted's twisted.web.server.Request, and adds:
  27. * Unique request ID
  28. * A log context associated with the request
  29. * Redaction of access_token query-params in __repr__
  30. * Logging at start and end
  31. * Metrics to record CPU, wallclock and DB time by endpoint.
  32. It also provides a method `processing`, which returns a context manager. If this
  33. method is called, the request won't be logged until the context manager is closed;
  34. this is useful for asynchronous request handlers which may go on processing the
  35. request even after the client has disconnected.
  36. Attributes:
  37. logcontext(LoggingContext) : the log context for this request
  38. """
  39. def __init__(self, site, channel, *args, **kw):
  40. Request.__init__(self, channel, *args, **kw)
  41. self.site = site
  42. self._channel = channel # this is used by the tests
  43. self.authenticated_entity = None
  44. self.start_time = 0
  45. # we can't yet create the logcontext, as we don't know the method.
  46. self.logcontext = None
  47. global _next_request_seq
  48. self.request_seq = _next_request_seq
  49. _next_request_seq += 1
  50. # whether an asynchronous request handler has called processing()
  51. self._is_processing = False
  52. # the time when the asynchronous request handler completed its processing
  53. self._processing_finished_time = None
  54. # what time we finished sending the response to the client (or the connection
  55. # dropped)
  56. self.finish_time = None
  57. def __repr__(self):
  58. # We overwrite this so that we don't log ``access_token``
  59. return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
  60. self.__class__.__name__,
  61. id(self),
  62. self.method,
  63. self.get_redacted_uri(),
  64. self.clientproto,
  65. self.site.site_tag,
  66. )
  67. def get_request_id(self):
  68. return "%s-%i" % (self.method, self.request_seq)
  69. def get_redacted_uri(self):
  70. return redact_uri(self.uri)
  71. def get_user_agent(self):
  72. return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
  73. def render(self, resrc):
  74. # this is called once a Resource has been found to serve the request; in our
  75. # case the Resource in question will normally be a JsonResource.
  76. # create a LogContext for this request
  77. request_id = self.get_request_id()
  78. logcontext = self.logcontext = LoggingContext(request_id)
  79. logcontext.request = request_id
  80. # override the Server header which is set by twisted
  81. self.setHeader("Server", self.site.server_version_string)
  82. with PreserveLoggingContext(self.logcontext):
  83. # we start the request metrics timer here with an initial stab
  84. # at the servlet name. For most requests that name will be
  85. # JsonResource (or a subclass), and JsonResource._async_render
  86. # will update it once it picks a servlet.
  87. servlet_name = resrc.__class__.__name__
  88. self._started_processing(servlet_name)
  89. Request.render(self, resrc)
  90. # record the arrival of the request *after*
  91. # dispatching to the handler, so that the handler
  92. # can update the servlet name in the request
  93. # metrics
  94. requests_counter.labels(self.method,
  95. self.request_metrics.name).inc()
  96. @contextlib.contextmanager
  97. def processing(self):
  98. """Record the fact that we are processing this request.
  99. Returns a context manager; the correct way to use this is:
  100. @defer.inlineCallbacks
  101. def handle_request(request):
  102. with request.processing("FooServlet"):
  103. yield really_handle_the_request()
  104. Once the context manager is closed, the completion of the request will be logged,
  105. and the various metrics will be updated.
  106. """
  107. if self._is_processing:
  108. raise RuntimeError("Request is already processing")
  109. self._is_processing = True
  110. try:
  111. yield
  112. except Exception:
  113. # this should already have been caught, and sent back to the client as a 500.
  114. logger.exception("Asynchronous messge handler raised an uncaught exception")
  115. finally:
  116. # the request handler has finished its work and either sent the whole response
  117. # back, or handed over responsibility to a Producer.
  118. self._processing_finished_time = time.time()
  119. self._is_processing = False
  120. # if we've already sent the response, log it now; otherwise, we wait for the
  121. # response to be sent.
  122. if self.finish_time is not None:
  123. self._finished_processing()
  124. def finish(self):
  125. """Called when all response data has been written to this Request.
  126. Overrides twisted.web.server.Request.finish to record the finish time and do
  127. logging.
  128. """
  129. self.finish_time = time.time()
  130. Request.finish(self)
  131. if not self._is_processing:
  132. with PreserveLoggingContext(self.logcontext):
  133. self._finished_processing()
  134. def connectionLost(self, reason):
  135. """Called when the client connection is closed before the response is written.
  136. Overrides twisted.web.server.Request.connectionLost to record the finish time and
  137. do logging.
  138. """
  139. self.finish_time = time.time()
  140. Request.connectionLost(self, reason)
  141. # we only get here if the connection to the client drops before we send
  142. # the response.
  143. #
  144. # It's useful to log it here so that we can get an idea of when
  145. # the client disconnects.
  146. with PreserveLoggingContext(self.logcontext):
  147. logger.warn(
  148. "Error processing request %r: %s %s", self, reason.type, reason.value,
  149. )
  150. if not self._is_processing:
  151. self._finished_processing()
  152. def _started_processing(self, servlet_name):
  153. """Record the fact that we are processing this request.
  154. This will log the request's arrival. Once the request completes,
  155. be sure to call finished_processing.
  156. Args:
  157. servlet_name (str): the name of the servlet which will be
  158. processing this request. This is used in the metrics.
  159. It is possible to update this afterwards by updating
  160. self.request_metrics.name.
  161. """
  162. self.start_time = time.time()
  163. self.request_metrics = RequestMetrics()
  164. self.request_metrics.start(
  165. self.start_time, name=servlet_name, method=self.method.decode('ascii'),
  166. )
  167. self.site.access_logger.info(
  168. "%s - %s - Received request: %s %s",
  169. self.getClientIP(),
  170. self.site.site_tag,
  171. self.method.decode('ascii'),
  172. self.get_redacted_uri()
  173. )
  174. def _finished_processing(self):
  175. """Log the completion of this request and update the metrics
  176. """
  177. if self.logcontext is None:
  178. # this can happen if the connection closed before we read the
  179. # headers (so render was never called). In that case we'll already
  180. # have logged a warning, so just bail out.
  181. return
  182. usage = self.logcontext.get_resource_usage()
  183. if self._processing_finished_time is None:
  184. # we completed the request without anything calling processing()
  185. self._processing_finished_time = time.time()
  186. # the time between receiving the request and the request handler finishing
  187. processing_time = self._processing_finished_time - self.start_time
  188. # the time between the request handler finishing and the response being sent
  189. # to the client (nb may be negative)
  190. response_send_time = self.finish_time - self._processing_finished_time
  191. # need to decode as it could be raw utf-8 bytes
  192. # from a IDN servname in an auth header
  193. authenticated_entity = self.authenticated_entity
  194. if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
  195. authenticated_entity = authenticated_entity.decode("utf-8", "replace")
  196. # ...or could be raw utf-8 bytes in the User-Agent header.
  197. # N.B. if you don't do this, the logger explodes cryptically
  198. # with maximum recursion trying to log errors about
  199. # the charset problem.
  200. # c.f. https://github.com/matrix-org/synapse/issues/3471
  201. user_agent = self.get_user_agent()
  202. if user_agent is not None:
  203. user_agent = user_agent.decode("utf-8", "replace")
  204. else:
  205. user_agent = "-"
  206. code = str(self.code)
  207. if not self.finished:
  208. # we didn't send the full response before we gave up (presumably because
  209. # the connection dropped)
  210. code += "!"
  211. self.site.access_logger.info(
  212. "%s - %s - {%s}"
  213. " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
  214. " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
  215. self.getClientIP(),
  216. self.site.site_tag,
  217. authenticated_entity,
  218. processing_time,
  219. response_send_time,
  220. usage.ru_utime,
  221. usage.ru_stime,
  222. usage.db_sched_duration_sec,
  223. usage.db_txn_duration_sec,
  224. int(usage.db_txn_count),
  225. self.sentLength,
  226. code,
  227. self.method,
  228. self.get_redacted_uri(),
  229. self.clientproto,
  230. user_agent,
  231. usage.evt_db_fetch_count,
  232. )
  233. try:
  234. self.request_metrics.stop(self.finish_time, self)
  235. except Exception as e:
  236. logger.warn("Failed to stop metrics: %r", e)
  237. class XForwardedForRequest(SynapseRequest):
  238. def __init__(self, *args, **kw):
  239. SynapseRequest.__init__(self, *args, **kw)
  240. """
  241. Add a layer on top of another request that only uses the value of an
  242. X-Forwarded-For header as the result of C{getClientIP}.
  243. """
  244. def getClientIP(self):
  245. """
  246. @return: The client address (the first address) in the value of the
  247. I{X-Forwarded-For header}. If the header is not present, return
  248. C{b"-"}.
  249. """
  250. return self.requestHeaders.getRawHeaders(
  251. b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
  252. class SynapseRequestFactory(object):
  253. def __init__(self, site, x_forwarded_for):
  254. self.site = site
  255. self.x_forwarded_for = x_forwarded_for
  256. def __call__(self, *args, **kwargs):
  257. if self.x_forwarded_for:
  258. return XForwardedForRequest(self.site, *args, **kwargs)
  259. else:
  260. return SynapseRequest(self.site, *args, **kwargs)
  261. class SynapseSite(Site):
  262. """
  263. Subclass of a twisted http Site that does access logging with python's
  264. standard logging
  265. """
  266. def __init__(self, logger_name, site_tag, config, resource,
  267. server_version_string, *args, **kwargs):
  268. Site.__init__(self, resource, *args, **kwargs)
  269. self.site_tag = site_tag
  270. proxied = config.get("x_forwarded", False)
  271. self.requestFactory = SynapseRequestFactory(self, proxied)
  272. self.access_logger = logging.getLogger(logger_name)
  273. self.server_version_string = server_version_string.encode('ascii')
  274. def log(self, request):
  275. pass