1
0

SynchronousBackgroundJob.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <?php
  2. /**
  3. * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
  4. * SPDX-License-Identifier: AGPL-3.0-or-later
  5. */
  6. namespace OC\TaskProcessing;
  7. use OCP\AppFramework\Utility\ITimeFactory;
  8. use OCP\BackgroundJob\IJobList;
  9. use OCP\BackgroundJob\QueuedJob;
  10. use OCP\Files\GenericFileException;
  11. use OCP\Files\NotPermittedException;
  12. use OCP\Lock\LockedException;
  13. use OCP\TaskProcessing\Exception\Exception;
  14. use OCP\TaskProcessing\Exception\NotFoundException;
  15. use OCP\TaskProcessing\Exception\ProcessingException;
  16. use OCP\TaskProcessing\Exception\ValidationException;
  17. use OCP\TaskProcessing\IManager;
  18. use OCP\TaskProcessing\ISynchronousProvider;
  19. use Psr\Log\LoggerInterface;
  20. class SynchronousBackgroundJob extends QueuedJob {
  21. public function __construct(
  22. ITimeFactory $timeFactory,
  23. private readonly IManager $taskProcessingManager,
  24. private readonly IJobList $jobList,
  25. private readonly LoggerInterface $logger,
  26. ) {
  27. parent::__construct($timeFactory);
  28. }
  29. /**
  30. * @inheritDoc
  31. */
  32. protected function run($argument) {
  33. $providers = $this->taskProcessingManager->getProviders();
  34. foreach ($providers as $provider) {
  35. if (!$provider instanceof ISynchronousProvider) {
  36. continue;
  37. }
  38. $taskType = $provider->getTaskTypeId();
  39. try {
  40. $task = $this->taskProcessingManager->getNextScheduledTask($taskType);
  41. } catch (NotFoundException $e) {
  42. continue;
  43. } catch (Exception $e) {
  44. $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
  45. continue;
  46. }
  47. try {
  48. try {
  49. $input = $this->taskProcessingManager->prepareInputData($task);
  50. } catch (GenericFileException|NotPermittedException|LockedException|ValidationException $e) {
  51. $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
  52. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  53. // Schedule again
  54. $this->jobList->add(self::class, $argument);
  55. return;
  56. }
  57. try {
  58. $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
  59. } catch (ProcessingException $e) {
  60. $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
  61. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  62. // Schedule again
  63. $this->jobList->add(self::class, $argument);
  64. return;
  65. } catch (\Throwable $e) {
  66. $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
  67. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  68. // Schedule again
  69. $this->jobList->add(self::class, $argument);
  70. return;
  71. }
  72. $this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
  73. } catch (NotFoundException $e) {
  74. $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
  75. } catch (Exception $e) {
  76. $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
  77. }
  78. }
  79. $synchronousProviders = array_filter($providers, fn ($provider) =>
  80. $provider instanceof ISynchronousProvider);
  81. $taskTypes = array_values(array_map(fn ($provider) =>
  82. $provider->getTaskTypeId(),
  83. $synchronousProviders
  84. ));
  85. $taskTypesWithTasks = array_filter($taskTypes, function ($taskType) {
  86. try {
  87. $this->taskProcessingManager->getNextScheduledTask($taskType);
  88. return true;
  89. } catch (NotFoundException|Exception $e) {
  90. return false;
  91. }
  92. });
  93. if (count($taskTypesWithTasks) > 0) {
  94. // Schedule again
  95. $this->jobList->add(self::class, $argument);
  96. }
  97. }
  98. }