test_typing.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from mock import Mock
  16. from synapse.handlers.typing import RoomMember
  17. from synapse.replication.tcp.streams import TypingStream
  18. from synapse.util.caches.stream_change_cache import StreamChangeCache
  19. from tests.replication._base import BaseStreamTestCase
  20. USER_ID = "@feeling:blue"
  21. USER_ID_2 = "@da-ba-dee:blue"
  22. ROOM_ID = "!bar:blue"
  23. ROOM_ID_2 = "!foo:blue"
  24. class TypingStreamTestCase(BaseStreamTestCase):
  25. def _build_replication_data_handler(self):
  26. return Mock(wraps=super()._build_replication_data_handler())
  27. def test_typing(self):
  28. typing = self.hs.get_typing_handler()
  29. self.reconnect()
  30. typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
  31. self.reactor.advance(0)
  32. # We should now see an attempt to connect to the master
  33. request = self.handle_http_replication_attempt()
  34. self.assert_request_is_get_repl_stream_updates(request, "typing")
  35. self.test_handler.on_rdata.assert_called_once()
  36. stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
  37. self.assertEqual(stream_name, "typing")
  38. self.assertEqual(1, len(rdata_rows))
  39. row = rdata_rows[0] # type: TypingStream.TypingStreamRow
  40. self.assertEqual(ROOM_ID, row.room_id)
  41. self.assertEqual([USER_ID], row.user_ids)
  42. # Now let's disconnect and insert some data.
  43. self.disconnect()
  44. self.test_handler.on_rdata.reset_mock()
  45. typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=False)
  46. self.test_handler.on_rdata.assert_not_called()
  47. self.reconnect()
  48. self.pump(0.1)
  49. # We should now see an attempt to connect to the master
  50. request = self.handle_http_replication_attempt()
  51. self.assert_request_is_get_repl_stream_updates(request, "typing")
  52. # The from token should be the token from the last RDATA we got.
  53. self.assertEqual(int(request.args[b"from_token"][0]), token)
  54. self.test_handler.on_rdata.assert_called_once()
  55. stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
  56. self.assertEqual(stream_name, "typing")
  57. self.assertEqual(1, len(rdata_rows))
  58. row = rdata_rows[0]
  59. self.assertEqual(ROOM_ID, row.room_id)
  60. self.assertEqual([], row.user_ids)
  61. def test_reset(self):
  62. """
  63. Test what happens when a typing stream resets.
  64. This is emulated by jumping the stream ahead, then reconnecting (which
  65. sends the proper position and RDATA).
  66. """
  67. typing = self.hs.get_typing_handler()
  68. self.reconnect()
  69. typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
  70. self.reactor.advance(0)
  71. # We should now see an attempt to connect to the master
  72. request = self.handle_http_replication_attempt()
  73. self.assert_request_is_get_repl_stream_updates(request, "typing")
  74. self.test_handler.on_rdata.assert_called_once()
  75. stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
  76. self.assertEqual(stream_name, "typing")
  77. self.assertEqual(1, len(rdata_rows))
  78. row = rdata_rows[0] # type: TypingStream.TypingStreamRow
  79. self.assertEqual(ROOM_ID, row.room_id)
  80. self.assertEqual([USER_ID], row.user_ids)
  81. # Push the stream forward a bunch so it can be reset.
  82. for i in range(100):
  83. typing._push_update(
  84. member=RoomMember(ROOM_ID, "@test%s:blue" % i), typing=True
  85. )
  86. self.reactor.advance(0)
  87. # Disconnect.
  88. self.disconnect()
  89. # Reset the typing handler
  90. self.hs.get_replication_streams()["typing"].last_token = 0
  91. self.hs.get_tcp_replication()._streams["typing"].last_token = 0
  92. typing._latest_room_serial = 0
  93. typing._typing_stream_change_cache = StreamChangeCache(
  94. "TypingStreamChangeCache", typing._latest_room_serial
  95. )
  96. typing._reset()
  97. # Reconnect.
  98. self.reconnect()
  99. self.pump(0.1)
  100. # We should now see an attempt to connect to the master
  101. request = self.handle_http_replication_attempt()
  102. self.assert_request_is_get_repl_stream_updates(request, "typing")
  103. # Reset the test code.
  104. self.test_handler.on_rdata.reset_mock()
  105. self.test_handler.on_rdata.assert_not_called()
  106. # Push additional data.
  107. typing._push_update(member=RoomMember(ROOM_ID_2, USER_ID_2), typing=False)
  108. self.reactor.advance(0)
  109. self.test_handler.on_rdata.assert_called_once()
  110. stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
  111. self.assertEqual(stream_name, "typing")
  112. self.assertEqual(1, len(rdata_rows))
  113. row = rdata_rows[0]
  114. self.assertEqual(ROOM_ID_2, row.room_id)
  115. self.assertEqual([], row.user_ids)
  116. # The token should have been reset.
  117. self.assertEqual(token, 1)