media_repository.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.storage._base import SQLBaseStore
  16. from synapse.storage.database import DatabasePool
  17. class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
  18. def __init__(self, database: DatabasePool, db_conn, hs):
  19. super(MediaRepositoryBackgroundUpdateStore, self).__init__(
  20. database, db_conn, hs
  21. )
  22. self.db_pool.updates.register_background_index_update(
  23. update_name="local_media_repository_url_idx",
  24. index_name="local_media_repository_url_idx",
  25. table="local_media_repository",
  26. columns=["created_ts"],
  27. where_clause="url_cache IS NOT NULL",
  28. )
  29. class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
  30. """Persistence for attachments and avatars"""
  31. def __init__(self, database: DatabasePool, db_conn, hs):
  32. super(MediaRepositoryStore, self).__init__(database, db_conn, hs)
  33. def get_local_media(self, media_id):
  34. """Get the metadata for a local piece of media
  35. Returns:
  36. None if the media_id doesn't exist.
  37. """
  38. return self.db_pool.simple_select_one(
  39. "local_media_repository",
  40. {"media_id": media_id},
  41. (
  42. "media_type",
  43. "media_length",
  44. "upload_name",
  45. "created_ts",
  46. "quarantined_by",
  47. "url_cache",
  48. ),
  49. allow_none=True,
  50. desc="get_local_media",
  51. )
  52. def store_local_media(
  53. self,
  54. media_id,
  55. media_type,
  56. time_now_ms,
  57. upload_name,
  58. media_length,
  59. user_id,
  60. url_cache=None,
  61. ):
  62. return self.db_pool.simple_insert(
  63. "local_media_repository",
  64. {
  65. "media_id": media_id,
  66. "media_type": media_type,
  67. "created_ts": time_now_ms,
  68. "upload_name": upload_name,
  69. "media_length": media_length,
  70. "user_id": user_id.to_string(),
  71. "url_cache": url_cache,
  72. },
  73. desc="store_local_media",
  74. )
  75. def mark_local_media_as_safe(self, media_id: str):
  76. """Mark a local media as safe from quarantining."""
  77. return self.db_pool.simple_update_one(
  78. table="local_media_repository",
  79. keyvalues={"media_id": media_id},
  80. updatevalues={"safe_from_quarantine": True},
  81. desc="mark_local_media_as_safe",
  82. )
  83. def get_url_cache(self, url, ts):
  84. """Get the media_id and ts for a cached URL as of the given timestamp
  85. Returns:
  86. None if the URL isn't cached.
  87. """
  88. def get_url_cache_txn(txn):
  89. # get the most recently cached result (relative to the given ts)
  90. sql = (
  91. "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
  92. " FROM local_media_repository_url_cache"
  93. " WHERE url = ? AND download_ts <= ?"
  94. " ORDER BY download_ts DESC LIMIT 1"
  95. )
  96. txn.execute(sql, (url, ts))
  97. row = txn.fetchone()
  98. if not row:
  99. # ...or if we've requested a timestamp older than the oldest
  100. # copy in the cache, return the oldest copy (if any)
  101. sql = (
  102. "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
  103. " FROM local_media_repository_url_cache"
  104. " WHERE url = ? AND download_ts > ?"
  105. " ORDER BY download_ts ASC LIMIT 1"
  106. )
  107. txn.execute(sql, (url, ts))
  108. row = txn.fetchone()
  109. if not row:
  110. return None
  111. return dict(
  112. zip(
  113. (
  114. "response_code",
  115. "etag",
  116. "expires_ts",
  117. "og",
  118. "media_id",
  119. "download_ts",
  120. ),
  121. row,
  122. )
  123. )
  124. return self.db_pool.runInteraction("get_url_cache", get_url_cache_txn)
  125. def store_url_cache(
  126. self, url, response_code, etag, expires_ts, og, media_id, download_ts
  127. ):
  128. return self.db_pool.simple_insert(
  129. "local_media_repository_url_cache",
  130. {
  131. "url": url,
  132. "response_code": response_code,
  133. "etag": etag,
  134. "expires_ts": expires_ts,
  135. "og": og,
  136. "media_id": media_id,
  137. "download_ts": download_ts,
  138. },
  139. desc="store_url_cache",
  140. )
  141. def get_local_media_thumbnails(self, media_id):
  142. return self.db_pool.simple_select_list(
  143. "local_media_repository_thumbnails",
  144. {"media_id": media_id},
  145. (
  146. "thumbnail_width",
  147. "thumbnail_height",
  148. "thumbnail_method",
  149. "thumbnail_type",
  150. "thumbnail_length",
  151. ),
  152. desc="get_local_media_thumbnails",
  153. )
  154. def store_local_thumbnail(
  155. self,
  156. media_id,
  157. thumbnail_width,
  158. thumbnail_height,
  159. thumbnail_type,
  160. thumbnail_method,
  161. thumbnail_length,
  162. ):
  163. return self.db_pool.simple_insert(
  164. "local_media_repository_thumbnails",
  165. {
  166. "media_id": media_id,
  167. "thumbnail_width": thumbnail_width,
  168. "thumbnail_height": thumbnail_height,
  169. "thumbnail_method": thumbnail_method,
  170. "thumbnail_type": thumbnail_type,
  171. "thumbnail_length": thumbnail_length,
  172. },
  173. desc="store_local_thumbnail",
  174. )
  175. def get_cached_remote_media(self, origin, media_id):
  176. return self.db_pool.simple_select_one(
  177. "remote_media_cache",
  178. {"media_origin": origin, "media_id": media_id},
  179. (
  180. "media_type",
  181. "media_length",
  182. "upload_name",
  183. "created_ts",
  184. "filesystem_id",
  185. "quarantined_by",
  186. ),
  187. allow_none=True,
  188. desc="get_cached_remote_media",
  189. )
  190. def store_cached_remote_media(
  191. self,
  192. origin,
  193. media_id,
  194. media_type,
  195. media_length,
  196. time_now_ms,
  197. upload_name,
  198. filesystem_id,
  199. ):
  200. return self.db_pool.simple_insert(
  201. "remote_media_cache",
  202. {
  203. "media_origin": origin,
  204. "media_id": media_id,
  205. "media_type": media_type,
  206. "media_length": media_length,
  207. "created_ts": time_now_ms,
  208. "upload_name": upload_name,
  209. "filesystem_id": filesystem_id,
  210. "last_access_ts": time_now_ms,
  211. },
  212. desc="store_cached_remote_media",
  213. )
  214. def update_cached_last_access_time(self, local_media, remote_media, time_ms):
  215. """Updates the last access time of the given media
  216. Args:
  217. local_media (iterable[str]): Set of media_ids
  218. remote_media (iterable[(str, str)]): Set of (server_name, media_id)
  219. time_ms: Current time in milliseconds
  220. """
  221. def update_cache_txn(txn):
  222. sql = (
  223. "UPDATE remote_media_cache SET last_access_ts = ?"
  224. " WHERE media_origin = ? AND media_id = ?"
  225. )
  226. txn.executemany(
  227. sql,
  228. (
  229. (time_ms, media_origin, media_id)
  230. for media_origin, media_id in remote_media
  231. ),
  232. )
  233. sql = (
  234. "UPDATE local_media_repository SET last_access_ts = ?"
  235. " WHERE media_id = ?"
  236. )
  237. txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
  238. return self.db_pool.runInteraction(
  239. "update_cached_last_access_time", update_cache_txn
  240. )
  241. def get_remote_media_thumbnails(self, origin, media_id):
  242. return self.db_pool.simple_select_list(
  243. "remote_media_cache_thumbnails",
  244. {"media_origin": origin, "media_id": media_id},
  245. (
  246. "thumbnail_width",
  247. "thumbnail_height",
  248. "thumbnail_method",
  249. "thumbnail_type",
  250. "thumbnail_length",
  251. "filesystem_id",
  252. ),
  253. desc="get_remote_media_thumbnails",
  254. )
  255. def store_remote_media_thumbnail(
  256. self,
  257. origin,
  258. media_id,
  259. filesystem_id,
  260. thumbnail_width,
  261. thumbnail_height,
  262. thumbnail_type,
  263. thumbnail_method,
  264. thumbnail_length,
  265. ):
  266. return self.db_pool.simple_insert(
  267. "remote_media_cache_thumbnails",
  268. {
  269. "media_origin": origin,
  270. "media_id": media_id,
  271. "thumbnail_width": thumbnail_width,
  272. "thumbnail_height": thumbnail_height,
  273. "thumbnail_method": thumbnail_method,
  274. "thumbnail_type": thumbnail_type,
  275. "thumbnail_length": thumbnail_length,
  276. "filesystem_id": filesystem_id,
  277. },
  278. desc="store_remote_media_thumbnail",
  279. )
  280. def get_remote_media_before(self, before_ts):
  281. sql = (
  282. "SELECT media_origin, media_id, filesystem_id"
  283. " FROM remote_media_cache"
  284. " WHERE last_access_ts < ?"
  285. )
  286. return self.db_pool.execute(
  287. "get_remote_media_before", self.db_pool.cursor_to_dict, sql, before_ts
  288. )
  289. def delete_remote_media(self, media_origin, media_id):
  290. def delete_remote_media_txn(txn):
  291. self.db_pool.simple_delete_txn(
  292. txn,
  293. "remote_media_cache",
  294. keyvalues={"media_origin": media_origin, "media_id": media_id},
  295. )
  296. self.db_pool.simple_delete_txn(
  297. txn,
  298. "remote_media_cache_thumbnails",
  299. keyvalues={"media_origin": media_origin, "media_id": media_id},
  300. )
  301. return self.db_pool.runInteraction(
  302. "delete_remote_media", delete_remote_media_txn
  303. )
  304. def get_expired_url_cache(self, now_ts):
  305. sql = (
  306. "SELECT media_id FROM local_media_repository_url_cache"
  307. " WHERE expires_ts < ?"
  308. " ORDER BY expires_ts ASC"
  309. " LIMIT 500"
  310. )
  311. def _get_expired_url_cache_txn(txn):
  312. txn.execute(sql, (now_ts,))
  313. return [row[0] for row in txn]
  314. return self.db_pool.runInteraction(
  315. "get_expired_url_cache", _get_expired_url_cache_txn
  316. )
  317. async def delete_url_cache(self, media_ids):
  318. if len(media_ids) == 0:
  319. return
  320. sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
  321. def _delete_url_cache_txn(txn):
  322. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  323. return await self.db_pool.runInteraction(
  324. "delete_url_cache", _delete_url_cache_txn
  325. )
  326. def get_url_cache_media_before(self, before_ts):
  327. sql = (
  328. "SELECT media_id FROM local_media_repository"
  329. " WHERE created_ts < ? AND url_cache IS NOT NULL"
  330. " ORDER BY created_ts ASC"
  331. " LIMIT 500"
  332. )
  333. def _get_url_cache_media_before_txn(txn):
  334. txn.execute(sql, (before_ts,))
  335. return [row[0] for row in txn]
  336. return self.db_pool.runInteraction(
  337. "get_url_cache_media_before", _get_url_cache_media_before_txn
  338. )
  339. async def delete_url_cache_media(self, media_ids):
  340. if len(media_ids) == 0:
  341. return
  342. def _delete_url_cache_media_txn(txn):
  343. sql = "DELETE FROM local_media_repository WHERE media_id = ?"
  344. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  345. sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
  346. txn.executemany(sql, [(media_id,) for media_id in media_ids])
  347. return await self.db_pool.runInteraction(
  348. "delete_url_cache_media", _delete_url_cache_media_txn
  349. )