media_repository.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.storage.background_updates import BackgroundUpdateStore
  16. class MediaRepositoryStore(BackgroundUpdateStore):
  17. """Persistence for attachments and avatars"""
  18. def __init__(self, db_conn, hs):
  19. super(MediaRepositoryStore, self).__init__(db_conn, hs)
  20. self.register_background_index_update(
  21. update_name='local_media_repository_url_idx',
  22. index_name='local_media_repository_url_idx',
  23. table='local_media_repository',
  24. columns=['created_ts'],
  25. where_clause='url_cache IS NOT NULL',
  26. )
  27. def get_local_media(self, media_id):
  28. """Get the metadata for a local piece of media
  29. Returns:
  30. None if the media_id doesn't exist.
  31. """
  32. return self._simple_select_one(
  33. "local_media_repository",
  34. {"media_id": media_id},
  35. (
  36. "media_type", "media_length", "upload_name", "created_ts",
  37. "quarantined_by", "url_cache",
  38. ),
  39. allow_none=True,
  40. desc="get_local_media",
  41. )
  42. def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
  43. media_length, user_id, url_cache=None):
  44. return self._simple_insert(
  45. "local_media_repository",
  46. {
  47. "media_id": media_id,
  48. "media_type": media_type,
  49. "created_ts": time_now_ms,
  50. "upload_name": upload_name,
  51. "media_length": media_length,
  52. "user_id": user_id.to_string(),
  53. "url_cache": url_cache,
  54. },
  55. desc="store_local_media",
  56. )
  57. def get_url_cache(self, url, ts):
  58. """Get the media_id and ts for a cached URL as of the given timestamp
  59. Returns:
  60. None if the URL isn't cached.
  61. """
  62. def get_url_cache_txn(txn):
  63. # get the most recently cached result (relative to the given ts)
  64. sql = (
  65. "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
  66. " FROM local_media_repository_url_cache"
  67. " WHERE url = ? AND download_ts <= ?"
  68. " ORDER BY download_ts DESC LIMIT 1"
  69. )
  70. txn.execute(sql, (url, ts))
  71. row = txn.fetchone()
  72. if not row:
  73. # ...or if we've requested a timestamp older than the oldest
  74. # copy in the cache, return the oldest copy (if any)
  75. sql = (
  76. "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
  77. " FROM local_media_repository_url_cache"
  78. " WHERE url = ? AND download_ts > ?"
  79. " ORDER BY download_ts ASC LIMIT 1"
  80. )
  81. txn.execute(sql, (url, ts))
  82. row = txn.fetchone()
  83. if not row:
  84. return None
  85. return dict(zip((
  86. 'response_code', 'etag', 'expires_ts', 'og', 'media_id', 'download_ts'
  87. ), row))
  88. return self.runInteraction(
  89. "get_url_cache", get_url_cache_txn
  90. )
  91. def store_url_cache(self, url, response_code, etag, expires_ts, og, media_id,
  92. download_ts):
  93. return self._simple_insert(
  94. "local_media_repository_url_cache",
  95. {
  96. "url": url,
  97. "response_code": response_code,
  98. "etag": etag,
  99. "expires_ts": expires_ts,
  100. "og": og,
  101. "media_id": media_id,
  102. "download_ts": download_ts,
  103. },
  104. desc="store_url_cache",
  105. )
  106. def get_local_media_thumbnails(self, media_id):
  107. return self._simple_select_list(
  108. "local_media_repository_thumbnails",
  109. {"media_id": media_id},
  110. (
  111. "thumbnail_width", "thumbnail_height", "thumbnail_method",
  112. "thumbnail_type", "thumbnail_length",
  113. ),
  114. desc="get_local_media_thumbnails",
  115. )
  116. def store_local_thumbnail(self, media_id, thumbnail_width,
  117. thumbnail_height, thumbnail_type,
  118. thumbnail_method, thumbnail_length):
  119. return self._simple_insert(
  120. "local_media_repository_thumbnails",
  121. {
  122. "media_id": media_id,
  123. "thumbnail_width": thumbnail_width,
  124. "thumbnail_height": thumbnail_height,
  125. "thumbnail_method": thumbnail_method,
  126. "thumbnail_type": thumbnail_type,
  127. "thumbnail_length": thumbnail_length,
  128. },
  129. desc="store_local_thumbnail",
  130. )
  131. def get_cached_remote_media(self, origin, media_id):
  132. return self._simple_select_one(
  133. "remote_media_cache",
  134. {"media_origin": origin, "media_id": media_id},
  135. (
  136. "media_type", "media_length", "upload_name", "created_ts",
  137. "filesystem_id", "quarantined_by",
  138. ),
  139. allow_none=True,
  140. desc="get_cached_remote_media",
  141. )
  142. def store_cached_remote_media(self, origin, media_id, media_type,
  143. media_length, time_now_ms, upload_name,
  144. filesystem_id):
  145. return self._simple_insert(
  146. "remote_media_cache",
  147. {
  148. "media_origin": origin,
  149. "media_id": media_id,
  150. "media_type": media_type,
  151. "media_length": media_length,
  152. "created_ts": time_now_ms,
  153. "upload_name": upload_name,
  154. "filesystem_id": filesystem_id,
  155. "last_access_ts": time_now_ms,
  156. },
  157. desc="store_cached_remote_media",
  158. )
  159. def update_cached_last_access_time(self, local_media, remote_media, time_ms):
  160. """Updates the last access time of the given media
  161. Args:
  162. local_media (iterable[str]): Set of media_ids
  163. remote_media (iterable[(str, str)]): Set of (server_name, media_id)
  164. time_ms: Current time in milliseconds
  165. """
  166. def update_cache_txn(txn):
  167. sql = (
  168. "UPDATE remote_media_cache SET last_access_ts = ?"
  169. " WHERE media_origin = ? AND media_id = ?"
  170. )
  171. txn.executemany(sql, (
  172. (time_ms, media_origin, media_id)
  173. for media_origin, media_id in remote_media
  174. ))
  175. sql = (
  176. "UPDATE local_media_repository SET last_access_ts = ?"
  177. " WHERE media_id = ?"
  178. )
  179. txn.executemany(sql, (
  180. (time_ms, media_id)
  181. for media_id in local_media
  182. ))
  183. return self.runInteraction("update_cached_last_access_time", update_cache_txn)
  184. def get_remote_media_thumbnails(self, origin, media_id):
  185. return self._simple_select_list(
  186. "remote_media_cache_thumbnails",
  187. {"media_origin": origin, "media_id": media_id},
  188. (
  189. "thumbnail_width", "thumbnail_height", "thumbnail_method",
  190. "thumbnail_type", "thumbnail_length", "filesystem_id",
  191. ),
  192. desc="get_remote_media_thumbnails",
  193. )
  194. def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
  195. thumbnail_width, thumbnail_height,
  196. thumbnail_type, thumbnail_method,
  197. thumbnail_length):
  198. return self._simple_insert(
  199. "remote_media_cache_thumbnails",
  200. {
  201. "media_origin": origin,
  202. "media_id": media_id,
  203. "thumbnail_width": thumbnail_width,
  204. "thumbnail_height": thumbnail_height,
  205. "thumbnail_method": thumbnail_method,
  206. "thumbnail_type": thumbnail_type,
  207. "thumbnail_length": thumbnail_length,
  208. "filesystem_id": filesystem_id,
  209. },
  210. desc="store_remote_media_thumbnail",
  211. )
  212. def get_remote_media_before(self, before_ts):
  213. sql = (
  214. "SELECT media_origin, media_id, filesystem_id"
  215. " FROM remote_media_cache"
  216. " WHERE last_access_ts < ?"
  217. )
  218. return self._execute(
  219. "get_remote_media_before", self.cursor_to_dict, sql, before_ts
  220. )
  221. def delete_remote_media(self, media_origin, media_id):
  222. def delete_remote_media_txn(txn):
  223. self._simple_delete_txn(
  224. txn,
  225. "remote_media_cache",
  226. keyvalues={
  227. "media_origin": media_origin, "media_id": media_id
  228. },
  229. )
  230. self._simple_delete_txn(
  231. txn,
  232. "remote_media_cache_thumbnails",
  233. keyvalues={
  234. "media_origin": media_origin, "media_id": media_id
  235. },
  236. )
  237. return self.runInteraction("delete_remote_media", delete_remote_media_txn)
  238. def get_expired_url_cache(self, now_ts):
  239. sql = (
  240. "SELECT media_id FROM local_media_repository_url_cache"
  241. " WHERE expires_ts < ?"
  242. " ORDER BY expires_ts ASC"
  243. " LIMIT 500"
  244. )
  245. def _get_expired_url_cache_txn(txn):
  246. txn.execute(sql, (now_ts,))
  247. return [row[0] for row in txn]
  248. return self.runInteraction("get_expired_url_cache", _get_expired_url_cache_txn)
  249. def delete_url_cache(self, media_ids):
  250. if len(media_ids) == 0:
  251. return
  252. sql = (
  253. "DELETE FROM local_media_repository_url_cache"
  254. " WHERE media_id = ?"
  255. )
  256. def _delete_url_cache_txn(txn):
  257. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  258. return self.runInteraction("delete_url_cache", _delete_url_cache_txn)
  259. def get_url_cache_media_before(self, before_ts):
  260. sql = (
  261. "SELECT media_id FROM local_media_repository"
  262. " WHERE created_ts < ? AND url_cache IS NOT NULL"
  263. " ORDER BY created_ts ASC"
  264. " LIMIT 500"
  265. )
  266. def _get_url_cache_media_before_txn(txn):
  267. txn.execute(sql, (before_ts,))
  268. return [row[0] for row in txn]
  269. return self.runInteraction(
  270. "get_url_cache_media_before", _get_url_cache_media_before_txn,
  271. )
  272. def delete_url_cache_media(self, media_ids):
  273. if len(media_ids) == 0:
  274. return
  275. def _delete_url_cache_media_txn(txn):
  276. sql = (
  277. "DELETE FROM local_media_repository"
  278. " WHERE media_id = ?"
  279. )
  280. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  281. sql = (
  282. "DELETE FROM local_media_repository_thumbnails"
  283. " WHERE media_id = ?"
  284. )
  285. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  286. return self.runInteraction(
  287. "delete_url_cache_media", _delete_url_cache_media_txn,
  288. )