CrossShardMoveHelper.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * SPDX-FileCopyrightText: 2024 Robin Appelman <robin@icewind.nl>
  5. * SPDX-License-Identifier: AGPL-3.0-or-later
  6. */
  7. namespace OC\DB\QueryBuilder\Sharded;
  8. use OCP\DB\QueryBuilder\IQueryBuilder;
  9. use OCP\IDBConnection;
  10. /**
  11. * Utility methods for implementing logic that moves data across shards
  12. */
  13. class CrossShardMoveHelper {
  14. public function __construct(
  15. private ShardConnectionManager $connectionManager,
  16. ) {
  17. }
  18. public function getConnection(ShardDefinition $shardDefinition, int $shardKey): IDBConnection {
  19. return $this->connectionManager->getConnection($shardDefinition, $shardDefinition->getShardForKey($shardKey));
  20. }
  21. /**
  22. * Update the shard key of a set of rows, moving them to a different shard if needed
  23. *
  24. * @param ShardDefinition $shardDefinition
  25. * @param string $table
  26. * @param string $shardColumn
  27. * @param int $sourceShardKey
  28. * @param int $targetShardKey
  29. * @param string $primaryColumn
  30. * @param int[] $primaryKeys
  31. * @return void
  32. */
  33. public function moveCrossShards(ShardDefinition $shardDefinition, string $table, string $shardColumn, int $sourceShardKey, int $targetShardKey, string $primaryColumn, array $primaryKeys): void {
  34. $sourceShard = $shardDefinition->getShardForKey($sourceShardKey);
  35. $targetShard = $shardDefinition->getShardForKey($targetShardKey);
  36. $sourceConnection = $this->connectionManager->getConnection($shardDefinition, $sourceShard);
  37. if ($sourceShard === $targetShard) {
  38. $this->updateItems($sourceConnection, $table, $shardColumn, $targetShardKey, $primaryColumn, $primaryKeys);
  39. return;
  40. }
  41. $targetConnection = $this->connectionManager->getConnection($shardDefinition, $targetShard);
  42. $sourceItems = $this->loadItems($sourceConnection, $table, $primaryColumn, $primaryKeys);
  43. foreach ($sourceItems as &$sourceItem) {
  44. $sourceItem[$shardColumn] = $targetShardKey;
  45. }
  46. if (!$sourceItems) {
  47. return;
  48. }
  49. $sourceConnection->beginTransaction();
  50. $targetConnection->beginTransaction();
  51. try {
  52. $this->saveItems($targetConnection, $table, $sourceItems);
  53. $this->deleteItems($sourceConnection, $table, $primaryColumn, $primaryKeys);
  54. $targetConnection->commit();
  55. $sourceConnection->commit();
  56. } catch (\Exception $e) {
  57. $sourceConnection->rollback();
  58. $targetConnection->rollback();
  59. throw $e;
  60. }
  61. }
  62. /**
  63. * Load rows from a table to move
  64. *
  65. * @param IDBConnection $connection
  66. * @param string $table
  67. * @param string $primaryColumn
  68. * @param int[] $primaryKeys
  69. * @return array[]
  70. */
  71. public function loadItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): array {
  72. $query = $connection->getQueryBuilder();
  73. $query->select('*')
  74. ->from($table)
  75. ->where($query->expr()->in($primaryColumn, $query->createParameter('keys')));
  76. $chunks = array_chunk($primaryKeys, 1000);
  77. $results = [];
  78. foreach ($chunks as $chunk) {
  79. $query->setParameter('keys', $chunk, IQueryBuilder::PARAM_INT_ARRAY);
  80. $results = array_merge($results, $query->execute()->fetchAll());
  81. }
  82. return $results;
  83. }
  84. /**
  85. * Save modified rows
  86. *
  87. * @param IDBConnection $connection
  88. * @param string $table
  89. * @param array[] $items
  90. * @return void
  91. */
  92. public function saveItems(IDBConnection $connection, string $table, array $items): void {
  93. if (count($items) === 0) {
  94. return;
  95. }
  96. $query = $connection->getQueryBuilder();
  97. $query->insert($table);
  98. foreach ($items[0] as $column => $value) {
  99. $query->setValue($column, $query->createParameter($column));
  100. }
  101. foreach ($items as $item) {
  102. foreach ($item as $column => $value) {
  103. if (is_int($column)) {
  104. $query->setParameter($column, $value, IQueryBuilder::PARAM_INT);
  105. } else {
  106. $query->setParameter($column, $value);
  107. }
  108. }
  109. $query->executeStatement();
  110. }
  111. }
  112. /**
  113. * @param IDBConnection $connection
  114. * @param string $table
  115. * @param string $primaryColumn
  116. * @param int[] $primaryKeys
  117. * @return void
  118. */
  119. public function updateItems(IDBConnection $connection, string $table, string $shardColumn, int $targetShardKey, string $primaryColumn, array $primaryKeys): void {
  120. $query = $connection->getQueryBuilder();
  121. $query->update($table)
  122. ->set($shardColumn, $query->createNamedParameter($targetShardKey, IQueryBuilder::PARAM_INT))
  123. ->where($query->expr()->in($primaryColumn, $query->createNamedParameter($primaryKeys, IQueryBuilder::PARAM_INT_ARRAY)));
  124. $query->executeQuery()->fetchAll();
  125. }
  126. /**
  127. * @param IDBConnection $connection
  128. * @param string $table
  129. * @param string $primaryColumn
  130. * @param int[] $primaryKeys
  131. * @return void
  132. */
  133. public function deleteItems(IDBConnection $connection, string $table, string $primaryColumn, array $primaryKeys): void {
  134. $query = $connection->getQueryBuilder();
  135. $query->delete($table)
  136. ->where($query->expr()->in($primaryColumn, $query->createParameter('keys')));
  137. $chunks = array_chunk($primaryKeys, 1000);
  138. foreach ($chunks as $chunk) {
  139. $query->setParameter('keys', $chunk, IQueryBuilder::PARAM_INT_ARRAY);
  140. $query->executeStatement();
  141. }
  142. }
  143. }