test_events_worker.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. from contextlib import contextmanager
  16. from typing import Generator, List, Tuple
  17. from unittest import mock
  18. from twisted.enterprise.adbapi import ConnectionPool
  19. from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
  20. from twisted.test.proto_helpers import MemoryReactor
  21. from synapse.api.room_versions import EventFormatVersions, RoomVersions
  22. from synapse.events import make_event_from_dict
  23. from synapse.logging.context import LoggingContext
  24. from synapse.rest import admin
  25. from synapse.rest.client import login, room
  26. from synapse.server import HomeServer
  27. from synapse.storage.databases.main.events_worker import (
  28. EVENT_QUEUE_THREADS,
  29. EventsWorkerStore,
  30. )
  31. from synapse.storage.types import Connection
  32. from synapse.util import Clock
  33. from synapse.util.async_helpers import yieldable_gather_results
  34. from tests import unittest
  35. from tests.test_utils.event_injection import create_event, inject_event
  36. class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
  37. servlets = [
  38. admin.register_servlets,
  39. room.register_servlets,
  40. login.register_servlets,
  41. ]
  42. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  43. self.hs = hs
  44. self.store: EventsWorkerStore = hs.get_datastores().main
  45. self.user = self.register_user("user", "pass")
  46. self.token = self.login(self.user, "pass")
  47. self.room_id = self.helper.create_room_as(self.user, tok=self.token)
  48. self.event_ids: List[str] = []
  49. for i in range(3):
  50. event = self.get_success(
  51. inject_event(
  52. hs,
  53. room_version=RoomVersions.V7.identifier,
  54. room_id=self.room_id,
  55. sender=self.user,
  56. type="test_event_type",
  57. content={"body": f"foobarbaz{i}"},
  58. )
  59. )
  60. self.event_ids.append(event.event_id)
  61. def test_simple(self) -> None:
  62. with LoggingContext(name="test") as ctx:
  63. res = self.get_success(
  64. self.store.have_seen_events(
  65. self.room_id, [self.event_ids[0], "eventdoesnotexist"]
  66. )
  67. )
  68. self.assertEqual(res, {self.event_ids[0]})
  69. # that should result in a single db query
  70. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  71. # a second lookup of the same events should cause no queries
  72. with LoggingContext(name="test") as ctx:
  73. res = self.get_success(
  74. self.store.have_seen_events(
  75. self.room_id, [self.event_ids[0], "eventdoesnotexist"]
  76. )
  77. )
  78. self.assertEqual(res, {self.event_ids[0]})
  79. self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
  80. def test_persisting_event_invalidates_cache(self) -> None:
  81. """
  82. Test to make sure that the `have_seen_event` cache
  83. is invalidated after we persist an event and returns
  84. the updated value.
  85. """
  86. event, event_context = self.get_success(
  87. create_event(
  88. self.hs,
  89. room_id=self.room_id,
  90. sender=self.user,
  91. type="test_event_type",
  92. content={"body": "garply"},
  93. )
  94. )
  95. with LoggingContext(name="test") as ctx:
  96. # First, check `have_seen_event` for an event we have not seen yet
  97. # to prime the cache with a `false` value.
  98. res = self.get_success(
  99. self.store.have_seen_events(event.room_id, [event.event_id])
  100. )
  101. self.assertEqual(res, set())
  102. # That should result in a single db query to lookup
  103. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  104. # Persist the event which should invalidate or prefill the
  105. # `have_seen_event` cache so we don't return stale values.
  106. persistence = self.hs.get_storage_controllers().persistence
  107. assert persistence is not None
  108. self.get_success(
  109. persistence.persist_event(
  110. event,
  111. event_context,
  112. )
  113. )
  114. with LoggingContext(name="test") as ctx:
  115. # Check `have_seen_event` again and we should see the updated fact
  116. # that we have now seen the event after persisting it.
  117. res = self.get_success(
  118. self.store.have_seen_events(event.room_id, [event.event_id])
  119. )
  120. self.assertEqual(res, {event.event_id})
  121. # That should result in a single db query to lookup
  122. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  123. def test_persisting_event_prefills_get_event_cache(self) -> None:
  124. """
  125. Test to make sure that the `_get_event_cache` is prefilled after we persist an
  126. event and returns the updated value.
  127. """
  128. event, event_context = self.get_success(
  129. create_event(
  130. self.hs,
  131. room_id=self.room_id,
  132. sender=self.user,
  133. type="test_event_type",
  134. content={"body": "conflabulation"},
  135. )
  136. )
  137. # First, check `_get_event_cache` for the event we just made
  138. # to verify it's not in the cache.
  139. res = self.store._get_event_cache.get_local((event.event_id,))
  140. self.assertEqual(res, None, "Event was cached when it should not have been.")
  141. with LoggingContext(name="test") as ctx:
  142. # Persist the event which should invalidate then prefill the
  143. # `_get_event_cache` so we don't return stale values.
  144. # Side Note: Apparently, persisting an event isn't a transaction in the
  145. # sense that it is recorded in the LoggingContext
  146. persistence = self.hs.get_storage_controllers().persistence
  147. assert persistence is not None
  148. self.get_success(
  149. persistence.persist_event(
  150. event,
  151. event_context,
  152. )
  153. )
  154. # Check `_get_event_cache` again and we should see the updated fact
  155. # that we now have the event cached after persisting it.
  156. res = self.store._get_event_cache.get_local((event.event_id,))
  157. self.assertEqual(res.event, event, "Event not cached as expected.") # type: ignore
  158. # Try and fetch the event from the database.
  159. self.get_success(self.store.get_event(event.event_id))
  160. # Verify that the database hit was avoided.
  161. self.assertEqual(
  162. ctx.get_resource_usage().evt_db_fetch_count,
  163. 0,
  164. "Database was hit, which would not happen if event was cached.",
  165. )
  166. def test_invalidate_cache_by_room_id(self) -> None:
  167. """
  168. Test to make sure that all events associated with the given `(room_id,)`
  169. are invalidated in the `have_seen_event` cache.
  170. """
  171. with LoggingContext(name="test") as ctx:
  172. # Prime the cache with some values
  173. res = self.get_success(
  174. self.store.have_seen_events(self.room_id, self.event_ids)
  175. )
  176. self.assertEqual(res, set(self.event_ids))
  177. # That should result in a single db query to lookup
  178. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  179. # Clear the cache with any events associated with the `room_id`
  180. self.store.have_seen_event.invalidate((self.room_id,))
  181. with LoggingContext(name="test") as ctx:
  182. res = self.get_success(
  183. self.store.have_seen_events(self.room_id, self.event_ids)
  184. )
  185. self.assertEqual(res, set(self.event_ids))
  186. # Since we cleared the cache, it should result in another db query to lookup
  187. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  188. class EventCacheTestCase(unittest.HomeserverTestCase):
  189. """Test that the various layers of event cache works."""
  190. servlets = [
  191. admin.register_servlets,
  192. room.register_servlets,
  193. login.register_servlets,
  194. ]
  195. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  196. self.store: EventsWorkerStore = hs.get_datastores().main
  197. self.user = self.register_user("user", "pass")
  198. self.token = self.login(self.user, "pass")
  199. self.room = self.helper.create_room_as(self.user, tok=self.token)
  200. res = self.helper.send(self.room, tok=self.token)
  201. self.event_id = res["event_id"]
  202. # Reset the event cache so the tests start with it empty
  203. self.store._get_event_cache.clear()
  204. def test_simple(self) -> None:
  205. """Test that we cache events that we pull from the DB."""
  206. with LoggingContext("test") as ctx:
  207. self.get_success(self.store.get_event(self.event_id))
  208. # We should have fetched the event from the DB
  209. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  210. def test_event_ref(self) -> None:
  211. """Test that we reuse events that are still in memory but have fallen
  212. out of the cache, rather than requesting them from the DB.
  213. """
  214. # Reset the event cache
  215. self.store._get_event_cache.clear()
  216. with LoggingContext("test") as ctx:
  217. # We keep hold of the event event though we never use it.
  218. event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841
  219. # We should have fetched the event from the DB
  220. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  221. # Reset the event cache
  222. self.store._get_event_cache.clear()
  223. with LoggingContext("test") as ctx:
  224. self.get_success(self.store.get_event(self.event_id))
  225. # Since the event is still in memory we shouldn't have fetched it
  226. # from the DB
  227. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0)
  228. def test_dedupe(self) -> None:
  229. """Test that if we request the same event multiple times we only pull it
  230. out once.
  231. """
  232. with LoggingContext("test") as ctx:
  233. d = yieldable_gather_results(
  234. self.store.get_event, [self.event_id, self.event_id]
  235. )
  236. self.get_success(d)
  237. # We should have fetched the event from the DB
  238. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  239. class DatabaseOutageTestCase(unittest.HomeserverTestCase):
  240. """Test event fetching during a database outage."""
  241. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  242. self.store: EventsWorkerStore = hs.get_datastores().main
  243. self.room_id = f"!room:{hs.hostname}"
  244. self._populate_events()
  245. def _populate_events(self) -> None:
  246. """Ensure that there are test events in the database.
  247. When testing with the in-memory SQLite database, all the events are lost during
  248. the simulated outage.
  249. To ensure consistency between `room_id`s and `event_id`s before and after the
  250. outage, rows are built and inserted manually.
  251. Upserts are used to handle the non-SQLite case where events are not lost.
  252. """
  253. self.get_success(
  254. self.store.db_pool.simple_upsert(
  255. "rooms",
  256. {"room_id": self.room_id},
  257. {"room_version": RoomVersions.V4.identifier},
  258. )
  259. )
  260. self.event_ids: List[str] = []
  261. for idx in range(20):
  262. event_json = {
  263. "type": f"test {idx}",
  264. "room_id": self.room_id,
  265. }
  266. event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
  267. event_id = event.event_id
  268. self.get_success(
  269. self.store.db_pool.simple_upsert(
  270. "events",
  271. {"event_id": event_id},
  272. {
  273. "event_id": event_id,
  274. "room_id": self.room_id,
  275. "topological_ordering": idx,
  276. "stream_ordering": idx,
  277. "type": event.type,
  278. "processed": True,
  279. "outlier": False,
  280. },
  281. )
  282. )
  283. self.get_success(
  284. self.store.db_pool.simple_upsert(
  285. "event_json",
  286. {"event_id": event_id},
  287. {
  288. "room_id": self.room_id,
  289. "json": json.dumps(event_json),
  290. "internal_metadata": "{}",
  291. "format_version": EventFormatVersions.ROOM_V4_PLUS,
  292. },
  293. )
  294. )
  295. self.event_ids.append(event_id)
  296. @contextmanager
  297. def _outage(self) -> Generator[None, None, None]:
  298. """Simulate a database outage.
  299. Returns:
  300. A context manager. While the context is active, any attempts to connect to
  301. the database will fail.
  302. """
  303. connection_pool = self.store.db_pool._db_pool
  304. # Close all connections and shut down the database `ThreadPool`.
  305. connection_pool.close()
  306. # Restart the database `ThreadPool`.
  307. connection_pool.start()
  308. original_connection_factory = connection_pool.connectionFactory
  309. def connection_factory(_pool: ConnectionPool) -> Connection:
  310. raise Exception("Could not connect to the database.")
  311. connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
  312. try:
  313. yield
  314. finally:
  315. connection_pool.connectionFactory = original_connection_factory
  316. # If the in-memory SQLite database is being used, all the events are gone.
  317. # Restore the test data.
  318. self._populate_events()
  319. def test_failure(self) -> None:
  320. """Test that event fetches do not get stuck during a database outage."""
  321. with self._outage():
  322. failure = self.get_failure(
  323. self.store.get_event(self.event_ids[0]), Exception
  324. )
  325. self.assertEqual(str(failure.value), "Could not connect to the database.")
  326. def test_recovery(self) -> None:
  327. """Test that event fetchers recover after a database outage."""
  328. with self._outage():
  329. # Kick off a bunch of event fetches but do not pump the reactor
  330. event_deferreds = []
  331. for event_id in self.event_ids:
  332. event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
  333. # We should have maxed out on event fetcher threads
  334. self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
  335. # All the event fetchers will fail
  336. self.pump()
  337. self.assertEqual(self.store._event_fetch_ongoing, 0)
  338. for event_deferred in event_deferreds:
  339. failure = self.get_failure(event_deferred, Exception)
  340. self.assertEqual(
  341. str(failure.value), "Could not connect to the database."
  342. )
  343. # This next event fetch should succeed
  344. self.get_success(self.store.get_event(self.event_ids[0]))
  345. class GetEventCancellationTestCase(unittest.HomeserverTestCase):
  346. """Test cancellation of `get_event` calls."""
  347. servlets = [
  348. admin.register_servlets,
  349. room.register_servlets,
  350. login.register_servlets,
  351. ]
  352. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  353. self.store: EventsWorkerStore = hs.get_datastores().main
  354. self.user = self.register_user("user", "pass")
  355. self.token = self.login(self.user, "pass")
  356. self.room = self.helper.create_room_as(self.user, tok=self.token)
  357. res = self.helper.send(self.room, tok=self.token)
  358. self.event_id = res["event_id"]
  359. # Reset the event cache so the tests start with it empty
  360. self.store._get_event_cache.clear()
  361. @contextmanager
  362. def blocking_get_event_calls(
  363. self,
  364. ) -> Generator[
  365. Tuple["Deferred[None]", "Deferred[None]", "Deferred[None]"], None, None
  366. ]:
  367. """Starts two concurrent `get_event` calls for the same event.
  368. Both `get_event` calls will use the same database fetch, which will be blocked
  369. at the time this function returns.
  370. Returns:
  371. A tuple containing:
  372. * A `Deferred` that unblocks the database fetch.
  373. * A cancellable `Deferred` for the first `get_event` call.
  374. * A cancellable `Deferred` for the second `get_event` call.
  375. """
  376. # Patch `DatabasePool.runWithConnection` to block.
  377. unblock: "Deferred[None]" = Deferred()
  378. original_runWithConnection = self.store.db_pool.runWithConnection
  379. # Don't bother with the types here, we just pass into the original function.
  380. async def runWithConnection(*args, **kwargs): # type: ignore[no-untyped-def]
  381. await unblock
  382. return await original_runWithConnection(*args, **kwargs)
  383. with mock.patch.object(
  384. self.store.db_pool,
  385. "runWithConnection",
  386. new=runWithConnection,
  387. ):
  388. ctx1 = LoggingContext("get_event1")
  389. ctx2 = LoggingContext("get_event2")
  390. async def get_event(ctx: LoggingContext) -> None:
  391. with ctx:
  392. await self.store.get_event(self.event_id)
  393. get_event1 = ensureDeferred(get_event(ctx1))
  394. get_event2 = ensureDeferred(get_event(ctx2))
  395. # Both `get_event` calls ought to be blocked.
  396. self.assertNoResult(get_event1)
  397. self.assertNoResult(get_event2)
  398. yield unblock, get_event1, get_event2
  399. # Confirm that the two `get_event` calls shared the same database fetch.
  400. self.assertEqual(ctx1.get_resource_usage().evt_db_fetch_count, 1)
  401. self.assertEqual(ctx2.get_resource_usage().evt_db_fetch_count, 0)
  402. def test_first_get_event_cancelled(self) -> None:
  403. """Test cancellation of the first `get_event` call sharing a database fetch.
  404. The first `get_event` call is the one which initiates the fetch. We expect the
  405. fetch to complete despite the cancellation. Furthermore, the first `get_event`
  406. call must not abort before the fetch is complete, otherwise the fetch will be
  407. using a finished logging context.
  408. """
  409. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  410. # Cancel the first `get_event` call.
  411. get_event1.cancel()
  412. # The first `get_event` call must not abort immediately, otherwise its
  413. # logging context will be finished while it is still in use by the database
  414. # fetch.
  415. self.assertNoResult(get_event1)
  416. # The second `get_event` call must not be cancelled.
  417. self.assertNoResult(get_event2)
  418. # Unblock the database fetch.
  419. unblock.callback(None)
  420. # A `CancelledError` should be raised out of the first `get_event` call.
  421. exc = self.get_failure(get_event1, CancelledError).value
  422. self.assertIsInstance(exc, CancelledError)
  423. # The second `get_event` call should complete successfully.
  424. self.get_success(get_event2)
  425. def test_second_get_event_cancelled(self) -> None:
  426. """Test cancellation of the second `get_event` call sharing a database fetch."""
  427. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  428. # Cancel the second `get_event` call.
  429. get_event2.cancel()
  430. # The first `get_event` call must not be cancelled.
  431. self.assertNoResult(get_event1)
  432. # The second `get_event` call gets cancelled immediately.
  433. exc = self.get_failure(get_event2, CancelledError).value
  434. self.assertIsInstance(exc, CancelledError)
  435. # Unblock the database fetch.
  436. unblock.callback(None)
  437. # The first `get_event` call should complete successfully.
  438. self.get_success(get_event1)