SynchronousBackgroundJob.php 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. <?php
  2. /**
  3. * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
  4. * SPDX-License-Identifier: AGPL-3.0-or-later
  5. */
  6. namespace OC\TaskProcessing;
  7. use OCP\AppFramework\Utility\ITimeFactory;
  8. use OCP\BackgroundJob\IJobList;
  9. use OCP\BackgroundJob\QueuedJob;
  10. use OCP\Files\GenericFileException;
  11. use OCP\Files\NotPermittedException;
  12. use OCP\Lock\LockedException;
  13. use OCP\TaskProcessing\Exception\Exception;
  14. use OCP\TaskProcessing\Exception\NotFoundException;
  15. use OCP\TaskProcessing\Exception\ProcessingException;
  16. use OCP\TaskProcessing\Exception\UnauthorizedException;
  17. use OCP\TaskProcessing\Exception\ValidationException;
  18. use OCP\TaskProcessing\IManager;
  19. use OCP\TaskProcessing\ISynchronousProvider;
  20. use OCP\TaskProcessing\Task;
  21. use Psr\Log\LoggerInterface;
  22. class SynchronousBackgroundJob extends QueuedJob {
  23. public function __construct(
  24. ITimeFactory $timeFactory,
  25. private readonly IManager $taskProcessingManager,
  26. private readonly IJobList $jobList,
  27. private readonly LoggerInterface $logger,
  28. ) {
  29. parent::__construct($timeFactory);
  30. }
  31. /**
  32. * @inheritDoc
  33. */
  34. protected function run($argument) {
  35. $providers = $this->taskProcessingManager->getProviders();
  36. foreach ($providers as $provider) {
  37. if (!$provider instanceof ISynchronousProvider) {
  38. continue;
  39. }
  40. $taskType = $provider->getTaskTypeId();
  41. try {
  42. $task = $this->taskProcessingManager->getNextScheduledTask([$taskType]);
  43. } catch (NotFoundException $e) {
  44. continue;
  45. } catch (Exception $e) {
  46. $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
  47. continue;
  48. }
  49. try {
  50. try {
  51. $input = $this->taskProcessingManager->prepareInputData($task);
  52. } catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
  53. $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
  54. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  55. // Schedule again
  56. $this->jobList->add(self::class, $argument);
  57. return;
  58. }
  59. try {
  60. $this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
  61. $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
  62. } catch (ProcessingException $e) {
  63. $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
  64. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  65. // Schedule again
  66. $this->jobList->add(self::class, $argument);
  67. return;
  68. } catch (\Throwable $e) {
  69. $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
  70. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  71. // Schedule again
  72. $this->jobList->add(self::class, $argument);
  73. return;
  74. }
  75. $this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
  76. } catch (NotFoundException $e) {
  77. $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
  78. } catch (Exception $e) {
  79. $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
  80. }
  81. }
  82. $synchronousProviders = array_filter($providers, fn ($provider) =>
  83. $provider instanceof ISynchronousProvider);
  84. $taskTypes = array_values(array_map(fn ($provider) =>
  85. $provider->getTaskTypeId(),
  86. $synchronousProviders
  87. ));
  88. $taskTypesWithTasks = array_filter($taskTypes, function ($taskType) {
  89. try {
  90. $this->taskProcessingManager->getNextScheduledTask([$taskType]);
  91. return true;
  92. } catch (NotFoundException|Exception $e) {
  93. return false;
  94. }
  95. });
  96. if (count($taskTypesWithTasks) > 0) {
  97. // Schedule again
  98. $this->jobList->add(self::class, $argument);
  99. }
  100. }
  101. }