1
0

PartitionedQueryBuilder.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
  5. * SPDX-License-Identifier: AGPL-3.0-only
  6. */
  7. namespace OC\DB\QueryBuilder\Partitioned;
  8. use OC\DB\QueryBuilder\CompositeExpression;
  9. use OC\DB\QueryBuilder\QuoteHelper;
  10. use OC\DB\QueryBuilder\Sharded\AutoIncrementHandler;
  11. use OC\DB\QueryBuilder\Sharded\ShardConnectionManager;
  12. use OC\DB\QueryBuilder\Sharded\ShardedQueryBuilder;
  13. use OCP\DB\IResult;
  14. use OCP\DB\QueryBuilder\IQueryBuilder;
  15. use OCP\DB\QueryBuilder\IQueryFunction;
  16. use OCP\IDBConnection;
  17. /**
  18. * A special query builder that automatically splits queries that span across multiple database partitions[1].
  19. *
  20. * This is done by inspecting the query as it's being built, and when a cross-partition join is detected,
  21. * the part of the query that touches the partition is split of into a different sub-query.
  22. * Then, when the query is executed, the results from the sub-queries are automatically merged.
  23. *
  24. * This whole process is intended to be transparent to any code using the query builder, however it does impose some extra
  25. * limitation for queries that work cross-partition. See the documentation from `InvalidPartitionedQueryException` for more details.
  26. *
  27. * When a join is created in the query, this builder checks if it belongs to the same partition as the table from the
  28. * original FROM/UPDATE/DELETE/INSERT and if not, creates a new "sub query" for the partition.
  29. * Then for every part that is added the query, the part is analyzed to determine which partition the query part is referencing
  30. * and the query part is added to the sub query for that partition.
  31. *
  32. * [1]: A set of tables which can't be queried together with the rest of the tables, such as when sharding is used.
  33. */
  34. class PartitionedQueryBuilder extends ShardedQueryBuilder {
  35. /** @var array<string, PartitionQuery> $splitQueries */
  36. private array $splitQueries = [];
  37. /** @var list<PartitionSplit> */
  38. private array $partitions = [];
  39. /** @var array{'select': string|array, 'alias': ?string}[] */
  40. private array $selects = [];
  41. private ?PartitionSplit $mainPartition = null;
  42. private bool $hasPositionalParameter = false;
  43. private QuoteHelper $quoteHelper;
  44. private ?int $limit = null;
  45. private ?int $offset = null;
  46. public function __construct(
  47. IQueryBuilder $builder,
  48. array $shardDefinitions,
  49. ShardConnectionManager $shardConnectionManager,
  50. AutoIncrementHandler $autoIncrementHandler,
  51. ) {
  52. parent::__construct($builder, $shardDefinitions, $shardConnectionManager, $autoIncrementHandler);
  53. $this->quoteHelper = new QuoteHelper();
  54. }
  55. private function newQuery(): IQueryBuilder {
  56. // get a fresh, non-partitioning query builder
  57. $builder = $this->builder->getConnection()->getQueryBuilder();
  58. if ($builder instanceof PartitionedQueryBuilder) {
  59. $builder = $builder->builder;
  60. }
  61. return new ShardedQueryBuilder(
  62. $builder,
  63. $this->shardDefinitions,
  64. $this->shardConnectionManager,
  65. $this->autoIncrementHandler,
  66. );
  67. }
  68. // we need to save selects until we know all the table aliases
  69. public function select(...$selects) {
  70. $this->selects = [];
  71. $this->addSelect(...$selects);
  72. return $this;
  73. }
  74. public function addSelect(...$select) {
  75. $select = array_map(function ($select) {
  76. return ['select' => $select, 'alias' => null];
  77. }, $select);
  78. $this->selects = array_merge($this->selects, $select);
  79. return $this;
  80. }
  81. public function selectAlias($select, $alias) {
  82. $this->selects[] = ['select' => $select, 'alias' => $alias];
  83. return $this;
  84. }
  85. /**
  86. * Ensure that a column is being selected by the query
  87. *
  88. * This is mainly used to ensure that the returned rows from both sides of a partition contains the columns of the join predicate
  89. *
  90. * @param string $column
  91. * @return void
  92. */
  93. private function ensureSelect(string|IQueryFunction $column, ?string $alias = null): void {
  94. $checkColumn = $alias ?: $column;
  95. if (str_contains($checkColumn, '.')) {
  96. [, $checkColumn] = explode('.', $checkColumn);
  97. }
  98. foreach ($this->selects as $select) {
  99. if ($select['select'] === $checkColumn || $select['select'] === '*' || str_ends_with($select['select'], '.' . $checkColumn)) {
  100. return;
  101. }
  102. }
  103. if ($alias) {
  104. $this->selectAlias($column, $alias);
  105. } else {
  106. $this->addSelect($column);
  107. }
  108. }
  109. /**
  110. * Distribute the select statements to the correct partition
  111. *
  112. * This is done at the end instead of when the `select` call is made, because the `select` calls are generally done
  113. * before we know what tables are involved in the query
  114. *
  115. * @return void
  116. */
  117. private function applySelects(): void {
  118. foreach ($this->selects as $select) {
  119. foreach ($this->partitions as $partition) {
  120. if (is_string($select['select']) && (
  121. $select['select'] === '*' ||
  122. $partition->isColumnInPartition($select['select']))
  123. ) {
  124. if (isset($this->splitQueries[$partition->name])) {
  125. if ($select['alias']) {
  126. $this->splitQueries[$partition->name]->query->selectAlias($select['select'], $select['alias']);
  127. } else {
  128. $this->splitQueries[$partition->name]->query->addSelect($select['select']);
  129. }
  130. if ($select['select'] !== '*') {
  131. continue 2;
  132. }
  133. }
  134. }
  135. }
  136. if ($select['alias']) {
  137. parent::selectAlias($select['select'], $select['alias']);
  138. } else {
  139. parent::addSelect($select['select']);
  140. }
  141. }
  142. $this->selects = [];
  143. }
  144. public function addPartition(PartitionSplit $partition): void {
  145. $this->partitions[] = $partition;
  146. }
  147. private function getPartition(string $table): ?PartitionSplit {
  148. foreach ($this->partitions as $partition) {
  149. if ($partition->containsTable($table) || $partition->containsAlias($table)) {
  150. return $partition;
  151. }
  152. }
  153. return null;
  154. }
  155. public function from($from, $alias = null) {
  156. if (is_string($from) && $partition = $this->getPartition($from)) {
  157. $this->mainPartition = $partition;
  158. if ($alias) {
  159. $this->mainPartition->addAlias($from, $alias);
  160. }
  161. }
  162. return parent::from($from, $alias);
  163. }
  164. public function innerJoin($fromAlias, $join, $alias, $condition = null): self {
  165. return $this->join($fromAlias, $join, $alias, $condition);
  166. }
  167. public function leftJoin($fromAlias, $join, $alias, $condition = null): self {
  168. return $this->join($fromAlias, (string)$join, $alias, $condition, PartitionQuery::JOIN_MODE_LEFT);
  169. }
  170. public function join($fromAlias, $join, $alias, $condition = null, $joinMode = PartitionQuery::JOIN_MODE_INNER): self {
  171. $partition = $this->getPartition($join);
  172. $fromPartition = $this->getPartition($fromAlias);
  173. if ($partition && $partition !== $this->mainPartition) {
  174. // join from the main db to a partition
  175. $joinCondition = JoinCondition::parse($condition, $join, $alias, $fromAlias);
  176. $partition->addAlias($join, $alias);
  177. if (!isset($this->splitQueries[$partition->name])) {
  178. $this->splitQueries[$partition->name] = new PartitionQuery(
  179. $this->newQuery(),
  180. $joinCondition->fromAlias ?? $joinCondition->fromColumn, $joinCondition->toAlias ?? $joinCondition->toColumn,
  181. $joinMode
  182. );
  183. $this->splitQueries[$partition->name]->query->from($join, $alias);
  184. $this->ensureSelect($joinCondition->fromColumn, $joinCondition->fromAlias);
  185. $this->ensureSelect($joinCondition->toColumn, $joinCondition->toAlias);
  186. } else {
  187. $query = $this->splitQueries[$partition->name]->query;
  188. if ($partition->containsAlias($fromAlias)) {
  189. $query->innerJoin($fromAlias, $join, $alias, $condition);
  190. } else {
  191. throw new InvalidPartitionedQueryException("Can't join across partition boundaries more than once");
  192. }
  193. }
  194. $this->splitQueries[$partition->name]->query->andWhere(...$joinCondition->toConditions);
  195. parent::andWhere(...$joinCondition->fromConditions);
  196. return $this;
  197. } elseif ($fromPartition && $fromPartition !== $partition) {
  198. // join from partition, to the main db
  199. $joinCondition = JoinCondition::parse($condition, $join, $alias, $fromAlias);
  200. if (str_starts_with($fromPartition->name, 'from_')) {
  201. $partitionName = $fromPartition->name;
  202. } else {
  203. $partitionName = 'from_' . $fromPartition->name;
  204. }
  205. if (!isset($this->splitQueries[$partitionName])) {
  206. $newPartition = new PartitionSplit($partitionName, [$join]);
  207. $newPartition->addAlias($join, $alias);
  208. $this->partitions[] = $newPartition;
  209. $this->splitQueries[$partitionName] = new PartitionQuery(
  210. $this->newQuery(),
  211. $joinCondition->fromAlias ?? $joinCondition->fromColumn, $joinCondition->toAlias ?? $joinCondition->toColumn,
  212. $joinMode
  213. );
  214. $this->ensureSelect($joinCondition->fromColumn, $joinCondition->fromAlias);
  215. $this->ensureSelect($joinCondition->toColumn, $joinCondition->toAlias);
  216. $this->splitQueries[$partitionName]->query->from($join, $alias);
  217. $this->splitQueries[$partitionName]->query->andWhere(...$joinCondition->toConditions);
  218. parent::andWhere(...$joinCondition->fromConditions);
  219. } else {
  220. $fromPartition->addTable($join);
  221. $fromPartition->addAlias($join, $alias);
  222. $query = $this->splitQueries[$partitionName]->query;
  223. $query->innerJoin($fromAlias, $join, $alias, $condition);
  224. }
  225. return $this;
  226. } else {
  227. // join within the main db or a partition
  228. if ($joinMode === PartitionQuery::JOIN_MODE_INNER) {
  229. return parent::innerJoin($fromAlias, $join, $alias, $condition);
  230. } elseif ($joinMode === PartitionQuery::JOIN_MODE_LEFT) {
  231. return parent::leftJoin($fromAlias, $join, $alias, $condition);
  232. } elseif ($joinMode === PartitionQuery::JOIN_MODE_RIGHT) {
  233. return parent::rightJoin($fromAlias, $join, $alias, $condition);
  234. } else {
  235. throw new \InvalidArgumentException("Invalid join mode: $joinMode");
  236. }
  237. }
  238. }
  239. /**
  240. * Flatten a list of predicates by merging the parts of any "AND" expression into the list of predicates
  241. *
  242. * @param array $predicates
  243. * @return array
  244. */
  245. private function flattenPredicates(array $predicates): array {
  246. $result = [];
  247. foreach ($predicates as $predicate) {
  248. if ($predicate instanceof CompositeExpression && $predicate->getType() === CompositeExpression::TYPE_AND) {
  249. $result = array_merge($result, $this->flattenPredicates($predicate->getParts()));
  250. } else {
  251. $result[] = $predicate;
  252. }
  253. }
  254. return $result;
  255. }
  256. /**
  257. * Split an array of predicates (WHERE query parts) by the partition they reference
  258. * @param array $predicates
  259. * @return array<string, array>
  260. */
  261. private function splitPredicatesByParts(array $predicates): array {
  262. $predicates = $this->flattenPredicates($predicates);
  263. $partitionPredicates = [];
  264. foreach ($predicates as $predicate) {
  265. $partition = $this->getPartitionForPredicate((string)$predicate);
  266. if ($this->mainPartition === $partition) {
  267. $partitionPredicates[''][] = $predicate;
  268. } elseif ($partition) {
  269. $partitionPredicates[$partition->name][] = $predicate;
  270. } else {
  271. $partitionPredicates[''][] = $predicate;
  272. }
  273. }
  274. return $partitionPredicates;
  275. }
  276. public function where(...$predicates) {
  277. return $this->andWhere(...$predicates);
  278. }
  279. public function andWhere(...$where) {
  280. if ($where) {
  281. foreach ($this->splitPredicatesByParts($where) as $alias => $predicates) {
  282. if (isset($this->splitQueries[$alias])) {
  283. // when there is a condition on a table being left-joined it starts to behave as if it's an inner join
  284. // since any joined column that doesn't have the left part will not match the condition
  285. // when there the condition is `$joinToColumn IS NULL` we instead mark the query as excluding the left half
  286. if ($this->splitQueries[$alias]->joinMode === PartitionQuery::JOIN_MODE_LEFT) {
  287. $this->splitQueries[$alias]->joinMode = PartitionQuery::JOIN_MODE_INNER;
  288. $column = $this->quoteHelper->quoteColumnName($this->splitQueries[$alias]->joinToColumn);
  289. foreach ($predicates as $predicate) {
  290. if ((string)$predicate === "$column IS NULL") {
  291. $this->splitQueries[$alias]->joinMode = PartitionQuery::JOIN_MODE_LEFT_NULL;
  292. } else {
  293. $this->splitQueries[$alias]->query->andWhere($predicate);
  294. }
  295. }
  296. } else {
  297. $this->splitQueries[$alias]->query->andWhere(...$predicates);
  298. }
  299. } else {
  300. parent::andWhere(...$predicates);
  301. }
  302. }
  303. }
  304. return $this;
  305. }
  306. private function getPartitionForPredicate(string $predicate): ?PartitionSplit {
  307. foreach ($this->partitions as $partition) {
  308. if (str_contains($predicate, '?')) {
  309. $this->hasPositionalParameter = true;
  310. }
  311. if ($partition->checkPredicateForTable($predicate)) {
  312. return $partition;
  313. }
  314. }
  315. return null;
  316. }
  317. public function update($update = null, $alias = null) {
  318. return parent::update($update, $alias);
  319. }
  320. public function insert($insert = null) {
  321. return parent::insert($insert);
  322. }
  323. public function delete($delete = null, $alias = null) {
  324. return parent::delete($delete, $alias);
  325. }
  326. public function setMaxResults($maxResults) {
  327. if ($maxResults > 0) {
  328. $this->limit = (int)$maxResults;
  329. }
  330. return parent::setMaxResults($maxResults);
  331. }
  332. public function setFirstResult($firstResult) {
  333. if ($firstResult > 0) {
  334. $this->offset = (int)$firstResult;
  335. }
  336. return parent::setFirstResult($firstResult);
  337. }
  338. public function executeQuery(?IDBConnection $connection = null): IResult {
  339. $this->applySelects();
  340. if ($this->splitQueries && $this->hasPositionalParameter) {
  341. throw new InvalidPartitionedQueryException("Partitioned queries aren't allowed to to positional arguments");
  342. }
  343. foreach ($this->splitQueries as $split) {
  344. $split->query->setParameters($this->getParameters(), $this->getParameterTypes());
  345. }
  346. if (count($this->splitQueries) > 0) {
  347. $hasNonLeftJoins = array_reduce($this->splitQueries, function (bool $hasNonLeftJoins, PartitionQuery $query) {
  348. return $hasNonLeftJoins || $query->joinMode !== PartitionQuery::JOIN_MODE_LEFT;
  349. }, false);
  350. if ($hasNonLeftJoins) {
  351. if (is_int($this->limit)) {
  352. throw new InvalidPartitionedQueryException('Limit is not allowed in partitioned queries');
  353. }
  354. if (is_int($this->offset)) {
  355. throw new InvalidPartitionedQueryException('Offset is not allowed in partitioned queries');
  356. }
  357. }
  358. }
  359. $s = $this->getSQL();
  360. $result = parent::executeQuery($connection);
  361. if (count($this->splitQueries) > 0) {
  362. return new PartitionedResult($this->splitQueries, $result);
  363. } else {
  364. return $result;
  365. }
  366. }
  367. public function executeStatement(?IDBConnection $connection = null): int {
  368. if (count($this->splitQueries)) {
  369. throw new InvalidPartitionedQueryException("Partitioning write queries isn't supported");
  370. }
  371. return parent::executeStatement($connection);
  372. }
  373. public function getSQL() {
  374. $this->applySelects();
  375. return parent::getSQL();
  376. }
  377. public function getPartitionCount(): int {
  378. return count($this->splitQueries) + 1;
  379. }
  380. }