1
0

ShardedQueryBuilder.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
  5. * SPDX-License-Identifier: AGPL-3.0-or-later
  6. */
  7. namespace OC\DB\QueryBuilder\Sharded;
  8. use OC\DB\QueryBuilder\CompositeExpression;
  9. use OC\DB\QueryBuilder\ExtendedQueryBuilder;
  10. use OC\DB\QueryBuilder\Parameter;
  11. use OCP\DB\IResult;
  12. use OCP\DB\QueryBuilder\IQueryBuilder;
  13. use OCP\IDBConnection;
  14. /**
  15. * A special query builder that automatically distributes queries over multiple database shards.
  16. *
  17. * This relies on `PartitionedQueryBuilder` to handle splitting of parts of the query that touch the sharded tables
  18. * from the non-sharded tables. So the query build here should only either touch only sharded table or only non-sharded tables.
  19. *
  20. * Most of the logic in this class is concerned with extracting either the shard key (e.g. "storage") or primary key (e.g. "fileid")
  21. * from the query. The logic for actually running the query across the shards is mostly delegated to `ShardQueryRunner`.
  22. */
  23. class ShardedQueryBuilder extends ExtendedQueryBuilder {
  24. private array $shardKeys = [];
  25. private array $primaryKeys = [];
  26. private ?ShardDefinition $shardDefinition = null;
  27. /** @var bool Run the query across all shards */
  28. private bool $allShards = false;
  29. private ?string $insertTable = null;
  30. private mixed $lastInsertId = null;
  31. private ?IDBConnection $lastInsertConnection = null;
  32. private ?int $updateShardKey = null;
  33. private ?int $limit = null;
  34. private ?int $offset = null;
  35. /** @var array{column: string, order: string}[] */
  36. private array $sortList = [];
  37. private string $mainTable = '';
  38. public function __construct(
  39. IQueryBuilder $builder,
  40. protected array $shardDefinitions,
  41. protected ShardConnectionManager $shardConnectionManager,
  42. protected AutoIncrementHandler $autoIncrementHandler,
  43. ) {
  44. parent::__construct($builder);
  45. }
  46. public function getShardKeys(): array {
  47. return $this->getKeyValues($this->shardKeys);
  48. }
  49. public function getPrimaryKeys(): array {
  50. return $this->getKeyValues($this->primaryKeys);
  51. }
  52. private function getKeyValues(array $keys): array {
  53. $values = [];
  54. foreach ($keys as $key) {
  55. $values = array_merge($values, $this->getKeyValue($key));
  56. }
  57. return array_values(array_unique($values));
  58. }
  59. private function getKeyValue($value): array {
  60. if ($value instanceof Parameter) {
  61. $value = (string)$value;
  62. }
  63. if (is_string($value) && str_starts_with($value, ':')) {
  64. $param = $this->getParameter(substr($value, 1));
  65. if (is_array($param)) {
  66. return $param;
  67. } else {
  68. return [$param];
  69. }
  70. } elseif ($value !== null) {
  71. return [$value];
  72. } else {
  73. return [];
  74. }
  75. }
  76. public function where(...$predicates) {
  77. return $this->andWhere(...$predicates);
  78. }
  79. public function andWhere(...$where) {
  80. if ($where) {
  81. foreach ($where as $predicate) {
  82. $this->tryLoadShardKey($predicate);
  83. }
  84. parent::andWhere(...$where);
  85. }
  86. return $this;
  87. }
  88. private function tryLoadShardKey($predicate): void {
  89. if (!$this->shardDefinition) {
  90. return;
  91. }
  92. if ($keys = $this->tryExtractShardKeys($predicate, $this->shardDefinition->shardKey)) {
  93. $this->shardKeys += $keys;
  94. }
  95. if ($keys = $this->tryExtractShardKeys($predicate, $this->shardDefinition->primaryKey)) {
  96. $this->primaryKeys += $keys;
  97. }
  98. foreach ($this->shardDefinition->companionKeys as $companionKey) {
  99. if ($keys = $this->tryExtractShardKeys($predicate, $companionKey)) {
  100. $this->primaryKeys += $keys;
  101. }
  102. }
  103. }
  104. /**
  105. * @param $predicate
  106. * @param string $column
  107. * @return string[]
  108. */
  109. private function tryExtractShardKeys($predicate, string $column): array {
  110. if ($predicate instanceof CompositeExpression) {
  111. $values = [];
  112. foreach ($predicate->getParts() as $part) {
  113. $partValues = $this->tryExtractShardKeys($part, $column);
  114. // for OR expressions, we can only rely on the predicate if all parts contain the comparison
  115. if ($predicate->getType() === CompositeExpression::TYPE_OR && !$partValues) {
  116. return [];
  117. }
  118. $values = array_merge($values, $partValues);
  119. }
  120. return $values;
  121. }
  122. $predicate = (string)$predicate;
  123. // expect a condition in the form of 'alias1.column1 = placeholder' or 'alias1.column1 in placeholder'
  124. if (substr_count($predicate, ' ') > 2) {
  125. return [];
  126. }
  127. if (str_contains($predicate, ' = ')) {
  128. $parts = explode(' = ', $predicate);
  129. if ($parts[0] === "`{$column}`" || str_ends_with($parts[0], "`.`{$column}`")) {
  130. return [$parts[1]];
  131. } else {
  132. return [];
  133. }
  134. }
  135. if (str_contains($predicate, ' IN ')) {
  136. $parts = explode(' IN ', $predicate);
  137. if ($parts[0] === "`{$column}`" || str_ends_with($parts[0], "`.`{$column}`")) {
  138. return [trim(trim($parts[1], '('), ')')];
  139. } else {
  140. return [];
  141. }
  142. }
  143. return [];
  144. }
  145. public function set($key, $value) {
  146. if ($this->shardDefinition && $key === $this->shardDefinition->shardKey) {
  147. $updateShardKey = $value;
  148. }
  149. return parent::set($key, $value);
  150. }
  151. public function setValue($column, $value) {
  152. if ($this->shardDefinition) {
  153. if ($this->shardDefinition->isKey($column)) {
  154. $this->primaryKeys[] = $value;
  155. }
  156. if ($column === $this->shardDefinition->shardKey) {
  157. $this->shardKeys[] = $value;
  158. }
  159. }
  160. return parent::setValue($column, $value);
  161. }
  162. public function values(array $values) {
  163. foreach ($values as $column => $value) {
  164. $this->setValue($column, $value);
  165. }
  166. return $this;
  167. }
  168. private function actOnTable(string $table): void {
  169. $this->mainTable = $table;
  170. foreach ($this->shardDefinitions as $shardDefinition) {
  171. if ($shardDefinition->hasTable($table)) {
  172. $this->shardDefinition = $shardDefinition;
  173. }
  174. }
  175. }
  176. public function from($from, $alias = null) {
  177. if (is_string($from) && $from) {
  178. $this->actOnTable($from);
  179. }
  180. return parent::from($from, $alias);
  181. }
  182. public function update($update = null, $alias = null) {
  183. if (is_string($update) && $update) {
  184. $this->actOnTable($update);
  185. }
  186. return parent::update($update, $alias);
  187. }
  188. public function insert($insert = null) {
  189. if (is_string($insert) && $insert) {
  190. $this->insertTable = $insert;
  191. $this->actOnTable($insert);
  192. }
  193. return parent::insert($insert);
  194. }
  195. public function delete($delete = null, $alias = null) {
  196. if (is_string($delete) && $delete) {
  197. $this->actOnTable($delete);
  198. }
  199. return parent::delete($delete, $alias);
  200. }
  201. private function checkJoin(string $table): void {
  202. if ($this->shardDefinition) {
  203. if ($table === $this->mainTable) {
  204. throw new InvalidShardedQueryException("Sharded query on {$this->mainTable} isn't allowed to join on itself");
  205. }
  206. if (!$this->shardDefinition->hasTable($table)) {
  207. // this generally shouldn't happen as the partitioning logic should prevent this
  208. // but the check is here just in case
  209. throw new InvalidShardedQueryException("Sharded query on {$this->shardDefinition->table} isn't allowed to join on $table");
  210. }
  211. }
  212. }
  213. public function innerJoin($fromAlias, $join, $alias, $condition = null) {
  214. $this->checkJoin($join);
  215. return parent::innerJoin($fromAlias, $join, $alias, $condition);
  216. }
  217. public function leftJoin($fromAlias, $join, $alias, $condition = null) {
  218. $this->checkJoin($join);
  219. return parent::leftJoin($fromAlias, $join, $alias, $condition);
  220. }
  221. public function rightJoin($fromAlias, $join, $alias, $condition = null) {
  222. if ($this->shardDefinition) {
  223. throw new InvalidShardedQueryException("Sharded query on {$this->shardDefinition->table} isn't allowed to right join");
  224. }
  225. return parent::rightJoin($fromAlias, $join, $alias, $condition);
  226. }
  227. public function join($fromAlias, $join, $alias, $condition = null) {
  228. return $this->innerJoin($fromAlias, $join, $alias, $condition);
  229. }
  230. public function setMaxResults($maxResults) {
  231. if ($maxResults > 0) {
  232. $this->limit = (int)$maxResults;
  233. }
  234. return parent::setMaxResults($maxResults);
  235. }
  236. public function setFirstResult($firstResult) {
  237. if ($firstResult > 0) {
  238. $this->offset = (int)$firstResult;
  239. }
  240. if ($this->shardDefinition && count($this->shardDefinition->shards) > 1) {
  241. // we have to emulate offset
  242. return $this;
  243. } else {
  244. return parent::setFirstResult($firstResult);
  245. }
  246. }
  247. public function addOrderBy($sort, $order = null) {
  248. $this->registerOrder((string)$sort, (string)$order ?? 'ASC');
  249. return parent::orderBy($sort, $order);
  250. }
  251. public function orderBy($sort, $order = null) {
  252. $this->sortList = [];
  253. $this->registerOrder((string)$sort, (string)$order ?? 'ASC');
  254. return parent::orderBy($sort, $order);
  255. }
  256. private function registerOrder(string $column, string $order): void {
  257. // handle `mime + 0` and similar by just sorting on the first part of the expression
  258. [$column] = explode(' ', $column);
  259. $column = trim($column, '`');
  260. $this->sortList[] = [
  261. 'column' => $column,
  262. 'order' => strtoupper($order),
  263. ];
  264. }
  265. public function hintShardKey(string $column, mixed $value, bool $overwrite = false): self {
  266. if ($overwrite) {
  267. $this->primaryKeys = [];
  268. $this->shardKeys = [];
  269. }
  270. if ($this->shardDefinition?->isKey($column)) {
  271. $this->primaryKeys[] = $value;
  272. }
  273. if ($column === $this->shardDefinition?->shardKey) {
  274. $this->shardKeys[] = $value;
  275. }
  276. return $this;
  277. }
  278. public function runAcrossAllShards(): self {
  279. $this->allShards = true;
  280. return $this;
  281. }
  282. /**
  283. * @throws InvalidShardedQueryException
  284. */
  285. public function validate(): void {
  286. if ($this->shardDefinition && $this->insertTable) {
  287. if ($this->allShards) {
  288. throw new InvalidShardedQueryException("Can't insert across all shards");
  289. }
  290. if (empty($this->getShardKeys())) {
  291. throw new InvalidShardedQueryException("Can't insert without shard key");
  292. }
  293. }
  294. if ($this->shardDefinition && !$this->allShards) {
  295. if (empty($this->getShardKeys()) && empty($this->getPrimaryKeys())) {
  296. throw new InvalidShardedQueryException('No shard key or primary key set for query');
  297. }
  298. }
  299. if ($this->shardDefinition && $this->updateShardKey) {
  300. $newShardKey = $this->getKeyValue($this->updateShardKey);
  301. $oldShardKeys = $this->getShardKeys();
  302. if (count($newShardKey) !== 1) {
  303. throw new InvalidShardedQueryException("Can't set shard key to an array");
  304. }
  305. $newShardKey = current($newShardKey);
  306. if (empty($oldShardKeys)) {
  307. throw new InvalidShardedQueryException("Can't update without shard key");
  308. }
  309. $oldShards = array_values(array_unique(array_map(function ($shardKey) {
  310. return $this->shardDefinition->getShardForKey((int)$shardKey);
  311. }, $oldShardKeys)));
  312. $newShard = $this->shardDefinition->getShardForKey((int)$newShardKey);
  313. if ($oldShards === [$newShard]) {
  314. throw new InvalidShardedQueryException('Update statement would move rows to a different shard');
  315. }
  316. }
  317. }
  318. public function executeQuery(?IDBConnection $connection = null): IResult {
  319. $this->validate();
  320. if ($this->shardDefinition) {
  321. $runner = new ShardQueryRunner($this->shardConnectionManager, $this->shardDefinition);
  322. return $runner->executeQuery($this->builder, $this->allShards, $this->getShardKeys(), $this->getPrimaryKeys(), $this->sortList, $this->limit, $this->offset);
  323. }
  324. return parent::executeQuery($connection);
  325. }
  326. public function executeStatement(?IDBConnection $connection = null): int {
  327. $this->validate();
  328. if ($this->shardDefinition) {
  329. $runner = new ShardQueryRunner($this->shardConnectionManager, $this->shardDefinition);
  330. if ($this->insertTable) {
  331. $shards = $runner->getShards($this->allShards, $this->getShardKeys());
  332. if (!$shards) {
  333. throw new InvalidShardedQueryException("Can't insert without shard key");
  334. }
  335. $count = 0;
  336. foreach ($shards as $shard) {
  337. $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
  338. if (!$this->primaryKeys && $this->shardDefinition->table === $this->insertTable) {
  339. $id = $this->autoIncrementHandler->getNextPrimaryKey($this->shardDefinition, $shard);
  340. parent::setValue($this->shardDefinition->primaryKey, $this->createParameter('__generated_primary_key'));
  341. $this->setParameter('__generated_primary_key', $id, self::PARAM_INT);
  342. $this->lastInsertId = $id;
  343. }
  344. $count += parent::executeStatement($shardConnection);
  345. $this->lastInsertConnection = $shardConnection;
  346. }
  347. return $count;
  348. } else {
  349. return $runner->executeStatement($this->builder, $this->allShards, $this->getShardKeys(), $this->getPrimaryKeys());
  350. }
  351. }
  352. return parent::executeStatement($connection);
  353. }
  354. public function getLastInsertId(): int {
  355. if ($this->lastInsertId) {
  356. return $this->lastInsertId;
  357. }
  358. if ($this->lastInsertConnection) {
  359. $table = $this->builder->prefixTableName($this->insertTable);
  360. return $this->lastInsertConnection->lastInsertId($table);
  361. } else {
  362. return parent::getLastInsertId();
  363. }
  364. }
  365. }