media_storage.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vecotr Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import contextlib
  16. import logging
  17. import os
  18. import shutil
  19. from twisted.internet import defer
  20. from twisted.protocols.basic import FileSender
  21. from synapse.logging.context import defer_to_thread, make_deferred_yieldable
  22. from synapse.util.file_consumer import BackgroundFileConsumer
  23. from ._base import Responder
  24. logger = logging.getLogger(__name__)
  25. class MediaStorage(object):
  26. """Responsible for storing/fetching files from local sources.
  27. Args:
  28. hs (synapse.server.Homeserver)
  29. local_media_directory (str): Base path where we store media on disk
  30. filepaths (MediaFilePaths)
  31. storage_providers ([StorageProvider]): List of StorageProvider that are
  32. used to fetch and store files.
  33. """
  34. def __init__(self, hs, local_media_directory, filepaths, storage_providers):
  35. self.hs = hs
  36. self.local_media_directory = local_media_directory
  37. self.filepaths = filepaths
  38. self.storage_providers = storage_providers
  39. @defer.inlineCallbacks
  40. def store_file(self, source, file_info):
  41. """Write `source` to the on disk media store, and also any other
  42. configured storage providers
  43. Args:
  44. source: A file like object that should be written
  45. file_info (FileInfo): Info about the file to store
  46. Returns:
  47. Deferred[str]: the file path written to in the primary media store
  48. """
  49. with self.store_into_file(file_info) as (f, fname, finish_cb):
  50. # Write to the main repository
  51. yield defer_to_thread(
  52. self.hs.get_reactor(), _write_file_synchronously, source, f
  53. )
  54. yield finish_cb()
  55. return fname
  56. @contextlib.contextmanager
  57. def store_into_file(self, file_info):
  58. """Context manager used to get a file like object to write into, as
  59. described by file_info.
  60. Actually yields a 3-tuple (file, fname, finish_cb), where file is a file
  61. like object that can be written to, fname is the absolute path of file
  62. on disk, and finish_cb is a function that returns a Deferred.
  63. fname can be used to read the contents from after upload, e.g. to
  64. generate thumbnails.
  65. finish_cb must be called and waited on after the file has been
  66. successfully been written to. Should not be called if there was an
  67. error.
  68. Args:
  69. file_info (FileInfo): Info about the file to store
  70. Example:
  71. with media_storage.store_into_file(info) as (f, fname, finish_cb):
  72. # .. write into f ...
  73. yield finish_cb()
  74. """
  75. path = self._file_info_to_path(file_info)
  76. fname = os.path.join(self.local_media_directory, path)
  77. dirname = os.path.dirname(fname)
  78. if not os.path.exists(dirname):
  79. os.makedirs(dirname)
  80. finished_called = [False]
  81. @defer.inlineCallbacks
  82. def finish():
  83. for provider in self.storage_providers:
  84. yield provider.store_file(path, file_info)
  85. finished_called[0] = True
  86. try:
  87. with open(fname, "wb") as f:
  88. yield f, fname, finish
  89. except Exception:
  90. try:
  91. os.remove(fname)
  92. except Exception:
  93. pass
  94. raise
  95. if not finished_called:
  96. raise Exception("Finished callback not called")
  97. @defer.inlineCallbacks
  98. def fetch_media(self, file_info):
  99. """Attempts to fetch media described by file_info from the local cache
  100. and configured storage providers.
  101. Args:
  102. file_info (FileInfo)
  103. Returns:
  104. Deferred[Responder|None]: Returns a Responder if the file was found,
  105. otherwise None.
  106. """
  107. path = self._file_info_to_path(file_info)
  108. local_path = os.path.join(self.local_media_directory, path)
  109. if os.path.exists(local_path):
  110. return FileResponder(open(local_path, "rb"))
  111. for provider in self.storage_providers:
  112. res = yield provider.fetch(path, file_info)
  113. if res:
  114. logger.debug("Streaming %s from %s", path, provider)
  115. return res
  116. return None
  117. @defer.inlineCallbacks
  118. def ensure_media_is_in_local_cache(self, file_info):
  119. """Ensures that the given file is in the local cache. Attempts to
  120. download it from storage providers if it isn't.
  121. Args:
  122. file_info (FileInfo)
  123. Returns:
  124. Deferred[str]: Full path to local file
  125. """
  126. path = self._file_info_to_path(file_info)
  127. local_path = os.path.join(self.local_media_directory, path)
  128. if os.path.exists(local_path):
  129. return local_path
  130. dirname = os.path.dirname(local_path)
  131. if not os.path.exists(dirname):
  132. os.makedirs(dirname)
  133. for provider in self.storage_providers:
  134. res = yield provider.fetch(path, file_info)
  135. if res:
  136. with res:
  137. consumer = BackgroundFileConsumer(
  138. open(local_path, "wb"), self.hs.get_reactor()
  139. )
  140. yield res.write_to_consumer(consumer)
  141. yield consumer.wait()
  142. return local_path
  143. raise Exception("file could not be found")
  144. def _file_info_to_path(self, file_info):
  145. """Converts file_info into a relative path.
  146. The path is suitable for storing files under a directory, e.g. used to
  147. store files on local FS under the base media repository directory.
  148. Args:
  149. file_info (FileInfo)
  150. Returns:
  151. str
  152. """
  153. if file_info.url_cache:
  154. if file_info.thumbnail:
  155. return self.filepaths.url_cache_thumbnail_rel(
  156. media_id=file_info.file_id,
  157. width=file_info.thumbnail_width,
  158. height=file_info.thumbnail_height,
  159. content_type=file_info.thumbnail_type,
  160. method=file_info.thumbnail_method,
  161. )
  162. return self.filepaths.url_cache_filepath_rel(file_info.file_id)
  163. if file_info.server_name:
  164. if file_info.thumbnail:
  165. return self.filepaths.remote_media_thumbnail_rel(
  166. server_name=file_info.server_name,
  167. file_id=file_info.file_id,
  168. width=file_info.thumbnail_width,
  169. height=file_info.thumbnail_height,
  170. content_type=file_info.thumbnail_type,
  171. method=file_info.thumbnail_method,
  172. )
  173. return self.filepaths.remote_media_filepath_rel(
  174. file_info.server_name, file_info.file_id
  175. )
  176. if file_info.thumbnail:
  177. return self.filepaths.local_media_thumbnail_rel(
  178. media_id=file_info.file_id,
  179. width=file_info.thumbnail_width,
  180. height=file_info.thumbnail_height,
  181. content_type=file_info.thumbnail_type,
  182. method=file_info.thumbnail_method,
  183. )
  184. return self.filepaths.local_media_filepath_rel(file_info.file_id)
  185. def _write_file_synchronously(source, dest):
  186. """Write `source` to the file like `dest` synchronously. Should be called
  187. from a thread.
  188. Args:
  189. source: A file like object that's to be written
  190. dest: A file like object to be written to
  191. """
  192. source.seek(0) # Ensure we read from the start of the file
  193. shutil.copyfileobj(source, dest)
  194. class FileResponder(Responder):
  195. """Wraps an open file that can be sent to a request.
  196. Args:
  197. open_file (file): A file like object to be streamed ot the client,
  198. is closed when finished streaming.
  199. """
  200. def __init__(self, open_file):
  201. self.open_file = open_file
  202. def write_to_consumer(self, consumer):
  203. return make_deferred_yieldable(
  204. FileSender().beginFileTransfer(self.open_file, consumer)
  205. )
  206. def __exit__(self, exc_type, exc_val, exc_tb):
  207. self.open_file.close()