test_events.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import Iterable, Optional
  16. from canonicaljson import encode_canonical_json
  17. from synapse.api.room_versions import RoomVersions
  18. from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
  19. from synapse.handlers.room import RoomEventSource
  20. from synapse.replication.slave.storage.events import SlavedEventStore
  21. from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
  22. from synapse.types import PersistedEventPosition
  23. from tests.server import FakeTransport
  24. from ._base import BaseSlavedStoreTestCase
  25. USER_ID = "@feeling:test"
  26. USER_ID_2 = "@bright:test"
  27. OUTLIER = {"outlier": True}
  28. ROOM_ID = "!room:test"
  29. logger = logging.getLogger(__name__)
  30. def dict_equals(self, other):
  31. me = encode_canonical_json(self.get_pdu_json())
  32. them = encode_canonical_json(other.get_pdu_json())
  33. return me == them
  34. def patch__eq__(cls):
  35. eq = getattr(cls, "__eq__", None)
  36. cls.__eq__ = dict_equals
  37. def unpatch():
  38. if eq is not None:
  39. cls.__eq__ = eq
  40. return unpatch
  41. class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
  42. STORE_TYPE = SlavedEventStore
  43. def setUp(self):
  44. # Patch up the equality operator for events so that we can check
  45. # whether lists of events match using assertEquals
  46. self.unpatches = [patch__eq__(_EventInternalMetadata), patch__eq__(FrozenEvent)]
  47. return super().setUp()
  48. def prepare(self, *args, **kwargs):
  49. super().prepare(*args, **kwargs)
  50. self.get_success(
  51. self.master_store.store_room(
  52. ROOM_ID,
  53. USER_ID,
  54. is_public=False,
  55. room_version=RoomVersions.V1,
  56. )
  57. )
  58. def tearDown(self):
  59. [unpatch() for unpatch in self.unpatches]
  60. def test_get_latest_event_ids_in_room(self):
  61. create = self.persist(type="m.room.create", key="", creator=USER_ID)
  62. self.replicate()
  63. self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
  64. join = self.persist(
  65. type="m.room.member",
  66. key=USER_ID,
  67. membership="join",
  68. prev_events=[(create.event_id, {})],
  69. )
  70. self.replicate()
  71. self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
  72. def test_redactions(self):
  73. self.persist(type="m.room.create", key="", creator=USER_ID)
  74. self.persist(type="m.room.member", key=USER_ID, membership="join")
  75. msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
  76. self.replicate()
  77. self.check("get_event", [msg.event_id], msg)
  78. redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
  79. self.replicate()
  80. msg_dict = msg.get_dict()
  81. msg_dict["content"] = {}
  82. msg_dict["unsigned"]["redacted_by"] = redaction.event_id
  83. msg_dict["unsigned"]["redacted_because"] = redaction
  84. redacted = make_event_from_dict(
  85. msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
  86. )
  87. self.check("get_event", [msg.event_id], redacted)
  88. def test_backfilled_redactions(self):
  89. self.persist(type="m.room.create", key="", creator=USER_ID)
  90. self.persist(type="m.room.member", key=USER_ID, membership="join")
  91. msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
  92. self.replicate()
  93. self.check("get_event", [msg.event_id], msg)
  94. redaction = self.persist(
  95. type="m.room.redaction", redacts=msg.event_id, backfill=True
  96. )
  97. self.replicate()
  98. msg_dict = msg.get_dict()
  99. msg_dict["content"] = {}
  100. msg_dict["unsigned"]["redacted_by"] = redaction.event_id
  101. msg_dict["unsigned"]["redacted_because"] = redaction
  102. redacted = make_event_from_dict(
  103. msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
  104. )
  105. self.check("get_event", [msg.event_id], redacted)
  106. def test_invites(self):
  107. self.persist(type="m.room.create", key="", creator=USER_ID)
  108. self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
  109. event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
  110. self.replicate()
  111. self.check(
  112. "get_invited_rooms_for_local_user",
  113. [USER_ID_2],
  114. [
  115. RoomsForUser(
  116. ROOM_ID,
  117. USER_ID,
  118. "invite",
  119. event.event_id,
  120. event.internal_metadata.stream_ordering,
  121. RoomVersions.V1.identifier,
  122. )
  123. ],
  124. )
  125. def test_push_actions_for_user(self):
  126. self.persist(type="m.room.create", key="", creator=USER_ID)
  127. self.persist(type="m.room.join", key=USER_ID, membership="join")
  128. self.persist(
  129. type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"
  130. )
  131. event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
  132. self.replicate()
  133. self.check(
  134. "get_unread_event_push_actions_by_room_for_user",
  135. [ROOM_ID, USER_ID_2, event1.event_id],
  136. {"highlight_count": 0, "unread_count": 0, "notify_count": 0},
  137. )
  138. self.persist(
  139. type="m.room.message",
  140. msgtype="m.text",
  141. body="world",
  142. push_actions=[(USER_ID_2, ["notify"])],
  143. )
  144. self.replicate()
  145. self.check(
  146. "get_unread_event_push_actions_by_room_for_user",
  147. [ROOM_ID, USER_ID_2, event1.event_id],
  148. {"highlight_count": 0, "unread_count": 0, "notify_count": 1},
  149. )
  150. self.persist(
  151. type="m.room.message",
  152. msgtype="m.text",
  153. body="world",
  154. push_actions=[
  155. (USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
  156. ],
  157. )
  158. self.replicate()
  159. self.check(
  160. "get_unread_event_push_actions_by_room_for_user",
  161. [ROOM_ID, USER_ID_2, event1.event_id],
  162. {"highlight_count": 1, "unread_count": 0, "notify_count": 2},
  163. )
  164. def test_get_rooms_for_user_with_stream_ordering(self):
  165. """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
  166. by rows in the events stream
  167. """
  168. self.persist(type="m.room.create", key="", creator=USER_ID)
  169. self.persist(type="m.room.member", key=USER_ID, membership="join")
  170. self.replicate()
  171. self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
  172. j2 = self.persist(
  173. type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
  174. )
  175. self.replicate()
  176. expected_pos = PersistedEventPosition(
  177. "master", j2.internal_metadata.stream_ordering
  178. )
  179. self.check(
  180. "get_rooms_for_user_with_stream_ordering",
  181. (USER_ID_2,),
  182. {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
  183. )
  184. def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
  185. """Check that current_state invalidation happens correctly with multiple events
  186. in the persistence batch.
  187. This test attempts to reproduce a race condition between the event persistence
  188. loop and a worker-based Sync handler.
  189. The problem occurred when the master persisted several events in one batch. It
  190. only updates the current_state at the end of each batch, so the obvious thing
  191. to do is then to issue a current_state_delta stream update corresponding to the
  192. last stream_id in the batch.
  193. However, that raises the possibility that a worker will see the replication
  194. notification for a join event before the current_state caches are invalidated.
  195. The test involves:
  196. * creating a join and a message event for a user, and persisting them in the
  197. same batch
  198. * controlling the replication stream so that updates are sent gradually
  199. * between each bunch of replication updates, check that we see a consistent
  200. snapshot of the state.
  201. """
  202. self.persist(type="m.room.create", key="", creator=USER_ID)
  203. self.persist(type="m.room.member", key=USER_ID, membership="join")
  204. self.replicate()
  205. self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
  206. # limit the replication rate
  207. repl_transport = self._server_transport
  208. assert isinstance(repl_transport, FakeTransport)
  209. repl_transport.autoflush = False
  210. # build the join and message events and persist them in the same batch.
  211. logger.info("----- build test events ------")
  212. j2, j2ctx = self.build_event(
  213. type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
  214. )
  215. msg, msgctx = self.build_event()
  216. self.get_success(
  217. self.storage.persistence.persist_events([(j2, j2ctx), (msg, msgctx)])
  218. )
  219. self.replicate()
  220. event_source = RoomEventSource(self.hs)
  221. event_source.store = self.slaved_store
  222. current_token = self.get_success(event_source.get_current_key())
  223. # gradually stream out the replication
  224. while repl_transport.buffer:
  225. logger.info("------ flush ------")
  226. repl_transport.flush(30)
  227. self.pump(0)
  228. prev_token = current_token
  229. current_token = self.get_success(event_source.get_current_key())
  230. # attempt to replicate the behaviour of the sync handler.
  231. #
  232. # First, we get a list of the rooms we are joined to
  233. joined_rooms = self.get_success(
  234. self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
  235. )
  236. # Then, we get a list of the events since the last sync
  237. membership_changes = self.get_success(
  238. self.slaved_store.get_membership_changes_for_user(
  239. USER_ID_2, prev_token, current_token
  240. )
  241. )
  242. logger.info(
  243. "%s->%s: joined_rooms=%r membership_changes=%r",
  244. prev_token,
  245. current_token,
  246. joined_rooms,
  247. membership_changes,
  248. )
  249. # the membership change is only any use to us if the room is in the
  250. # joined_rooms list.
  251. if membership_changes:
  252. expected_pos = PersistedEventPosition(
  253. "master", j2.internal_metadata.stream_ordering
  254. )
  255. self.assertEqual(
  256. joined_rooms,
  257. {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
  258. )
  259. event_id = 0
  260. def persist(self, backfill=False, **kwargs):
  261. """
  262. Returns:
  263. synapse.events.FrozenEvent: The event that was persisted.
  264. """
  265. event, context = self.build_event(**kwargs)
  266. if backfill:
  267. self.get_success(
  268. self.storage.persistence.persist_events(
  269. [(event, context)], backfilled=True
  270. )
  271. )
  272. else:
  273. self.get_success(self.storage.persistence.persist_event(event, context))
  274. return event
  275. def build_event(
  276. self,
  277. sender=USER_ID,
  278. room_id=ROOM_ID,
  279. type="m.room.message",
  280. key=None,
  281. internal: Optional[dict] = None,
  282. depth=None,
  283. prev_events: Optional[list] = None,
  284. auth_events: Optional[list] = None,
  285. prev_state: Optional[list] = None,
  286. redacts=None,
  287. push_actions: Iterable = frozenset(),
  288. **content,
  289. ):
  290. prev_events = prev_events or []
  291. auth_events = auth_events or []
  292. prev_state = prev_state or []
  293. if depth is None:
  294. depth = self.event_id
  295. if not prev_events:
  296. latest_event_ids = self.get_success(
  297. self.master_store.get_latest_event_ids_in_room(room_id)
  298. )
  299. prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
  300. event_dict = {
  301. "sender": sender,
  302. "type": type,
  303. "content": content,
  304. "event_id": "$%d:blue" % (self.event_id,),
  305. "room_id": room_id,
  306. "depth": depth,
  307. "origin_server_ts": self.event_id,
  308. "prev_events": prev_events,
  309. "auth_events": auth_events,
  310. }
  311. if key is not None:
  312. event_dict["state_key"] = key
  313. event_dict["prev_state"] = prev_state
  314. if redacts is not None:
  315. event_dict["redacts"] = redacts
  316. event = make_event_from_dict(event_dict, internal_metadata_dict=internal or {})
  317. self.event_id += 1
  318. state_handler = self.hs.get_state_handler()
  319. context = self.get_success(state_handler.compute_event_context(event))
  320. self.get_success(
  321. self.master_store.add_push_actions_to_staging(
  322. event.event_id,
  323. {user_id: actions for user_id, actions in push_actions},
  324. False,
  325. )
  326. )
  327. return event, context