site.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import contextlib
  15. import logging
  16. import time
  17. from twisted.web.server import Request, Site
  18. from synapse.http import redact_uri
  19. from synapse.http.request_metrics import RequestMetrics, requests_counter
  20. from synapse.logging.context import LoggingContext, PreserveLoggingContext
  21. logger = logging.getLogger(__name__)
  22. _next_request_seq = 0
  23. class SynapseRequest(Request):
  24. """Class which encapsulates an HTTP request to synapse.
  25. All of the requests processed in synapse are of this type.
  26. It extends twisted's twisted.web.server.Request, and adds:
  27. * Unique request ID
  28. * A log context associated with the request
  29. * Redaction of access_token query-params in __repr__
  30. * Logging at start and end
  31. * Metrics to record CPU, wallclock and DB time by endpoint.
  32. It also provides a method `processing`, which returns a context manager. If this
  33. method is called, the request won't be logged until the context manager is closed;
  34. this is useful for asynchronous request handlers which may go on processing the
  35. request even after the client has disconnected.
  36. Attributes:
  37. logcontext(LoggingContext) : the log context for this request
  38. """
  39. def __init__(self, site, channel, *args, **kw):
  40. Request.__init__(self, channel, *args, **kw)
  41. self.site = site
  42. self._channel = channel # this is used by the tests
  43. self.authenticated_entity = None
  44. self.start_time = 0
  45. # we can't yet create the logcontext, as we don't know the method.
  46. self.logcontext = None
  47. global _next_request_seq
  48. self.request_seq = _next_request_seq
  49. _next_request_seq += 1
  50. # whether an asynchronous request handler has called processing()
  51. self._is_processing = False
  52. # the time when the asynchronous request handler completed its processing
  53. self._processing_finished_time = None
  54. # what time we finished sending the response to the client (or the connection
  55. # dropped)
  56. self.finish_time = None
  57. def __repr__(self):
  58. # We overwrite this so that we don't log ``access_token``
  59. return "<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>" % (
  60. self.__class__.__name__,
  61. id(self),
  62. self.get_method(),
  63. self.get_redacted_uri(),
  64. self.clientproto.decode("ascii", errors="replace"),
  65. self.site.site_tag,
  66. )
  67. def get_request_id(self):
  68. return "%s-%i" % (self.get_method(), self.request_seq)
  69. def get_redacted_uri(self):
  70. uri = self.uri
  71. if isinstance(uri, bytes):
  72. uri = self.uri.decode("ascii")
  73. return redact_uri(uri)
  74. def get_method(self):
  75. """Gets the method associated with the request (or placeholder if not
  76. method has yet been received).
  77. Note: This is necessary as the placeholder value in twisted is str
  78. rather than bytes, so we need to sanitise `self.method`.
  79. Returns:
  80. str
  81. """
  82. method = self.method
  83. if isinstance(method, bytes):
  84. method = self.method.decode("ascii")
  85. return method
  86. def get_user_agent(self):
  87. return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
  88. def render(self, resrc):
  89. # this is called once a Resource has been found to serve the request; in our
  90. # case the Resource in question will normally be a JsonResource.
  91. # create a LogContext for this request
  92. request_id = self.get_request_id()
  93. logcontext = self.logcontext = LoggingContext(request_id)
  94. logcontext.request = request_id
  95. # override the Server header which is set by twisted
  96. self.setHeader("Server", self.site.server_version_string)
  97. with PreserveLoggingContext(self.logcontext):
  98. # we start the request metrics timer here with an initial stab
  99. # at the servlet name. For most requests that name will be
  100. # JsonResource (or a subclass), and JsonResource._async_render
  101. # will update it once it picks a servlet.
  102. servlet_name = resrc.__class__.__name__
  103. self._started_processing(servlet_name)
  104. Request.render(self, resrc)
  105. # record the arrival of the request *after*
  106. # dispatching to the handler, so that the handler
  107. # can update the servlet name in the request
  108. # metrics
  109. requests_counter.labels(self.get_method(), self.request_metrics.name).inc()
  110. @contextlib.contextmanager
  111. def processing(self):
  112. """Record the fact that we are processing this request.
  113. Returns a context manager; the correct way to use this is:
  114. @defer.inlineCallbacks
  115. def handle_request(request):
  116. with request.processing("FooServlet"):
  117. yield really_handle_the_request()
  118. Once the context manager is closed, the completion of the request will be logged,
  119. and the various metrics will be updated.
  120. """
  121. if self._is_processing:
  122. raise RuntimeError("Request is already processing")
  123. self._is_processing = True
  124. try:
  125. yield
  126. except Exception:
  127. # this should already have been caught, and sent back to the client as a 500.
  128. logger.exception("Asynchronous messge handler raised an uncaught exception")
  129. finally:
  130. # the request handler has finished its work and either sent the whole response
  131. # back, or handed over responsibility to a Producer.
  132. self._processing_finished_time = time.time()
  133. self._is_processing = False
  134. # if we've already sent the response, log it now; otherwise, we wait for the
  135. # response to be sent.
  136. if self.finish_time is not None:
  137. self._finished_processing()
  138. def finish(self):
  139. """Called when all response data has been written to this Request.
  140. Overrides twisted.web.server.Request.finish to record the finish time and do
  141. logging.
  142. """
  143. self.finish_time = time.time()
  144. Request.finish(self)
  145. if not self._is_processing:
  146. with PreserveLoggingContext(self.logcontext):
  147. self._finished_processing()
  148. def connectionLost(self, reason):
  149. """Called when the client connection is closed before the response is written.
  150. Overrides twisted.web.server.Request.connectionLost to record the finish time and
  151. do logging.
  152. """
  153. self.finish_time = time.time()
  154. Request.connectionLost(self, reason)
  155. # we only get here if the connection to the client drops before we send
  156. # the response.
  157. #
  158. # It's useful to log it here so that we can get an idea of when
  159. # the client disconnects.
  160. with PreserveLoggingContext(self.logcontext):
  161. logger.warn(
  162. "Error processing request %r: %s %s", self, reason.type, reason.value
  163. )
  164. if not self._is_processing:
  165. self._finished_processing()
  166. def _started_processing(self, servlet_name):
  167. """Record the fact that we are processing this request.
  168. This will log the request's arrival. Once the request completes,
  169. be sure to call finished_processing.
  170. Args:
  171. servlet_name (str): the name of the servlet which will be
  172. processing this request. This is used in the metrics.
  173. It is possible to update this afterwards by updating
  174. self.request_metrics.name.
  175. """
  176. self.start_time = time.time()
  177. self.request_metrics = RequestMetrics()
  178. self.request_metrics.start(
  179. self.start_time, name=servlet_name, method=self.get_method()
  180. )
  181. self.site.access_logger.info(
  182. "%s - %s - Received request: %s %s",
  183. self.getClientIP(),
  184. self.site.site_tag,
  185. self.get_method(),
  186. self.get_redacted_uri(),
  187. )
  188. def _finished_processing(self):
  189. """Log the completion of this request and update the metrics
  190. """
  191. if self.logcontext is None:
  192. # this can happen if the connection closed before we read the
  193. # headers (so render was never called). In that case we'll already
  194. # have logged a warning, so just bail out.
  195. return
  196. usage = self.logcontext.get_resource_usage()
  197. if self._processing_finished_time is None:
  198. # we completed the request without anything calling processing()
  199. self._processing_finished_time = time.time()
  200. # the time between receiving the request and the request handler finishing
  201. processing_time = self._processing_finished_time - self.start_time
  202. # the time between the request handler finishing and the response being sent
  203. # to the client (nb may be negative)
  204. response_send_time = self.finish_time - self._processing_finished_time
  205. # need to decode as it could be raw utf-8 bytes
  206. # from a IDN servname in an auth header
  207. authenticated_entity = self.authenticated_entity
  208. if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
  209. authenticated_entity = authenticated_entity.decode("utf-8", "replace")
  210. # ...or could be raw utf-8 bytes in the User-Agent header.
  211. # N.B. if you don't do this, the logger explodes cryptically
  212. # with maximum recursion trying to log errors about
  213. # the charset problem.
  214. # c.f. https://github.com/matrix-org/synapse/issues/3471
  215. user_agent = self.get_user_agent()
  216. if user_agent is not None:
  217. user_agent = user_agent.decode("utf-8", "replace")
  218. else:
  219. user_agent = "-"
  220. code = str(self.code)
  221. if not self.finished:
  222. # we didn't send the full response before we gave up (presumably because
  223. # the connection dropped)
  224. code += "!"
  225. self.site.access_logger.info(
  226. "%s - %s - {%s}"
  227. " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
  228. ' %sB %s "%s %s %s" "%s" [%d dbevts]',
  229. self.getClientIP(),
  230. self.site.site_tag,
  231. authenticated_entity,
  232. processing_time,
  233. response_send_time,
  234. usage.ru_utime,
  235. usage.ru_stime,
  236. usage.db_sched_duration_sec,
  237. usage.db_txn_duration_sec,
  238. int(usage.db_txn_count),
  239. self.sentLength,
  240. code,
  241. self.get_method(),
  242. self.get_redacted_uri(),
  243. self.clientproto.decode("ascii", errors="replace"),
  244. user_agent,
  245. usage.evt_db_fetch_count,
  246. )
  247. try:
  248. self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
  249. except Exception as e:
  250. logger.warn("Failed to stop metrics: %r", e)
  251. class XForwardedForRequest(SynapseRequest):
  252. def __init__(self, *args, **kw):
  253. SynapseRequest.__init__(self, *args, **kw)
  254. """
  255. Add a layer on top of another request that only uses the value of an
  256. X-Forwarded-For header as the result of C{getClientIP}.
  257. """
  258. def getClientIP(self):
  259. """
  260. @return: The client address (the first address) in the value of the
  261. I{X-Forwarded-For header}. If the header is not present, return
  262. C{b"-"}.
  263. """
  264. return (
  265. self.requestHeaders.getRawHeaders(b"x-forwarded-for", [b"-"])[0]
  266. .split(b",")[0]
  267. .strip()
  268. .decode("ascii")
  269. )
  270. class SynapseRequestFactory(object):
  271. def __init__(self, site, x_forwarded_for):
  272. self.site = site
  273. self.x_forwarded_for = x_forwarded_for
  274. def __call__(self, *args, **kwargs):
  275. if self.x_forwarded_for:
  276. return XForwardedForRequest(self.site, *args, **kwargs)
  277. else:
  278. return SynapseRequest(self.site, *args, **kwargs)
  279. class SynapseSite(Site):
  280. """
  281. Subclass of a twisted http Site that does access logging with python's
  282. standard logging
  283. """
  284. def __init__(
  285. self,
  286. logger_name,
  287. site_tag,
  288. config,
  289. resource,
  290. server_version_string,
  291. *args,
  292. **kwargs
  293. ):
  294. Site.__init__(self, resource, *args, **kwargs)
  295. self.site_tag = site_tag
  296. proxied = config.get("x_forwarded", False)
  297. self.requestFactory = SynapseRequestFactory(self, proxied)
  298. self.access_logger = logging.getLogger(logger_name)
  299. self.server_version_string = server_version_string.encode("ascii")
  300. def log(self, request):
  301. pass