media_repository.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.storage._base import SQLBaseStore
  16. from synapse.storage.database import Database
  17. class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
  18. def __init__(self, database: Database, db_conn, hs):
  19. super(MediaRepositoryBackgroundUpdateStore, self).__init__(
  20. database, db_conn, hs
  21. )
  22. self.db.updates.register_background_index_update(
  23. update_name="local_media_repository_url_idx",
  24. index_name="local_media_repository_url_idx",
  25. table="local_media_repository",
  26. columns=["created_ts"],
  27. where_clause="url_cache IS NOT NULL",
  28. )
  29. class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
  30. """Persistence for attachments and avatars"""
  31. def __init__(self, database: Database, db_conn, hs):
  32. super(MediaRepositoryStore, self).__init__(database, db_conn, hs)
  33. def get_local_media(self, media_id):
  34. """Get the metadata for a local piece of media
  35. Returns:
  36. None if the media_id doesn't exist.
  37. """
  38. return self.db.simple_select_one(
  39. "local_media_repository",
  40. {"media_id": media_id},
  41. (
  42. "media_type",
  43. "media_length",
  44. "upload_name",
  45. "created_ts",
  46. "quarantined_by",
  47. "url_cache",
  48. ),
  49. allow_none=True,
  50. desc="get_local_media",
  51. )
  52. def store_local_media(
  53. self,
  54. media_id,
  55. media_type,
  56. time_now_ms,
  57. upload_name,
  58. media_length,
  59. user_id,
  60. url_cache=None,
  61. ):
  62. return self.db.simple_insert(
  63. "local_media_repository",
  64. {
  65. "media_id": media_id,
  66. "media_type": media_type,
  67. "created_ts": time_now_ms,
  68. "upload_name": upload_name,
  69. "media_length": media_length,
  70. "user_id": user_id.to_string(),
  71. "url_cache": url_cache,
  72. },
  73. desc="store_local_media",
  74. )
  75. def get_url_cache(self, url, ts):
  76. """Get the media_id and ts for a cached URL as of the given timestamp
  77. Returns:
  78. None if the URL isn't cached.
  79. """
  80. def get_url_cache_txn(txn):
  81. # get the most recently cached result (relative to the given ts)
  82. sql = (
  83. "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
  84. " FROM local_media_repository_url_cache"
  85. " WHERE url = ? AND download_ts <= ?"
  86. " ORDER BY download_ts DESC LIMIT 1"
  87. )
  88. txn.execute(sql, (url, ts))
  89. row = txn.fetchone()
  90. if not row:
  91. # ...or if we've requested a timestamp older than the oldest
  92. # copy in the cache, return the oldest copy (if any)
  93. sql = (
  94. "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
  95. " FROM local_media_repository_url_cache"
  96. " WHERE url = ? AND download_ts > ?"
  97. " ORDER BY download_ts ASC LIMIT 1"
  98. )
  99. txn.execute(sql, (url, ts))
  100. row = txn.fetchone()
  101. if not row:
  102. return None
  103. return dict(
  104. zip(
  105. (
  106. "response_code",
  107. "etag",
  108. "expires_ts",
  109. "og",
  110. "media_id",
  111. "download_ts",
  112. ),
  113. row,
  114. )
  115. )
  116. return self.db.runInteraction("get_url_cache", get_url_cache_txn)
  117. def store_url_cache(
  118. self, url, response_code, etag, expires_ts, og, media_id, download_ts
  119. ):
  120. return self.db.simple_insert(
  121. "local_media_repository_url_cache",
  122. {
  123. "url": url,
  124. "response_code": response_code,
  125. "etag": etag,
  126. "expires_ts": expires_ts,
  127. "og": og,
  128. "media_id": media_id,
  129. "download_ts": download_ts,
  130. },
  131. desc="store_url_cache",
  132. )
  133. def get_local_media_thumbnails(self, media_id):
  134. return self.db.simple_select_list(
  135. "local_media_repository_thumbnails",
  136. {"media_id": media_id},
  137. (
  138. "thumbnail_width",
  139. "thumbnail_height",
  140. "thumbnail_method",
  141. "thumbnail_type",
  142. "thumbnail_length",
  143. ),
  144. desc="get_local_media_thumbnails",
  145. )
  146. def store_local_thumbnail(
  147. self,
  148. media_id,
  149. thumbnail_width,
  150. thumbnail_height,
  151. thumbnail_type,
  152. thumbnail_method,
  153. thumbnail_length,
  154. ):
  155. return self.db.simple_insert(
  156. "local_media_repository_thumbnails",
  157. {
  158. "media_id": media_id,
  159. "thumbnail_width": thumbnail_width,
  160. "thumbnail_height": thumbnail_height,
  161. "thumbnail_method": thumbnail_method,
  162. "thumbnail_type": thumbnail_type,
  163. "thumbnail_length": thumbnail_length,
  164. },
  165. desc="store_local_thumbnail",
  166. )
  167. def get_cached_remote_media(self, origin, media_id):
  168. return self.db.simple_select_one(
  169. "remote_media_cache",
  170. {"media_origin": origin, "media_id": media_id},
  171. (
  172. "media_type",
  173. "media_length",
  174. "upload_name",
  175. "created_ts",
  176. "filesystem_id",
  177. "quarantined_by",
  178. ),
  179. allow_none=True,
  180. desc="get_cached_remote_media",
  181. )
  182. def store_cached_remote_media(
  183. self,
  184. origin,
  185. media_id,
  186. media_type,
  187. media_length,
  188. time_now_ms,
  189. upload_name,
  190. filesystem_id,
  191. ):
  192. return self.db.simple_insert(
  193. "remote_media_cache",
  194. {
  195. "media_origin": origin,
  196. "media_id": media_id,
  197. "media_type": media_type,
  198. "media_length": media_length,
  199. "created_ts": time_now_ms,
  200. "upload_name": upload_name,
  201. "filesystem_id": filesystem_id,
  202. "last_access_ts": time_now_ms,
  203. },
  204. desc="store_cached_remote_media",
  205. )
  206. def update_cached_last_access_time(self, local_media, remote_media, time_ms):
  207. """Updates the last access time of the given media
  208. Args:
  209. local_media (iterable[str]): Set of media_ids
  210. remote_media (iterable[(str, str)]): Set of (server_name, media_id)
  211. time_ms: Current time in milliseconds
  212. """
  213. def update_cache_txn(txn):
  214. sql = (
  215. "UPDATE remote_media_cache SET last_access_ts = ?"
  216. " WHERE media_origin = ? AND media_id = ?"
  217. )
  218. txn.executemany(
  219. sql,
  220. (
  221. (time_ms, media_origin, media_id)
  222. for media_origin, media_id in remote_media
  223. ),
  224. )
  225. sql = (
  226. "UPDATE local_media_repository SET last_access_ts = ?"
  227. " WHERE media_id = ?"
  228. )
  229. txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
  230. return self.db.runInteraction(
  231. "update_cached_last_access_time", update_cache_txn
  232. )
  233. def get_remote_media_thumbnails(self, origin, media_id):
  234. return self.db.simple_select_list(
  235. "remote_media_cache_thumbnails",
  236. {"media_origin": origin, "media_id": media_id},
  237. (
  238. "thumbnail_width",
  239. "thumbnail_height",
  240. "thumbnail_method",
  241. "thumbnail_type",
  242. "thumbnail_length",
  243. "filesystem_id",
  244. ),
  245. desc="get_remote_media_thumbnails",
  246. )
  247. def store_remote_media_thumbnail(
  248. self,
  249. origin,
  250. media_id,
  251. filesystem_id,
  252. thumbnail_width,
  253. thumbnail_height,
  254. thumbnail_type,
  255. thumbnail_method,
  256. thumbnail_length,
  257. ):
  258. return self.db.simple_insert(
  259. "remote_media_cache_thumbnails",
  260. {
  261. "media_origin": origin,
  262. "media_id": media_id,
  263. "thumbnail_width": thumbnail_width,
  264. "thumbnail_height": thumbnail_height,
  265. "thumbnail_method": thumbnail_method,
  266. "thumbnail_type": thumbnail_type,
  267. "thumbnail_length": thumbnail_length,
  268. "filesystem_id": filesystem_id,
  269. },
  270. desc="store_remote_media_thumbnail",
  271. )
  272. def get_remote_media_before(self, before_ts):
  273. sql = (
  274. "SELECT media_origin, media_id, filesystem_id"
  275. " FROM remote_media_cache"
  276. " WHERE last_access_ts < ?"
  277. )
  278. return self.db.execute(
  279. "get_remote_media_before", self.db.cursor_to_dict, sql, before_ts
  280. )
  281. def delete_remote_media(self, media_origin, media_id):
  282. def delete_remote_media_txn(txn):
  283. self.db.simple_delete_txn(
  284. txn,
  285. "remote_media_cache",
  286. keyvalues={"media_origin": media_origin, "media_id": media_id},
  287. )
  288. self.db.simple_delete_txn(
  289. txn,
  290. "remote_media_cache_thumbnails",
  291. keyvalues={"media_origin": media_origin, "media_id": media_id},
  292. )
  293. return self.db.runInteraction("delete_remote_media", delete_remote_media_txn)
  294. def get_expired_url_cache(self, now_ts):
  295. sql = (
  296. "SELECT media_id FROM local_media_repository_url_cache"
  297. " WHERE expires_ts < ?"
  298. " ORDER BY expires_ts ASC"
  299. " LIMIT 500"
  300. )
  301. def _get_expired_url_cache_txn(txn):
  302. txn.execute(sql, (now_ts,))
  303. return [row[0] for row in txn]
  304. return self.db.runInteraction(
  305. "get_expired_url_cache", _get_expired_url_cache_txn
  306. )
  307. def delete_url_cache(self, media_ids):
  308. if len(media_ids) == 0:
  309. return
  310. sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
  311. def _delete_url_cache_txn(txn):
  312. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  313. return self.db.runInteraction("delete_url_cache", _delete_url_cache_txn)
  314. def get_url_cache_media_before(self, before_ts):
  315. sql = (
  316. "SELECT media_id FROM local_media_repository"
  317. " WHERE created_ts < ? AND url_cache IS NOT NULL"
  318. " ORDER BY created_ts ASC"
  319. " LIMIT 500"
  320. )
  321. def _get_url_cache_media_before_txn(txn):
  322. txn.execute(sql, (before_ts,))
  323. return [row[0] for row in txn]
  324. return self.db.runInteraction(
  325. "get_url_cache_media_before", _get_url_cache_media_before_txn
  326. )
  327. def delete_url_cache_media(self, media_ids):
  328. if len(media_ids) == 0:
  329. return
  330. def _delete_url_cache_media_txn(txn):
  331. sql = "DELETE FROM local_media_repository WHERE media_id = ?"
  332. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  333. sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
  334. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  335. return self.db.runInteraction(
  336. "delete_url_cache_media", _delete_url_cache_media_txn
  337. )