test_events_worker.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. from contextlib import contextmanager
  16. from typing import Generator, List, Tuple
  17. from unittest import mock
  18. from twisted.enterprise.adbapi import ConnectionPool
  19. from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
  20. from twisted.test.proto_helpers import MemoryReactor
  21. from synapse.api.room_versions import EventFormatVersions, RoomVersions
  22. from synapse.events import make_event_from_dict
  23. from synapse.logging.context import LoggingContext
  24. from synapse.rest import admin
  25. from synapse.rest.client import login, room
  26. from synapse.server import HomeServer
  27. from synapse.storage.databases.main.events_worker import (
  28. EVENT_QUEUE_THREADS,
  29. EventsWorkerStore,
  30. )
  31. from synapse.storage.types import Connection
  32. from synapse.util import Clock
  33. from synapse.util.async_helpers import yieldable_gather_results
  34. from tests import unittest
  35. from tests.test_utils.event_injection import create_event, inject_event
  36. class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
  37. servlets = [
  38. admin.register_servlets,
  39. room.register_servlets,
  40. login.register_servlets,
  41. ]
  42. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  43. self.hs = hs
  44. self.store: EventsWorkerStore = hs.get_datastores().main
  45. self.user = self.register_user("user", "pass")
  46. self.token = self.login(self.user, "pass")
  47. self.room_id = self.helper.create_room_as(self.user, tok=self.token)
  48. self.event_ids: List[str] = []
  49. for i in range(3):
  50. event = self.get_success(
  51. inject_event(
  52. hs,
  53. room_version=RoomVersions.V7.identifier,
  54. room_id=self.room_id,
  55. sender=self.user,
  56. type="test_event_type",
  57. content={"body": f"foobarbaz{i}"},
  58. )
  59. )
  60. self.event_ids.append(event.event_id)
  61. def test_simple(self) -> None:
  62. with LoggingContext(name="test") as ctx:
  63. res = self.get_success(
  64. self.store.have_seen_events(
  65. self.room_id, [self.event_ids[0], "eventdoesnotexist"]
  66. )
  67. )
  68. self.assertEqual(res, {self.event_ids[0]})
  69. # that should result in a single db query
  70. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  71. # a second lookup of the same events should cause no queries
  72. with LoggingContext(name="test") as ctx:
  73. res = self.get_success(
  74. self.store.have_seen_events(
  75. self.room_id, [self.event_ids[0], "eventdoesnotexist"]
  76. )
  77. )
  78. self.assertEqual(res, {self.event_ids[0]})
  79. self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
  80. def test_persisting_event_invalidates_cache(self) -> None:
  81. """
  82. Test to make sure that the `have_seen_event` cache
  83. is invalidated after we persist an event and returns
  84. the updated value.
  85. """
  86. event, event_context = self.get_success(
  87. create_event(
  88. self.hs,
  89. room_id=self.room_id,
  90. sender=self.user,
  91. type="test_event_type",
  92. content={"body": "garply"},
  93. )
  94. )
  95. with LoggingContext(name="test") as ctx:
  96. # First, check `have_seen_event` for an event we have not seen yet
  97. # to prime the cache with a `false` value.
  98. res = self.get_success(
  99. self.store.have_seen_events(event.room_id, [event.event_id])
  100. )
  101. self.assertEqual(res, set())
  102. # That should result in a single db query to lookup
  103. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  104. # Persist the event which should invalidate or prefill the
  105. # `have_seen_event` cache so we don't return stale values.
  106. persistence = self.hs.get_storage_controllers().persistence
  107. self.get_success(
  108. persistence.persist_event(
  109. event,
  110. event_context,
  111. )
  112. )
  113. with LoggingContext(name="test") as ctx:
  114. # Check `have_seen_event` again and we should see the updated fact
  115. # that we have now seen the event after persisting it.
  116. res = self.get_success(
  117. self.store.have_seen_events(event.room_id, [event.event_id])
  118. )
  119. self.assertEqual(res, {event.event_id})
  120. # That should result in a single db query to lookup
  121. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  122. def test_invalidate_cache_by_room_id(self) -> None:
  123. """
  124. Test to make sure that all events associated with the given `(room_id,)`
  125. are invalidated in the `have_seen_event` cache.
  126. """
  127. with LoggingContext(name="test") as ctx:
  128. # Prime the cache with some values
  129. res = self.get_success(
  130. self.store.have_seen_events(self.room_id, self.event_ids)
  131. )
  132. self.assertEqual(res, set(self.event_ids))
  133. # That should result in a single db query to lookup
  134. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  135. # Clear the cache with any events associated with the `room_id`
  136. self.store.have_seen_event.invalidate((self.room_id,))
  137. with LoggingContext(name="test") as ctx:
  138. res = self.get_success(
  139. self.store.have_seen_events(self.room_id, self.event_ids)
  140. )
  141. self.assertEqual(res, set(self.event_ids))
  142. # Since we cleared the cache, it should result in another db query to lookup
  143. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  144. class EventCacheTestCase(unittest.HomeserverTestCase):
  145. """Test that the various layers of event cache works."""
  146. servlets = [
  147. admin.register_servlets,
  148. room.register_servlets,
  149. login.register_servlets,
  150. ]
  151. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  152. self.store: EventsWorkerStore = hs.get_datastores().main
  153. self.user = self.register_user("user", "pass")
  154. self.token = self.login(self.user, "pass")
  155. self.room = self.helper.create_room_as(self.user, tok=self.token)
  156. res = self.helper.send(self.room, tok=self.token)
  157. self.event_id = res["event_id"]
  158. # Reset the event cache so the tests start with it empty
  159. self.get_success(self.store._get_event_cache.clear())
  160. def test_simple(self) -> None:
  161. """Test that we cache events that we pull from the DB."""
  162. with LoggingContext("test") as ctx:
  163. self.get_success(self.store.get_event(self.event_id))
  164. # We should have fetched the event from the DB
  165. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  166. def test_event_ref(self) -> None:
  167. """Test that we reuse events that are still in memory but have fallen
  168. out of the cache, rather than requesting them from the DB.
  169. """
  170. # Reset the event cache
  171. self.get_success(self.store._get_event_cache.clear())
  172. with LoggingContext("test") as ctx:
  173. # We keep hold of the event event though we never use it.
  174. event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841
  175. # We should have fetched the event from the DB
  176. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  177. # Reset the event cache
  178. self.get_success(self.store._get_event_cache.clear())
  179. with LoggingContext("test") as ctx:
  180. self.get_success(self.store.get_event(self.event_id))
  181. # Since the event is still in memory we shouldn't have fetched it
  182. # from the DB
  183. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0)
  184. def test_dedupe(self) -> None:
  185. """Test that if we request the same event multiple times we only pull it
  186. out once.
  187. """
  188. with LoggingContext("test") as ctx:
  189. d = yieldable_gather_results(
  190. self.store.get_event, [self.event_id, self.event_id]
  191. )
  192. self.get_success(d)
  193. # We should have fetched the event from the DB
  194. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  195. class DatabaseOutageTestCase(unittest.HomeserverTestCase):
  196. """Test event fetching during a database outage."""
  197. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  198. self.store: EventsWorkerStore = hs.get_datastores().main
  199. self.room_id = f"!room:{hs.hostname}"
  200. self._populate_events()
  201. def _populate_events(self) -> None:
  202. """Ensure that there are test events in the database.
  203. When testing with the in-memory SQLite database, all the events are lost during
  204. the simulated outage.
  205. To ensure consistency between `room_id`s and `event_id`s before and after the
  206. outage, rows are built and inserted manually.
  207. Upserts are used to handle the non-SQLite case where events are not lost.
  208. """
  209. self.get_success(
  210. self.store.db_pool.simple_upsert(
  211. "rooms",
  212. {"room_id": self.room_id},
  213. {"room_version": RoomVersions.V4.identifier},
  214. )
  215. )
  216. self.event_ids: List[str] = []
  217. for idx in range(20):
  218. event_json = {
  219. "type": f"test {idx}",
  220. "room_id": self.room_id,
  221. }
  222. event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
  223. event_id = event.event_id
  224. self.get_success(
  225. self.store.db_pool.simple_upsert(
  226. "events",
  227. {"event_id": event_id},
  228. {
  229. "event_id": event_id,
  230. "room_id": self.room_id,
  231. "topological_ordering": idx,
  232. "stream_ordering": idx,
  233. "type": event.type,
  234. "processed": True,
  235. "outlier": False,
  236. },
  237. )
  238. )
  239. self.get_success(
  240. self.store.db_pool.simple_upsert(
  241. "event_json",
  242. {"event_id": event_id},
  243. {
  244. "room_id": self.room_id,
  245. "json": json.dumps(event_json),
  246. "internal_metadata": "{}",
  247. "format_version": EventFormatVersions.ROOM_V4_PLUS,
  248. },
  249. )
  250. )
  251. self.event_ids.append(event_id)
  252. @contextmanager
  253. def _outage(self) -> Generator[None, None, None]:
  254. """Simulate a database outage.
  255. Returns:
  256. A context manager. While the context is active, any attempts to connect to
  257. the database will fail.
  258. """
  259. connection_pool = self.store.db_pool._db_pool
  260. # Close all connections and shut down the database `ThreadPool`.
  261. connection_pool.close()
  262. # Restart the database `ThreadPool`.
  263. connection_pool.start()
  264. original_connection_factory = connection_pool.connectionFactory
  265. def connection_factory(_pool: ConnectionPool) -> Connection:
  266. raise Exception("Could not connect to the database.")
  267. connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
  268. try:
  269. yield
  270. finally:
  271. connection_pool.connectionFactory = original_connection_factory
  272. # If the in-memory SQLite database is being used, all the events are gone.
  273. # Restore the test data.
  274. self._populate_events()
  275. def test_failure(self) -> None:
  276. """Test that event fetches do not get stuck during a database outage."""
  277. with self._outage():
  278. failure = self.get_failure(
  279. self.store.get_event(self.event_ids[0]), Exception
  280. )
  281. self.assertEqual(str(failure.value), "Could not connect to the database.")
  282. def test_recovery(self) -> None:
  283. """Test that event fetchers recover after a database outage."""
  284. with self._outage():
  285. # Kick off a bunch of event fetches but do not pump the reactor
  286. event_deferreds = []
  287. for event_id in self.event_ids:
  288. event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
  289. # We should have maxed out on event fetcher threads
  290. self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
  291. # All the event fetchers will fail
  292. self.pump()
  293. self.assertEqual(self.store._event_fetch_ongoing, 0)
  294. for event_deferred in event_deferreds:
  295. failure = self.get_failure(event_deferred, Exception)
  296. self.assertEqual(
  297. str(failure.value), "Could not connect to the database."
  298. )
  299. # This next event fetch should succeed
  300. self.get_success(self.store.get_event(self.event_ids[0]))
  301. class GetEventCancellationTestCase(unittest.HomeserverTestCase):
  302. """Test cancellation of `get_event` calls."""
  303. servlets = [
  304. admin.register_servlets,
  305. room.register_servlets,
  306. login.register_servlets,
  307. ]
  308. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  309. self.store: EventsWorkerStore = hs.get_datastores().main
  310. self.user = self.register_user("user", "pass")
  311. self.token = self.login(self.user, "pass")
  312. self.room = self.helper.create_room_as(self.user, tok=self.token)
  313. res = self.helper.send(self.room, tok=self.token)
  314. self.event_id = res["event_id"]
  315. # Reset the event cache so the tests start with it empty
  316. self.get_success(self.store._get_event_cache.clear())
  317. @contextmanager
  318. def blocking_get_event_calls(
  319. self,
  320. ) -> Generator[
  321. Tuple["Deferred[None]", "Deferred[None]", "Deferred[None]"], None, None
  322. ]:
  323. """Starts two concurrent `get_event` calls for the same event.
  324. Both `get_event` calls will use the same database fetch, which will be blocked
  325. at the time this function returns.
  326. Returns:
  327. A tuple containing:
  328. * A `Deferred` that unblocks the database fetch.
  329. * A cancellable `Deferred` for the first `get_event` call.
  330. * A cancellable `Deferred` for the second `get_event` call.
  331. """
  332. # Patch `DatabasePool.runWithConnection` to block.
  333. unblock: "Deferred[None]" = Deferred()
  334. original_runWithConnection = self.store.db_pool.runWithConnection
  335. # Don't bother with the types here, we just pass into the original function.
  336. async def runWithConnection(*args, **kwargs): # type: ignore[no-untyped-def]
  337. await unblock
  338. return await original_runWithConnection(*args, **kwargs)
  339. with mock.patch.object(
  340. self.store.db_pool,
  341. "runWithConnection",
  342. new=runWithConnection,
  343. ):
  344. ctx1 = LoggingContext("get_event1")
  345. ctx2 = LoggingContext("get_event2")
  346. async def get_event(ctx: LoggingContext) -> None:
  347. with ctx:
  348. await self.store.get_event(self.event_id)
  349. get_event1 = ensureDeferred(get_event(ctx1))
  350. get_event2 = ensureDeferred(get_event(ctx2))
  351. # Both `get_event` calls ought to be blocked.
  352. self.assertNoResult(get_event1)
  353. self.assertNoResult(get_event2)
  354. yield unblock, get_event1, get_event2
  355. # Confirm that the two `get_event` calls shared the same database fetch.
  356. self.assertEqual(ctx1.get_resource_usage().evt_db_fetch_count, 1)
  357. self.assertEqual(ctx2.get_resource_usage().evt_db_fetch_count, 0)
  358. def test_first_get_event_cancelled(self) -> None:
  359. """Test cancellation of the first `get_event` call sharing a database fetch.
  360. The first `get_event` call is the one which initiates the fetch. We expect the
  361. fetch to complete despite the cancellation. Furthermore, the first `get_event`
  362. call must not abort before the fetch is complete, otherwise the fetch will be
  363. using a finished logging context.
  364. """
  365. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  366. # Cancel the first `get_event` call.
  367. get_event1.cancel()
  368. # The first `get_event` call must not abort immediately, otherwise its
  369. # logging context will be finished while it is still in use by the database
  370. # fetch.
  371. self.assertNoResult(get_event1)
  372. # The second `get_event` call must not be cancelled.
  373. self.assertNoResult(get_event2)
  374. # Unblock the database fetch.
  375. unblock.callback(None)
  376. # A `CancelledError` should be raised out of the first `get_event` call.
  377. exc = self.get_failure(get_event1, CancelledError).value
  378. self.assertIsInstance(exc, CancelledError)
  379. # The second `get_event` call should complete successfully.
  380. self.get_success(get_event2)
  381. def test_second_get_event_cancelled(self) -> None:
  382. """Test cancellation of the second `get_event` call sharing a database fetch."""
  383. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  384. # Cancel the second `get_event` call.
  385. get_event2.cancel()
  386. # The first `get_event` call must not be cancelled.
  387. self.assertNoResult(get_event1)
  388. # The second `get_event` call gets cancelled immediately.
  389. exc = self.get_failure(get_event2, CancelledError).value
  390. self.assertIsInstance(exc, CancelledError)
  391. # Unblock the database fetch.
  392. unblock.callback(None)
  393. # The first `get_event` call should complete successfully.
  394. self.get_success(get_event1)