JobWorker.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * SPDX-FileCopyrightText: 2021 Nextcloud GmbH and Nextcloud contributors
  5. * SPDX-License-Identifier: AGPL-3.0-or-later
  6. */
  7. namespace OC\Core\Command\Background;
  8. use OC\Core\Command\InterruptedException;
  9. use OC\Files\SetupManager;
  10. use OCP\BackgroundJob\IJobList;
  11. use OCP\ITempManager;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\Console\Input\InputArgument;
  14. use Symfony\Component\Console\Input\InputInterface;
  15. use Symfony\Component\Console\Input\InputOption;
  16. use Symfony\Component\Console\Output\OutputInterface;
  17. class JobWorker extends JobBase {
  18. public function __construct(
  19. protected IJobList $jobList,
  20. protected LoggerInterface $logger,
  21. private ITempManager $tempManager,
  22. private SetupManager $setupManager,
  23. ) {
  24. parent::__construct($jobList, $logger);
  25. }
  26. protected function configure(): void {
  27. parent::configure();
  28. $this
  29. ->setName('background-job:worker')
  30. ->setDescription('Run a background job worker')
  31. ->addArgument(
  32. 'job-classes',
  33. InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
  34. 'The classes of the jobs to look for in the database'
  35. )
  36. ->addOption(
  37. 'once',
  38. null,
  39. InputOption::VALUE_NONE,
  40. 'Only execute the worker once (as a regular cron execution would do it)'
  41. )
  42. ->addOption(
  43. 'interval',
  44. 'i',
  45. InputOption::VALUE_OPTIONAL,
  46. 'Interval in seconds in which the worker should repeat already processed jobs (set to 0 for no repeat)',
  47. 5
  48. )
  49. ->addOption(
  50. 'stop_after',
  51. 't',
  52. InputOption::VALUE_OPTIONAL,
  53. 'Duration after which the worker should stop and exit. The worker won\'t kill a potential running job, it will exit after this job has finished running (supported values are: "30" or "30s" for 30 seconds, "10m" for 10 minutes and "2h" for 2 hours)'
  54. )
  55. ;
  56. }
  57. protected function execute(InputInterface $input, OutputInterface $output): int {
  58. $startTime = time();
  59. $stopAfterOptionValue = $input->getOption('stop_after');
  60. $stopAfterSeconds = $stopAfterOptionValue === null
  61. ? null
  62. : $this->parseStopAfter($stopAfterOptionValue);
  63. if ($stopAfterSeconds !== null) {
  64. $output->writeln('<info>Background job worker will stop after ' . $stopAfterSeconds . ' seconds</info>');
  65. }
  66. $jobClasses = $input->getArgument('job-classes');
  67. $jobClasses = empty($jobClasses) ? null : $jobClasses;
  68. if ($jobClasses !== null) {
  69. // at least one class is invalid
  70. foreach ($jobClasses as $jobClass) {
  71. if (!class_exists($jobClass)) {
  72. $output->writeln('<error>Invalid job class: ' . $jobClass . '</error>');
  73. return 1;
  74. }
  75. }
  76. }
  77. while (true) {
  78. // Stop if we exceeded stop_after value
  79. if ($stopAfterSeconds !== null && ($startTime + $stopAfterSeconds) < time()) {
  80. $output->writeln('stop_after time has been exceeded, exiting...', OutputInterface::VERBOSITY_VERBOSE);
  81. break;
  82. }
  83. // Handle canceling of the process
  84. try {
  85. $this->abortIfInterrupted();
  86. } catch (InterruptedException $e) {
  87. $output->writeln('<info>Background job worker stopped</info>');
  88. break;
  89. }
  90. $this->printSummary($input, $output);
  91. usleep(50000);
  92. $job = $this->jobList->getNext(false, $jobClasses);
  93. if (!$job) {
  94. if ($input->getOption('once') === true) {
  95. if ($jobClasses === null) {
  96. $output->writeln('No job is currently queued', OutputInterface::VERBOSITY_VERBOSE);
  97. } else {
  98. $output->writeln('No job of classes [' . implode(', ', $jobClasses) . '] is currently queued', OutputInterface::VERBOSITY_VERBOSE);
  99. }
  100. $output->writeln('Exiting...', OutputInterface::VERBOSITY_VERBOSE);
  101. break;
  102. }
  103. $output->writeln('Waiting for new jobs to be queued', OutputInterface::VERBOSITY_VERBOSE);
  104. // Re-check interval for new jobs
  105. sleep(1);
  106. continue;
  107. }
  108. $output->writeln('Running job ' . get_class($job) . ' with ID ' . $job->getId());
  109. if ($output->isVerbose()) {
  110. $this->printJobInfo($job->getId(), $job, $output);
  111. }
  112. /** @psalm-suppress DeprecatedMethod Calling execute until it is removed, then will switch to start */
  113. $job->execute($this->jobList);
  114. $output->writeln('Job ' . $job->getId() . ' has finished', OutputInterface::VERBOSITY_VERBOSE);
  115. // clean up after unclean jobs
  116. $this->setupManager->tearDown();
  117. $this->tempManager->clean();
  118. $this->jobList->setLastJob($job);
  119. $this->jobList->unlockJob($job);
  120. if ($input->getOption('once') === true) {
  121. break;
  122. }
  123. }
  124. return 0;
  125. }
  126. private function printSummary(InputInterface $input, OutputInterface $output): void {
  127. if (!$output->isVeryVerbose()) {
  128. return;
  129. }
  130. $output->writeln('<comment>Summary</comment>');
  131. $counts = [];
  132. foreach ($this->jobList->countByClass() as $row) {
  133. $counts[] = $row;
  134. }
  135. $this->writeTableInOutputFormat($input, $output, $counts);
  136. }
  137. private function parseStopAfter(string $value): ?int {
  138. if (is_numeric($value)) {
  139. return (int)$value;
  140. }
  141. if (preg_match("/^(\d+)s$/i", $value, $matches)) {
  142. return (int)$matches[0];
  143. }
  144. if (preg_match("/^(\d+)m$/i", $value, $matches)) {
  145. return 60 * ((int)$matches[0]);
  146. }
  147. if (preg_match("/^(\d+)h$/i", $value, $matches)) {
  148. return 60 * 60 * ((int)$matches[0]);
  149. }
  150. return null;
  151. }
  152. }