1
0

ShardQueryRunner.php 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
  5. * SPDX-License-Identifier: AGPL-3.0-or-later
  6. */
  7. namespace OC\DB\QueryBuilder\Sharded;
  8. use OC\DB\ArrayResult;
  9. use OCP\DB\IResult;
  10. use OCP\DB\QueryBuilder\IQueryBuilder;
  11. /**
  12. * Logic for running a query across a number of shards, combining the results
  13. */
  14. class ShardQueryRunner {
  15. public function __construct(
  16. private ShardConnectionManager $shardConnectionManager,
  17. private ShardDefinition $shardDefinition,
  18. ) {
  19. }
  20. /**
  21. * Get the shards for a specific query or null if the shards aren't known in advance
  22. *
  23. * @param bool $allShards
  24. * @param int[] $shardKeys
  25. * @return null|int[]
  26. */
  27. public function getShards(bool $allShards, array $shardKeys): ?array {
  28. if ($allShards) {
  29. return $this->shardDefinition->getAllShards();
  30. }
  31. $allConfiguredShards = $this->shardDefinition->getAllShards();
  32. if (count($allConfiguredShards) === 1) {
  33. return $allConfiguredShards;
  34. }
  35. if (empty($shardKeys)) {
  36. return null;
  37. }
  38. $shards = array_map(function ($shardKey) {
  39. return $this->shardDefinition->getShardForKey((int)$shardKey);
  40. }, $shardKeys);
  41. return array_values(array_unique($shards));
  42. }
  43. /**
  44. * Try to get the shards that the keys are likely to be in, based on the shard the row was created
  45. *
  46. * @param int[] $primaryKeys
  47. * @return int[]
  48. */
  49. private function getLikelyShards(array $primaryKeys): array {
  50. $shards = [];
  51. foreach ($primaryKeys as $primaryKey) {
  52. $encodedShard = $primaryKey & ShardDefinition::PRIMARY_KEY_SHARD_MASK;
  53. if ($encodedShard < count($this->shardDefinition->shards) && !in_array($encodedShard, $shards)) {
  54. $shards[] = $encodedShard;
  55. }
  56. }
  57. return $shards;
  58. }
  59. /**
  60. * Execute a SELECT statement across the configured shards
  61. *
  62. * @param IQueryBuilder $query
  63. * @param bool $allShards
  64. * @param int[] $shardKeys
  65. * @param int[] $primaryKeys
  66. * @param array{column: string, order: string}[] $sortList
  67. * @param int|null $limit
  68. * @param int|null $offset
  69. * @return IResult
  70. */
  71. public function executeQuery(
  72. IQueryBuilder $query,
  73. bool $allShards,
  74. array $shardKeys,
  75. array $primaryKeys,
  76. ?array $sortList = null,
  77. ?int $limit = null,
  78. ?int $offset = null,
  79. ): IResult {
  80. $shards = $this->getShards($allShards, $shardKeys);
  81. $results = [];
  82. if ($shards && count($shards) === 1) {
  83. // trivial case
  84. return $query->executeQuery($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0]));
  85. }
  86. // we have to emulate limit and offset, so we select offset+limit from all shards to ensure we have enough rows
  87. // and then filter them down after we merged the results
  88. if ($limit !== null && $offset !== null) {
  89. $query->setMaxResults($limit + $offset);
  90. }
  91. if ($shards) {
  92. // we know exactly what shards we need to query
  93. foreach ($shards as $shard) {
  94. $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
  95. $subResult = $query->executeQuery($shardConnection);
  96. $results = array_merge($results, $subResult->fetchAll());
  97. $subResult->closeCursor();
  98. }
  99. } else {
  100. // we don't know for sure what shards we need to query,
  101. // we first try the shards that are "likely" to have the rows we want, based on the shard that the row was
  102. // originally created in. If we then still haven't found all rows we try the rest of the shards
  103. $likelyShards = $this->getLikelyShards($primaryKeys);
  104. $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards);
  105. $shards = array_merge($likelyShards, $unlikelyShards);
  106. foreach ($shards as $shard) {
  107. $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
  108. $subResult = $query->executeQuery($shardConnection);
  109. $rows = $subResult->fetchAll();
  110. $results = array_merge($results, $rows);
  111. $subResult->closeCursor();
  112. if (count($rows) >= count($primaryKeys)) {
  113. // we have all the rows we're looking for
  114. break;
  115. }
  116. }
  117. }
  118. if ($sortList) {
  119. usort($results, function ($a, $b) use ($sortList) {
  120. foreach ($sortList as $sort) {
  121. $valueA = $a[$sort['column']] ?? null;
  122. $valueB = $b[$sort['column']] ?? null;
  123. $cmp = $valueA <=> $valueB;
  124. if ($cmp === 0) {
  125. continue;
  126. }
  127. if ($sort['order'] === 'DESC') {
  128. $cmp = -$cmp;
  129. }
  130. return $cmp;
  131. }
  132. });
  133. }
  134. if ($limit !== null && $offset !== null) {
  135. $results = array_slice($results, $offset, $limit);
  136. } elseif ($limit !== null) {
  137. $results = array_slice($results, 0, $limit);
  138. } elseif ($offset !== null) {
  139. $results = array_slice($results, $offset);
  140. }
  141. return new ArrayResult($results);
  142. }
  143. /**
  144. * Execute an UPDATE or DELETE statement
  145. *
  146. * @param IQueryBuilder $query
  147. * @param bool $allShards
  148. * @param int[] $shardKeys
  149. * @param int[] $primaryKeys
  150. * @return int
  151. * @throws \OCP\DB\Exception
  152. */
  153. public function executeStatement(IQueryBuilder $query, bool $allShards, array $shardKeys, array $primaryKeys): int {
  154. if ($query->getType() === \Doctrine\DBAL\Query\QueryBuilder::INSERT) {
  155. throw new \Exception('insert queries need special handling');
  156. }
  157. $shards = $this->getShards($allShards, $shardKeys);
  158. $maxCount = count($primaryKeys);
  159. if ($shards && count($shards) === 1) {
  160. return $query->executeStatement($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0]));
  161. } elseif ($shards) {
  162. $maxCount = PHP_INT_MAX;
  163. } else {
  164. // sort the likely shards before the rest, similar logic to `self::executeQuery`
  165. $likelyShards = $this->getLikelyShards($primaryKeys);
  166. $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards);
  167. $shards = array_merge($likelyShards, $unlikelyShards);
  168. }
  169. $count = 0;
  170. foreach ($shards as $shard) {
  171. $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
  172. $count += $query->executeStatement($shardConnection);
  173. if ($count >= $maxCount) {
  174. break;
  175. }
  176. }
  177. return $count;
  178. }
  179. }