test_events.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from canonicaljson import encode_canonical_json
  16. from synapse.api.room_versions import RoomVersions
  17. from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
  18. from synapse.events.snapshot import EventContext
  19. from synapse.handlers.room import RoomEventSource
  20. from synapse.replication.slave.storage.events import SlavedEventStore
  21. from synapse.storage.roommember import RoomsForUser
  22. from ._base import BaseSlavedStoreTestCase
  23. USER_ID = "@feeling:blue"
  24. USER_ID_2 = "@bright:blue"
  25. OUTLIER = {"outlier": True}
  26. ROOM_ID = "!room:blue"
  27. logger = logging.getLogger(__name__)
  28. def dict_equals(self, other):
  29. me = encode_canonical_json(self.get_pdu_json())
  30. them = encode_canonical_json(other.get_pdu_json())
  31. return me == them
  32. def patch__eq__(cls):
  33. eq = getattr(cls, "__eq__", None)
  34. cls.__eq__ = dict_equals
  35. def unpatch():
  36. if eq is not None:
  37. cls.__eq__ = eq
  38. return unpatch
  39. class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
  40. STORE_TYPE = SlavedEventStore
  41. def setUp(self):
  42. # Patch up the equality operator for events so that we can check
  43. # whether lists of events match using assertEquals
  44. self.unpatches = [patch__eq__(_EventInternalMetadata), patch__eq__(FrozenEvent)]
  45. return super(SlavedEventStoreTestCase, self).setUp()
  46. def prepare(self, *args, **kwargs):
  47. super().prepare(*args, **kwargs)
  48. self.get_success(
  49. self.master_store.store_room(
  50. ROOM_ID, USER_ID, is_public=False, room_version=RoomVersions.V1,
  51. )
  52. )
  53. def tearDown(self):
  54. [unpatch() for unpatch in self.unpatches]
  55. def test_get_latest_event_ids_in_room(self):
  56. create = self.persist(type="m.room.create", key="", creator=USER_ID)
  57. self.replicate()
  58. self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
  59. join = self.persist(
  60. type="m.room.member",
  61. key=USER_ID,
  62. membership="join",
  63. prev_events=[(create.event_id, {})],
  64. )
  65. self.replicate()
  66. self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
  67. def test_redactions(self):
  68. self.persist(type="m.room.create", key="", creator=USER_ID)
  69. self.persist(type="m.room.member", key=USER_ID, membership="join")
  70. msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
  71. self.replicate()
  72. self.check("get_event", [msg.event_id], msg)
  73. redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
  74. self.replicate()
  75. msg_dict = msg.get_dict()
  76. msg_dict["content"] = {}
  77. msg_dict["unsigned"]["redacted_by"] = redaction.event_id
  78. msg_dict["unsigned"]["redacted_because"] = redaction
  79. redacted = make_event_from_dict(
  80. msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
  81. )
  82. self.check("get_event", [msg.event_id], redacted)
  83. def test_backfilled_redactions(self):
  84. self.persist(type="m.room.create", key="", creator=USER_ID)
  85. self.persist(type="m.room.member", key=USER_ID, membership="join")
  86. msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
  87. self.replicate()
  88. self.check("get_event", [msg.event_id], msg)
  89. redaction = self.persist(
  90. type="m.room.redaction", redacts=msg.event_id, backfill=True
  91. )
  92. self.replicate()
  93. msg_dict = msg.get_dict()
  94. msg_dict["content"] = {}
  95. msg_dict["unsigned"]["redacted_by"] = redaction.event_id
  96. msg_dict["unsigned"]["redacted_because"] = redaction
  97. redacted = make_event_from_dict(
  98. msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict()
  99. )
  100. self.check("get_event", [msg.event_id], redacted)
  101. def test_invites(self):
  102. self.persist(type="m.room.create", key="", creator=USER_ID)
  103. self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
  104. event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
  105. self.replicate()
  106. self.check(
  107. "get_invited_rooms_for_local_user",
  108. [USER_ID_2],
  109. [
  110. RoomsForUser(
  111. ROOM_ID,
  112. USER_ID,
  113. "invite",
  114. event.event_id,
  115. event.internal_metadata.stream_ordering,
  116. )
  117. ],
  118. )
  119. def test_push_actions_for_user(self):
  120. self.persist(type="m.room.create", key="", creator=USER_ID)
  121. self.persist(type="m.room.join", key=USER_ID, membership="join")
  122. self.persist(
  123. type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"
  124. )
  125. event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
  126. self.replicate()
  127. self.check(
  128. "get_unread_event_push_actions_by_room_for_user",
  129. [ROOM_ID, USER_ID_2, event1.event_id],
  130. {"highlight_count": 0, "notify_count": 0},
  131. )
  132. self.persist(
  133. type="m.room.message",
  134. msgtype="m.text",
  135. body="world",
  136. push_actions=[(USER_ID_2, ["notify"])],
  137. )
  138. self.replicate()
  139. self.check(
  140. "get_unread_event_push_actions_by_room_for_user",
  141. [ROOM_ID, USER_ID_2, event1.event_id],
  142. {"highlight_count": 0, "notify_count": 1},
  143. )
  144. self.persist(
  145. type="m.room.message",
  146. msgtype="m.text",
  147. body="world",
  148. push_actions=[
  149. (USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
  150. ],
  151. )
  152. self.replicate()
  153. self.check(
  154. "get_unread_event_push_actions_by_room_for_user",
  155. [ROOM_ID, USER_ID_2, event1.event_id],
  156. {"highlight_count": 1, "notify_count": 2},
  157. )
  158. def test_get_rooms_for_user_with_stream_ordering(self):
  159. """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
  160. by rows in the events stream
  161. """
  162. self.persist(type="m.room.create", key="", creator=USER_ID)
  163. self.persist(type="m.room.member", key=USER_ID, membership="join")
  164. self.replicate()
  165. self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
  166. j2 = self.persist(
  167. type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
  168. )
  169. self.replicate()
  170. self.check(
  171. "get_rooms_for_user_with_stream_ordering",
  172. (USER_ID_2,),
  173. {(ROOM_ID, j2.internal_metadata.stream_ordering)},
  174. )
  175. def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
  176. """Check that current_state invalidation happens correctly with multiple events
  177. in the persistence batch.
  178. This test attempts to reproduce a race condition between the event persistence
  179. loop and a worker-based Sync handler.
  180. The problem occurred when the master persisted several events in one batch. It
  181. only updates the current_state at the end of each batch, so the obvious thing
  182. to do is then to issue a current_state_delta stream update corresponding to the
  183. last stream_id in the batch.
  184. However, that raises the possibility that a worker will see the replication
  185. notification for a join event before the current_state caches are invalidated.
  186. The test involves:
  187. * creating a join and a message event for a user, and persisting them in the
  188. same batch
  189. * controlling the replication stream so that updates are sent gradually
  190. * between each bunch of replication updates, check that we see a consistent
  191. snapshot of the state.
  192. """
  193. self.persist(type="m.room.create", key="", creator=USER_ID)
  194. self.persist(type="m.room.member", key=USER_ID, membership="join")
  195. self.replicate()
  196. self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
  197. # limit the replication rate
  198. repl_transport = self.server_to_client_transport
  199. repl_transport.autoflush = False
  200. # build the join and message events and persist them in the same batch.
  201. logger.info("----- build test events ------")
  202. j2, j2ctx = self.build_event(
  203. type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
  204. )
  205. msg, msgctx = self.build_event()
  206. self.get_success(
  207. self.storage.persistence.persist_events([(j2, j2ctx), (msg, msgctx)])
  208. )
  209. self.replicate()
  210. event_source = RoomEventSource(self.hs)
  211. event_source.store = self.slaved_store
  212. current_token = self.get_success(event_source.get_current_key())
  213. # gradually stream out the replication
  214. while repl_transport.buffer:
  215. logger.info("------ flush ------")
  216. repl_transport.flush(30)
  217. self.pump(0)
  218. prev_token = current_token
  219. current_token = self.get_success(event_source.get_current_key())
  220. # attempt to replicate the behaviour of the sync handler.
  221. #
  222. # First, we get a list of the rooms we are joined to
  223. joined_rooms = self.get_success(
  224. self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
  225. )
  226. # Then, we get a list of the events since the last sync
  227. membership_changes = self.get_success(
  228. self.slaved_store.get_membership_changes_for_user(
  229. USER_ID_2, prev_token, current_token
  230. )
  231. )
  232. logger.info(
  233. "%s->%s: joined_rooms=%r membership_changes=%r",
  234. prev_token,
  235. current_token,
  236. joined_rooms,
  237. membership_changes,
  238. )
  239. # the membership change is only any use to us if the room is in the
  240. # joined_rooms list.
  241. if membership_changes:
  242. self.assertEqual(
  243. joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)}
  244. )
  245. event_id = 0
  246. def persist(self, backfill=False, **kwargs):
  247. """
  248. Returns:
  249. synapse.events.FrozenEvent: The event that was persisted.
  250. """
  251. event, context = self.build_event(**kwargs)
  252. if backfill:
  253. self.get_success(
  254. self.storage.persistence.persist_events(
  255. [(event, context)], backfilled=True
  256. )
  257. )
  258. else:
  259. self.get_success(self.storage.persistence.persist_event(event, context))
  260. return event
  261. def build_event(
  262. self,
  263. sender=USER_ID,
  264. room_id=ROOM_ID,
  265. type="m.room.message",
  266. key=None,
  267. internal={},
  268. state=None,
  269. depth=None,
  270. prev_events=[],
  271. auth_events=[],
  272. prev_state=[],
  273. redacts=None,
  274. push_actions=[],
  275. **content
  276. ):
  277. if depth is None:
  278. depth = self.event_id
  279. if not prev_events:
  280. latest_event_ids = self.get_success(
  281. self.master_store.get_latest_event_ids_in_room(room_id)
  282. )
  283. prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
  284. event_dict = {
  285. "sender": sender,
  286. "type": type,
  287. "content": content,
  288. "event_id": "$%d:blue" % (self.event_id,),
  289. "room_id": room_id,
  290. "depth": depth,
  291. "origin_server_ts": self.event_id,
  292. "prev_events": prev_events,
  293. "auth_events": auth_events,
  294. }
  295. if key is not None:
  296. event_dict["state_key"] = key
  297. event_dict["prev_state"] = prev_state
  298. if redacts is not None:
  299. event_dict["redacts"] = redacts
  300. event = make_event_from_dict(event_dict, internal_metadata_dict=internal)
  301. self.event_id += 1
  302. if state is not None:
  303. state_ids = {key: e.event_id for key, e in state.items()}
  304. context = EventContext.with_state(
  305. state_group=None, current_state_ids=state_ids, prev_state_ids=state_ids
  306. )
  307. else:
  308. state_handler = self.hs.get_state_handler()
  309. context = self.get_success(state_handler.compute_event_context(event))
  310. self.master_store.add_push_actions_to_staging(
  311. event.event_id, {user_id: actions for user_id, actions in push_actions}
  312. )
  313. return event, context