123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- from unittest.mock import Mock
- from twisted.trial import unittest
- from twisted.web.client import Response
- from sydent.db.invite_tokens import JoinTokenStore
- from sydent.http.httpclient import FederationHttpClient
- from sydent.http.servlets.store_invite_servlet import StoreInviteServlet
- from tests.utils import make_sydent
- class ThreepidInvitesTestCase(unittest.TestCase):
- """Tests features related to storing and delivering 3PID invites."""
- def setUp(self):
- # Create a new sydent
- config = {
- "email": {
- # Used by test_invited_email_address_obfuscation
- "email.third_party_invite_username_obfuscate_characters": "6",
- "email.third_party_invite_domain_obfuscate_characters": "8",
- },
- }
- self.sydent = make_sydent(test_config=config)
- def test_delete_on_bind(self):
- """Tests that 3PID invite tokens are deleted upon delivery after a successful
- bind.
- """
- self.sydent.run()
- # The 3PID we're working with.
- medium = "email"
- address = "john@example.com"
- # Mock post_json_get_nothing so the /onBind call doesn't fail.
- async def post_json_get_nothing(uri, post_json, opts):
- return Response((b"HTTP", 1, 1), 200, b"OK", None, None)
- FederationHttpClient.post_json_get_nothing = Mock(
- side_effect=post_json_get_nothing,
- )
- # Manually insert an invite token, we'll check later that it's been deleted.
- join_token_store = JoinTokenStore(self.sydent)
- join_token_store.storeToken(
- medium,
- address,
- "!someroom:example.com",
- "@jane:example.com",
- "sometoken",
- )
- # Make sure the token still exists and can be retrieved.
- tokens = join_token_store.getTokens(medium, address)
- self.assertEqual(len(tokens), 1, tokens)
- # Bind the 3PID
- self.sydent.threepidBinder.addBinding(
- medium,
- address,
- "@john:example.com",
- )
- # Give Sydent some time to call /onBind and delete the token.
- self.sydent.reactor.advance(1000)
- cur = self.sydent.db.cursor()
- # Manually retrieve the tokens for this 3PID. We don't use getTokens because it
- # filters out sent tokens, so would return nothing even if the token hasn't been
- # deleted.
- res = cur.execute(
- "SELECT medium, address, room_id, sender, token FROM invite_tokens"
- " WHERE medium = ? AND address = ?",
- (
- medium,
- address,
- ),
- )
- rows = res.fetchall()
- # Check that we didn't get any result.
- self.assertEqual(len(rows), 0, rows)
- def test_invited_email_address_obfuscation(self):
- """Test that email addresses included in third-party invites are properly
- obfuscated according to the relevant config options
- """
- store_invite_servlet = StoreInviteServlet(self.sydent)
- email_address = "1234567890@1234567890.com"
- redacted_address = store_invite_servlet.redact_email_address(email_address)
- self.assertEqual(redacted_address, "123456...@12345678...")
- # Even short addresses are redacted
- short_email_address = "1@1.com"
- redacted_address = store_invite_servlet.redact_email_address(
- short_email_address
- )
- self.assertEqual(redacted_address, "...@1...")
- class ThreepidInvitesNoDeleteTestCase(unittest.TestCase):
- """Test that invite tokens are not deleted when that is disabled."""
- def setUp(self):
- # Create a new sydent
- config = {"general": {"delete_tokens_on_bind": "false"}}
- self.sydent = make_sydent(test_config=config)
- def test_no_delete_on_bind(self):
- self.sydent.run()
- # The 3PID we're working with.
- medium = "email"
- address = "john@example.com"
- # Mock post_json_get_nothing so the /onBind call doesn't fail.
- async def post_json_get_nothing(uri, post_json, opts):
- return Response((b"HTTP", 1, 1), 200, b"OK", None, None)
- FederationHttpClient.post_json_get_nothing = Mock(
- side_effect=post_json_get_nothing,
- )
- # Manually insert an invite token, we'll check later that it's been deleted.
- join_token_store = JoinTokenStore(self.sydent)
- join_token_store.storeToken(
- medium,
- address,
- "!someroom:example.com",
- "@jane:example.com",
- "sometoken",
- )
- # Make sure the token still exists and can be retrieved.
- tokens = join_token_store.getTokens(medium, address)
- self.assertEqual(len(tokens), 1, tokens)
- # Bind the 3PID
- self.sydent.threepidBinder.addBinding(
- medium,
- address,
- "@john:example.com",
- )
- # Give Sydent some time to call /onBind and delete the token.
- self.sydent.reactor.advance(1000)
- cur = self.sydent.db.cursor()
- # Manually retrieve the tokens for this 3PID. We don't use getTokens because it
- # filters out sent tokens, so would return nothing even if the token hasn't been
- # deleted.
- res = cur.execute(
- "SELECT medium, address, room_id, sender, token FROM invite_tokens"
- " WHERE medium = ? AND address = ?",
- (
- medium,
- address,
- ),
- )
- rows = res.fetchall()
- # Check that we didn't get any result.
- self.assertEqual(len(rows), 1, rows)
|