test_invites.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. from mock import Mock
  2. from sydent.http.httpclient import FederationHttpClient
  3. from sydent.db.invite_tokens import JoinTokenStore
  4. from tests.utils import make_sydent
  5. from twisted.web.client import Response
  6. from twisted.trial import unittest
  7. from sydent.http.servlets.store_invite_servlet import StoreInviteServlet
  8. class ThreepidInvitesTestCase(unittest.TestCase):
  9. """Tests features related to storing and delivering 3PID invites."""
  10. def setUp(self):
  11. # Create a new sydent
  12. config = {
  13. "email": {
  14. # Used by test_invited_email_address_obfuscation
  15. "email.third_party_invite_username_obfuscate_characters": "6",
  16. "email.third_party_invite_domain_obfuscate_characters": "8",
  17. },
  18. }
  19. self.sydent = make_sydent(test_config=config)
  20. def test_delete_on_bind(self):
  21. """Tests that 3PID invite tokens are deleted upon delivery after a successful
  22. bind.
  23. """
  24. self.sydent.run()
  25. # The 3PID we're working with.
  26. medium = "email"
  27. address = "john@example.com"
  28. # Mock post_json_get_nothing so the /onBind call doesn't fail.
  29. def post_json_get_nothing(uri, post_json, opts):
  30. return Response((b'HTTP', 1, 1), 200, b'OK', None, None)
  31. FederationHttpClient.post_json_get_nothing = Mock(
  32. side_effect=post_json_get_nothing,
  33. )
  34. # Manually insert an invite token, we'll check later that it's been deleted.
  35. join_token_store = JoinTokenStore(self.sydent)
  36. join_token_store.storeToken(
  37. medium, address, "!someroom:example.com", "@jane:example.com",
  38. "sometoken",
  39. )
  40. # Make sure the token still exists and can be retrieved.
  41. tokens = join_token_store.getTokens(medium, address)
  42. self.assertEqual(len(tokens), 1, tokens)
  43. # Bind the 3PID
  44. self.sydent.threepidBinder.addBinding(
  45. medium, address, "@john:example.com",
  46. )
  47. # Give Sydent some time to call /onBind and delete the token.
  48. self.sydent.reactor.advance(1000)
  49. cur = self.sydent.db.cursor()
  50. # Manually retrieve the tokens for this 3PID. We don't use getTokens because it
  51. # filters out sent tokens, so would return nothing even if the token hasn't been
  52. # deleted.
  53. res = cur.execute(
  54. "SELECT medium, address, room_id, sender, token FROM invite_tokens"
  55. " WHERE medium = ? AND address = ?",
  56. (medium, address,)
  57. )
  58. rows = res.fetchall()
  59. # Check that we didn't get any result.
  60. self.assertEqual(len(rows), 0, rows)
  61. def test_invited_email_address_obfuscation(self):
  62. """Test that email addresses included in third-party invites are properly
  63. obfuscated according to the relevant config options
  64. """
  65. store_invite_servlet = StoreInviteServlet(self.sydent)
  66. email_address = "1234567890@1234567890.com"
  67. redacted_address = store_invite_servlet.redact_email_address(email_address)
  68. self.assertEqual(redacted_address, "123456...@12345678...")
  69. # Even short addresses are redacted
  70. short_email_address = "1@1.com"
  71. redacted_address = store_invite_servlet.redact_email_address(short_email_address)
  72. self.assertEqual(redacted_address, "...@1...")
  73. class ThreepidInvitesNoDeleteTestCase(unittest.TestCase):
  74. """Test that invite tokens are not deleted when that is disabled.
  75. """
  76. def setUp(self):
  77. # Create a new sydent
  78. config = {
  79. "general": {
  80. "delete_tokens_on_bind": "false"
  81. }
  82. }
  83. self.sydent = make_sydent(test_config=config)
  84. def test_no_delete_on_bind(self):
  85. self.sydent.run()
  86. # The 3PID we're working with.
  87. medium = "email"
  88. address = "john@example.com"
  89. # Mock post_json_get_nothing so the /onBind call doesn't fail.
  90. def post_json_get_nothing(uri, post_json, opts):
  91. return Response((b'HTTP', 1, 1), 200, b'OK', None, None)
  92. FederationHttpClient.post_json_get_nothing = Mock(
  93. side_effect=post_json_get_nothing,
  94. )
  95. # Manually insert an invite token, we'll check later that it's been deleted.
  96. join_token_store = JoinTokenStore(self.sydent)
  97. join_token_store.storeToken(
  98. medium, address, "!someroom:example.com", "@jane:example.com",
  99. "sometoken",
  100. )
  101. # Make sure the token still exists and can be retrieved.
  102. tokens = join_token_store.getTokens(medium, address)
  103. self.assertEqual(len(tokens), 1, tokens)
  104. # Bind the 3PID
  105. self.sydent.threepidBinder.addBinding(
  106. medium, address, "@john:example.com",
  107. )
  108. # Give Sydent some time to call /onBind and delete the token.
  109. self.sydent.reactor.advance(1000)
  110. cur = self.sydent.db.cursor()
  111. # Manually retrieve the tokens for this 3PID. We don't use getTokens because it
  112. # filters out sent tokens, so would return nothing even if the token hasn't been
  113. # deleted.
  114. res = cur.execute(
  115. "SELECT medium, address, room_id, sender, token FROM invite_tokens"
  116. " WHERE medium = ? AND address = ?",
  117. (medium, address,)
  118. )
  119. rows = res.fetchall()
  120. # Check that we didn't get any result.
  121. self.assertEqual(len(rows), 1, rows)