test_invites.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from unittest.mock import Mock
  2. from twisted.trial import unittest
  3. from twisted.web.client import Response
  4. from sydent.db.invite_tokens import JoinTokenStore
  5. from sydent.http.httpclient import FederationHttpClient
  6. from sydent.http.servlets.store_invite_servlet import StoreInviteServlet
  7. from tests.utils import make_sydent
  8. class ThreepidInvitesTestCase(unittest.TestCase):
  9. """Tests features related to storing and delivering 3PID invites."""
  10. def setUp(self):
  11. # Create a new sydent
  12. config = {
  13. "email": {
  14. # Used by test_invited_email_address_obfuscation
  15. "email.third_party_invite_username_obfuscate_characters": "6",
  16. "email.third_party_invite_domain_obfuscate_characters": "8",
  17. },
  18. }
  19. self.sydent = make_sydent(test_config=config)
  20. def test_delete_on_bind(self):
  21. """Tests that 3PID invite tokens are deleted upon delivery after a successful
  22. bind.
  23. """
  24. self.sydent.run()
  25. # The 3PID we're working with.
  26. medium = "email"
  27. address = "john@example.com"
  28. # Mock post_json_get_nothing so the /onBind call doesn't fail.
  29. async def post_json_get_nothing(uri, post_json, opts):
  30. return Response((b"HTTP", 1, 1), 200, b"OK", None, None)
  31. FederationHttpClient.post_json_get_nothing = Mock(
  32. side_effect=post_json_get_nothing,
  33. )
  34. # Manually insert an invite token, we'll check later that it's been deleted.
  35. join_token_store = JoinTokenStore(self.sydent)
  36. join_token_store.storeToken(
  37. medium,
  38. address,
  39. "!someroom:example.com",
  40. "@jane:example.com",
  41. "sometoken",
  42. )
  43. # Make sure the token still exists and can be retrieved.
  44. tokens = join_token_store.getTokens(medium, address)
  45. self.assertEqual(len(tokens), 1, tokens)
  46. # Bind the 3PID
  47. self.sydent.threepidBinder.addBinding(
  48. medium,
  49. address,
  50. "@john:example.com",
  51. )
  52. # Give Sydent some time to call /onBind and delete the token.
  53. self.sydent.reactor.advance(1000)
  54. cur = self.sydent.db.cursor()
  55. # Manually retrieve the tokens for this 3PID. We don't use getTokens because it
  56. # filters out sent tokens, so would return nothing even if the token hasn't been
  57. # deleted.
  58. res = cur.execute(
  59. "SELECT medium, address, room_id, sender, token FROM invite_tokens"
  60. " WHERE medium = ? AND address = ?",
  61. (
  62. medium,
  63. address,
  64. ),
  65. )
  66. rows = res.fetchall()
  67. # Check that we didn't get any result.
  68. self.assertEqual(len(rows), 0, rows)
  69. def test_invited_email_address_obfuscation(self):
  70. """Test that email addresses included in third-party invites are properly
  71. obfuscated according to the relevant config options
  72. """
  73. store_invite_servlet = StoreInviteServlet(self.sydent)
  74. email_address = "1234567890@1234567890.com"
  75. redacted_address = store_invite_servlet.redact_email_address(email_address)
  76. self.assertEqual(redacted_address, "123456...@12345678...")
  77. # Even short addresses are redacted
  78. short_email_address = "1@1.com"
  79. redacted_address = store_invite_servlet.redact_email_address(
  80. short_email_address
  81. )
  82. self.assertEqual(redacted_address, "...@1...")
  83. class ThreepidInvitesNoDeleteTestCase(unittest.TestCase):
  84. """Test that invite tokens are not deleted when that is disabled."""
  85. def setUp(self):
  86. # Create a new sydent
  87. config = {"general": {"delete_tokens_on_bind": "false"}}
  88. self.sydent = make_sydent(test_config=config)
  89. def test_no_delete_on_bind(self):
  90. self.sydent.run()
  91. # The 3PID we're working with.
  92. medium = "email"
  93. address = "john@example.com"
  94. # Mock post_json_get_nothing so the /onBind call doesn't fail.
  95. async def post_json_get_nothing(uri, post_json, opts):
  96. return Response((b"HTTP", 1, 1), 200, b"OK", None, None)
  97. FederationHttpClient.post_json_get_nothing = Mock(
  98. side_effect=post_json_get_nothing,
  99. )
  100. # Manually insert an invite token, we'll check later that it's been deleted.
  101. join_token_store = JoinTokenStore(self.sydent)
  102. join_token_store.storeToken(
  103. medium,
  104. address,
  105. "!someroom:example.com",
  106. "@jane:example.com",
  107. "sometoken",
  108. )
  109. # Make sure the token still exists and can be retrieved.
  110. tokens = join_token_store.getTokens(medium, address)
  111. self.assertEqual(len(tokens), 1, tokens)
  112. # Bind the 3PID
  113. self.sydent.threepidBinder.addBinding(
  114. medium,
  115. address,
  116. "@john:example.com",
  117. )
  118. # Give Sydent some time to call /onBind and delete the token.
  119. self.sydent.reactor.advance(1000)
  120. cur = self.sydent.db.cursor()
  121. # Manually retrieve the tokens for this 3PID. We don't use getTokens because it
  122. # filters out sent tokens, so would return nothing even if the token hasn't been
  123. # deleted.
  124. res = cur.execute(
  125. "SELECT medium, address, room_id, sender, token FROM invite_tokens"
  126. " WHERE medium = ? AND address = ?",
  127. (
  128. medium,
  129. address,
  130. ),
  131. )
  132. rows = res.fetchall()
  133. # Check that we didn't get any result.
  134. self.assertEqual(len(rows), 1, rows)