Browse Source

Wrap connections in an N minute timeout to ensure they get reaped correctly

Erik Johnston 7 years ago
parent
commit
5b6672c66d
2 changed files with 61 additions and 5 deletions
  1. 54 4
      synapse/http/endpoint.py
  2. 7 1
      synapse/http/matrixfederationclient.py

+ 54 - 4
synapse/http/endpoint.py

@@ -14,7 +14,7 @@
 # limitations under the License.
 # limitations under the License.
 
 
 from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
 from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
-from twisted.internet import defer
+from twisted.internet import defer, reactor, task
 from twisted.internet.error import ConnectError
 from twisted.internet.error import ConnectError
 from twisted.names import client, dns
 from twisted.names import client, dns
 from twisted.names.error import DNSNameError, DomainError
 from twisted.names.error import DNSNameError, DomainError
@@ -66,13 +66,63 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
         default_port = 8448
         default_port = 8448
 
 
     if port is None:
     if port is None:
-        return SRVClientEndpoint(
+        return _WrappingEndointFac(SRVClientEndpoint(
             reactor, "matrix", domain, protocol="tcp",
             reactor, "matrix", domain, protocol="tcp",
             default_port=default_port, endpoint=transport_endpoint,
             default_port=default_port, endpoint=transport_endpoint,
             endpoint_kw_args=endpoint_kw_args
             endpoint_kw_args=endpoint_kw_args
-        )
+        ))
     else:
     else:
-        return transport_endpoint(reactor, domain, port, **endpoint_kw_args)
+        return _WrappingEndointFac(transport_endpoint(reactor, domain, port, **endpoint_kw_args))
+
+
+class _WrappingEndointFac(object):
+    def __init__(self, endpoint_fac):
+        self.endpoint_fac = endpoint_fac
+
+    @defer.inlineCallbacks
+    def connect(self, protocolFactory):
+        conn = yield self.endpoint_fac.connect(protocolFactory)
+        conn = _WrappedConnection(conn)
+        defer.returnValue(conn)
+
+
+class _WrappedConnection(object):
+    """Wraps a connection and calls abort on it if it hasn't seen any actio
+    for 5 minutes
+    """
+    __slots__ = ["conn", "last_request"]
+
+    def __init__(self, conn):
+        object.__setattr__(self, "conn", conn)
+        object.__setattr__(self, "last_request", time.time())
+
+    def __getattr__(self, name):
+        return getattr(self.conn, name)
+
+    def __setattr__(self, name, value):
+        setattr(self.conn, name, value)
+
+    def _time_things_out_maybe(self):
+        if time.time() - self.last_request >= 2 * 60:
+            self.abort()
+
+    def request(self, request):
+        self.last_request = time.time()
+
+        # Time this connection out if we haven't send a request in the last
+        # N minutes
+        reactor.callLater(3 * 60, self._time_things_out_maybe)
+
+        d = self.conn.request(request)
+
+        def update_request_time(res):
+            self.last_request = time.time()
+            reactor.callLater(3 * 60, self._time_things_out_maybe)
+            return res
+
+        d.addCallback(update_request_time)
+
+        return d
 
 
 
 
 class SpiderEndpoint(object):
 class SpiderEndpoint(object):

+ 7 - 1
synapse/http/matrixfederationclient.py

@@ -61,6 +61,11 @@ MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
 MAX_SHORT_RETRIES = 3
 
 
 
 
+def test(conn):
+    conn.loseConnection()
+    return conn
+
+
 class MatrixFederationEndpointFactory(object):
 class MatrixFederationEndpointFactory(object):
     def __init__(self, hs):
     def __init__(self, hs):
         self.tls_server_context_factory = hs.tls_server_context_factory
         self.tls_server_context_factory = hs.tls_server_context_factory
@@ -88,7 +93,8 @@ class MatrixFederationHttpClient(object):
         self.signing_key = hs.config.signing_key[0]
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
         self.server_name = hs.hostname
         pool = HTTPConnectionPool(reactor)
         pool = HTTPConnectionPool(reactor)
-        pool.maxPersistentPerHost = 10
+        pool.maxPersistentPerHost = 5
+        pool.cachedConnectionTimeout = 2 * 60
         self.agent = Agent.usingEndpointFactory(
         self.agent = Agent.usingEndpointFactory(
             reactor, MatrixFederationEndpointFactory(hs), pool=pool
             reactor, MatrixFederationEndpointFactory(hs), pool=pool
         )
         )