async.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor
  16. from .logcontext import (
  17. PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
  18. )
  19. from synapse.util import unwrapFirstError
  20. from contextlib import contextmanager
  21. @defer.inlineCallbacks
  22. def sleep(seconds):
  23. d = defer.Deferred()
  24. with PreserveLoggingContext():
  25. reactor.callLater(seconds, d.callback, seconds)
  26. res = yield d
  27. defer.returnValue(res)
  28. def run_on_reactor():
  29. """ This will cause the rest of the function to be invoked upon the next
  30. iteration of the main loop
  31. """
  32. return sleep(0)
  33. class ObservableDeferred(object):
  34. """Wraps a deferred object so that we can add observer deferreds. These
  35. observer deferreds do not affect the callback chain of the original
  36. deferred.
  37. If consumeErrors is true errors will be captured from the origin deferred.
  38. Cancelling or otherwise resolving an observer will not affect the original
  39. ObservableDeferred.
  40. """
  41. __slots__ = ["_deferred", "_observers", "_result"]
  42. def __init__(self, deferred, consumeErrors=False):
  43. object.__setattr__(self, "_deferred", deferred)
  44. object.__setattr__(self, "_result", None)
  45. object.__setattr__(self, "_observers", set())
  46. def callback(r):
  47. object.__setattr__(self, "_result", (True, r))
  48. while self._observers:
  49. try:
  50. # TODO: Handle errors here.
  51. self._observers.pop().callback(r)
  52. except:
  53. pass
  54. return r
  55. def errback(f):
  56. object.__setattr__(self, "_result", (False, f))
  57. while self._observers:
  58. try:
  59. # TODO: Handle errors here.
  60. self._observers.pop().errback(f)
  61. except:
  62. pass
  63. if consumeErrors:
  64. return None
  65. else:
  66. return f
  67. deferred.addCallbacks(callback, errback)
  68. def observe(self):
  69. if not self._result:
  70. d = defer.Deferred()
  71. def remove(r):
  72. self._observers.discard(d)
  73. return r
  74. d.addBoth(remove)
  75. self._observers.add(d)
  76. return d
  77. else:
  78. success, res = self._result
  79. return defer.succeed(res) if success else defer.fail(res)
  80. def observers(self):
  81. return self._observers
  82. def has_called(self):
  83. return self._result is not None
  84. def has_succeeded(self):
  85. return self._result is not None and self._result[0] is True
  86. def get_result(self):
  87. return self._result[1]
  88. def __getattr__(self, name):
  89. return getattr(self._deferred, name)
  90. def __setattr__(self, name, value):
  91. setattr(self._deferred, name, value)
  92. def __repr__(self):
  93. return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
  94. id(self), self._result, self._deferred,
  95. )
  96. def concurrently_execute(func, args, limit):
  97. """Executes the function with each argument conncurrently while limiting
  98. the number of concurrent executions.
  99. Args:
  100. func (func): Function to execute, should return a deferred.
  101. args (list): List of arguments to pass to func, each invocation of func
  102. gets a signle argument.
  103. limit (int): Maximum number of conccurent executions.
  104. Returns:
  105. deferred: Resolved when all function invocations have finished.
  106. """
  107. it = iter(args)
  108. @defer.inlineCallbacks
  109. def _concurrently_execute_inner():
  110. try:
  111. while True:
  112. yield func(it.next())
  113. except StopIteration:
  114. pass
  115. return preserve_context_over_deferred(defer.gatherResults([
  116. preserve_fn(_concurrently_execute_inner)()
  117. for _ in xrange(limit)
  118. ], consumeErrors=True)).addErrback(unwrapFirstError)
  119. class Linearizer(object):
  120. """Linearizes access to resources based on a key. Useful to ensure only one
  121. thing is happening at a time on a given resource.
  122. Example:
  123. with (yield linearizer.queue("test_key")):
  124. # do some work.
  125. """
  126. def __init__(self):
  127. self.key_to_defer = {}
  128. @defer.inlineCallbacks
  129. def queue(self, key):
  130. # If there is already a deferred in the queue, we pull it out so that
  131. # we can wait on it later.
  132. # Then we replace it with a deferred that we resolve *after* the
  133. # context manager has exited.
  134. # We only return the context manager after the previous deferred has
  135. # resolved.
  136. # This all has the net effect of creating a chain of deferreds that
  137. # wait for the previous deferred before starting their work.
  138. current_defer = self.key_to_defer.get(key)
  139. new_defer = defer.Deferred()
  140. self.key_to_defer[key] = new_defer
  141. if current_defer:
  142. with PreserveLoggingContext():
  143. yield current_defer
  144. @contextmanager
  145. def _ctx_manager():
  146. try:
  147. yield
  148. finally:
  149. new_defer.callback(None)
  150. current_d = self.key_to_defer.get(key)
  151. if current_d is new_defer:
  152. self.key_to_defer.pop(key, None)
  153. defer.returnValue(_ctx_manager())
  154. class Limiter(object):
  155. """Limits concurrent access to resources based on a key. Useful to ensure
  156. only a few thing happen at a time on a given resource.
  157. Example:
  158. with (yield limiter.queue("test_key")):
  159. # do some work.
  160. """
  161. def __init__(self, max_count):
  162. """
  163. Args:
  164. max_count(int): The maximum number of concurrent access
  165. """
  166. self.max_count = max_count
  167. # key_to_defer is a map from the key to a 2 element list where
  168. # the first element is the number of things executing
  169. # the second element is a list of deferreds for the things blocked from
  170. # executing.
  171. self.key_to_defer = {}
  172. @defer.inlineCallbacks
  173. def queue(self, key):
  174. entry = self.key_to_defer.setdefault(key, [0, []])
  175. # If the number of things executing is greater than the maximum
  176. # then add a deferred to the list of blocked items
  177. # When on of the things currently executing finishes it will callback
  178. # this item so that it can continue executing.
  179. if entry[0] >= self.max_count:
  180. new_defer = defer.Deferred()
  181. entry[1].append(new_defer)
  182. with PreserveLoggingContext():
  183. yield new_defer
  184. entry[0] += 1
  185. @contextmanager
  186. def _ctx_manager():
  187. try:
  188. yield
  189. finally:
  190. # We've finished executing so check if there are any things
  191. # blocked waiting to execute and start one of them
  192. entry[0] -= 1
  193. try:
  194. entry[1].pop(0).callback(None)
  195. except IndexError:
  196. # If nothing else is executing for this key then remove it
  197. # from the map
  198. if entry[0] == 0:
  199. self.key_to_defer.pop(key, None)
  200. defer.returnValue(_ctx_manager())
  201. class ReadWriteLock(object):
  202. """A deferred style read write lock.
  203. Example:
  204. with (yield read_write_lock.read("test_key")):
  205. # do some work
  206. """
  207. # IMPLEMENTATION NOTES
  208. #
  209. # We track the most recent queued reader and writer deferreds (which get
  210. # resolved when they release the lock).
  211. #
  212. # Read: We know its safe to acquire a read lock when the latest writer has
  213. # been resolved. The new reader is appeneded to the list of latest readers.
  214. #
  215. # Write: We know its safe to acquire the write lock when both the latest
  216. # writers and readers have been resolved. The new writer replaces the latest
  217. # writer.
  218. def __init__(self):
  219. # Latest readers queued
  220. self.key_to_current_readers = {}
  221. # Latest writer queued
  222. self.key_to_current_writer = {}
  223. @defer.inlineCallbacks
  224. def read(self, key):
  225. new_defer = defer.Deferred()
  226. curr_readers = self.key_to_current_readers.setdefault(key, set())
  227. curr_writer = self.key_to_current_writer.get(key, None)
  228. curr_readers.add(new_defer)
  229. # We wait for the latest writer to finish writing. We can safely ignore
  230. # any existing readers... as they're readers.
  231. yield curr_writer
  232. @contextmanager
  233. def _ctx_manager():
  234. try:
  235. yield
  236. finally:
  237. new_defer.callback(None)
  238. self.key_to_current_readers.get(key, set()).discard(new_defer)
  239. defer.returnValue(_ctx_manager())
  240. @defer.inlineCallbacks
  241. def write(self, key):
  242. new_defer = defer.Deferred()
  243. curr_readers = self.key_to_current_readers.get(key, set())
  244. curr_writer = self.key_to_current_writer.get(key, None)
  245. # We wait on all latest readers and writer.
  246. to_wait_on = list(curr_readers)
  247. if curr_writer:
  248. to_wait_on.append(curr_writer)
  249. # We can clear the list of current readers since the new writer waits
  250. # for them to finish.
  251. curr_readers.clear()
  252. self.key_to_current_writer[key] = new_defer
  253. yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
  254. @contextmanager
  255. def _ctx_manager():
  256. try:
  257. yield
  258. finally:
  259. new_defer.callback(None)
  260. if self.key_to_current_writer[key] == new_defer:
  261. self.key_to_current_writer.pop(key)
  262. defer.returnValue(_ctx_manager())