123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- # Copyright 2019 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import random
- import time
- import attr
- from twisted.internet import defer
- from twisted.internet.error import ConnectError
- from twisted.names import client, dns
- from twisted.names.error import DNSNameError, DomainError
- from synapse.logging.context import make_deferred_yieldable
- logger = logging.getLogger(__name__)
- SERVER_CACHE = {}
- @attr.s
- class Server(object):
- """
- Our record of an individual server which can be tried to reach a destination.
- Attributes:
- host (bytes): target hostname
- port (int):
- priority (int):
- weight (int):
- expires (int): when the cache should expire this record - in *seconds* since
- the epoch
- """
- host = attr.ib()
- port = attr.ib()
- priority = attr.ib(default=0)
- weight = attr.ib(default=0)
- expires = attr.ib(default=0)
- def pick_server_from_list(server_list):
- """Randomly choose a server from the server list
- Args:
- server_list (list[Server]): list of candidate servers
- Returns:
- Tuple[bytes, int]: (host, port) pair for the chosen server
- """
- if not server_list:
- raise RuntimeError("pick_server_from_list called with empty list")
- # TODO: currently we only use the lowest-priority servers. We should maintain a
- # cache of servers known to be "down" and filter them out
- min_priority = min(s.priority for s in server_list)
- eligible_servers = list(s for s in server_list if s.priority == min_priority)
- total_weight = sum(s.weight for s in eligible_servers)
- target_weight = random.randint(0, total_weight)
- for s in eligible_servers:
- target_weight -= s.weight
- if target_weight <= 0:
- return s.host, s.port
- # this should be impossible.
- raise RuntimeError("pick_server_from_list got to end of eligible server list.")
- class SrvResolver(object):
- """Interface to the dns client to do SRV lookups, with result caching.
- The default resolver in twisted.names doesn't do any caching (it has a CacheResolver,
- but the cache never gets populated), so we add our own caching layer here.
- Args:
- dns_client (twisted.internet.interfaces.IResolver): twisted resolver impl
- cache (dict): cache object
- get_time (callable): clock implementation. Should return seconds since the epoch
- """
- def __init__(self, dns_client=client, cache=SERVER_CACHE, get_time=time.time):
- self._dns_client = dns_client
- self._cache = cache
- self._get_time = get_time
- @defer.inlineCallbacks
- def resolve_service(self, service_name):
- """Look up a SRV record
- Args:
- service_name (bytes): record to look up
- Returns:
- Deferred[list[Server]]:
- a list of the SRV records, or an empty list if none found
- """
- now = int(self._get_time())
- if not isinstance(service_name, bytes):
- raise TypeError("%r is not a byte string" % (service_name,))
- cache_entry = self._cache.get(service_name, None)
- if cache_entry:
- if all(s.expires > now for s in cache_entry):
- servers = list(cache_entry)
- return servers
- try:
- answers, _, _ = yield make_deferred_yieldable(
- self._dns_client.lookupService(service_name)
- )
- except DNSNameError:
- # TODO: cache this. We can get the SOA out of the exception, and use
- # the negative-TTL value.
- return []
- except DomainError as e:
- # We failed to resolve the name (other than a NameError)
- # Try something in the cache, else rereaise
- cache_entry = self._cache.get(service_name, None)
- if cache_entry:
- logger.warn(
- "Failed to resolve %r, falling back to cache. %r", service_name, e
- )
- return list(cache_entry)
- else:
- raise e
- if (
- len(answers) == 1
- and answers[0].type == dns.SRV
- and answers[0].payload
- and answers[0].payload.target == dns.Name(b".")
- ):
- raise ConnectError("Service %s unavailable" % service_name)
- servers = []
- for answer in answers:
- if answer.type != dns.SRV or not answer.payload:
- continue
- payload = answer.payload
- servers.append(
- Server(
- host=payload.target.name,
- port=payload.port,
- priority=payload.priority,
- weight=payload.weight,
- expires=now + answer.ttl,
- )
- )
- self._cache[service_name] = list(servers)
- return servers
|