site.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import contextlib
  15. import logging
  16. import time
  17. from twisted.web.server import Request, Site
  18. from synapse.http import redact_uri
  19. from synapse.http.request_metrics import RequestMetrics, requests_counter
  20. from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
  21. logger = logging.getLogger(__name__)
  22. _next_request_seq = 0
  23. class SynapseRequest(Request):
  24. """Class which encapsulates an HTTP request to synapse.
  25. All of the requests processed in synapse are of this type.
  26. It extends twisted's twisted.web.server.Request, and adds:
  27. * Unique request ID
  28. * A log context associated with the request
  29. * Redaction of access_token query-params in __repr__
  30. * Logging at start and end
  31. * Metrics to record CPU, wallclock and DB time by endpoint.
  32. It also provides a method `processing`, which returns a context manager. If this
  33. method is called, the request won't be logged until the context manager is closed;
  34. this is useful for asynchronous request handlers which may go on processing the
  35. request even after the client has disconnected.
  36. Attributes:
  37. logcontext(LoggingContext) : the log context for this request
  38. """
  39. def __init__(self, site, channel, *args, **kw):
  40. Request.__init__(self, channel, *args, **kw)
  41. self.site = site
  42. self._channel = channel # this is used by the tests
  43. self.authenticated_entity = None
  44. self.start_time = 0
  45. # we can't yet create the logcontext, as we don't know the method.
  46. self.logcontext = None
  47. global _next_request_seq
  48. self.request_seq = _next_request_seq
  49. _next_request_seq += 1
  50. # whether an asynchronous request handler has called processing()
  51. self._is_processing = False
  52. # the time when the asynchronous request handler completed its processing
  53. self._processing_finished_time = None
  54. # what time we finished sending the response to the client (or the connection
  55. # dropped)
  56. self.finish_time = None
  57. def __repr__(self):
  58. # We overwrite this so that we don't log ``access_token``
  59. return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
  60. self.__class__.__name__,
  61. id(self),
  62. self.get_method(),
  63. self.get_redacted_uri(),
  64. self.clientproto.decode('ascii', errors='replace'),
  65. self.site.site_tag,
  66. )
  67. def get_request_id(self):
  68. return "%s-%i" % (self.get_method(), self.request_seq)
  69. def get_redacted_uri(self):
  70. uri = self.uri
  71. if isinstance(uri, bytes):
  72. uri = self.uri.decode('ascii')
  73. return redact_uri(uri)
  74. def get_method(self):
  75. """Gets the method associated with the request (or placeholder if not
  76. method has yet been received).
  77. Note: This is necessary as the placeholder value in twisted is str
  78. rather than bytes, so we need to sanitise `self.method`.
  79. Returns:
  80. str
  81. """
  82. method = self.method
  83. if isinstance(method, bytes):
  84. method = self.method.decode('ascii')
  85. return method
  86. def get_user_agent(self):
  87. return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
  88. def render(self, resrc):
  89. # this is called once a Resource has been found to serve the request; in our
  90. # case the Resource in question will normally be a JsonResource.
  91. # create a LogContext for this request
  92. request_id = self.get_request_id()
  93. logcontext = self.logcontext = LoggingContext(request_id)
  94. logcontext.request = request_id
  95. # override the Server header which is set by twisted
  96. self.setHeader("Server", self.site.server_version_string)
  97. with PreserveLoggingContext(self.logcontext):
  98. # we start the request metrics timer here with an initial stab
  99. # at the servlet name. For most requests that name will be
  100. # JsonResource (or a subclass), and JsonResource._async_render
  101. # will update it once it picks a servlet.
  102. servlet_name = resrc.__class__.__name__
  103. self._started_processing(servlet_name)
  104. Request.render(self, resrc)
  105. # record the arrival of the request *after*
  106. # dispatching to the handler, so that the handler
  107. # can update the servlet name in the request
  108. # metrics
  109. requests_counter.labels(self.get_method(),
  110. self.request_metrics.name).inc()
  111. @contextlib.contextmanager
  112. def processing(self):
  113. """Record the fact that we are processing this request.
  114. Returns a context manager; the correct way to use this is:
  115. @defer.inlineCallbacks
  116. def handle_request(request):
  117. with request.processing("FooServlet"):
  118. yield really_handle_the_request()
  119. Once the context manager is closed, the completion of the request will be logged,
  120. and the various metrics will be updated.
  121. """
  122. if self._is_processing:
  123. raise RuntimeError("Request is already processing")
  124. self._is_processing = True
  125. try:
  126. yield
  127. except Exception:
  128. # this should already have been caught, and sent back to the client as a 500.
  129. logger.exception("Asynchronous messge handler raised an uncaught exception")
  130. finally:
  131. # the request handler has finished its work and either sent the whole response
  132. # back, or handed over responsibility to a Producer.
  133. self._processing_finished_time = time.time()
  134. self._is_processing = False
  135. # if we've already sent the response, log it now; otherwise, we wait for the
  136. # response to be sent.
  137. if self.finish_time is not None:
  138. self._finished_processing()
  139. def finish(self):
  140. """Called when all response data has been written to this Request.
  141. Overrides twisted.web.server.Request.finish to record the finish time and do
  142. logging.
  143. """
  144. self.finish_time = time.time()
  145. Request.finish(self)
  146. if not self._is_processing:
  147. with PreserveLoggingContext(self.logcontext):
  148. self._finished_processing()
  149. def connectionLost(self, reason):
  150. """Called when the client connection is closed before the response is written.
  151. Overrides twisted.web.server.Request.connectionLost to record the finish time and
  152. do logging.
  153. """
  154. self.finish_time = time.time()
  155. Request.connectionLost(self, reason)
  156. # we only get here if the connection to the client drops before we send
  157. # the response.
  158. #
  159. # It's useful to log it here so that we can get an idea of when
  160. # the client disconnects.
  161. with PreserveLoggingContext(self.logcontext):
  162. logger.warn(
  163. "Error processing request %r: %s %s", self, reason.type, reason.value,
  164. )
  165. if not self._is_processing:
  166. self._finished_processing()
  167. def _started_processing(self, servlet_name):
  168. """Record the fact that we are processing this request.
  169. This will log the request's arrival. Once the request completes,
  170. be sure to call finished_processing.
  171. Args:
  172. servlet_name (str): the name of the servlet which will be
  173. processing this request. This is used in the metrics.
  174. It is possible to update this afterwards by updating
  175. self.request_metrics.name.
  176. """
  177. self.start_time = time.time()
  178. self.request_metrics = RequestMetrics()
  179. self.request_metrics.start(
  180. self.start_time, name=servlet_name, method=self.get_method(),
  181. )
  182. self.site.access_logger.info(
  183. "%s - %s - Received request: %s %s",
  184. self.getClientIP(),
  185. self.site.site_tag,
  186. self.get_method(),
  187. self.get_redacted_uri()
  188. )
  189. def _finished_processing(self):
  190. """Log the completion of this request and update the metrics
  191. """
  192. if self.logcontext is None:
  193. # this can happen if the connection closed before we read the
  194. # headers (so render was never called). In that case we'll already
  195. # have logged a warning, so just bail out.
  196. return
  197. usage = self.logcontext.get_resource_usage()
  198. if self._processing_finished_time is None:
  199. # we completed the request without anything calling processing()
  200. self._processing_finished_time = time.time()
  201. # the time between receiving the request and the request handler finishing
  202. processing_time = self._processing_finished_time - self.start_time
  203. # the time between the request handler finishing and the response being sent
  204. # to the client (nb may be negative)
  205. response_send_time = self.finish_time - self._processing_finished_time
  206. # need to decode as it could be raw utf-8 bytes
  207. # from a IDN servname in an auth header
  208. authenticated_entity = self.authenticated_entity
  209. if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
  210. authenticated_entity = authenticated_entity.decode("utf-8", "replace")
  211. # ...or could be raw utf-8 bytes in the User-Agent header.
  212. # N.B. if you don't do this, the logger explodes cryptically
  213. # with maximum recursion trying to log errors about
  214. # the charset problem.
  215. # c.f. https://github.com/matrix-org/synapse/issues/3471
  216. user_agent = self.get_user_agent()
  217. if user_agent is not None:
  218. user_agent = user_agent.decode("utf-8", "replace")
  219. else:
  220. user_agent = "-"
  221. code = str(self.code)
  222. if not self.finished:
  223. # we didn't send the full response before we gave up (presumably because
  224. # the connection dropped)
  225. code += "!"
  226. self.site.access_logger.info(
  227. "%s - %s - {%s}"
  228. " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
  229. " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
  230. self.getClientIP(),
  231. self.site.site_tag,
  232. authenticated_entity,
  233. processing_time,
  234. response_send_time,
  235. usage.ru_utime,
  236. usage.ru_stime,
  237. usage.db_sched_duration_sec,
  238. usage.db_txn_duration_sec,
  239. int(usage.db_txn_count),
  240. self.sentLength,
  241. code,
  242. self.get_method(),
  243. self.get_redacted_uri(),
  244. self.clientproto.decode('ascii', errors='replace'),
  245. user_agent,
  246. usage.evt_db_fetch_count,
  247. )
  248. try:
  249. self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
  250. except Exception as e:
  251. logger.warn("Failed to stop metrics: %r", e)
  252. class XForwardedForRequest(SynapseRequest):
  253. def __init__(self, *args, **kw):
  254. SynapseRequest.__init__(self, *args, **kw)
  255. """
  256. Add a layer on top of another request that only uses the value of an
  257. X-Forwarded-For header as the result of C{getClientIP}.
  258. """
  259. def getClientIP(self):
  260. """
  261. @return: The client address (the first address) in the value of the
  262. I{X-Forwarded-For header}. If the header is not present, return
  263. C{b"-"}.
  264. """
  265. return self.requestHeaders.getRawHeaders(
  266. b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
  267. class SynapseRequestFactory(object):
  268. def __init__(self, site, x_forwarded_for):
  269. self.site = site
  270. self.x_forwarded_for = x_forwarded_for
  271. def __call__(self, *args, **kwargs):
  272. if self.x_forwarded_for:
  273. return XForwardedForRequest(self.site, *args, **kwargs)
  274. else:
  275. return SynapseRequest(self.site, *args, **kwargs)
  276. class SynapseSite(Site):
  277. """
  278. Subclass of a twisted http Site that does access logging with python's
  279. standard logging
  280. """
  281. def __init__(self, logger_name, site_tag, config, resource,
  282. server_version_string, *args, **kwargs):
  283. Site.__init__(self, resource, *args, **kwargs)
  284. self.site_tag = site_tag
  285. proxied = config.get("x_forwarded", False)
  286. self.requestFactory = SynapseRequestFactory(self, proxied)
  287. self.access_logger = logging.getLogger(logger_name)
  288. self.server_version_string = server_version_string.encode('ascii')
  289. def log(self, request):
  290. pass