123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from synapse.api.errors import (
- cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
- )
- from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
- from synapse.util.caches import intern_dict
- from synapse.util.metrics import Measure
- import synapse.metrics
- import synapse.events
- from canonicaljson import (
- encode_canonical_json, encode_pretty_printed_json
- )
- from twisted.internet import defer
- from twisted.python import failure
- from twisted.web import server, resource
- from twisted.web.server import NOT_DONE_YET
- from twisted.web.util import redirectTo
- import collections
- import logging
- import urllib
- import ujson
- logger = logging.getLogger(__name__)
- metrics = synapse.metrics.get_metrics_for(__name__)
- # total number of responses served, split by method/servlet/tag
- response_count = metrics.register_counter(
- "response_count",
- labels=["method", "servlet", "tag"],
- alternative_names=(
- # the following are all deprecated aliases for the same metric
- metrics.name_prefix + x for x in (
- "_requests",
- "_response_time:count",
- "_response_ru_utime:count",
- "_response_ru_stime:count",
- "_response_db_txn_count:count",
- "_response_db_txn_duration:count",
- )
- )
- )
- outgoing_responses_counter = metrics.register_counter(
- "responses",
- labels=["method", "code"],
- )
- response_timer = metrics.register_counter(
- "response_time_seconds",
- labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_time:total",
- ),
- )
- response_ru_utime = metrics.register_counter(
- "response_ru_utime_seconds", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_ru_utime:total",
- ),
- )
- response_ru_stime = metrics.register_counter(
- "response_ru_stime_seconds", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_ru_stime:total",
- ),
- )
- response_db_txn_count = metrics.register_counter(
- "response_db_txn_count", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_db_txn_count:total",
- ),
- )
- # seconds spent waiting for db txns, excluding scheduling time, when processing
- # this request
- response_db_txn_duration = metrics.register_counter(
- "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_db_txn_duration:total",
- ),
- )
- # seconds spent waiting for a db connection, when processing this request
- response_db_sched_duration = metrics.register_counter(
- "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
- )
- _next_request_id = 0
- def request_handler(include_metrics=False):
- """Decorator for ``wrap_request_handler``"""
- return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
- def wrap_request_handler(request_handler, include_metrics=False):
- """Wraps a method that acts as a request handler with the necessary logging
- and exception handling.
- The method must have a signature of "handle_foo(self, request)". The
- argument "self" must have "version_string" and "clock" attributes. The
- argument "request" must be a twisted HTTP request.
- The method must return a deferred. If the deferred succeeds we assume that
- a response has been sent. If the deferred fails with a SynapseError we use
- it to send a JSON response with the appropriate HTTP reponse code. If the
- deferred fails with any other type of error we send a 500 reponse.
- We insert a unique request-id into the logging context for this request and
- log the response and duration for this request.
- """
- @defer.inlineCallbacks
- def wrapped_request_handler(self, request):
- global _next_request_id
- request_id = "%s-%s" % (request.method, _next_request_id)
- _next_request_id += 1
- with LoggingContext(request_id) as request_context:
- with Measure(self.clock, "wrapped_request_handler"):
- request_metrics = RequestMetrics()
- # we start the request metrics timer here with an initial stab
- # at the servlet name. For most requests that name will be
- # JsonResource (or a subclass), and JsonResource._async_render
- # will update it once it picks a servlet.
- request_metrics.start(self.clock, name=self.__class__.__name__)
- request_context.request = request_id
- with request.processing():
- try:
- with PreserveLoggingContext(request_context):
- if include_metrics:
- yield request_handler(self, request, request_metrics)
- else:
- yield request_handler(self, request)
- except CodeMessageException as e:
- code = e.code
- if isinstance(e, SynapseError):
- logger.info(
- "%s SynapseError: %s - %s", request, code, e.msg
- )
- else:
- logger.exception(e)
- outgoing_responses_counter.inc(request.method, str(code))
- respond_with_json(
- request, code, cs_exception(e), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- )
- except Exception:
- # failure.Failure() fishes the original Failure out
- # of our stack, and thus gives us a sensible stack
- # trace.
- f = failure.Failure()
- logger.error(
- "Failed handle request %s.%s on %r: %r: %s",
- request_handler.__module__,
- request_handler.__name__,
- self,
- request,
- f.getTraceback().rstrip(),
- )
- respond_with_json(
- request,
- 500,
- {
- "error": "Internal server error",
- "errcode": Codes.UNKNOWN,
- },
- send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- )
- finally:
- try:
- request_metrics.stop(
- self.clock, request
- )
- except Exception as e:
- logger.warn("Failed to stop metrics: %r", e)
- return wrapped_request_handler
- class HttpServer(object):
- """ Interface for registering callbacks on a HTTP server
- """
- def register_paths(self, method, path_patterns, callback):
- """ Register a callback that gets fired if we receive a http request
- with the given method for a path that matches the given regex.
- If the regex contains groups these gets passed to the calback via
- an unpacked tuple.
- Args:
- method (str): The method to listen to.
- path_patterns (list<SRE_Pattern>): The regex used to match requests.
- callback (function): The function to fire if we receive a matched
- request. The first argument will be the request object and
- subsequent arguments will be any matched groups from the regex.
- This should return a tuple of (code, response).
- """
- pass
- class JsonResource(HttpServer, resource.Resource):
- """ This implements the HttpServer interface and provides JSON support for
- Resources.
- Register callbacks via register_path()
- Callbacks can return a tuple of status code and a dict in which case the
- the dict will automatically be sent to the client as a JSON object.
- The JsonResource is primarily intended for returning JSON, but callbacks
- may send something other than JSON, they may do so by using the methods
- on the request object and instead returning None.
- """
- isLeaf = True
- _PathEntry = collections.namedtuple("_PathEntry", ["pattern", "callback"])
- def __init__(self, hs, canonical_json=True):
- resource.Resource.__init__(self)
- self.canonical_json = canonical_json
- self.clock = hs.get_clock()
- self.path_regexs = {}
- self.version_string = hs.version_string
- self.hs = hs
- def register_paths(self, method, path_patterns, callback):
- for path_pattern in path_patterns:
- logger.debug("Registering for %s %s", method, path_pattern.pattern)
- self.path_regexs.setdefault(method, []).append(
- self._PathEntry(path_pattern, callback)
- )
- def render(self, request):
- """ This gets called by twisted every time someone sends us a request.
- """
- self._async_render(request)
- return server.NOT_DONE_YET
- # Disable metric reporting because _async_render does its own metrics.
- # It does its own metric reporting because _async_render dispatches to
- # a callback and it's the class name of that callback we want to report
- # against rather than the JsonResource itself.
- @request_handler(include_metrics=True)
- @defer.inlineCallbacks
- def _async_render(self, request, request_metrics):
- """ This gets called from render() every time someone sends us a request.
- This checks if anyone has registered a callback for that method and
- path.
- """
- if request.method == "OPTIONS":
- self._send_response(request, 200, {})
- return
- # Loop through all the registered callbacks to check if the method
- # and path regex match
- for path_entry in self.path_regexs.get(request.method, []):
- m = path_entry.pattern.match(request.path)
- if not m:
- continue
- # We found a match! First update the metrics object to indicate
- # which servlet is handling the request.
- callback = path_entry.callback
- servlet_instance = getattr(callback, "__self__", None)
- if servlet_instance is not None:
- servlet_classname = servlet_instance.__class__.__name__
- else:
- servlet_classname = "%r" % callback
- request_metrics.name = servlet_classname
- # Now trigger the callback. If it returns a response, we send it
- # here. If it throws an exception, that is handled by the wrapper
- # installed by @request_handler.
- kwargs = intern_dict({
- name: urllib.unquote(value).decode("UTF-8") if value else value
- for name, value in m.groupdict().items()
- })
- callback_return = yield callback(request, **kwargs)
- if callback_return is not None:
- code, response = callback_return
- self._send_response(request, code, response)
- return
- # Huh. No one wanted to handle that? Fiiiiiine. Send 400.
- request_metrics.name = self.__class__.__name__ + ".UnrecognizedRequest"
- raise UnrecognizedRequestError()
- def _send_response(self, request, code, response_json_object,
- response_code_message=None):
- outgoing_responses_counter.inc(request.method, str(code))
- # TODO: Only enable CORS for the requests that need it.
- respond_with_json(
- request, code, response_json_object,
- send_cors=True,
- response_code_message=response_code_message,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- canonical_json=self.canonical_json,
- )
- class RequestMetrics(object):
- def start(self, clock, name):
- self.start = clock.time_msec()
- self.start_context = LoggingContext.current_context()
- self.name = name
- def stop(self, clock, request):
- context = LoggingContext.current_context()
- tag = ""
- if context:
- tag = context.tag
- if context != self.start_context:
- logger.warn(
- "Context have unexpectedly changed %r, %r",
- context, self.start_context
- )
- return
- response_count.inc(request.method, self.name, tag)
- response_timer.inc_by(
- clock.time_msec() - self.start, request.method,
- self.name, tag
- )
- ru_utime, ru_stime = context.get_resource_usage()
- response_ru_utime.inc_by(
- ru_utime, request.method, self.name, tag
- )
- response_ru_stime.inc_by(
- ru_stime, request.method, self.name, tag
- )
- response_db_txn_count.inc_by(
- context.db_txn_count, request.method, self.name, tag
- )
- response_db_txn_duration.inc_by(
- context.db_txn_duration_ms / 1000., request.method, self.name, tag
- )
- response_db_sched_duration.inc_by(
- context.db_sched_duration_ms / 1000., request.method, self.name, tag
- )
- class RootRedirect(resource.Resource):
- """Redirects the root '/' path to another path."""
- def __init__(self, path):
- resource.Resource.__init__(self)
- self.url = path
- def render_GET(self, request):
- return redirectTo(self.url, request)
- def getChild(self, name, request):
- if len(name) == 0:
- return self # select ourselves as the child to render
- return resource.Resource.getChild(self, name, request)
- def respond_with_json(request, code, json_object, send_cors=False,
- response_code_message=None, pretty_print=False,
- version_string="", canonical_json=True):
- # could alternatively use request.notifyFinish() and flip a flag when
- # the Deferred fires, but since the flag is RIGHT THERE it seems like
- # a waste.
- if request._disconnected:
- logger.warn(
- "Not sending response to request %s, already disconnected.",
- request)
- return
- if pretty_print:
- json_bytes = encode_pretty_printed_json(json_object) + "\n"
- else:
- if canonical_json or synapse.events.USE_FROZEN_DICTS:
- json_bytes = encode_canonical_json(json_object)
- else:
- # ujson doesn't like frozen_dicts.
- json_bytes = ujson.dumps(json_object, ensure_ascii=False)
- return respond_with_json_bytes(
- request, code, json_bytes,
- send_cors=send_cors,
- response_code_message=response_code_message,
- version_string=version_string
- )
- def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
- version_string="", response_code_message=None):
- """Sends encoded JSON in response to the given request.
- Args:
- request (twisted.web.http.Request): The http request to respond to.
- code (int): The HTTP response code.
- json_bytes (bytes): The json bytes to use as the response body.
- send_cors (bool): Whether to send Cross-Origin Resource Sharing headers
- http://www.w3.org/TR/cors/
- Returns:
- twisted.web.server.NOT_DONE_YET"""
- request.setResponseCode(code, message=response_code_message)
- request.setHeader(b"Content-Type", b"application/json")
- request.setHeader(b"Server", version_string)
- request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
- if send_cors:
- set_cors_headers(request)
- request.write(json_bytes)
- finish_request(request)
- return NOT_DONE_YET
- def set_cors_headers(request):
- """Set the CORs headers so that javascript running in a web browsers can
- use this API
- Args:
- request (twisted.web.http.Request): The http request to add CORs to.
- """
- request.setHeader("Access-Control-Allow-Origin", "*")
- request.setHeader(
- "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"
- )
- request.setHeader(
- "Access-Control-Allow-Headers",
- "Origin, X-Requested-With, Content-Type, Accept, Authorization"
- )
- def finish_request(request):
- """ Finish writing the response to the request.
- Twisted throws a RuntimeException if the connection closed before the
- response was written but doesn't provide a convenient or reliable way to
- determine if the connection was closed. So we catch and log the RuntimeException
- You might think that ``request.notifyFinish`` could be used to tell if the
- request was finished. However the deferred it returns won't fire if the
- connection was already closed, meaning we'd have to have called the method
- right at the start of the request. By the time we want to write the response
- it will already be too late.
- """
- try:
- request.finish()
- except RuntimeError as e:
- logger.info("Connection disconnected before response was written: %r", e)
- def _request_user_agent_is_curl(request):
- user_agents = request.requestHeaders.getRawHeaders(
- "User-Agent", default=[]
- )
- for user_agent in user_agents:
- if "curl" in user_agent:
- return True
- return False
|