test_stats.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from mock import Mock
  16. from twisted.internet import defer
  17. from synapse.api.constants import EventTypes, Membership
  18. from synapse.rest import admin
  19. from synapse.rest.client.v1 import login, room
  20. from tests import unittest
  21. class StatsRoomTests(unittest.HomeserverTestCase):
  22. servlets = [
  23. admin.register_servlets_for_client_rest_resource,
  24. room.register_servlets,
  25. login.register_servlets,
  26. ]
  27. def prepare(self, reactor, clock, hs):
  28. self.store = hs.get_datastore()
  29. self.handler = self.hs.get_stats_handler()
  30. def _add_background_updates(self):
  31. """
  32. Add the background updates we need to run.
  33. """
  34. # Ugh, have to reset this flag
  35. self.store._all_done = False
  36. self.get_success(
  37. self.store._simple_insert(
  38. "background_updates",
  39. {"update_name": "populate_stats_createtables", "progress_json": "{}"},
  40. )
  41. )
  42. self.get_success(
  43. self.store._simple_insert(
  44. "background_updates",
  45. {
  46. "update_name": "populate_stats_process_rooms",
  47. "progress_json": "{}",
  48. "depends_on": "populate_stats_createtables",
  49. },
  50. )
  51. )
  52. self.get_success(
  53. self.store._simple_insert(
  54. "background_updates",
  55. {
  56. "update_name": "populate_stats_cleanup",
  57. "progress_json": "{}",
  58. "depends_on": "populate_stats_process_rooms",
  59. },
  60. )
  61. )
  62. def test_initial_room(self):
  63. """
  64. The background updates will build the table from scratch.
  65. """
  66. r = self.get_success(self.store.get_all_room_state())
  67. self.assertEqual(len(r), 0)
  68. # Disable stats
  69. self.hs.config.stats_enabled = False
  70. self.handler.stats_enabled = False
  71. u1 = self.register_user("u1", "pass")
  72. u1_token = self.login("u1", "pass")
  73. room_1 = self.helper.create_room_as(u1, tok=u1_token)
  74. self.helper.send_state(
  75. room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
  76. )
  77. # Stats disabled, shouldn't have done anything
  78. r = self.get_success(self.store.get_all_room_state())
  79. self.assertEqual(len(r), 0)
  80. # Enable stats
  81. self.hs.config.stats_enabled = True
  82. self.handler.stats_enabled = True
  83. # Do the initial population of the user directory via the background update
  84. self._add_background_updates()
  85. while not self.get_success(self.store.has_completed_background_updates()):
  86. self.get_success(self.store.do_next_background_update(100), by=0.1)
  87. r = self.get_success(self.store.get_all_room_state())
  88. self.assertEqual(len(r), 1)
  89. self.assertEqual(r[0]["topic"], "foo")
  90. def test_initial_earliest_token(self):
  91. """
  92. Ingestion via notify_new_event will ignore tokens that the background
  93. update have already processed.
  94. """
  95. self.reactor.advance(86401)
  96. self.hs.config.stats_enabled = False
  97. self.handler.stats_enabled = False
  98. u1 = self.register_user("u1", "pass")
  99. u1_token = self.login("u1", "pass")
  100. u2 = self.register_user("u2", "pass")
  101. u2_token = self.login("u2", "pass")
  102. u3 = self.register_user("u3", "pass")
  103. u3_token = self.login("u3", "pass")
  104. room_1 = self.helper.create_room_as(u1, tok=u1_token)
  105. self.helper.send_state(
  106. room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
  107. )
  108. # Begin the ingestion by creating the temp tables. This will also store
  109. # the position that the deltas should begin at, once they take over.
  110. self.hs.config.stats_enabled = True
  111. self.handler.stats_enabled = True
  112. self.store._all_done = False
  113. self.get_success(self.store.update_stats_stream_pos(None))
  114. self.get_success(
  115. self.store._simple_insert(
  116. "background_updates",
  117. {"update_name": "populate_stats_createtables", "progress_json": "{}"},
  118. )
  119. )
  120. while not self.get_success(self.store.has_completed_background_updates()):
  121. self.get_success(self.store.do_next_background_update(100), by=0.1)
  122. # Now, before the table is actually ingested, add some more events.
  123. self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
  124. self.helper.join(room=room_1, user=u2, tok=u2_token)
  125. # Now do the initial ingestion.
  126. self.get_success(
  127. self.store._simple_insert(
  128. "background_updates",
  129. {"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
  130. )
  131. )
  132. self.get_success(
  133. self.store._simple_insert(
  134. "background_updates",
  135. {
  136. "update_name": "populate_stats_cleanup",
  137. "progress_json": "{}",
  138. "depends_on": "populate_stats_process_rooms",
  139. },
  140. )
  141. )
  142. self.store._all_done = False
  143. while not self.get_success(self.store.has_completed_background_updates()):
  144. self.get_success(self.store.do_next_background_update(100), by=0.1)
  145. self.reactor.advance(86401)
  146. # Now add some more events, triggering ingestion. Because of the stream
  147. # position being set to before the events sent in the middle, a simpler
  148. # implementation would reprocess those events, and say there were four
  149. # users, not three.
  150. self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
  151. self.helper.join(room=room_1, user=u3, tok=u3_token)
  152. # Get the deltas! There should be two -- day 1, and day 2.
  153. r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
  154. # The oldest has 2 joined members
  155. self.assertEqual(r[-1]["joined_members"], 2)
  156. # The newest has 3
  157. self.assertEqual(r[0]["joined_members"], 3)
  158. def test_incorrect_state_transition(self):
  159. """
  160. If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
  161. (JOIN, INVITE, LEAVE, BAN), an error is raised.
  162. """
  163. events = {
  164. "a1": {"membership": Membership.LEAVE},
  165. "a2": {"membership": "not a real thing"},
  166. }
  167. def get_event(event_id, allow_none=True):
  168. m = Mock()
  169. m.content = events[event_id]
  170. d = defer.Deferred()
  171. self.reactor.callLater(0.0, d.callback, m)
  172. return d
  173. def get_received_ts(event_id):
  174. return defer.succeed(1)
  175. self.store.get_received_ts = get_received_ts
  176. self.store.get_event = get_event
  177. deltas = [
  178. {
  179. "type": EventTypes.Member,
  180. "state_key": "some_user",
  181. "room_id": "room",
  182. "event_id": "a1",
  183. "prev_event_id": "a2",
  184. "stream_id": 60,
  185. }
  186. ]
  187. f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
  188. self.assertEqual(
  189. f.value.args[0], "'not a real thing' is not a valid prev_membership"
  190. )
  191. # And the other way...
  192. deltas = [
  193. {
  194. "type": EventTypes.Member,
  195. "state_key": "some_user",
  196. "room_id": "room",
  197. "event_id": "a2",
  198. "prev_event_id": "a1",
  199. "stream_id": 100,
  200. }
  201. ]
  202. f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
  203. self.assertEqual(
  204. f.value.args[0], "'not a real thing' is not a valid membership"
  205. )
  206. def test_redacted_prev_event(self):
  207. """
  208. If the prev_event does not exist, then it is assumed to be a LEAVE.
  209. """
  210. u1 = self.register_user("u1", "pass")
  211. u1_token = self.login("u1", "pass")
  212. room_1 = self.helper.create_room_as(u1, tok=u1_token)
  213. # Do the initial population of the user directory via the background update
  214. self._add_background_updates()
  215. while not self.get_success(self.store.has_completed_background_updates()):
  216. self.get_success(self.store.do_next_background_update(100), by=0.1)
  217. events = {"a1": None, "a2": {"membership": Membership.JOIN}}
  218. def get_event(event_id, allow_none=True):
  219. if events.get(event_id):
  220. m = Mock()
  221. m.content = events[event_id]
  222. else:
  223. m = None
  224. d = defer.Deferred()
  225. self.reactor.callLater(0.0, d.callback, m)
  226. return d
  227. def get_received_ts(event_id):
  228. return defer.succeed(1)
  229. self.store.get_received_ts = get_received_ts
  230. self.store.get_event = get_event
  231. deltas = [
  232. {
  233. "type": EventTypes.Member,
  234. "state_key": "some_user:test",
  235. "room_id": room_1,
  236. "event_id": "a2",
  237. "prev_event_id": "a1",
  238. "stream_id": 100,
  239. }
  240. ]
  241. # Handle our fake deltas, which has a user going from LEAVE -> JOIN.
  242. self.get_success(self.handler._handle_deltas(deltas))
  243. # One delta, with two joined members -- the room creator, and our fake
  244. # user.
  245. r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
  246. self.assertEqual(len(r), 1)
  247. self.assertEqual(r[0]["joined_members"], 2)