test_replication.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import json
  2. from mock import Mock
  3. from sydent.threepid import ThreepidAssociation
  4. from sydent.threepid.signer import Signer
  5. from tests.utils import make_request, make_sydent
  6. from twisted.web.client import Response
  7. from twisted.internet import defer
  8. from twisted.trial import unittest
  9. class ReplicationTestCase(unittest.TestCase):
  10. """Test that a Sydent can correctly replicate data with another Sydent"""
  11. def setUp(self):
  12. # Create a new sydent
  13. config = {
  14. "crypto": {
  15. "ed25519.signingkey": "ed25519 0 FJi1Rnpj3/otydngacrwddFvwz/dTDsBv62uZDN2fZM"
  16. }
  17. }
  18. self.sydent = make_sydent(test_config=config)
  19. # Create a fake peer to replicate to.
  20. peer_public_key_base64 = "+vB8mTaooD/MA8YYZM8t9+vnGhP1937q2icrqPV9JTs"
  21. # Inject our fake peer into the database.
  22. cur = self.sydent.db.cursor()
  23. cur.execute(
  24. "INSERT INTO peers (name, port, lastSentVersion, active) VALUES (?, ?, ?, ?)",
  25. ("fake.server", 1234, 0, 1)
  26. )
  27. cur.execute(
  28. "INSERT INTO peer_pubkeys (peername, alg, key) VALUES (?, ?, ?)",
  29. ("fake.server", "ed25519", peer_public_key_base64)
  30. )
  31. self.sydent.db.commit()
  32. # Build some fake associations.
  33. self.assocs = []
  34. assoc_count = 150
  35. for i in range(assoc_count):
  36. assoc = ThreepidAssociation(
  37. medium="email",
  38. address="bob%d@example.com" % i,
  39. lookup_hash=None,
  40. mxid="@bob%d:example.com" % i,
  41. ts=(i * 10000),
  42. not_before=0,
  43. not_after=99999999999,
  44. )
  45. self.assocs.append(assoc)
  46. def test_incoming_replication(self):
  47. """Impersonate a peer that sends a replication push to Sydent, then checks that it
  48. accepts the payload and saves it correctly.
  49. """
  50. self.sydent.run()
  51. # Configure the Sydent to impersonate. We need to use "fake.server" as the
  52. # server's name because that's the name the recipient Sydent has for it. On top
  53. # of that, the replication servlet expects a TLS certificate in the request so it
  54. # can extract a common name and figure out which peer sent it from its common
  55. # name. The common name of the certificate we use for tests is fake.server.
  56. config = {
  57. "general": {
  58. "server.name": "fake.server"
  59. },
  60. "crypto": {
  61. "ed25519.signingkey": "ed25519 0 b29eXMMAYCFvFEtq9mLI42aivMtcg4Hl0wK89a+Vb6c"
  62. }
  63. }
  64. fake_sender_sydent = make_sydent(config)
  65. signer = Signer(fake_sender_sydent)
  66. # Sign the associations with the Sydent to impersonate so the recipient Sydent
  67. # can verify the signatures on them.
  68. signed_assocs = {}
  69. for assoc_id, assoc in enumerate(self.assocs):
  70. signed_assoc = signer.signedThreePidAssociation(assoc)
  71. signed_assocs[assoc_id] = signed_assoc
  72. # Send the replication push.
  73. body = json.dumps({"sgAssocs": signed_assocs})
  74. request, channel = make_request(
  75. self.sydent.reactor, "POST", "/_matrix/identity/replicate/v1/push", body
  76. )
  77. request.render(self.sydent.servlets.replicationPush)
  78. self.assertEqual(channel.code, 200)
  79. # Check that the recipient Sydent has correctly saved the associations in the
  80. # push.
  81. cur = self.sydent.db.cursor()
  82. res = cur.execute("SELECT originId, sgAssoc FROM global_threepid_associations")
  83. res_assocs = {}
  84. for row in res.fetchall():
  85. originId = row[0]
  86. signed_assoc = json.loads(row[1])
  87. res_assocs[originId] = signed_assoc
  88. for assoc_id, signed_assoc in signed_assocs.items():
  89. self.assertDictEqual(signed_assoc, res_assocs[assoc_id])
  90. def test_outgoing_replication(self):
  91. """Make a fake peer and associations and make sure Sydent tries to push to it.
  92. """
  93. cur = self.sydent.db.cursor()
  94. # Insert the fake associations into the database.
  95. cur.executemany(
  96. "INSERT INTO local_threepid_associations "
  97. "(medium, address, lookup_hash, mxid, ts, notBefore, notAfter) "
  98. "VALUES (?, ?, ?, ?, ?, ?, ?)",
  99. [
  100. (
  101. assoc.medium,
  102. assoc.address,
  103. assoc.lookup_hash,
  104. assoc.mxid,
  105. assoc.ts,
  106. assoc.not_before,
  107. assoc.not_after,
  108. )
  109. for assoc in self.assocs
  110. ]
  111. )
  112. self.sydent.db.commit()
  113. # Manually sign all associations so we can check whether Sydent attempted to
  114. # push the same.
  115. signer = Signer(self.sydent)
  116. signed_assocs = {}
  117. for assoc_id, assoc in enumerate(self.assocs):
  118. signed_assoc = signer.signedThreePidAssociation(assoc)
  119. signed_assocs[assoc_id] = signed_assoc
  120. sent_assocs = {}
  121. def request(method, uri, headers, body):
  122. """
  123. Processes a request sent to the mocked agent.
  124. :param method: The method of the request.
  125. :type method: bytes
  126. :param uri: The URI of the request.
  127. :type uri: bytes
  128. :param headers: The headers of the request.
  129. :type headers: twisted.web.http_headers.Headers
  130. :param body: The body of the request.
  131. :type body: twisted.web.client.FileBodyProducer[io.BytesIO]
  132. :return: A deferred that resolves into a 200 OK response.
  133. :rtype: twisted.internet.defer.Deferred[Response]
  134. """
  135. # Check the method and the URI.
  136. assert method == b'POST'
  137. assert uri == b'https://fake.server:1234/_matrix/identity/replicate/v1/push'
  138. # postJson calls the agent with a BytesIO within a FileBodyProducer, so we
  139. # need to unpack the payload correctly.
  140. payload = json.loads(body._inputFile.read().decode("utf8"))
  141. for assoc_id, assoc in payload['sgAssocs'].items():
  142. sent_assocs[assoc_id] = assoc
  143. # Return with a fake response wrapped in a Deferred.
  144. d = defer.Deferred()
  145. d.callback(Response((b'HTTP', 1, 1), 200, b'OK', None, None))
  146. return d
  147. # Mock the replication client's agent so it runs the custom code instead of
  148. # actually sending the requests.
  149. agent = Mock(spec=['request'])
  150. agent.request.side_effect = request
  151. self.sydent.replicationHttpsClient.agent = agent
  152. # Start Sydent and allow some time for all the necessary pushes to happen.
  153. self.sydent.run()
  154. self.sydent.reactor.advance(1000)
  155. # Check that, now that Sydent pushed all the associations it was meant to, we
  156. # have all of the associations we initially inserted.
  157. self.assertEqual(len(self.assocs), len(sent_assocs))
  158. for assoc_id, assoc in sent_assocs.items():
  159. # Replication payloads use a specific format that causes the JSON encoder to
  160. # convert the numeric indexes to string, so we need to convert them back when
  161. # looking up in signed_assocs. Also, the ID of the first association Sydent
  162. # will push will be 1, so we need to subtract 1 when figuring out which index
  163. # to lookup.
  164. self.assertDictEqual(assoc, signed_assocs[int(assoc_id)-1])