test_receipts.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # type: ignore
  15. from unittest.mock import Mock
  16. from synapse.replication.tcp.streams._base import ReceiptsStream
  17. from tests.replication._base import BaseStreamTestCase
  18. USER_ID = "@feeling:blue"
  19. class ReceiptsStreamTestCase(BaseStreamTestCase):
  20. def _build_replication_data_handler(self):
  21. return Mock(wraps=super()._build_replication_data_handler())
  22. def test_receipt(self):
  23. self.reconnect()
  24. # tell the master to send a new receipt
  25. self.get_success(
  26. self.hs.get_datastores().main.insert_receipt(
  27. "!room:blue",
  28. "m.read",
  29. USER_ID,
  30. ["$event:blue"],
  31. thread_id=None,
  32. data={"a": 1},
  33. )
  34. )
  35. self.replicate()
  36. # there should be one RDATA command
  37. self.test_handler.on_rdata.assert_called_once()
  38. stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
  39. self.assertEqual(stream_name, "receipts")
  40. self.assertEqual(1, len(rdata_rows))
  41. row: ReceiptsStream.ReceiptsStreamRow = rdata_rows[0]
  42. self.assertEqual("!room:blue", row.room_id)
  43. self.assertEqual("m.read", row.receipt_type)
  44. self.assertEqual(USER_ID, row.user_id)
  45. self.assertEqual("$event:blue", row.event_id)
  46. self.assertIsNone(row.thread_id)
  47. self.assertEqual({"a": 1}, row.data)
  48. # Now let's disconnect and insert some data.
  49. self.disconnect()
  50. self.test_handler.on_rdata.reset_mock()
  51. self.get_success(
  52. self.hs.get_datastores().main.insert_receipt(
  53. "!room2:blue",
  54. "m.read",
  55. USER_ID,
  56. ["$event2:foo"],
  57. thread_id=None,
  58. data={"a": 2},
  59. )
  60. )
  61. self.replicate()
  62. # Nothing should have happened as we are disconnected
  63. self.test_handler.on_rdata.assert_not_called()
  64. self.reconnect()
  65. self.pump(0.1)
  66. # We should now have caught up and get the missing data
  67. self.test_handler.on_rdata.assert_called_once()
  68. stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
  69. self.assertEqual(stream_name, "receipts")
  70. self.assertEqual(token, 3)
  71. self.assertEqual(1, len(rdata_rows))
  72. row: ReceiptsStream.ReceiptsStreamRow = rdata_rows[0]
  73. self.assertEqual("!room2:blue", row.room_id)
  74. self.assertEqual("m.read", row.receipt_type)
  75. self.assertEqual(USER_ID, row.user_id)
  76. self.assertEqual("$event2:foo", row.event_id)
  77. self.assertEqual({"a": 2}, row.data)