test_events_worker.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. from contextlib import contextmanager
  16. from typing import Generator, List, Tuple
  17. from unittest import mock
  18. from twisted.enterprise.adbapi import ConnectionPool
  19. from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
  20. from twisted.test.proto_helpers import MemoryReactor
  21. from synapse.api.room_versions import EventFormatVersions, RoomVersions
  22. from synapse.events import make_event_from_dict
  23. from synapse.logging.context import LoggingContext
  24. from synapse.rest import admin
  25. from synapse.rest.client import login, room
  26. from synapse.server import HomeServer
  27. from synapse.storage.databases.main.events_worker import (
  28. EVENT_QUEUE_THREADS,
  29. EventsWorkerStore,
  30. )
  31. from synapse.storage.types import Connection
  32. from synapse.util import Clock
  33. from synapse.util.async_helpers import yieldable_gather_results
  34. from tests import unittest
  35. from tests.test_utils.event_injection import create_event, inject_event
  36. class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
  37. servlets = [
  38. admin.register_servlets,
  39. room.register_servlets,
  40. login.register_servlets,
  41. ]
  42. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  43. self.hs = hs
  44. self.store: EventsWorkerStore = hs.get_datastores().main
  45. self.user = self.register_user("user", "pass")
  46. self.token = self.login(self.user, "pass")
  47. self.room_id = self.helper.create_room_as(self.user, tok=self.token)
  48. self.event_ids: List[str] = []
  49. for i in range(3):
  50. event = self.get_success(
  51. inject_event(
  52. hs,
  53. room_version=RoomVersions.V7.identifier,
  54. room_id=self.room_id,
  55. sender=self.user,
  56. type="test_event_type",
  57. content={"body": f"foobarbaz{i}"},
  58. )
  59. )
  60. self.event_ids.append(event.event_id)
  61. def test_simple(self) -> None:
  62. with LoggingContext(name="test") as ctx:
  63. res = self.get_success(
  64. self.store.have_seen_events(
  65. self.room_id, [self.event_ids[0], "eventdoesnotexist"]
  66. )
  67. )
  68. self.assertEqual(res, {self.event_ids[0]})
  69. # that should result in a single db query
  70. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  71. # a second lookup of the same events should cause no queries
  72. with LoggingContext(name="test") as ctx:
  73. res = self.get_success(
  74. self.store.have_seen_events(
  75. self.room_id, [self.event_ids[0], "eventdoesnotexist"]
  76. )
  77. )
  78. self.assertEqual(res, {self.event_ids[0]})
  79. self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
  80. def test_persisting_event_invalidates_cache(self) -> None:
  81. """
  82. Test to make sure that the `have_seen_event` cache
  83. is invalidated after we persist an event and returns
  84. the updated value.
  85. """
  86. event, event_context = self.get_success(
  87. create_event(
  88. self.hs,
  89. room_id=self.room_id,
  90. sender=self.user,
  91. type="test_event_type",
  92. content={"body": "garply"},
  93. )
  94. )
  95. with LoggingContext(name="test") as ctx:
  96. # First, check `have_seen_event` for an event we have not seen yet
  97. # to prime the cache with a `false` value.
  98. res = self.get_success(
  99. self.store.have_seen_events(event.room_id, [event.event_id])
  100. )
  101. self.assertEqual(res, set())
  102. # That should result in a single db query to lookup
  103. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  104. # Persist the event which should invalidate or prefill the
  105. # `have_seen_event` cache so we don't return stale values.
  106. persistence = self.hs.get_storage_controllers().persistence
  107. assert persistence is not None
  108. self.get_success(
  109. persistence.persist_event(
  110. event,
  111. event_context,
  112. )
  113. )
  114. with LoggingContext(name="test") as ctx:
  115. # Check `have_seen_event` again and we should see the updated fact
  116. # that we have now seen the event after persisting it.
  117. res = self.get_success(
  118. self.store.have_seen_events(event.room_id, [event.event_id])
  119. )
  120. self.assertEqual(res, {event.event_id})
  121. # That should result in a single db query to lookup
  122. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  123. def test_invalidate_cache_by_room_id(self) -> None:
  124. """
  125. Test to make sure that all events associated with the given `(room_id,)`
  126. are invalidated in the `have_seen_event` cache.
  127. """
  128. with LoggingContext(name="test") as ctx:
  129. # Prime the cache with some values
  130. res = self.get_success(
  131. self.store.have_seen_events(self.room_id, self.event_ids)
  132. )
  133. self.assertEqual(res, set(self.event_ids))
  134. # That should result in a single db query to lookup
  135. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  136. # Clear the cache with any events associated with the `room_id`
  137. self.store.have_seen_event.invalidate((self.room_id,))
  138. with LoggingContext(name="test") as ctx:
  139. res = self.get_success(
  140. self.store.have_seen_events(self.room_id, self.event_ids)
  141. )
  142. self.assertEqual(res, set(self.event_ids))
  143. # Since we cleared the cache, it should result in another db query to lookup
  144. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  145. class EventCacheTestCase(unittest.HomeserverTestCase):
  146. """Test that the various layers of event cache works."""
  147. servlets = [
  148. admin.register_servlets,
  149. room.register_servlets,
  150. login.register_servlets,
  151. ]
  152. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  153. self.store: EventsWorkerStore = hs.get_datastores().main
  154. self.user = self.register_user("user", "pass")
  155. self.token = self.login(self.user, "pass")
  156. self.room = self.helper.create_room_as(self.user, tok=self.token)
  157. res = self.helper.send(self.room, tok=self.token)
  158. self.event_id = res["event_id"]
  159. # Reset the event cache so the tests start with it empty
  160. self.get_success(self.store._get_event_cache.clear())
  161. def test_simple(self) -> None:
  162. """Test that we cache events that we pull from the DB."""
  163. with LoggingContext("test") as ctx:
  164. self.get_success(self.store.get_event(self.event_id))
  165. # We should have fetched the event from the DB
  166. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  167. def test_event_ref(self) -> None:
  168. """Test that we reuse events that are still in memory but have fallen
  169. out of the cache, rather than requesting them from the DB.
  170. """
  171. # Reset the event cache
  172. self.get_success(self.store._get_event_cache.clear())
  173. with LoggingContext("test") as ctx:
  174. # We keep hold of the event event though we never use it.
  175. event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841
  176. # We should have fetched the event from the DB
  177. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  178. # Reset the event cache
  179. self.get_success(self.store._get_event_cache.clear())
  180. with LoggingContext("test") as ctx:
  181. self.get_success(self.store.get_event(self.event_id))
  182. # Since the event is still in memory we shouldn't have fetched it
  183. # from the DB
  184. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0)
  185. def test_dedupe(self) -> None:
  186. """Test that if we request the same event multiple times we only pull it
  187. out once.
  188. """
  189. with LoggingContext("test") as ctx:
  190. d = yieldable_gather_results(
  191. self.store.get_event, [self.event_id, self.event_id]
  192. )
  193. self.get_success(d)
  194. # We should have fetched the event from the DB
  195. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  196. class DatabaseOutageTestCase(unittest.HomeserverTestCase):
  197. """Test event fetching during a database outage."""
  198. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  199. self.store: EventsWorkerStore = hs.get_datastores().main
  200. self.room_id = f"!room:{hs.hostname}"
  201. self._populate_events()
  202. def _populate_events(self) -> None:
  203. """Ensure that there are test events in the database.
  204. When testing with the in-memory SQLite database, all the events are lost during
  205. the simulated outage.
  206. To ensure consistency between `room_id`s and `event_id`s before and after the
  207. outage, rows are built and inserted manually.
  208. Upserts are used to handle the non-SQLite case where events are not lost.
  209. """
  210. self.get_success(
  211. self.store.db_pool.simple_upsert(
  212. "rooms",
  213. {"room_id": self.room_id},
  214. {"room_version": RoomVersions.V4.identifier},
  215. )
  216. )
  217. self.event_ids: List[str] = []
  218. for idx in range(20):
  219. event_json = {
  220. "type": f"test {idx}",
  221. "room_id": self.room_id,
  222. }
  223. event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
  224. event_id = event.event_id
  225. self.get_success(
  226. self.store.db_pool.simple_upsert(
  227. "events",
  228. {"event_id": event_id},
  229. {
  230. "event_id": event_id,
  231. "room_id": self.room_id,
  232. "topological_ordering": idx,
  233. "stream_ordering": idx,
  234. "type": event.type,
  235. "processed": True,
  236. "outlier": False,
  237. },
  238. )
  239. )
  240. self.get_success(
  241. self.store.db_pool.simple_upsert(
  242. "event_json",
  243. {"event_id": event_id},
  244. {
  245. "room_id": self.room_id,
  246. "json": json.dumps(event_json),
  247. "internal_metadata": "{}",
  248. "format_version": EventFormatVersions.ROOM_V4_PLUS,
  249. },
  250. )
  251. )
  252. self.event_ids.append(event_id)
  253. @contextmanager
  254. def _outage(self) -> Generator[None, None, None]:
  255. """Simulate a database outage.
  256. Returns:
  257. A context manager. While the context is active, any attempts to connect to
  258. the database will fail.
  259. """
  260. connection_pool = self.store.db_pool._db_pool
  261. # Close all connections and shut down the database `ThreadPool`.
  262. connection_pool.close()
  263. # Restart the database `ThreadPool`.
  264. connection_pool.start()
  265. original_connection_factory = connection_pool.connectionFactory
  266. def connection_factory(_pool: ConnectionPool) -> Connection:
  267. raise Exception("Could not connect to the database.")
  268. connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
  269. try:
  270. yield
  271. finally:
  272. connection_pool.connectionFactory = original_connection_factory
  273. # If the in-memory SQLite database is being used, all the events are gone.
  274. # Restore the test data.
  275. self._populate_events()
  276. def test_failure(self) -> None:
  277. """Test that event fetches do not get stuck during a database outage."""
  278. with self._outage():
  279. failure = self.get_failure(
  280. self.store.get_event(self.event_ids[0]), Exception
  281. )
  282. self.assertEqual(str(failure.value), "Could not connect to the database.")
  283. def test_recovery(self) -> None:
  284. """Test that event fetchers recover after a database outage."""
  285. with self._outage():
  286. # Kick off a bunch of event fetches but do not pump the reactor
  287. event_deferreds = []
  288. for event_id in self.event_ids:
  289. event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
  290. # We should have maxed out on event fetcher threads
  291. self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
  292. # All the event fetchers will fail
  293. self.pump()
  294. self.assertEqual(self.store._event_fetch_ongoing, 0)
  295. for event_deferred in event_deferreds:
  296. failure = self.get_failure(event_deferred, Exception)
  297. self.assertEqual(
  298. str(failure.value), "Could not connect to the database."
  299. )
  300. # This next event fetch should succeed
  301. self.get_success(self.store.get_event(self.event_ids[0]))
  302. class GetEventCancellationTestCase(unittest.HomeserverTestCase):
  303. """Test cancellation of `get_event` calls."""
  304. servlets = [
  305. admin.register_servlets,
  306. room.register_servlets,
  307. login.register_servlets,
  308. ]
  309. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  310. self.store: EventsWorkerStore = hs.get_datastores().main
  311. self.user = self.register_user("user", "pass")
  312. self.token = self.login(self.user, "pass")
  313. self.room = self.helper.create_room_as(self.user, tok=self.token)
  314. res = self.helper.send(self.room, tok=self.token)
  315. self.event_id = res["event_id"]
  316. # Reset the event cache so the tests start with it empty
  317. self.get_success(self.store._get_event_cache.clear())
  318. @contextmanager
  319. def blocking_get_event_calls(
  320. self,
  321. ) -> Generator[
  322. Tuple["Deferred[None]", "Deferred[None]", "Deferred[None]"], None, None
  323. ]:
  324. """Starts two concurrent `get_event` calls for the same event.
  325. Both `get_event` calls will use the same database fetch, which will be blocked
  326. at the time this function returns.
  327. Returns:
  328. A tuple containing:
  329. * A `Deferred` that unblocks the database fetch.
  330. * A cancellable `Deferred` for the first `get_event` call.
  331. * A cancellable `Deferred` for the second `get_event` call.
  332. """
  333. # Patch `DatabasePool.runWithConnection` to block.
  334. unblock: "Deferred[None]" = Deferred()
  335. original_runWithConnection = self.store.db_pool.runWithConnection
  336. # Don't bother with the types here, we just pass into the original function.
  337. async def runWithConnection(*args, **kwargs): # type: ignore[no-untyped-def]
  338. await unblock
  339. return await original_runWithConnection(*args, **kwargs)
  340. with mock.patch.object(
  341. self.store.db_pool,
  342. "runWithConnection",
  343. new=runWithConnection,
  344. ):
  345. ctx1 = LoggingContext("get_event1")
  346. ctx2 = LoggingContext("get_event2")
  347. async def get_event(ctx: LoggingContext) -> None:
  348. with ctx:
  349. await self.store.get_event(self.event_id)
  350. get_event1 = ensureDeferred(get_event(ctx1))
  351. get_event2 = ensureDeferred(get_event(ctx2))
  352. # Both `get_event` calls ought to be blocked.
  353. self.assertNoResult(get_event1)
  354. self.assertNoResult(get_event2)
  355. yield unblock, get_event1, get_event2
  356. # Confirm that the two `get_event` calls shared the same database fetch.
  357. self.assertEqual(ctx1.get_resource_usage().evt_db_fetch_count, 1)
  358. self.assertEqual(ctx2.get_resource_usage().evt_db_fetch_count, 0)
  359. def test_first_get_event_cancelled(self) -> None:
  360. """Test cancellation of the first `get_event` call sharing a database fetch.
  361. The first `get_event` call is the one which initiates the fetch. We expect the
  362. fetch to complete despite the cancellation. Furthermore, the first `get_event`
  363. call must not abort before the fetch is complete, otherwise the fetch will be
  364. using a finished logging context.
  365. """
  366. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  367. # Cancel the first `get_event` call.
  368. get_event1.cancel()
  369. # The first `get_event` call must not abort immediately, otherwise its
  370. # logging context will be finished while it is still in use by the database
  371. # fetch.
  372. self.assertNoResult(get_event1)
  373. # The second `get_event` call must not be cancelled.
  374. self.assertNoResult(get_event2)
  375. # Unblock the database fetch.
  376. unblock.callback(None)
  377. # A `CancelledError` should be raised out of the first `get_event` call.
  378. exc = self.get_failure(get_event1, CancelledError).value
  379. self.assertIsInstance(exc, CancelledError)
  380. # The second `get_event` call should complete successfully.
  381. self.get_success(get_event2)
  382. def test_second_get_event_cancelled(self) -> None:
  383. """Test cancellation of the second `get_event` call sharing a database fetch."""
  384. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  385. # Cancel the second `get_event` call.
  386. get_event2.cancel()
  387. # The first `get_event` call must not be cancelled.
  388. self.assertNoResult(get_event1)
  389. # The second `get_event` call gets cancelled immediately.
  390. exc = self.get_failure(get_event2, CancelledError).value
  391. self.assertIsInstance(exc, CancelledError)
  392. # Unblock the database fetch.
  393. unblock.callback(None)
  394. # The first `get_event` call should complete successfully.
  395. self.get_success(get_event1)