test_state.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from tests import unittest
  16. from twisted.internet import defer
  17. from twisted.python.log import PythonLoggingObserver
  18. from synapse.state import StateHandler
  19. from synapse.storage.pdu import PduEntry
  20. from synapse.federation.pdu_codec import encode_event_id
  21. from synapse.federation.units import Pdu
  22. from collections import namedtuple
  23. from mock import Mock
  24. import mock
  25. ReturnType = namedtuple(
  26. "StateReturnType", ["new_branch", "current_branch"]
  27. )
  28. def _gen_get_power_level(power_level_list):
  29. def get_power_level(room_id, user_id):
  30. return defer.succeed(power_level_list.get(user_id, None))
  31. return get_power_level
  32. class StateTestCase(unittest.TestCase):
  33. def setUp(self):
  34. self.persistence = Mock(spec=[
  35. "get_unresolved_state_tree",
  36. "update_current_state",
  37. "get_latest_pdus_in_context",
  38. "get_current_state_pdu",
  39. "get_pdu",
  40. "get_power_level",
  41. ])
  42. self.replication = Mock(spec=["get_pdu"])
  43. hs = Mock(spec=["get_datastore", "get_replication_layer"])
  44. hs.get_datastore.return_value = self.persistence
  45. hs.get_replication_layer.return_value = self.replication
  46. hs.hostname = "bob.com"
  47. self.state = StateHandler(hs)
  48. @defer.inlineCallbacks
  49. def test_new_state_key(self):
  50. # We've never seen anything for this state before
  51. new_pdu = new_fake_pdu("A", "test", "mem", "x", None, "u")
  52. self.persistence.get_power_level.side_effect = _gen_get_power_level({})
  53. self.persistence.get_unresolved_state_tree.return_value = (
  54. (ReturnType([new_pdu], []), None)
  55. )
  56. is_new = yield self.state.handle_new_state(new_pdu)
  57. self.assertTrue(is_new)
  58. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  59. new_pdu
  60. )
  61. self.assertEqual(1, self.persistence.update_current_state.call_count)
  62. self.assertFalse(self.replication.get_pdu.called)
  63. @defer.inlineCallbacks
  64. def test_direct_overwrite(self):
  65. # We do a direct overwriting of the old state, i.e., the new state
  66. # points to the old state.
  67. old_pdu = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  68. new_pdu = new_fake_pdu("B", "test", "mem", "x", "A", "u2")
  69. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  70. "u1": 10,
  71. "u2": 5,
  72. })
  73. self.persistence.get_unresolved_state_tree.return_value = (
  74. (ReturnType([new_pdu, old_pdu], [old_pdu]), None)
  75. )
  76. is_new = yield self.state.handle_new_state(new_pdu)
  77. self.assertTrue(is_new)
  78. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  79. new_pdu
  80. )
  81. self.assertEqual(1, self.persistence.update_current_state.call_count)
  82. self.assertFalse(self.replication.get_pdu.called)
  83. @defer.inlineCallbacks
  84. def test_overwrite(self):
  85. old_pdu_1 = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  86. old_pdu_2 = new_fake_pdu("B", "test", "mem", "x", "A", "u2")
  87. new_pdu = new_fake_pdu("C", "test", "mem", "x", "B", "u3")
  88. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  89. "u1": 10,
  90. "u2": 5,
  91. "u3": 0,
  92. })
  93. self.persistence.get_unresolved_state_tree.return_value = (
  94. (ReturnType([new_pdu, old_pdu_2, old_pdu_1], [old_pdu_1]), None)
  95. )
  96. is_new = yield self.state.handle_new_state(new_pdu)
  97. self.assertTrue(is_new)
  98. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  99. new_pdu
  100. )
  101. self.assertEqual(1, self.persistence.update_current_state.call_count)
  102. self.assertFalse(self.replication.get_pdu.called)
  103. @defer.inlineCallbacks
  104. def test_power_level_fail(self):
  105. # We try to update the state based on an outdated state, and have a
  106. # too low power level.
  107. old_pdu_1 = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  108. old_pdu_2 = new_fake_pdu("B", "test", "mem", "x", None, "u2")
  109. new_pdu = new_fake_pdu("C", "test", "mem", "x", "A", "u3")
  110. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  111. "u1": 10,
  112. "u2": 10,
  113. "u3": 5,
  114. })
  115. self.persistence.get_unresolved_state_tree.return_value = (
  116. (ReturnType([new_pdu, old_pdu_1], [old_pdu_2, old_pdu_1]), None)
  117. )
  118. is_new = yield self.state.handle_new_state(new_pdu)
  119. self.assertFalse(is_new)
  120. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  121. new_pdu
  122. )
  123. self.assertEqual(0, self.persistence.update_current_state.call_count)
  124. self.assertFalse(self.replication.get_pdu.called)
  125. @defer.inlineCallbacks
  126. def test_power_level_succeed(self):
  127. # We try to update the state based on an outdated state, but have
  128. # sufficient power level to force the update.
  129. old_pdu_1 = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  130. old_pdu_2 = new_fake_pdu("B", "test", "mem", "x", None, "u2")
  131. new_pdu = new_fake_pdu("C", "test", "mem", "x", "A", "u3")
  132. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  133. "u1": 10,
  134. "u2": 10,
  135. "u3": 15,
  136. })
  137. self.persistence.get_unresolved_state_tree.return_value = (
  138. (ReturnType([new_pdu, old_pdu_1], [old_pdu_2, old_pdu_1]), None)
  139. )
  140. is_new = yield self.state.handle_new_state(new_pdu)
  141. self.assertTrue(is_new)
  142. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  143. new_pdu
  144. )
  145. self.assertEqual(1, self.persistence.update_current_state.call_count)
  146. self.assertFalse(self.replication.get_pdu.called)
  147. @defer.inlineCallbacks
  148. def test_power_level_equal_same_len(self):
  149. # We try to update the state based on an outdated state, the power
  150. # levels are the same and so are the branch lengths
  151. old_pdu_1 = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  152. old_pdu_2 = new_fake_pdu("B", "test", "mem", "x", None, "u2")
  153. new_pdu = new_fake_pdu("C", "test", "mem", "x", "A", "u3")
  154. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  155. "u1": 10,
  156. "u2": 10,
  157. "u3": 10,
  158. })
  159. self.persistence.get_unresolved_state_tree.return_value = (
  160. (ReturnType([new_pdu, old_pdu_1], [old_pdu_2, old_pdu_1]), None)
  161. )
  162. is_new = yield self.state.handle_new_state(new_pdu)
  163. self.assertTrue(is_new)
  164. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  165. new_pdu
  166. )
  167. self.assertEqual(1, self.persistence.update_current_state.call_count)
  168. self.assertFalse(self.replication.get_pdu.called)
  169. @defer.inlineCallbacks
  170. def test_power_level_equal_diff_len(self):
  171. # We try to update the state based on an outdated state, the power
  172. # levels are the same but the branch length of the new one is longer.
  173. old_pdu_1 = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  174. old_pdu_2 = new_fake_pdu("B", "test", "mem", "x", None, "u2")
  175. old_pdu_3 = new_fake_pdu("C", "test", "mem", "x", "A", "u3")
  176. new_pdu = new_fake_pdu("D", "test", "mem", "x", "C", "u4")
  177. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  178. "u1": 10,
  179. "u2": 10,
  180. "u3": 10,
  181. "u4": 10,
  182. })
  183. self.persistence.get_unresolved_state_tree.return_value = (
  184. (
  185. ReturnType(
  186. [new_pdu, old_pdu_3, old_pdu_1],
  187. [old_pdu_2, old_pdu_1]
  188. ),
  189. None
  190. )
  191. )
  192. is_new = yield self.state.handle_new_state(new_pdu)
  193. self.assertTrue(is_new)
  194. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  195. new_pdu
  196. )
  197. self.assertEqual(1, self.persistence.update_current_state.call_count)
  198. self.assertFalse(self.replication.get_pdu.called)
  199. @defer.inlineCallbacks
  200. def test_missing_pdu(self):
  201. # We try to update state against a PDU we haven't yet seen,
  202. # triggering a get_pdu request
  203. # The pdu we haven't seen
  204. old_pdu_1 = new_fake_pdu(
  205. "A", "test", "mem", "x", None, "u1", depth=0
  206. )
  207. old_pdu_2 = new_fake_pdu(
  208. "B", "test", "mem", "x", "A", "u2", depth=1
  209. )
  210. new_pdu = new_fake_pdu(
  211. "C", "test", "mem", "x", "A", "u3", depth=2
  212. )
  213. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  214. "u1": 10,
  215. "u2": 10,
  216. "u3": 20,
  217. })
  218. # The return_value of `get_unresolved_state_tree`, which changes after
  219. # the call to get_pdu
  220. tree_to_return = [(ReturnType([new_pdu], [old_pdu_2]), 0)]
  221. def return_tree(p):
  222. return tree_to_return[0]
  223. def set_return_tree(destination, pdu_origin, pdu_id, outlier=False):
  224. tree_to_return[0] = (
  225. ReturnType(
  226. [new_pdu, old_pdu_1], [old_pdu_2, old_pdu_1]
  227. ),
  228. None
  229. )
  230. return defer.succeed(None)
  231. self.persistence.get_unresolved_state_tree.side_effect = return_tree
  232. self.replication.get_pdu.side_effect = set_return_tree
  233. self.persistence.get_pdu.return_value = None
  234. is_new = yield self.state.handle_new_state(new_pdu)
  235. self.assertTrue(is_new)
  236. self.replication.get_pdu.assert_called_with(
  237. destination=new_pdu.origin,
  238. pdu_origin=old_pdu_1.origin,
  239. pdu_id=old_pdu_1.pdu_id,
  240. outlier=True
  241. )
  242. self.persistence.get_unresolved_state_tree.assert_called_with(
  243. new_pdu
  244. )
  245. self.assertEquals(
  246. 2, self.persistence.get_unresolved_state_tree.call_count
  247. )
  248. self.assertEqual(1, self.persistence.update_current_state.call_count)
  249. @defer.inlineCallbacks
  250. def test_missing_pdu_depth_1(self):
  251. # We try to update state against a PDU we haven't yet seen,
  252. # triggering a get_pdu request
  253. # The pdu we haven't seen
  254. old_pdu_1 = new_fake_pdu(
  255. "A", "test", "mem", "x", None, "u1", depth=0
  256. )
  257. old_pdu_2 = new_fake_pdu(
  258. "B", "test", "mem", "x", "A", "u2", depth=2
  259. )
  260. old_pdu_3 = new_fake_pdu(
  261. "C", "test", "mem", "x", "B", "u3", depth=3
  262. )
  263. new_pdu = new_fake_pdu(
  264. "D", "test", "mem", "x", "A", "u4", depth=4
  265. )
  266. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  267. "u1": 10,
  268. "u2": 10,
  269. "u3": 10,
  270. "u4": 20,
  271. })
  272. # The return_value of `get_unresolved_state_tree`, which changes after
  273. # the call to get_pdu
  274. tree_to_return = [
  275. (
  276. ReturnType([new_pdu], [old_pdu_3]),
  277. 0
  278. ),
  279. (
  280. ReturnType(
  281. [new_pdu, old_pdu_1], [old_pdu_3]
  282. ),
  283. 1
  284. ),
  285. (
  286. ReturnType(
  287. [new_pdu, old_pdu_1], [old_pdu_3, old_pdu_2, old_pdu_1]
  288. ),
  289. None
  290. ),
  291. ]
  292. to_return = [0]
  293. def return_tree(p):
  294. return tree_to_return[to_return[0]]
  295. def set_return_tree(destination, pdu_origin, pdu_id, outlier=False):
  296. to_return[0] += 1
  297. return defer.succeed(None)
  298. self.persistence.get_unresolved_state_tree.side_effect = return_tree
  299. self.replication.get_pdu.side_effect = set_return_tree
  300. self.persistence.get_pdu.return_value = None
  301. is_new = yield self.state.handle_new_state(new_pdu)
  302. self.assertTrue(is_new)
  303. self.assertEqual(2, self.replication.get_pdu.call_count)
  304. self.replication.get_pdu.assert_has_calls(
  305. [
  306. mock.call(
  307. destination=new_pdu.origin,
  308. pdu_origin=old_pdu_1.origin,
  309. pdu_id=old_pdu_1.pdu_id,
  310. outlier=True
  311. ),
  312. mock.call(
  313. destination=old_pdu_3.origin,
  314. pdu_origin=old_pdu_2.origin,
  315. pdu_id=old_pdu_2.pdu_id,
  316. outlier=True
  317. ),
  318. ]
  319. )
  320. self.persistence.get_unresolved_state_tree.assert_called_with(
  321. new_pdu
  322. )
  323. self.assertEquals(
  324. 3, self.persistence.get_unresolved_state_tree.call_count
  325. )
  326. self.assertEqual(1, self.persistence.update_current_state.call_count)
  327. @defer.inlineCallbacks
  328. def test_missing_pdu_depth_2(self):
  329. # We try to update state against a PDU we haven't yet seen,
  330. # triggering a get_pdu request
  331. # The pdu we haven't seen
  332. old_pdu_1 = new_fake_pdu(
  333. "A", "test", "mem", "x", None, "u1", depth=0
  334. )
  335. old_pdu_2 = new_fake_pdu(
  336. "B", "test", "mem", "x", "A", "u2", depth=2
  337. )
  338. old_pdu_3 = new_fake_pdu(
  339. "C", "test", "mem", "x", "B", "u3", depth=3
  340. )
  341. new_pdu = new_fake_pdu(
  342. "D", "test", "mem", "x", "A", "u4", depth=1
  343. )
  344. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  345. "u1": 10,
  346. "u2": 10,
  347. "u3": 10,
  348. "u4": 20,
  349. })
  350. # The return_value of `get_unresolved_state_tree`, which changes after
  351. # the call to get_pdu
  352. tree_to_return = [
  353. (
  354. ReturnType([new_pdu], [old_pdu_3]),
  355. 1,
  356. ),
  357. (
  358. ReturnType(
  359. [new_pdu], [old_pdu_3, old_pdu_2]
  360. ),
  361. 0,
  362. ),
  363. (
  364. ReturnType(
  365. [new_pdu, old_pdu_1], [old_pdu_3, old_pdu_2, old_pdu_1]
  366. ),
  367. None
  368. ),
  369. ]
  370. to_return = [0]
  371. def return_tree(p):
  372. return tree_to_return[to_return[0]]
  373. def set_return_tree(destination, pdu_origin, pdu_id, outlier=False):
  374. to_return[0] += 1
  375. return defer.succeed(None)
  376. self.persistence.get_unresolved_state_tree.side_effect = return_tree
  377. self.replication.get_pdu.side_effect = set_return_tree
  378. self.persistence.get_pdu.return_value = None
  379. is_new = yield self.state.handle_new_state(new_pdu)
  380. self.assertTrue(is_new)
  381. self.assertEqual(2, self.replication.get_pdu.call_count)
  382. self.replication.get_pdu.assert_has_calls(
  383. [
  384. mock.call(
  385. destination=old_pdu_3.origin,
  386. pdu_origin=old_pdu_2.origin,
  387. pdu_id=old_pdu_2.pdu_id,
  388. outlier=True
  389. ),
  390. mock.call(
  391. destination=new_pdu.origin,
  392. pdu_origin=old_pdu_1.origin,
  393. pdu_id=old_pdu_1.pdu_id,
  394. outlier=True
  395. ),
  396. ]
  397. )
  398. self.persistence.get_unresolved_state_tree.assert_called_with(
  399. new_pdu
  400. )
  401. self.assertEquals(
  402. 3, self.persistence.get_unresolved_state_tree.call_count
  403. )
  404. self.assertEqual(1, self.persistence.update_current_state.call_count)
  405. @defer.inlineCallbacks
  406. def test_no_common_ancestor(self):
  407. # We do a direct overwriting of the old state, i.e., the new state
  408. # points to the old state.
  409. old_pdu = new_fake_pdu("A", "test", "mem", "x", None, "u1")
  410. new_pdu = new_fake_pdu("B", "test", "mem", "x", None, "u2")
  411. self.persistence.get_power_level.side_effect = _gen_get_power_level({
  412. "u1": 5,
  413. "u2": 10,
  414. })
  415. self.persistence.get_unresolved_state_tree.return_value = (
  416. (ReturnType([new_pdu], [old_pdu]), None)
  417. )
  418. is_new = yield self.state.handle_new_state(new_pdu)
  419. self.assertTrue(is_new)
  420. self.persistence.get_unresolved_state_tree.assert_called_once_with(
  421. new_pdu
  422. )
  423. self.assertEqual(1, self.persistence.update_current_state.call_count)
  424. self.assertFalse(self.replication.get_pdu.called)
  425. @defer.inlineCallbacks
  426. def test_new_event(self):
  427. event = Mock()
  428. event.event_id = "12123123@test"
  429. state_pdu = new_fake_pdu("C", "test", "mem", "x", "A", 20)
  430. snapshot = Mock()
  431. snapshot.prev_state_pdu = state_pdu
  432. event_id = "pdu_id@origin.com"
  433. def fill_out_prev_events(event):
  434. event.prev_events = [event_id]
  435. event.depth = 6
  436. snapshot.fill_out_prev_events = fill_out_prev_events
  437. yield self.state.handle_new_event(event, snapshot)
  438. self.assertLess(5, event.depth)
  439. self.assertEquals(1, len(event.prev_events))
  440. prev_id = event.prev_events[0]
  441. self.assertEqual(event_id, prev_id)
  442. self.assertEqual(
  443. encode_event_id(state_pdu.pdu_id, state_pdu.origin),
  444. event.prev_state
  445. )
  446. def new_fake_pdu(pdu_id, context, pdu_type, state_key, prev_state_id,
  447. user_id, depth=0):
  448. new_pdu = Pdu(
  449. pdu_id=pdu_id,
  450. pdu_type=pdu_type,
  451. state_key=state_key,
  452. user_id=user_id,
  453. prev_state_id=prev_state_id,
  454. origin="example.com",
  455. context="context",
  456. origin_server_ts=1405353060021,
  457. depth=depth,
  458. content_json="{}",
  459. unrecognized_keys="{}",
  460. outlier=True,
  461. is_state=True,
  462. prev_state_origin="example.com",
  463. have_processed=True,
  464. content={},
  465. )
  466. return new_pdu