|
@@ -15,23 +15,13 @@
|
|
|
|
|
|
from mock import Mock, NonCallableMock
|
|
|
|
|
|
-from synapse.replication.tcp.client import (
|
|
|
- DirectTcpReplicationClientFactory,
|
|
|
- ReplicationDataHandler,
|
|
|
-)
|
|
|
-from synapse.replication.tcp.handler import ReplicationCommandHandler
|
|
|
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
|
|
|
-from synapse.storage.database import make_conn
|
|
|
+from tests.replication._base import BaseStreamTestCase
|
|
|
|
|
|
-from tests import unittest
|
|
|
-from tests.server import FakeTransport
|
|
|
|
|
|
-
|
|
|
-class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
|
|
|
+class BaseSlavedStoreTestCase(BaseStreamTestCase):
|
|
|
def make_homeserver(self, reactor, clock):
|
|
|
|
|
|
hs = self.setup_test_homeserver(
|
|
|
- "blue",
|
|
|
federation_client=Mock(),
|
|
|
ratelimiter=NonCallableMock(spec_set=["can_do_action"]),
|
|
|
)
|
|
@@ -41,39 +31,13 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
|
|
|
return hs
|
|
|
|
|
|
def prepare(self, reactor, clock, hs):
|
|
|
+ super().prepare(reactor, clock, hs)
|
|
|
|
|
|
- db_config = hs.config.database.get_single_database()
|
|
|
- self.master_store = self.hs.get_datastore()
|
|
|
- self.storage = hs.get_storage()
|
|
|
- database = hs.get_datastores().databases[0]
|
|
|
- self.slaved_store = self.STORE_TYPE(
|
|
|
- database, make_conn(db_config, database.engine), self.hs
|
|
|
- )
|
|
|
- self.event_id = 0
|
|
|
-
|
|
|
- server_factory = ReplicationStreamProtocolFactory(self.hs)
|
|
|
- self.streamer = hs.get_replication_streamer()
|
|
|
-
|
|
|
- # We now do some gut wrenching so that we have a client that is based
|
|
|
- # off of the slave store rather than the main store.
|
|
|
- self.replication_handler = ReplicationCommandHandler(self.hs)
|
|
|
- self.replication_handler._instance_name = "worker"
|
|
|
- self.replication_handler._replication_data_handler = ReplicationDataHandler(
|
|
|
- self.slaved_store
|
|
|
- )
|
|
|
+ self.reconnect()
|
|
|
|
|
|
- client_factory = DirectTcpReplicationClientFactory(
|
|
|
- self.hs, "client_name", self.replication_handler
|
|
|
- )
|
|
|
- client_factory.handler = self.replication_handler
|
|
|
-
|
|
|
- server = server_factory.buildProtocol(None)
|
|
|
- client = client_factory.buildProtocol(None)
|
|
|
-
|
|
|
- client.makeConnection(FakeTransport(server, reactor))
|
|
|
-
|
|
|
- self.server_to_client_transport = FakeTransport(client, reactor)
|
|
|
- server.makeConnection(self.server_to_client_transport)
|
|
|
+ self.master_store = hs.get_datastore()
|
|
|
+ self.slaved_store = self.worker_hs.get_datastore()
|
|
|
+ self.storage = hs.get_storage()
|
|
|
|
|
|
def replicate(self):
|
|
|
"""Tell the master side of replication that something has happened, and then
|