test_events_worker.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. from contextlib import contextmanager
  16. from typing import Generator
  17. from twisted.enterprise.adbapi import ConnectionPool
  18. from twisted.internet.defer import ensureDeferred
  19. from twisted.test.proto_helpers import MemoryReactor
  20. from synapse.api.room_versions import EventFormatVersions, RoomVersions
  21. from synapse.logging.context import LoggingContext
  22. from synapse.rest import admin
  23. from synapse.rest.client import login, room
  24. from synapse.server import HomeServer
  25. from synapse.storage.databases.main.events_worker import (
  26. EVENT_QUEUE_THREADS,
  27. EventsWorkerStore,
  28. )
  29. from synapse.storage.types import Connection
  30. from synapse.util import Clock
  31. from synapse.util.async_helpers import yieldable_gather_results
  32. from tests import unittest
  33. class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
  34. def prepare(self, reactor, clock, hs):
  35. self.store: EventsWorkerStore = hs.get_datastore()
  36. # insert some test data
  37. for rid in ("room1", "room2"):
  38. self.get_success(
  39. self.store.db_pool.simple_insert(
  40. "rooms",
  41. {"room_id": rid, "room_version": 4},
  42. )
  43. )
  44. for idx, (rid, eid) in enumerate(
  45. (
  46. ("room1", "event10"),
  47. ("room1", "event11"),
  48. ("room1", "event12"),
  49. ("room2", "event20"),
  50. )
  51. ):
  52. self.get_success(
  53. self.store.db_pool.simple_insert(
  54. "events",
  55. {
  56. "event_id": eid,
  57. "room_id": rid,
  58. "topological_ordering": idx,
  59. "stream_ordering": idx,
  60. "type": "test",
  61. "processed": True,
  62. "outlier": False,
  63. },
  64. )
  65. )
  66. self.get_success(
  67. self.store.db_pool.simple_insert(
  68. "event_json",
  69. {
  70. "event_id": eid,
  71. "room_id": rid,
  72. "json": json.dumps({"type": "test", "room_id": rid}),
  73. "internal_metadata": "{}",
  74. "format_version": 3,
  75. },
  76. )
  77. )
  78. def test_simple(self):
  79. with LoggingContext(name="test") as ctx:
  80. res = self.get_success(
  81. self.store.have_seen_events("room1", ["event10", "event19"])
  82. )
  83. self.assertEquals(res, {"event10"})
  84. # that should result in a single db query
  85. self.assertEquals(ctx.get_resource_usage().db_txn_count, 1)
  86. # a second lookup of the same events should cause no queries
  87. with LoggingContext(name="test") as ctx:
  88. res = self.get_success(
  89. self.store.have_seen_events("room1", ["event10", "event19"])
  90. )
  91. self.assertEquals(res, {"event10"})
  92. self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
  93. def test_query_via_event_cache(self):
  94. # fetch an event into the event cache
  95. self.get_success(self.store.get_event("event10"))
  96. # looking it up should now cause no db hits
  97. with LoggingContext(name="test") as ctx:
  98. res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
  99. self.assertEquals(res, {"event10"})
  100. self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
  101. class EventCacheTestCase(unittest.HomeserverTestCase):
  102. """Test that the various layers of event cache works."""
  103. servlets = [
  104. admin.register_servlets,
  105. room.register_servlets,
  106. login.register_servlets,
  107. ]
  108. def prepare(self, reactor, clock, hs):
  109. self.store: EventsWorkerStore = hs.get_datastore()
  110. self.user = self.register_user("user", "pass")
  111. self.token = self.login(self.user, "pass")
  112. self.room = self.helper.create_room_as(self.user, tok=self.token)
  113. res = self.helper.send(self.room, tok=self.token)
  114. self.event_id = res["event_id"]
  115. # Reset the event cache so the tests start with it empty
  116. self.store._get_event_cache.clear()
  117. def test_simple(self):
  118. """Test that we cache events that we pull from the DB."""
  119. with LoggingContext("test") as ctx:
  120. self.get_success(self.store.get_event(self.event_id))
  121. # We should have fetched the event from the DB
  122. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  123. def test_dedupe(self):
  124. """Test that if we request the same event multiple times we only pull it
  125. out once.
  126. """
  127. with LoggingContext("test") as ctx:
  128. d = yieldable_gather_results(
  129. self.store.get_event, [self.event_id, self.event_id]
  130. )
  131. self.get_success(d)
  132. # We should have fetched the event from the DB
  133. self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
  134. class DatabaseOutageTestCase(unittest.HomeserverTestCase):
  135. """Test event fetching during a database outage."""
  136. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
  137. self.store: EventsWorkerStore = hs.get_datastore()
  138. self.room_id = f"!room:{hs.hostname}"
  139. self.event_ids = [f"event{i}" for i in range(20)]
  140. self._populate_events()
  141. def _populate_events(self) -> None:
  142. """Ensure that there are test events in the database.
  143. When testing with the in-memory SQLite database, all the events are lost during
  144. the simulated outage.
  145. To ensure consistency between `room_id`s and `event_id`s before and after the
  146. outage, rows are built and inserted manually.
  147. Upserts are used to handle the non-SQLite case where events are not lost.
  148. """
  149. self.get_success(
  150. self.store.db_pool.simple_upsert(
  151. "rooms",
  152. {"room_id": self.room_id},
  153. {"room_version": RoomVersions.V4.identifier},
  154. )
  155. )
  156. self.event_ids = [f"event{i}" for i in range(20)]
  157. for idx, event_id in enumerate(self.event_ids):
  158. self.get_success(
  159. self.store.db_pool.simple_upsert(
  160. "events",
  161. {"event_id": event_id},
  162. {
  163. "event_id": event_id,
  164. "room_id": self.room_id,
  165. "topological_ordering": idx,
  166. "stream_ordering": idx,
  167. "type": "test",
  168. "processed": True,
  169. "outlier": False,
  170. },
  171. )
  172. )
  173. self.get_success(
  174. self.store.db_pool.simple_upsert(
  175. "event_json",
  176. {"event_id": event_id},
  177. {
  178. "room_id": self.room_id,
  179. "json": json.dumps({"type": "test", "room_id": self.room_id}),
  180. "internal_metadata": "{}",
  181. "format_version": EventFormatVersions.V3,
  182. },
  183. )
  184. )
  185. @contextmanager
  186. def _outage(self) -> Generator[None, None, None]:
  187. """Simulate a database outage.
  188. Returns:
  189. A context manager. While the context is active, any attempts to connect to
  190. the database will fail.
  191. """
  192. connection_pool = self.store.db_pool._db_pool
  193. # Close all connections and shut down the database `ThreadPool`.
  194. connection_pool.close()
  195. # Restart the database `ThreadPool`.
  196. connection_pool.start()
  197. original_connection_factory = connection_pool.connectionFactory
  198. def connection_factory(_pool: ConnectionPool) -> Connection:
  199. raise Exception("Could not connect to the database.")
  200. connection_pool.connectionFactory = connection_factory # type: ignore[assignment]
  201. try:
  202. yield
  203. finally:
  204. connection_pool.connectionFactory = original_connection_factory
  205. # If the in-memory SQLite database is being used, all the events are gone.
  206. # Restore the test data.
  207. self._populate_events()
  208. def test_failure(self) -> None:
  209. """Test that event fetches do not get stuck during a database outage."""
  210. with self._outage():
  211. failure = self.get_failure(
  212. self.store.get_event(self.event_ids[0]), Exception
  213. )
  214. self.assertEqual(str(failure.value), "Could not connect to the database.")
  215. def test_recovery(self) -> None:
  216. """Test that event fetchers recover after a database outage."""
  217. with self._outage():
  218. # Kick off a bunch of event fetches but do not pump the reactor
  219. event_deferreds = []
  220. for event_id in self.event_ids:
  221. event_deferreds.append(ensureDeferred(self.store.get_event(event_id)))
  222. # We should have maxed out on event fetcher threads
  223. self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS)
  224. # All the event fetchers will fail
  225. self.pump()
  226. self.assertEqual(self.store._event_fetch_ongoing, 0)
  227. for event_deferred in event_deferreds:
  228. failure = self.get_failure(event_deferred, Exception)
  229. self.assertEqual(
  230. str(failure.value), "Could not connect to the database."
  231. )
  232. # This next event fetch should succeed
  233. self.get_success(self.store.get_event(self.event_ids[0]))