task_scheduler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. # Copyright 2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast
  15. from synapse.storage._base import SQLBaseStore, db_to_json
  16. from synapse.storage.database import (
  17. DatabasePool,
  18. LoggingDatabaseConnection,
  19. LoggingTransaction,
  20. make_in_list_sql_clause,
  21. )
  22. from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
  23. from synapse.util import json_encoder
  24. if TYPE_CHECKING:
  25. from synapse.server import HomeServer
  26. ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str]
  27. class TaskSchedulerWorkerStore(SQLBaseStore):
  28. def __init__(
  29. self,
  30. database: DatabasePool,
  31. db_conn: LoggingDatabaseConnection,
  32. hs: "HomeServer",
  33. ):
  34. super().__init__(database, db_conn, hs)
  35. @staticmethod
  36. def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask:
  37. task_id, action, status, timestamp, resource_id, params, result, error = row
  38. return ScheduledTask(
  39. id=task_id,
  40. action=action,
  41. status=TaskStatus(status),
  42. timestamp=timestamp,
  43. resource_id=resource_id,
  44. params=db_to_json(params) if params is not None else None,
  45. result=db_to_json(result) if result is not None else None,
  46. error=error,
  47. )
  48. async def get_scheduled_tasks(
  49. self,
  50. *,
  51. actions: Optional[List[str]] = None,
  52. resource_id: Optional[str] = None,
  53. statuses: Optional[List[TaskStatus]] = None,
  54. max_timestamp: Optional[int] = None,
  55. limit: Optional[int] = None,
  56. ) -> List[ScheduledTask]:
  57. """Get a list of scheduled tasks from the DB.
  58. Args:
  59. actions: Limit the returned tasks to those specific action names
  60. resource_id: Limit the returned tasks to the specific resource id, if specified
  61. statuses: Limit the returned tasks to the specific statuses
  62. max_timestamp: Limit the returned tasks to the ones that have
  63. a timestamp inferior to the specified one
  64. limit: Only return `limit` number of rows if set.
  65. Returns: a list of `ScheduledTask`, ordered by increasing timestamps
  66. """
  67. def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]:
  68. clauses: List[str] = []
  69. args: List[Any] = []
  70. if resource_id:
  71. clauses.append("resource_id = ?")
  72. args.append(resource_id)
  73. if actions is not None:
  74. clause, temp_args = make_in_list_sql_clause(
  75. txn.database_engine, "action", actions
  76. )
  77. clauses.append(clause)
  78. args.extend(temp_args)
  79. if statuses is not None:
  80. clause, temp_args = make_in_list_sql_clause(
  81. txn.database_engine, "status", statuses
  82. )
  83. clauses.append(clause)
  84. args.extend(temp_args)
  85. if max_timestamp is not None:
  86. clauses.append("timestamp <= ?")
  87. args.append(max_timestamp)
  88. sql = "SELECT * FROM scheduled_tasks"
  89. if clauses:
  90. sql = sql + " WHERE " + " AND ".join(clauses)
  91. sql = sql + " ORDER BY timestamp"
  92. if limit is not None:
  93. sql += " LIMIT ?"
  94. args.append(limit)
  95. txn.execute(sql, args)
  96. return cast(List[ScheduledTaskRow], txn.fetchall())
  97. rows = await self.db_pool.runInteraction(
  98. "get_scheduled_tasks", get_scheduled_tasks_txn
  99. )
  100. return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
  101. async def insert_scheduled_task(self, task: ScheduledTask) -> None:
  102. """Insert a specified `ScheduledTask` in the DB.
  103. Args:
  104. task: the `ScheduledTask` to insert
  105. """
  106. await self.db_pool.simple_insert(
  107. "scheduled_tasks",
  108. {
  109. "id": task.id,
  110. "action": task.action,
  111. "status": task.status,
  112. "timestamp": task.timestamp,
  113. "resource_id": task.resource_id,
  114. "params": None
  115. if task.params is None
  116. else json_encoder.encode(task.params),
  117. "result": None
  118. if task.result is None
  119. else json_encoder.encode(task.result),
  120. "error": task.error,
  121. },
  122. desc="insert_scheduled_task",
  123. )
  124. async def update_scheduled_task(
  125. self,
  126. id: str,
  127. timestamp: int,
  128. *,
  129. status: Optional[TaskStatus] = None,
  130. result: Optional[JsonMapping] = None,
  131. error: Optional[str] = None,
  132. ) -> bool:
  133. """Update a scheduled task in the DB with some new value(s).
  134. Args:
  135. id: id of the `ScheduledTask` to update
  136. timestamp: new timestamp of the task
  137. status: new status of the task
  138. result: new result of the task
  139. error: new error of the task
  140. Returns: `False` if no matching row was found, `True` otherwise
  141. """
  142. updatevalues: JsonDict = {"timestamp": timestamp}
  143. if status is not None:
  144. updatevalues["status"] = status
  145. if result is not None:
  146. updatevalues["result"] = json_encoder.encode(result)
  147. if error is not None:
  148. updatevalues["error"] = error
  149. nb_rows = await self.db_pool.simple_update(
  150. "scheduled_tasks",
  151. {"id": id},
  152. updatevalues,
  153. desc="update_scheduled_task",
  154. )
  155. return nb_rows > 0
  156. async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
  157. """Get a specific `ScheduledTask` from its id.
  158. Args:
  159. id: the id of the task to retrieve
  160. Returns: the task if available, `None` otherwise
  161. """
  162. row = await self.db_pool.simple_select_one(
  163. table="scheduled_tasks",
  164. keyvalues={"id": id},
  165. retcols=(
  166. "id",
  167. "action",
  168. "status",
  169. "timestamp",
  170. "resource_id",
  171. "params",
  172. "result",
  173. "error",
  174. ),
  175. allow_none=True,
  176. desc="get_scheduled_task",
  177. )
  178. return (
  179. TaskSchedulerWorkerStore._convert_row_to_task(
  180. (
  181. row["id"],
  182. row["action"],
  183. row["status"],
  184. row["timestamp"],
  185. row["resource_id"],
  186. row["params"],
  187. row["result"],
  188. row["error"],
  189. )
  190. )
  191. if row
  192. else None
  193. )
  194. async def delete_scheduled_task(self, id: str) -> None:
  195. """Delete a specific task from its id.
  196. Args:
  197. id: the id of the task to delete
  198. """
  199. await self.db_pool.simple_delete(
  200. "scheduled_tasks",
  201. keyvalues={"id": id},
  202. desc="delete_scheduled_task",
  203. )