test_events.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from canonicaljson import encode_canonical_json
  16. from synapse.events import FrozenEvent, _EventInternalMetadata
  17. from synapse.events.snapshot import EventContext
  18. from synapse.handlers.room import RoomEventSource
  19. from synapse.replication.slave.storage.events import SlavedEventStore
  20. from synapse.storage.roommember import RoomsForUser
  21. from ._base import BaseSlavedStoreTestCase
  22. USER_ID = "@feeling:blue"
  23. USER_ID_2 = "@bright:blue"
  24. OUTLIER = {"outlier": True}
  25. ROOM_ID = "!room:blue"
  26. logger = logging.getLogger(__name__)
  27. def dict_equals(self, other):
  28. me = encode_canonical_json(self.get_pdu_json())
  29. them = encode_canonical_json(other.get_pdu_json())
  30. return me == them
  31. def patch__eq__(cls):
  32. eq = getattr(cls, "__eq__", None)
  33. cls.__eq__ = dict_equals
  34. def unpatch():
  35. if eq is not None:
  36. cls.__eq__ = eq
  37. return unpatch
  38. class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
  39. STORE_TYPE = SlavedEventStore
  40. def setUp(self):
  41. # Patch up the equality operator for events so that we can check
  42. # whether lists of events match using assertEquals
  43. self.unpatches = [patch__eq__(_EventInternalMetadata), patch__eq__(FrozenEvent)]
  44. return super(SlavedEventStoreTestCase, self).setUp()
  45. def tearDown(self):
  46. [unpatch() for unpatch in self.unpatches]
  47. def test_get_latest_event_ids_in_room(self):
  48. create = self.persist(type="m.room.create", key="", creator=USER_ID)
  49. self.replicate()
  50. self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
  51. join = self.persist(
  52. type="m.room.member",
  53. key=USER_ID,
  54. membership="join",
  55. prev_events=[(create.event_id, {})],
  56. )
  57. self.replicate()
  58. self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
  59. def test_redactions(self):
  60. self.persist(type="m.room.create", key="", creator=USER_ID)
  61. self.persist(type="m.room.member", key=USER_ID, membership="join")
  62. msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
  63. self.replicate()
  64. self.check("get_event", [msg.event_id], msg)
  65. redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
  66. self.replicate()
  67. msg_dict = msg.get_dict()
  68. msg_dict["content"] = {}
  69. msg_dict["unsigned"]["redacted_by"] = redaction.event_id
  70. msg_dict["unsigned"]["redacted_because"] = redaction
  71. redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
  72. self.check("get_event", [msg.event_id], redacted)
  73. def test_backfilled_redactions(self):
  74. self.persist(type="m.room.create", key="", creator=USER_ID)
  75. self.persist(type="m.room.member", key=USER_ID, membership="join")
  76. msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
  77. self.replicate()
  78. self.check("get_event", [msg.event_id], msg)
  79. redaction = self.persist(
  80. type="m.room.redaction", redacts=msg.event_id, backfill=True
  81. )
  82. self.replicate()
  83. msg_dict = msg.get_dict()
  84. msg_dict["content"] = {}
  85. msg_dict["unsigned"]["redacted_by"] = redaction.event_id
  86. msg_dict["unsigned"]["redacted_because"] = redaction
  87. redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
  88. self.check("get_event", [msg.event_id], redacted)
  89. def test_invites(self):
  90. self.persist(type="m.room.create", key="", creator=USER_ID)
  91. self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
  92. event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
  93. self.replicate()
  94. self.check(
  95. "get_invited_rooms_for_local_user",
  96. [USER_ID_2],
  97. [
  98. RoomsForUser(
  99. ROOM_ID,
  100. USER_ID,
  101. "invite",
  102. event.event_id,
  103. event.internal_metadata.stream_ordering,
  104. )
  105. ],
  106. )
  107. def test_push_actions_for_user(self):
  108. self.persist(type="m.room.create", key="", creator=USER_ID)
  109. self.persist(type="m.room.join", key=USER_ID, membership="join")
  110. self.persist(
  111. type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"
  112. )
  113. event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
  114. self.replicate()
  115. self.check(
  116. "get_unread_event_push_actions_by_room_for_user",
  117. [ROOM_ID, USER_ID_2, event1.event_id],
  118. {"highlight_count": 0, "notify_count": 0},
  119. )
  120. self.persist(
  121. type="m.room.message",
  122. msgtype="m.text",
  123. body="world",
  124. push_actions=[(USER_ID_2, ["notify"])],
  125. )
  126. self.replicate()
  127. self.check(
  128. "get_unread_event_push_actions_by_room_for_user",
  129. [ROOM_ID, USER_ID_2, event1.event_id],
  130. {"highlight_count": 0, "notify_count": 1},
  131. )
  132. self.persist(
  133. type="m.room.message",
  134. msgtype="m.text",
  135. body="world",
  136. push_actions=[
  137. (USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
  138. ],
  139. )
  140. self.replicate()
  141. self.check(
  142. "get_unread_event_push_actions_by_room_for_user",
  143. [ROOM_ID, USER_ID_2, event1.event_id],
  144. {"highlight_count": 1, "notify_count": 2},
  145. )
  146. def test_get_rooms_for_user_with_stream_ordering(self):
  147. """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
  148. by rows in the events stream
  149. """
  150. self.persist(type="m.room.create", key="", creator=USER_ID)
  151. self.persist(type="m.room.member", key=USER_ID, membership="join")
  152. self.replicate()
  153. self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
  154. j2 = self.persist(
  155. type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
  156. )
  157. self.replicate()
  158. self.check(
  159. "get_rooms_for_user_with_stream_ordering",
  160. (USER_ID_2,),
  161. {(ROOM_ID, j2.internal_metadata.stream_ordering)},
  162. )
  163. def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
  164. """Check that current_state invalidation happens correctly with multiple events
  165. in the persistence batch.
  166. This test attempts to reproduce a race condition between the event persistence
  167. loop and a worker-based Sync handler.
  168. The problem occurred when the master persisted several events in one batch. It
  169. only updates the current_state at the end of each batch, so the obvious thing
  170. to do is then to issue a current_state_delta stream update corresponding to the
  171. last stream_id in the batch.
  172. However, that raises the possibility that a worker will see the replication
  173. notification for a join event before the current_state caches are invalidated.
  174. The test involves:
  175. * creating a join and a message event for a user, and persisting them in the
  176. same batch
  177. * controlling the replication stream so that updates are sent gradually
  178. * between each bunch of replication updates, check that we see a consistent
  179. snapshot of the state.
  180. """
  181. self.persist(type="m.room.create", key="", creator=USER_ID)
  182. self.persist(type="m.room.member", key=USER_ID, membership="join")
  183. self.replicate()
  184. self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
  185. # limit the replication rate
  186. repl_transport = self.server_to_client_transport
  187. repl_transport.autoflush = False
  188. # build the join and message events and persist them in the same batch.
  189. logger.info("----- build test events ------")
  190. j2, j2ctx = self.build_event(
  191. type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
  192. )
  193. msg, msgctx = self.build_event()
  194. self.get_success(
  195. self.storage.persistence.persist_events([(j2, j2ctx), (msg, msgctx)])
  196. )
  197. self.replicate()
  198. event_source = RoomEventSource(self.hs)
  199. event_source.store = self.slaved_store
  200. current_token = self.get_success(event_source.get_current_key())
  201. # gradually stream out the replication
  202. while repl_transport.buffer:
  203. logger.info("------ flush ------")
  204. repl_transport.flush(30)
  205. self.pump(0)
  206. prev_token = current_token
  207. current_token = self.get_success(event_source.get_current_key())
  208. # attempt to replicate the behaviour of the sync handler.
  209. #
  210. # First, we get a list of the rooms we are joined to
  211. joined_rooms = self.get_success(
  212. self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
  213. )
  214. # Then, we get a list of the events since the last sync
  215. membership_changes = self.get_success(
  216. self.slaved_store.get_membership_changes_for_user(
  217. USER_ID_2, prev_token, current_token
  218. )
  219. )
  220. logger.info(
  221. "%s->%s: joined_rooms=%r membership_changes=%r",
  222. prev_token,
  223. current_token,
  224. joined_rooms,
  225. membership_changes,
  226. )
  227. # the membership change is only any use to us if the room is in the
  228. # joined_rooms list.
  229. if membership_changes:
  230. self.assertEqual(
  231. joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)}
  232. )
  233. event_id = 0
  234. def persist(self, backfill=False, **kwargs):
  235. """
  236. Returns:
  237. synapse.events.FrozenEvent: The event that was persisted.
  238. """
  239. event, context = self.build_event(**kwargs)
  240. if backfill:
  241. self.get_success(
  242. self.storage.persistence.persist_events(
  243. [(event, context)], backfilled=True
  244. )
  245. )
  246. else:
  247. self.get_success(self.storage.persistence.persist_event(event, context))
  248. return event
  249. def build_event(
  250. self,
  251. sender=USER_ID,
  252. room_id=ROOM_ID,
  253. type="m.room.message",
  254. key=None,
  255. internal={},
  256. state=None,
  257. depth=None,
  258. prev_events=[],
  259. auth_events=[],
  260. prev_state=[],
  261. redacts=None,
  262. push_actions=[],
  263. **content
  264. ):
  265. if depth is None:
  266. depth = self.event_id
  267. if not prev_events:
  268. latest_event_ids = self.get_success(
  269. self.master_store.get_latest_event_ids_in_room(room_id)
  270. )
  271. prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
  272. event_dict = {
  273. "sender": sender,
  274. "type": type,
  275. "content": content,
  276. "event_id": "$%d:blue" % (self.event_id,),
  277. "room_id": room_id,
  278. "depth": depth,
  279. "origin_server_ts": self.event_id,
  280. "prev_events": prev_events,
  281. "auth_events": auth_events,
  282. }
  283. if key is not None:
  284. event_dict["state_key"] = key
  285. event_dict["prev_state"] = prev_state
  286. if redacts is not None:
  287. event_dict["redacts"] = redacts
  288. event = FrozenEvent(event_dict, internal_metadata_dict=internal)
  289. self.event_id += 1
  290. if state is not None:
  291. state_ids = {key: e.event_id for key, e in state.items()}
  292. context = EventContext.with_state(
  293. state_group=None, current_state_ids=state_ids, prev_state_ids=state_ids
  294. )
  295. else:
  296. state_handler = self.hs.get_state_handler()
  297. context = self.get_success(state_handler.compute_event_context(event))
  298. self.master_store.add_push_actions_to_staging(
  299. event.event_id, {user_id: actions for user_id, actions in push_actions}
  300. )
  301. return event, context