|
@@ -112,8 +112,6 @@ def start_reactor(
|
|
|
run_command (Callable[]): callable that actually runs the reactor
|
|
|
"""
|
|
|
|
|
|
- install_dns_limiter(reactor)
|
|
|
-
|
|
|
def run():
|
|
|
logger.info("Running")
|
|
|
setup_jemalloc_stats()
|
|
@@ -398,107 +396,6 @@ def setup_sdnotify(hs):
|
|
|
)
|
|
|
|
|
|
|
|
|
-def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
|
|
|
- """Replaces the resolver with one that limits the number of in flight DNS
|
|
|
- requests.
|
|
|
-
|
|
|
- This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
|
|
|
- can run out of file descriptors and infinite loop if we attempt to do too
|
|
|
- many DNS queries at once
|
|
|
-
|
|
|
- XXX: I'm confused by this. reactor.nameResolver does not use twisted.names unless
|
|
|
- you explicitly install twisted.names as the resolver; rather it uses a GAIResolver
|
|
|
- backed by the reactor's default threadpool (which is limited to 10 threads). So
|
|
|
- (a) I don't understand why twisted ticket 9620 is relevant, and (b) I don't
|
|
|
- understand why we would run out of FDs if we did too many lookups at once.
|
|
|
- -- richvdh 2020/08/29
|
|
|
- """
|
|
|
- new_resolver = _LimitedHostnameResolver(
|
|
|
- reactor.nameResolver, max_dns_requests_in_flight
|
|
|
- )
|
|
|
-
|
|
|
- reactor.installNameResolver(new_resolver)
|
|
|
-
|
|
|
-
|
|
|
-class _LimitedHostnameResolver:
|
|
|
- """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups."""
|
|
|
-
|
|
|
- def __init__(self, resolver, max_dns_requests_in_flight):
|
|
|
- self._resolver = resolver
|
|
|
- self._limiter = Linearizer(
|
|
|
- name="dns_client_limiter", max_count=max_dns_requests_in_flight
|
|
|
- )
|
|
|
-
|
|
|
- def resolveHostName(
|
|
|
- self,
|
|
|
- resolutionReceiver,
|
|
|
- hostName,
|
|
|
- portNumber=0,
|
|
|
- addressTypes=None,
|
|
|
- transportSemantics="TCP",
|
|
|
- ):
|
|
|
- # We need this function to return `resolutionReceiver` so we do all the
|
|
|
- # actual logic involving deferreds in a separate function.
|
|
|
-
|
|
|
- # even though this is happening within the depths of twisted, we need to drop
|
|
|
- # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
|
|
|
- # the logcontext if it returns an incomplete deferred; (b) _resolve will
|
|
|
- # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
|
|
|
- with PreserveLoggingContext():
|
|
|
- self._resolve(
|
|
|
- resolutionReceiver,
|
|
|
- hostName,
|
|
|
- portNumber,
|
|
|
- addressTypes,
|
|
|
- transportSemantics,
|
|
|
- )
|
|
|
-
|
|
|
- return resolutionReceiver
|
|
|
-
|
|
|
- @defer.inlineCallbacks
|
|
|
- def _resolve(
|
|
|
- self,
|
|
|
- resolutionReceiver,
|
|
|
- hostName,
|
|
|
- portNumber=0,
|
|
|
- addressTypes=None,
|
|
|
- transportSemantics="TCP",
|
|
|
- ):
|
|
|
-
|
|
|
- with (yield self._limiter.queue(())):
|
|
|
- # resolveHostName doesn't return a Deferred, so we need to hook into
|
|
|
- # the receiver interface to get told when resolution has finished.
|
|
|
-
|
|
|
- deferred = defer.Deferred()
|
|
|
- receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
|
|
|
-
|
|
|
- self._resolver.resolveHostName(
|
|
|
- receiver, hostName, portNumber, addressTypes, transportSemantics
|
|
|
- )
|
|
|
-
|
|
|
- yield deferred
|
|
|
-
|
|
|
-
|
|
|
-class _DeferredResolutionReceiver:
|
|
|
- """Wraps a IResolutionReceiver and simply resolves the given deferred when
|
|
|
- resolution is complete
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, receiver, deferred):
|
|
|
- self._receiver = receiver
|
|
|
- self._deferred = deferred
|
|
|
-
|
|
|
- def resolutionBegan(self, resolutionInProgress):
|
|
|
- self._receiver.resolutionBegan(resolutionInProgress)
|
|
|
-
|
|
|
- def addressResolved(self, address):
|
|
|
- self._receiver.addressResolved(address)
|
|
|
-
|
|
|
- def resolutionComplete(self):
|
|
|
- self._deferred.callback(())
|
|
|
- self._receiver.resolutionComplete()
|
|
|
-
|
|
|
-
|
|
|
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
|
|
|
|
|
|
|