background_updates.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from . import engines
  17. from twisted.internet import defer
  18. import ujson as json
  19. import logging
  20. logger = logging.getLogger(__name__)
  21. class BackgroundUpdatePerformance(object):
  22. """Tracks the how long a background update is taking to update its items"""
  23. def __init__(self, name):
  24. self.name = name
  25. self.total_item_count = 0
  26. self.total_duration_ms = 0
  27. self.avg_item_count = 0
  28. self.avg_duration_ms = 0
  29. def update(self, item_count, duration_ms):
  30. """Update the stats after doing an update"""
  31. self.total_item_count += item_count
  32. self.total_duration_ms += duration_ms
  33. # Exponential moving averages for the number of items updated and
  34. # the duration.
  35. self.avg_item_count += 0.1 * (item_count - self.avg_item_count)
  36. self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms)
  37. def average_items_per_ms(self):
  38. """An estimate of how long it takes to do a single update.
  39. Returns:
  40. A duration in ms as a float
  41. """
  42. if self.total_item_count == 0:
  43. return None
  44. else:
  45. # Use the exponential moving average so that we can adapt to
  46. # changes in how long the update process takes.
  47. return float(self.avg_item_count) / float(self.avg_duration_ms)
  48. def total_items_per_ms(self):
  49. """An estimate of how long it takes to do a single update.
  50. Returns:
  51. A duration in ms as a float
  52. """
  53. if self.total_item_count == 0:
  54. return None
  55. else:
  56. return float(self.total_item_count) / float(self.total_duration_ms)
  57. class BackgroundUpdateStore(SQLBaseStore):
  58. """ Background updates are updates to the database that run in the
  59. background. Each update processes a batch of data at once. We attempt to
  60. limit the impact of each update by monitoring how long each batch takes to
  61. process and autotuning the batch size.
  62. """
  63. MINIMUM_BACKGROUND_BATCH_SIZE = 100
  64. DEFAULT_BACKGROUND_BATCH_SIZE = 100
  65. BACKGROUND_UPDATE_INTERVAL_MS = 1000
  66. BACKGROUND_UPDATE_DURATION_MS = 100
  67. def __init__(self, hs):
  68. super(BackgroundUpdateStore, self).__init__(hs)
  69. self._background_update_performance = {}
  70. self._background_update_queue = []
  71. self._background_update_handlers = {}
  72. self._background_update_timer = None
  73. @defer.inlineCallbacks
  74. def start_doing_background_updates(self):
  75. assert self._background_update_timer is None, \
  76. "background updates already running"
  77. logger.info("Starting background schema updates")
  78. while True:
  79. sleep = defer.Deferred()
  80. self._background_update_timer = self._clock.call_later(
  81. self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
  82. )
  83. try:
  84. yield sleep
  85. finally:
  86. self._background_update_timer = None
  87. try:
  88. result = yield self.do_next_background_update(
  89. self.BACKGROUND_UPDATE_DURATION_MS
  90. )
  91. except:
  92. logger.exception("Error doing update")
  93. else:
  94. if result is None:
  95. logger.info(
  96. "No more background updates to do."
  97. " Unscheduling background update task."
  98. )
  99. defer.returnValue(None)
  100. @defer.inlineCallbacks
  101. def do_next_background_update(self, desired_duration_ms):
  102. """Does some amount of work on the next queued background update
  103. Args:
  104. desired_duration_ms(float): How long we want to spend
  105. updating.
  106. Returns:
  107. A deferred that completes once some amount of work is done.
  108. The deferred will have a value of None if there is currently
  109. no more work to do.
  110. """
  111. if not self._background_update_queue:
  112. updates = yield self._simple_select_list(
  113. "background_updates",
  114. keyvalues=None,
  115. retcols=("update_name", "depends_on"),
  116. )
  117. in_flight = set(update["update_name"] for update in updates)
  118. for update in updates:
  119. if update["depends_on"] not in in_flight:
  120. self._background_update_queue.append(update['update_name'])
  121. if not self._background_update_queue:
  122. # no work left to do
  123. defer.returnValue(None)
  124. # pop from the front, and add back to the back
  125. update_name = self._background_update_queue.pop(0)
  126. self._background_update_queue.append(update_name)
  127. res = yield self._do_background_update(update_name, desired_duration_ms)
  128. defer.returnValue(res)
  129. @defer.inlineCallbacks
  130. def _do_background_update(self, update_name, desired_duration_ms):
  131. logger.info("Starting update batch on background update '%s'",
  132. update_name)
  133. update_handler = self._background_update_handlers[update_name]
  134. performance = self._background_update_performance.get(update_name)
  135. if performance is None:
  136. performance = BackgroundUpdatePerformance(update_name)
  137. self._background_update_performance[update_name] = performance
  138. items_per_ms = performance.average_items_per_ms()
  139. if items_per_ms is not None:
  140. batch_size = int(desired_duration_ms * items_per_ms)
  141. # Clamp the batch size so that we always make progress
  142. batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
  143. else:
  144. batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
  145. progress_json = yield self._simple_select_one_onecol(
  146. "background_updates",
  147. keyvalues={"update_name": update_name},
  148. retcol="progress_json"
  149. )
  150. progress = json.loads(progress_json)
  151. time_start = self._clock.time_msec()
  152. items_updated = yield update_handler(progress, batch_size)
  153. time_stop = self._clock.time_msec()
  154. duration_ms = time_stop - time_start
  155. logger.info(
  156. "Updating %r. Updated %r items in %rms."
  157. " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
  158. update_name, items_updated, duration_ms,
  159. performance.total_items_per_ms(),
  160. performance.average_items_per_ms(),
  161. performance.total_item_count,
  162. batch_size,
  163. )
  164. performance.update(items_updated, duration_ms)
  165. defer.returnValue(len(self._background_update_performance))
  166. def register_background_update_handler(self, update_name, update_handler):
  167. """Register a handler for doing a background update.
  168. The handler should take two arguments:
  169. * A dict of the current progress
  170. * An integer count of the number of items to update in this batch.
  171. The handler should return a deferred integer count of items updated.
  172. The hander is responsible for updating the progress of the update.
  173. Args:
  174. update_name(str): The name of the update that this code handles.
  175. update_handler(function): The function that does the update.
  176. """
  177. self._background_update_handlers[update_name] = update_handler
  178. def register_background_index_update(self, update_name, index_name,
  179. table, columns, where_clause=None):
  180. """Helper for store classes to do a background index addition
  181. To use:
  182. 1. use a schema delta file to add a background update. Example:
  183. INSERT INTO background_updates (update_name, progress_json) VALUES
  184. ('my_new_index', '{}');
  185. 2. In the Store constructor, call this method
  186. Args:
  187. update_name (str): update_name to register for
  188. index_name (str): name of index to add
  189. table (str): table to add index to
  190. columns (list[str]): columns/expressions to include in index
  191. """
  192. # if this is postgres, we add the indexes concurrently. Otherwise
  193. # we fall back to doing it inline
  194. if isinstance(self.database_engine, engines.PostgresEngine):
  195. conc = True
  196. else:
  197. conc = False
  198. # We don't use partial indices on SQLite as it wasn't introduced
  199. # until 3.8, and wheezy has 3.7
  200. where_clause = None
  201. sql = (
  202. "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)"
  203. " %(where_clause)s"
  204. ) % {
  205. "conc": "CONCURRENTLY" if conc else "",
  206. "name": index_name,
  207. "table": table,
  208. "columns": ", ".join(columns),
  209. "where_clause": "WHERE " + where_clause if where_clause else ""
  210. }
  211. def create_index_concurrently(conn):
  212. conn.rollback()
  213. # postgres insists on autocommit for the index
  214. conn.set_session(autocommit=True)
  215. c = conn.cursor()
  216. c.execute(sql)
  217. conn.set_session(autocommit=False)
  218. def create_index(conn):
  219. c = conn.cursor()
  220. c.execute(sql)
  221. @defer.inlineCallbacks
  222. def updater(progress, batch_size):
  223. logger.info("Adding index %s to %s", index_name, table)
  224. if conc:
  225. yield self.runWithConnection(create_index_concurrently)
  226. else:
  227. yield self.runWithConnection(create_index)
  228. yield self._end_background_update(update_name)
  229. defer.returnValue(1)
  230. self.register_background_update_handler(update_name, updater)
  231. def start_background_update(self, update_name, progress):
  232. """Starts a background update running.
  233. Args:
  234. update_name: The update to set running.
  235. progress: The initial state of the progress of the update.
  236. Returns:
  237. A deferred that completes once the task has been added to the
  238. queue.
  239. """
  240. # Clear the background update queue so that we will pick up the new
  241. # task on the next iteration of do_background_update.
  242. self._background_update_queue = []
  243. progress_json = json.dumps(progress)
  244. return self._simple_insert(
  245. "background_updates",
  246. {"update_name": update_name, "progress_json": progress_json}
  247. )
  248. def _end_background_update(self, update_name):
  249. """Removes a completed background update task from the queue.
  250. Args:
  251. update_name(str): The name of the completed task to remove
  252. Returns:
  253. A deferred that completes once the task is removed.
  254. """
  255. self._background_update_queue = [
  256. name for name in self._background_update_queue if name != update_name
  257. ]
  258. return self._simple_delete_one(
  259. "background_updates", keyvalues={"update_name": update_name}
  260. )
  261. def _background_update_progress_txn(self, txn, update_name, progress):
  262. """Update the progress of a background update
  263. Args:
  264. txn(cursor): The transaction.
  265. update_name(str): The name of the background update task
  266. progress(dict): The progress of the update.
  267. """
  268. progress_json = json.dumps(progress)
  269. self._simple_update_one_txn(
  270. txn,
  271. "background_updates",
  272. keyvalues={"update_name": update_name},
  273. updatevalues={"progress_json": progress_json},
  274. )