123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- <?php
- declare(strict_types=1);
- /**
- * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
- * SPDX-License-Identifier: AGPL-3.0-or-later
- */
- namespace OC\DB\QueryBuilder\Sharded;
- use OC\DB\ArrayResult;
- use OCP\DB\IResult;
- use OCP\DB\QueryBuilder\IQueryBuilder;
- /**
- * Logic for running a query across a number of shards, combining the results
- */
- class ShardQueryRunner {
- public function __construct(
- private ShardConnectionManager $shardConnectionManager,
- private ShardDefinition $shardDefinition,
- ) {
- }
- /**
- * Get the shards for a specific query or null if the shards aren't known in advance
- *
- * @param bool $allShards
- * @param int[] $shardKeys
- * @return null|int[]
- */
- public function getShards(bool $allShards, array $shardKeys): ?array {
- if ($allShards) {
- return $this->shardDefinition->getAllShards();
- }
- $allConfiguredShards = $this->shardDefinition->getAllShards();
- if (count($allConfiguredShards) === 1) {
- return $allConfiguredShards;
- }
- if (empty($shardKeys)) {
- return null;
- }
- $shards = array_map(function ($shardKey) {
- return $this->shardDefinition->getShardForKey((int)$shardKey);
- }, $shardKeys);
- return array_values(array_unique($shards));
- }
- /**
- * Try to get the shards that the keys are likely to be in, based on the shard the row was created
- *
- * @param int[] $primaryKeys
- * @return int[]
- */
- private function getLikelyShards(array $primaryKeys): array {
- $shards = [];
- foreach ($primaryKeys as $primaryKey) {
- $encodedShard = $primaryKey & ShardDefinition::PRIMARY_KEY_SHARD_MASK;
- if ($encodedShard < count($this->shardDefinition->shards) && !in_array($encodedShard, $shards)) {
- $shards[] = $encodedShard;
- }
- }
- return $shards;
- }
- /**
- * Execute a SELECT statement across the configured shards
- *
- * @param IQueryBuilder $query
- * @param bool $allShards
- * @param int[] $shardKeys
- * @param int[] $primaryKeys
- * @param array{column: string, order: string}[] $sortList
- * @param int|null $limit
- * @param int|null $offset
- * @return IResult
- */
- public function executeQuery(
- IQueryBuilder $query,
- bool $allShards,
- array $shardKeys,
- array $primaryKeys,
- ?array $sortList = null,
- ?int $limit = null,
- ?int $offset = null,
- ): IResult {
- $shards = $this->getShards($allShards, $shardKeys);
- $results = [];
- if ($shards && count($shards) === 1) {
- // trivial case
- return $query->executeQuery($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0]));
- }
- // we have to emulate limit and offset, so we select offset+limit from all shards to ensure we have enough rows
- // and then filter them down after we merged the results
- if ($limit !== null && $offset !== null) {
- $query->setMaxResults($limit + $offset);
- }
- if ($shards) {
- // we know exactly what shards we need to query
- foreach ($shards as $shard) {
- $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
- $subResult = $query->executeQuery($shardConnection);
- $results = array_merge($results, $subResult->fetchAll());
- $subResult->closeCursor();
- }
- } else {
- // we don't know for sure what shards we need to query,
- // we first try the shards that are "likely" to have the rows we want, based on the shard that the row was
- // originally created in. If we then still haven't found all rows we try the rest of the shards
- $likelyShards = $this->getLikelyShards($primaryKeys);
- $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards);
- $shards = array_merge($likelyShards, $unlikelyShards);
- foreach ($shards as $shard) {
- $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
- $subResult = $query->executeQuery($shardConnection);
- $rows = $subResult->fetchAll();
- $results = array_merge($results, $rows);
- $subResult->closeCursor();
- if (count($rows) >= count($primaryKeys)) {
- // we have all the rows we're looking for
- break;
- }
- }
- }
- if ($sortList) {
- usort($results, function ($a, $b) use ($sortList) {
- foreach ($sortList as $sort) {
- $valueA = $a[$sort['column']] ?? null;
- $valueB = $b[$sort['column']] ?? null;
- $cmp = $valueA <=> $valueB;
- if ($cmp === 0) {
- continue;
- }
- if ($sort['order'] === 'DESC') {
- $cmp = -$cmp;
- }
- return $cmp;
- }
- });
- }
- if ($limit !== null && $offset !== null) {
- $results = array_slice($results, $offset, $limit);
- } elseif ($limit !== null) {
- $results = array_slice($results, 0, $limit);
- } elseif ($offset !== null) {
- $results = array_slice($results, $offset);
- }
- return new ArrayResult($results);
- }
- /**
- * Execute an UPDATE or DELETE statement
- *
- * @param IQueryBuilder $query
- * @param bool $allShards
- * @param int[] $shardKeys
- * @param int[] $primaryKeys
- * @return int
- * @throws \OCP\DB\Exception
- */
- public function executeStatement(IQueryBuilder $query, bool $allShards, array $shardKeys, array $primaryKeys): int {
- if ($query->getType() === \Doctrine\DBAL\Query\QueryBuilder::INSERT) {
- throw new \Exception('insert queries need special handling');
- }
- $shards = $this->getShards($allShards, $shardKeys);
- $maxCount = count($primaryKeys);
- if ($shards && count($shards) === 1) {
- return $query->executeStatement($this->shardConnectionManager->getConnection($this->shardDefinition, $shards[0]));
- } elseif ($shards) {
- $maxCount = PHP_INT_MAX;
- } else {
- // sort the likely shards before the rest, similar logic to `self::executeQuery`
- $likelyShards = $this->getLikelyShards($primaryKeys);
- $unlikelyShards = array_diff($this->shardDefinition->getAllShards(), $likelyShards);
- $shards = array_merge($likelyShards, $unlikelyShards);
- }
- $count = 0;
- foreach ($shards as $shard) {
- $shardConnection = $this->shardConnectionManager->getConnection($this->shardDefinition, $shard);
- $count += $query->executeStatement($shardConnection);
- if ($count >= $maxCount) {
- break;
- }
- }
- return $count;
- }
- }
|