123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- # Copyright 2023 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast
- from synapse.storage._base import SQLBaseStore, db_to_json
- from synapse.storage.database import (
- DatabasePool,
- LoggingDatabaseConnection,
- LoggingTransaction,
- make_in_list_sql_clause,
- )
- from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
- from synapse.util import json_encoder
- if TYPE_CHECKING:
- from synapse.server import HomeServer
- ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str]
- class TaskSchedulerWorkerStore(SQLBaseStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
- @staticmethod
- def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask:
- task_id, action, status, timestamp, resource_id, params, result, error = row
- return ScheduledTask(
- id=task_id,
- action=action,
- status=TaskStatus(status),
- timestamp=timestamp,
- resource_id=resource_id,
- params=db_to_json(params) if params is not None else None,
- result=db_to_json(result) if result is not None else None,
- error=error,
- )
- async def get_scheduled_tasks(
- self,
- *,
- actions: Optional[List[str]] = None,
- resource_id: Optional[str] = None,
- statuses: Optional[List[TaskStatus]] = None,
- max_timestamp: Optional[int] = None,
- limit: Optional[int] = None,
- ) -> List[ScheduledTask]:
- """Get a list of scheduled tasks from the DB.
- Args:
- actions: Limit the returned tasks to those specific action names
- resource_id: Limit the returned tasks to the specific resource id, if specified
- statuses: Limit the returned tasks to the specific statuses
- max_timestamp: Limit the returned tasks to the ones that have
- a timestamp inferior to the specified one
- limit: Only return `limit` number of rows if set.
- Returns: a list of `ScheduledTask`, ordered by increasing timestamps
- """
- def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]:
- clauses: List[str] = []
- args: List[Any] = []
- if resource_id:
- clauses.append("resource_id = ?")
- args.append(resource_id)
- if actions is not None:
- clause, temp_args = make_in_list_sql_clause(
- txn.database_engine, "action", actions
- )
- clauses.append(clause)
- args.extend(temp_args)
- if statuses is not None:
- clause, temp_args = make_in_list_sql_clause(
- txn.database_engine, "status", statuses
- )
- clauses.append(clause)
- args.extend(temp_args)
- if max_timestamp is not None:
- clauses.append("timestamp <= ?")
- args.append(max_timestamp)
- sql = "SELECT * FROM scheduled_tasks"
- if clauses:
- sql = sql + " WHERE " + " AND ".join(clauses)
- sql = sql + " ORDER BY timestamp"
- if limit is not None:
- sql += " LIMIT ?"
- args.append(limit)
- txn.execute(sql, args)
- return cast(List[ScheduledTaskRow], txn.fetchall())
- rows = await self.db_pool.runInteraction(
- "get_scheduled_tasks", get_scheduled_tasks_txn
- )
- return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
- async def insert_scheduled_task(self, task: ScheduledTask) -> None:
- """Insert a specified `ScheduledTask` in the DB.
- Args:
- task: the `ScheduledTask` to insert
- """
- await self.db_pool.simple_insert(
- "scheduled_tasks",
- {
- "id": task.id,
- "action": task.action,
- "status": task.status,
- "timestamp": task.timestamp,
- "resource_id": task.resource_id,
- "params": None
- if task.params is None
- else json_encoder.encode(task.params),
- "result": None
- if task.result is None
- else json_encoder.encode(task.result),
- "error": task.error,
- },
- desc="insert_scheduled_task",
- )
- async def update_scheduled_task(
- self,
- id: str,
- timestamp: int,
- *,
- status: Optional[TaskStatus] = None,
- result: Optional[JsonMapping] = None,
- error: Optional[str] = None,
- ) -> bool:
- """Update a scheduled task in the DB with some new value(s).
- Args:
- id: id of the `ScheduledTask` to update
- timestamp: new timestamp of the task
- status: new status of the task
- result: new result of the task
- error: new error of the task
- Returns: `False` if no matching row was found, `True` otherwise
- """
- updatevalues: JsonDict = {"timestamp": timestamp}
- if status is not None:
- updatevalues["status"] = status
- if result is not None:
- updatevalues["result"] = json_encoder.encode(result)
- if error is not None:
- updatevalues["error"] = error
- nb_rows = await self.db_pool.simple_update(
- "scheduled_tasks",
- {"id": id},
- updatevalues,
- desc="update_scheduled_task",
- )
- return nb_rows > 0
- async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
- """Get a specific `ScheduledTask` from its id.
- Args:
- id: the id of the task to retrieve
- Returns: the task if available, `None` otherwise
- """
- row = await self.db_pool.simple_select_one(
- table="scheduled_tasks",
- keyvalues={"id": id},
- retcols=(
- "id",
- "action",
- "status",
- "timestamp",
- "resource_id",
- "params",
- "result",
- "error",
- ),
- allow_none=True,
- desc="get_scheduled_task",
- )
- return (
- TaskSchedulerWorkerStore._convert_row_to_task(
- (
- row["id"],
- row["action"],
- row["status"],
- row["timestamp"],
- row["resource_id"],
- row["params"],
- row["result"],
- row["error"],
- )
- )
- if row
- else None
- )
- async def delete_scheduled_task(self, id: str) -> None:
- """Delete a specific task from its id.
- Args:
- id: the id of the task to delete
- """
- await self.db_pool.simple_delete(
- "scheduled_tasks",
- keyvalues={"id": id},
- desc="delete_scheduled_task",
- )
|