test_events_worker.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. from contextlib import contextmanager
  16. from typing import Generator, List, Tuple
  17. from unittest import mock
  18. from twisted.enterprise.adbapi import ConnectionPool
  19. from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
  20. from twisted.test.proto_helpers import MemoryReactor
  21. from synapse.api.room_versions import EventFormatVersions, RoomVersions
  22. from synapse.events import make_event_from_dict
  23. from synapse.logging.context import LoggingContext
  24. from synapse.rest import admin
  25. from synapse.rest.client import login, room
  26. from synapse.server import HomeServer
  27. from synapse.storage.databases.main.events_worker import (
  28. EVENT_QUEUE_THREADS,
  29. EventsWorkerStore,
  30. )
  31. from synapse.storage.types import Connection
  32. from synapse.util import Clock
  33. from synapse.util.async_helpers import yieldable_gather_results
  34. from tests import unittest
  35. class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
  36. def prepare(self, reactor, clock, hs):
  37. self.store: EventsWorkerStore = hs.get_datastores().main
  38. # insert some test data
  39. for rid in ("room1", "room2"):
  40. self.get_success(
  41. self.store.db_pool.simple_insert(
  42. "rooms",
  43. {"room_id": rid, "room_version": 4},
  44. )
  45. )
  46. self.event_ids: List[str] = []
  47. for idx, rid in enumerate(
  48. (
  49. "room1",
  50. "room1",
  51. "room1",
  52. "room2",
  53. )
  54. ):
  55. event_json = {"type": f"test {idx}", "room_id": rid}
  56. event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
  57. event_id = event.event_id
  58. self.get_success(
  59. self.store.db_pool.simple_insert(
  60. "events",
  61. {
  62. "event_id": event_id,
  63. "room_id": rid,
  64. "topological_ordering": idx,
  65. "stream_ordering": idx,
  66. "type": event.type,
  67. "processed": True,
  68. "outlier": False,
  69. },
  70. )
  71. )
  72. self.get_success(
  73. self.store.db_pool.simple_insert(
  74. "event_json",
  75. {
  76. "event_id": event_id,
  77. "room_id": rid,
  78. "json": json.dumps(event_json),
  79. "internal_metadata": "{}",
  80. "format_version": 3,
  81. },
  82. )
  83. )
  84. self.event_ids.append(event_id)
  85. def test_simple(self):
  86. with LoggingContext(name="test") as ctx:
  87. res = self.get_success(
  88. self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
  89. )
  90. self.assertEqual(res, {self.event_ids[0]})
  91. # that should result in a single db query
  92. self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
  93. # a second lookup of the same events should cause no queries
  94. with LoggingContext(name="test") as ctx:
  95. res = self.get_success(
  96. self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
  97. )
  98. self.assertEqual(res, {self.event_ids[0]})
  99. self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
  100. def test_query_via_event_cache(self):
  101. # fetch an event into the event cache
  102. self.get_success(self.store.get_event(self.event_ids[0]))
  103. # looking it up should now cause no db hits
  104. with LoggingContext(name="test") as ctx:
  105. res = self.get_success(
  106. self.store.have_seen_events("room1", [self.event_ids[0]])
  107. )
  108. self.assertEqual(res, {self.event_ids[0]})
  109. self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
  110. class EventCacheTestCase(unittest.HomeserverTestCase):
  111. """Test that the various layers of event cache works."""
  112. servlets = [
  113. admin.register_servlets,
  114. room.register_servlets,
  115. login.register_servlets,
  116. ]
  117. def prepare(self, reactor, clock, hs):
  118. self.store: EventsWorkerStore = hs.get_datastores().main
  119. self.user = self.register_user("user", "pass")
  120. self.token = self.login(self.user, "pass")
  121. self.room = self.helper.create_room_as(self.user, tok=self.token)
  122. res = self.helper.send(self.room, tok=self.token)
  123. self.event_id = res["event_id"]
  124. # Reset the event cache so the tests start with it empty
  125. self.get_success(self.store._get_event_cache.clear())
  126. def test_simple(self):
  127. """Test that we cache events that we pull from the DB."""
  128. with LoggingContext("test") as ctx:
  129. self.get_success(self.store.get_event(self.event_id))
  130. # We should have fetched the event from the DB
  131. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  132. def test_event_ref(self):
  133. """Test that we reuse events that are still in memory but have fallen
  134. out of the cache, rather than requesting them from the DB.
  135. """
  136. # Reset the event cache
  137. self.get_success(self.store._get_event_cache.clear())
  138. with LoggingContext("test") as ctx:
  139. # We keep hold of the event event though we never use it.
  140. event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841
  141. # We should have fetched the event from the DB
  142. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  143. # Reset the event cache
  144. self.get_success(self.store._get_event_cache.clear())
  145. with LoggingContext("test") as ctx:
  146. self.get_success(self.store.get_event(self.event_id))
  147. # Since the event is still in memory we shouldn't have fetched it
  148. # from the DB
  149. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0)
  150. def test_dedupe(self):
  151. """Test that if we request the same event multiple times we only pull it
  152. out once.
  153. """
  154. with LoggingContext("test") as ctx:
  155. d = yieldable_gather_results(
  156. self.store.get_event, [self.event_id, self.event_id]
  157. )
  158. self.get_success(d)
  159. # We should have fetched the event from the DB
  160. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  161. class DatabaseOutageTestCase(unittest.HomeserverTestCase):
  162. """Test event fetching during a database outage."""
  163. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
  164. self.store: EventsWorkerStore = hs.get_datastores().main
  165. self.room_id = f"!room:{hs.hostname}"
  166. self._populate_events()
  167. def _populate_events(self) -> None:
  168. """Ensure that there are test events in the database.
  169. When testing with the in-memory SQLite database, all the events are lost during
  170. the simulated outage.
  171. To ensure consistency between `room_id`s and `event_id`s before and after the
  172. outage, rows are built and inserted manually.
  173. Upserts are used to handle the non-SQLite case where events are not lost.
  174. """
  175. self.get_success(
  176. self.store.db_pool.simple_upsert(
  177. "rooms",
  178. {"room_id": self.room_id},
  179. {"room_version": RoomVersions.V4.identifier},
  180. )
  181. )
  182. self.event_ids: List[str] = []
  183. for idx in range(20):
  184. event_json = {
  185. "type": f"test {idx}",
  186. "room_id": self.room_id,
  187. }
  188. event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
  189. event_id = event.event_id
  190. self.get_success(
  191. self.store.db_pool.simple_upsert(
  192. "events",
  193. {"event_id": event_id},
  194. {
  195. "event_id": event_id,
  196. "room_id": self.room_id,
  197. "topological_ordering": idx,
  198. "stream_ordering": idx,
  199. "type": event.type,
  200. "processed": True,
  201. "outlier": False,
  202. },
  203. )
  204. )
  205. self.get_success(
  206. self.store.db_pool.simple_upsert(
  207. "event_json",
  208. {"event_id": event_id},
  209. {
  210. "room_id": self.room_id,
  211. "json": json.dumps(event_json),
  212. "internal_metadata": "{}",
  213. "format_version": EventFormatVersions.V3,
  214. },
  215. )
  216. )
  217. self.event_ids.append(event_id)
  218. @contextmanager
  219. def _outage(self) -> Generator[None, None, None]:
  220. """Simulate a database outage.
  221. Returns:
  222. A context manager. While the context is active, any attempts to connect to
  223. the database will fail.
  224. """
  225. connection_pool = self.store.db_pool._db_pool
  226. # Close all connections and shut down the database `ThreadPool`.
  227. connection_pool.close()
  228. # Restart the database `ThreadPool`.
  229. connection_pool.start()
  230. original_connection_factory = connection_pool.connectionFactory
  231. def connection_factory(_pool: ConnectionPool) -> Connection:
  232. raise Exception("Could not connect to the database.")
  233. connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
  234. try:
  235. yield
  236. finally:
  237. connection_pool.connectionFactory = original_connection_factory
  238. # If the in-memory SQLite database is being used, all the events are gone.
  239. # Restore the test data.
  240. self._populate_events()
  241. def test_failure(self) -> None:
  242. """Test that event fetches do not get stuck during a database outage."""
  243. with self._outage():
  244. failure = self.get_failure(
  245. self.store.get_event(self.event_ids[0]), Exception
  246. )
  247. self.assertEqual(str(failure.value), "Could not connect to the database.")
  248. def test_recovery(self) -> None:
  249. """Test that event fetchers recover after a database outage."""
  250. with self._outage():
  251. # Kick off a bunch of event fetches but do not pump the reactor
  252. event_deferreds = []
  253. for event_id in self.event_ids:
  254. event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
  255. # We should have maxed out on event fetcher threads
  256. self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
  257. # All the event fetchers will fail
  258. self.pump()
  259. self.assertEqual(self.store._event_fetch_ongoing, 0)
  260. for event_deferred in event_deferreds:
  261. failure = self.get_failure(event_deferred, Exception)
  262. self.assertEqual(
  263. str(failure.value), "Could not connect to the database."
  264. )
  265. # This next event fetch should succeed
  266. self.get_success(self.store.get_event(self.event_ids[0]))
  267. class GetEventCancellationTestCase(unittest.HomeserverTestCase):
  268. """Test cancellation of `get_event` calls."""
  269. servlets = [
  270. admin.register_servlets,
  271. room.register_servlets,
  272. login.register_servlets,
  273. ]
  274. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
  275. self.store: EventsWorkerStore = hs.get_datastores().main
  276. self.user = self.register_user("user", "pass")
  277. self.token = self.login(self.user, "pass")
  278. self.room = self.helper.create_room_as(self.user, tok=self.token)
  279. res = self.helper.send(self.room, tok=self.token)
  280. self.event_id = res["event_id"]
  281. # Reset the event cache so the tests start with it empty
  282. self.get_success(self.store._get_event_cache.clear())
  283. @contextmanager
  284. def blocking_get_event_calls(
  285. self,
  286. ) -> Generator[
  287. Tuple["Deferred[None]", "Deferred[None]", "Deferred[None]"], None, None
  288. ]:
  289. """Starts two concurrent `get_event` calls for the same event.
  290. Both `get_event` calls will use the same database fetch, which will be blocked
  291. at the time this function returns.
  292. Returns:
  293. A tuple containing:
  294. * A `Deferred` that unblocks the database fetch.
  295. * A cancellable `Deferred` for the first `get_event` call.
  296. * A cancellable `Deferred` for the second `get_event` call.
  297. """
  298. # Patch `DatabasePool.runWithConnection` to block.
  299. unblock: "Deferred[None]" = Deferred()
  300. original_runWithConnection = self.store.db_pool.runWithConnection
  301. async def runWithConnection(*args, **kwargs):
  302. await unblock
  303. return await original_runWithConnection(*args, **kwargs)
  304. with mock.patch.object(
  305. self.store.db_pool,
  306. "runWithConnection",
  307. new=runWithConnection,
  308. ):
  309. ctx1 = LoggingContext("get_event1")
  310. ctx2 = LoggingContext("get_event2")
  311. async def get_event(ctx: LoggingContext) -> None:
  312. with ctx:
  313. await self.store.get_event(self.event_id)
  314. get_event1 = ensureDeferred(get_event(ctx1))
  315. get_event2 = ensureDeferred(get_event(ctx2))
  316. # Both `get_event` calls ought to be blocked.
  317. self.assertNoResult(get_event1)
  318. self.assertNoResult(get_event2)
  319. yield unblock, get_event1, get_event2
  320. # Confirm that the two `get_event` calls shared the same database fetch.
  321. self.assertEqual(ctx1.get_resource_usage().evt_db_fetch_count, 1)
  322. self.assertEqual(ctx2.get_resource_usage().evt_db_fetch_count, 0)
  323. def test_first_get_event_cancelled(self):
  324. """Test cancellation of the first `get_event` call sharing a database fetch.
  325. The first `get_event` call is the one which initiates the fetch. We expect the
  326. fetch to complete despite the cancellation. Furthermore, the first `get_event`
  327. call must not abort before the fetch is complete, otherwise the fetch will be
  328. using a finished logging context.
  329. """
  330. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  331. # Cancel the first `get_event` call.
  332. get_event1.cancel()
  333. # The first `get_event` call must not abort immediately, otherwise its
  334. # logging context will be finished while it is still in use by the database
  335. # fetch.
  336. self.assertNoResult(get_event1)
  337. # The second `get_event` call must not be cancelled.
  338. self.assertNoResult(get_event2)
  339. # Unblock the database fetch.
  340. unblock.callback(None)
  341. # A `CancelledError` should be raised out of the first `get_event` call.
  342. exc = self.get_failure(get_event1, CancelledError).value
  343. self.assertIsInstance(exc, CancelledError)
  344. # The second `get_event` call should complete successfully.
  345. self.get_success(get_event2)
  346. def test_second_get_event_cancelled(self):
  347. """Test cancellation of the second `get_event` call sharing a database fetch."""
  348. with self.blocking_get_event_calls() as (unblock, get_event1, get_event2):
  349. # Cancel the second `get_event` call.
  350. get_event2.cancel()
  351. # The first `get_event` call must not be cancelled.
  352. self.assertNoResult(get_event1)
  353. # The second `get_event` call gets cancelled immediately.
  354. exc = self.get_failure(get_event2, CancelledError).value
  355. self.assertIsInstance(exc, CancelledError)
  356. # Unblock the database fetch.
  357. unblock.callback(None)
  358. # The first `get_event` call should complete successfully.
  359. self.get_success(get_event1)