test_replication.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import json
  2. from unittest.mock import Mock
  3. from twisted.internet import defer
  4. from twisted.trial import unittest
  5. from twisted.web.client import Response
  6. from sydent.threepid import ThreepidAssociation
  7. from sydent.threepid.signer import Signer
  8. from tests.utils import make_request, make_sydent
  9. class ReplicationTestCase(unittest.TestCase):
  10. """Test that a Sydent can correctly replicate data with another Sydent"""
  11. def setUp(self):
  12. # Create a new sydent
  13. config = {
  14. "crypto": {
  15. "ed25519.signingkey": "ed25519 0 FJi1Rnpj3/otydngacrwddFvwz/dTDsBv62uZDN2fZM"
  16. }
  17. }
  18. self.sydent = make_sydent(test_config=config)
  19. # Create a fake peer to replicate to.
  20. peer_public_key_base64 = "+vB8mTaooD/MA8YYZM8t9+vnGhP1937q2icrqPV9JTs"
  21. # Inject our fake peer into the database.
  22. cur = self.sydent.db.cursor()
  23. cur.execute(
  24. "INSERT INTO peers (name, port, lastSentVersion, active) VALUES (?, ?, ?, ?)",
  25. ("fake.server", 1234, 0, 1),
  26. )
  27. cur.execute(
  28. "INSERT INTO peer_pubkeys (peername, alg, key) VALUES (?, ?, ?)",
  29. ("fake.server", "ed25519", peer_public_key_base64),
  30. )
  31. self.sydent.db.commit()
  32. # Build some fake associations.
  33. self.assocs = []
  34. assoc_count = 150
  35. for i in range(assoc_count):
  36. assoc = ThreepidAssociation(
  37. medium="email",
  38. address="bob%d@example.com" % i,
  39. lookup_hash=None,
  40. mxid="@bob%d:example.com" % i,
  41. ts=(i * 10000),
  42. not_before=0,
  43. not_after=99999999999,
  44. )
  45. self.assocs.append(assoc)
  46. def test_incoming_replication(self):
  47. """Impersonate a peer that sends a replication push to Sydent, then checks that it
  48. accepts the payload and saves it correctly.
  49. """
  50. self.sydent.run()
  51. # Configure the Sydent to impersonate. We need to use "fake.server" as the
  52. # server's name because that's the name the recipient Sydent has for it. On top
  53. # of that, the replication servlet expects a TLS certificate in the request so it
  54. # can extract a common name and figure out which peer sent it from its common
  55. # name. The common name of the certificate we use for tests is fake.server.
  56. config = {
  57. "general": {"server.name": "fake.server"},
  58. "crypto": {
  59. "ed25519.signingkey": "ed25519 0 b29eXMMAYCFvFEtq9mLI42aivMtcg4Hl0wK89a+Vb6c"
  60. },
  61. }
  62. fake_sender_sydent = make_sydent(config)
  63. signer = Signer(fake_sender_sydent)
  64. # Sign the associations with the Sydent to impersonate so the recipient Sydent
  65. # can verify the signatures on them.
  66. signed_assocs = {}
  67. for assoc_id, assoc in enumerate(self.assocs):
  68. signed_assoc = signer.signedThreePidAssociation(assoc)
  69. signed_assocs[assoc_id] = signed_assoc
  70. # Send the replication push.
  71. body = json.dumps({"sgAssocs": signed_assocs})
  72. request, channel = make_request(
  73. self.sydent.reactor, "POST", "/_matrix/identity/replicate/v1/push", body
  74. )
  75. request.render(self.sydent.servlets.replicationPush)
  76. self.assertEqual(channel.code, 200)
  77. # Check that the recipient Sydent has correctly saved the associations in the
  78. # push.
  79. cur = self.sydent.db.cursor()
  80. res = cur.execute("SELECT originId, sgAssoc FROM global_threepid_associations")
  81. res_assocs = {}
  82. for row in res.fetchall():
  83. originId = row[0]
  84. signed_assoc = json.loads(row[1])
  85. res_assocs[originId] = signed_assoc
  86. for assoc_id, signed_assoc in signed_assocs.items():
  87. self.assertDictEqual(signed_assoc, res_assocs[assoc_id])
  88. def test_outgoing_replication(self):
  89. """Make a fake peer and associations and make sure Sydent tries to push to it."""
  90. cur = self.sydent.db.cursor()
  91. # Insert the fake associations into the database.
  92. cur.executemany(
  93. "INSERT INTO local_threepid_associations "
  94. "(medium, address, lookup_hash, mxid, ts, notBefore, notAfter) "
  95. "VALUES (?, ?, ?, ?, ?, ?, ?)",
  96. [
  97. (
  98. assoc.medium,
  99. assoc.address,
  100. assoc.lookup_hash,
  101. assoc.mxid,
  102. assoc.ts,
  103. assoc.not_before,
  104. assoc.not_after,
  105. )
  106. for assoc in self.assocs
  107. ],
  108. )
  109. self.sydent.db.commit()
  110. # Manually sign all associations so we can check whether Sydent attempted to
  111. # push the same.
  112. signer = Signer(self.sydent)
  113. signed_assocs = {}
  114. for assoc_id, assoc in enumerate(self.assocs):
  115. signed_assoc = signer.signedThreePidAssociation(assoc)
  116. signed_assocs[assoc_id] = signed_assoc
  117. sent_assocs = {}
  118. def request(method, uri, headers, body):
  119. """
  120. Processes a request sent to the mocked agent.
  121. :param method: The method of the request.
  122. :type method: bytes
  123. :param uri: The URI of the request.
  124. :type uri: bytes
  125. :param headers: The headers of the request.
  126. :type headers: twisted.web.http_headers.Headers
  127. :param body: The body of the request.
  128. :type body: twisted.web.client.FileBodyProducer[io.BytesIO]
  129. :return: A deferred that resolves into a 200 OK response.
  130. :rtype: twisted.internet.defer.Deferred[Response]
  131. """
  132. # Check the method and the URI.
  133. assert method == b"POST"
  134. assert uri == b"https://fake.server:1234/_matrix/identity/replicate/v1/push"
  135. # postJson calls the agent with a BytesIO within a FileBodyProducer, so we
  136. # need to unpack the payload correctly.
  137. payload = json.loads(body._inputFile.read().decode("utf8"))
  138. for assoc_id, assoc in payload["sgAssocs"].items():
  139. sent_assocs[assoc_id] = assoc
  140. # Return with a fake response wrapped in a Deferred.
  141. d = defer.Deferred()
  142. d.callback(Response((b"HTTP", 1, 1), 200, b"OK", None, None))
  143. return d
  144. # Mock the replication client's agent so it runs the custom code instead of
  145. # actually sending the requests.
  146. agent = Mock(spec=["request"])
  147. agent.request.side_effect = request
  148. self.sydent.replicationHttpsClient.agent = agent
  149. # Start Sydent and allow some time for all the necessary pushes to happen.
  150. self.sydent.run()
  151. self.sydent.reactor.advance(1000)
  152. # Check that, now that Sydent pushed all the associations it was meant to, we
  153. # have all of the associations we initially inserted.
  154. self.assertEqual(len(self.assocs), len(sent_assocs))
  155. for assoc_id, assoc in sent_assocs.items():
  156. # Replication payloads use a specific format that causes the JSON encoder to
  157. # convert the numeric indexes to string, so we need to convert them back when
  158. # looking up in signed_assocs. Also, the ID of the first association Sydent
  159. # will push will be 1, so we need to subtract 1 when figuring out which index
  160. # to lookup.
  161. self.assertDictEqual(assoc, signed_assocs[int(assoc_id) - 1])