SynchronousBackgroundJob.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. <?php
  2. namespace OC\TaskProcessing;
  3. use OCP\AppFramework\Utility\ITimeFactory;
  4. use OCP\BackgroundJob\IJobList;
  5. use OCP\BackgroundJob\QueuedJob;
  6. use OCP\Files\GenericFileException;
  7. use OCP\Files\NotPermittedException;
  8. use OCP\Lock\LockedException;
  9. use OCP\TaskProcessing\Exception\Exception;
  10. use OCP\TaskProcessing\Exception\NotFoundException;
  11. use OCP\TaskProcessing\Exception\ProcessingException;
  12. use OCP\TaskProcessing\Exception\ValidationException;
  13. use OCP\TaskProcessing\IManager;
  14. use OCP\TaskProcessing\ISynchronousProvider;
  15. use Psr\Log\LoggerInterface;
  16. class SynchronousBackgroundJob extends QueuedJob {
  17. public function __construct(
  18. ITimeFactory $timeFactory,
  19. private readonly IManager $taskProcessingManager,
  20. private readonly IJobList $jobList,
  21. private readonly LoggerInterface $logger,
  22. ) {
  23. parent::__construct($timeFactory);
  24. }
  25. /**
  26. * @inheritDoc
  27. */
  28. protected function run($argument) {
  29. $providers = $this->taskProcessingManager->getProviders();
  30. foreach ($providers as $provider) {
  31. if (!$provider instanceof ISynchronousProvider) {
  32. continue;
  33. }
  34. $taskType = $provider->getTaskTypeId();
  35. try {
  36. $task = $this->taskProcessingManager->getNextScheduledTask($taskType);
  37. } catch (NotFoundException $e) {
  38. continue;
  39. } catch (Exception $e) {
  40. $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
  41. continue;
  42. }
  43. try {
  44. try {
  45. $input = $this->taskProcessingManager->prepareInputData($task);
  46. } catch (GenericFileException|NotPermittedException|LockedException|ValidationException $e) {
  47. $this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
  48. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  49. // Schedule again
  50. $this->jobList->add(self::class, $argument);
  51. return;
  52. }
  53. try {
  54. $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
  55. } catch (ProcessingException $e) {
  56. $this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
  57. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  58. // Schedule again
  59. $this->jobList->add(self::class, $argument);
  60. return;
  61. } catch (\Throwable $e) {
  62. $this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
  63. $this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
  64. // Schedule again
  65. $this->jobList->add(self::class, $argument);
  66. return;
  67. }
  68. $this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
  69. } catch (NotFoundException $e) {
  70. $this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
  71. } catch (Exception $e) {
  72. $this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
  73. }
  74. }
  75. $synchronousProviders = array_filter($providers, fn ($provider) =>
  76. $provider instanceof ISynchronousProvider);
  77. $taskTypes = array_values(array_map(fn ($provider) =>
  78. $provider->getTaskTypeId(),
  79. $synchronousProviders
  80. ));
  81. $taskTypesWithTasks = array_filter($taskTypes, function ($taskType) {
  82. try {
  83. $this->taskProcessingManager->getNextScheduledTask($taskType);
  84. return true;
  85. } catch (NotFoundException|Exception $e) {
  86. return false;
  87. }
  88. });
  89. if (count($taskTypesWithTasks) > 0) {
  90. // Schedule again
  91. $this->jobList->add(self::class, $argument);
  92. }
  93. }
  94. }