Browse Source

Lock jobs while executing them, to allow multiple executors to run in parallel

Joas Schilling 8 years ago
parent
commit
d0a2fa0506

+ 1 - 0
cron.php

@@ -138,6 +138,7 @@ try {
 		$executedJobs = [];
 		while ($job = $jobList->getNext()) {
 			if (isset($executedJobs[$job->getId()])) {
+				$jobList->unlockJob($job);
 				break;
 			}
 

+ 17 - 0
db_structure.xml

@@ -968,12 +968,29 @@
 			</field>
 
 			<field>
+				<!-- timestamp when the job was executed the last time -->
 				<name>last_run</name>
 				<type>integer</type>
 				<default></default>
 				<notnull>false</notnull>
 			</field>
 
+			<field>
+				<!-- timestamp when the job was checked if it needs execution the last time -->
+				<name>last_checked</name>
+				<type>integer</type>
+				<default></default>
+				<notnull>false</notnull>
+			</field>
+
+			<field>
+				<!-- timestamp when the job was reserved the last time, 1 day timeout -->
+				<name>reserved_at</name>
+				<type>integer</type>
+				<default></default>
+				<notnull>false</notnull>
+			</field>
+
 			<index>
 				<name>job_class_index</name>
 				<field>

+ 53 - 33
lib/private/BackgroundJob/JobList.php

@@ -25,27 +25,34 @@
 namespace OC\BackgroundJob;
 
 use OCP\AppFramework\QueryException;
+use OCP\AppFramework\Utility\ITimeFactory;
 use OCP\BackgroundJob\IJob;
 use OCP\BackgroundJob\IJobList;
 use OCP\AutoloadNotAllowedException;
 use OCP\DB\QueryBuilder\IQueryBuilder;
+use OCP\IConfig;
+use OCP\IDBConnection;
 
 class JobList implements IJobList {
-	/** @var \OCP\IDBConnection */
+
+	/** @var IDBConnection */
 	protected $connection;
 
-	/**
-	 * @var \OCP\IConfig $config
-	 */
+	/**@var IConfig */
 	protected $config;
 
+	/**@var ITimeFactory */
+	protected $timeFactory;
+
 	/**
-	 * @param \OCP\IDBConnection $connection
-	 * @param \OCP\IConfig $config
+	 * @param IDBConnection $connection
+	 * @param IConfig $config
+	 * @param ITimeFactory $timeFactory
 	 */
-	public function __construct($connection, $config) {
+	public function __construct(IDBConnection $connection, IConfig $config, ITimeFactory $timeFactory) {
 		$this->connection = $connection;
 		$this->config = $config;
+		$this->timeFactory = $timeFactory;
 	}
 
 	/**
@@ -71,6 +78,7 @@ class JobList implements IJobList {
 					'class' => $query->createNamedParameter($class),
 					'argument' => $query->createNamedParameter($argument),
 					'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT),
+					'last_checked' => $query->createNamedParameter($this->timeFactory->getTime(), IQueryBuilder::PARAM_INT),
 				]);
 			$query->execute();
 		}
@@ -167,45 +175,40 @@ class JobList implements IJobList {
 	 * @return IJob|null
 	 */
 	public function getNext() {
-		$lastId = $this->getLastJob();
-
 		$query = $this->connection->getQueryBuilder();
 		$query->select('*')
 			->from('jobs')
-			->where($query->expr()->lt('id', $query->createNamedParameter($lastId, IQueryBuilder::PARAM_INT)))
-			->orderBy('id', 'DESC')
+			->where($query->expr()->lte('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 12 * 3600, IQueryBuilder::PARAM_INT)))
+			->orderBy('last_checked', 'ASC')
 			->setMaxResults(1);
+
+		$update = $this->connection->getQueryBuilder();
+		$update->update('jobs')
+			->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime()))
+			->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime()))
+			->where($update->expr()->eq('id', $update->createParameter('jobid')));
+
+		$this->connection->lockTable('jobs');
 		$result = $query->execute();
 		$row = $result->fetch();
 		$result->closeCursor();
 
 		if ($row) {
-			$jobId = $row['id'];
+			$update->setParameter('jobid', $row['id']);
+			$update->execute();
+			$this->connection->unlockTable();
+
 			$job = $this->buildJob($row);
-		} else {
-			//begin at the start of the queue
-			$query = $this->connection->getQueryBuilder();
-			$query->select('*')
-				->from('jobs')
-				->orderBy('id', 'DESC')
-				->setMaxResults(1);
-			$result = $query->execute();
-			$row = $result->fetch();
-			$result->closeCursor();
-
-			if ($row) {
-				$jobId = $row['id'];
-				$job = $this->buildJob($row);
-			} else {
-				return null; //empty job list
+
+			if ($job === null) {
+				// Background job from disabled app, try again.
+				return $this->getNext();
 			}
-		}
 
-		if (is_null($job)) {
-			$this->removeById($jobId);
-			return $this->getNext();
-		} else {
 			return $job;
+		} else {
+			$this->connection->unlockTable();
+			return null;
 		}
 	}
 
@@ -267,13 +270,30 @@ class JobList implements IJobList {
 	 * @param IJob $job
 	 */
 	public function setLastJob($job) {
+		$this->unlockJob($job);
 		$this->config->setAppValue('backgroundjob', 'lastjob', $job->getId());
 	}
 
+	/**
+	 * Remove the reservation for a job
+	 *
+	 * @param IJob $job
+	 */
+	public function unlockJob($job) {
+		$query = $this->connection->getQueryBuilder();
+		$query->update('jobs')
+			->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT))
+			->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
+		$query->execute();
+	}
+
 	/**
 	 * get the id of the last ran job
 	 *
 	 * @return int
+	 * @deprecated 9.1.0 - The functionality behind the value is deprecated, it
+	 *    only tells you which job finished last, but since we now allow multiple
+	 *    executors to run in parallel, it's not used to calculate the next job.
 	 */
 	public function getLastJob() {
 		return (int) $this->config->getAppValue('backgroundjob', 'lastjob', 0);

+ 5 - 1
lib/private/Server.php

@@ -362,7 +362,11 @@ class Server extends ServerContainer implements IServerContainer {
 		});
 		$this->registerService('JobList', function (Server $c) {
 			$config = $c->getConfig();
-			return new \OC\BackgroundJob\JobList($c->getDatabaseConnection(), $config);
+			return new \OC\BackgroundJob\JobList(
+				$c->getDatabaseConnection(),
+				$config,
+				new TimeFactory()
+			);
 		});
 		$this->registerService('Router', function (Server $c) {
 			$cacheFactory = $c->getMemCacheFactory();

+ 11 - 0
lib/public/BackgroundJob/IJobList.php

@@ -92,11 +92,22 @@ interface IJobList {
 	 */
 	public function setLastJob($job);
 
+	/**
+	 * Remove the reservation for a job
+	 *
+	 * @param IJob $job
+	 * @since 9.1.0
+	 */
+	public function unlockJob($job);
+
 	/**
 	 * get the id of the last ran job
 	 *
 	 * @return int
 	 * @since 7.0.0
+	 * @deprecated 9.1.0 - The functionality behind the value is deprecated, it
+	 *    only tells you which job finished last, but since we now allow multiple
+	 *    executors to run in parallel, it's not used to calculate the next job.
 	 */
 	public function getLastJob();
 

+ 60 - 66
tests/lib/BackgroundJob/JobListTest.php

@@ -9,7 +9,7 @@
 namespace Test\BackgroundJob;
 
 use OCP\BackgroundJob\IJob;
-use OCP\IDBConnection;
+use OCP\DB\QueryBuilder\IQueryBuilder;
 use Test\TestCase;
 
 /**
@@ -22,20 +22,31 @@ class JobListTest extends TestCase {
 	/** @var \OC\BackgroundJob\JobList */
 	protected $instance;
 
+	/** @var \OCP\IDBConnection */
+	protected $connection;
+
 	/** @var \OCP\IConfig|\PHPUnit_Framework_MockObject_MockObject */
 	protected $config;
 
+	/** @var \OCP\AppFramework\Utility\ITimeFactory|\PHPUnit_Framework_MockObject_MockObject */
+	protected $timeFactory;
+
 	protected function setUp() {
 		parent::setUp();
 
-		$connection = \OC::$server->getDatabaseConnection();
-		$this->clearJobsList($connection);
-		$this->config = $this->getMock('\OCP\IConfig');
-		$this->instance = new \OC\BackgroundJob\JobList($connection, $this->config);
+		$this->connection = \OC::$server->getDatabaseConnection();
+		$this->clearJobsList();
+		$this->config = $this->getMock('OCP\IConfig');
+		$this->timeFactory = $this->getMock('OCP\AppFramework\Utility\ITimeFactory');
+		$this->instance = new \OC\BackgroundJob\JobList(
+			$this->connection,
+			$this->config,
+			$this->timeFactory
+		);
 	}
 
-	protected function clearJobsList(IDBConnection $connection) {
-		$query = $connection->getQueryBuilder();
+	protected function clearJobsList() {
+		$query = $this->connection->getQueryBuilder();
 		$query->delete('jobs');
 		$query->execute();
 	}
@@ -131,8 +142,6 @@ class JobListTest extends TestCase {
 		$this->instance->add($job, $argument);
 
 		$this->assertFalse($this->instance->has($job, 10));
-
-		$this->instance->remove($job, $argument);
 	}
 
 	public function testGetLastJob() {
@@ -144,50 +153,65 @@ class JobListTest extends TestCase {
 		$this->assertEquals(15, $this->instance->getLastJob());
 	}
 
+	protected function createTempJob($class, $argument, $reservedTime = 0, $lastChecked = 0) {
+		if ($lastChecked === 0) {
+			$lastChecked = time();
+		}
+
+		$query = $this->connection->getQueryBuilder();
+		$query->insert('jobs')
+			->values([
+				'class' => $query->createNamedParameter($class),
+				'argument' => $query->createNamedParameter($argument),
+				'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT),
+				'last_checked' => $query->createNamedParameter($lastChecked, IQueryBuilder::PARAM_INT),
+				'reserved_at' => $query->createNamedParameter($reservedTime, IQueryBuilder::PARAM_INT),
+			]);
+		$query->execute();
+	}
+
 	public function testGetNext() {
 		$job = new TestJob();
-		$this->instance->add($job, 1);
-		$this->instance->add($job, 2);
+		$this->createTempJob(get_class($job), 1, 0, 12345);
+		$this->createTempJob(get_class($job), 2, 0, 12346);
 
 		$jobs = $this->getAllSorted();
+		$savedJob1 = $jobs[0];
 
-		$savedJob1 = $jobs[count($jobs) - 2];
-		$savedJob2 = $jobs[count($jobs) - 1];
-
-		$this->config->expects($this->once())
-			->method('getAppValue')
-			->with('backgroundjob', 'lastjob', 0)
-			->will($this->returnValue($savedJob2->getId()));
-
+		$this->timeFactory->expects($this->atLeastOnce())
+			->method('getTime')
+			->willReturn(123456789);
 		$nextJob = $this->instance->getNext();
 
 		$this->assertEquals($savedJob1, $nextJob);
-
-		$this->instance->remove($job, 1);
-		$this->instance->remove($job, 2);
 	}
 
-	public function testGetNextWrapAround() {
+	public function testGetNextSkipReserved() {
 		$job = new TestJob();
-		$this->instance->add($job, 1);
-		$this->instance->add($job, 2);
+		$this->createTempJob(get_class($job), 1, 123456789, 12345);
+		$this->createTempJob(get_class($job), 2, 0, 12346);
 
-		$jobs = $this->getAllSorted();
+		$this->timeFactory->expects($this->atLeastOnce())
+			->method('getTime')
+			->willReturn(123456789);
+		$nextJob = $this->instance->getNext();
 
-		$savedJob1 = $jobs[count($jobs) - 2];
-		$savedJob2 = $jobs[count($jobs) - 1];
+		$this->assertEquals(get_class($job), get_class($nextJob));
+		$this->assertEquals(2, $nextJob->getArgument());
+	}
 
-		$this->config->expects($this->once())
-			->method('getAppValue')
-			->with('backgroundjob', 'lastjob', 0)
-			->will($this->returnValue($savedJob1->getId()));
+	public function testGetNextSkipNonExisting() {
+		$job = new TestJob();
+		$this->createTempJob('\OC\Non\Existing\Class', 1, 0, 12345);
+		$this->createTempJob(get_class($job), 2, 0, 12346);
 
+		$this->timeFactory->expects($this->atLeastOnce())
+			->method('getTime')
+			->willReturn(123456789);
 		$nextJob = $this->instance->getNext();
 
-		$this->assertEquals($savedJob2, $nextJob);
-
-		$this->instance->remove($job, 1);
-		$this->instance->remove($job, 2);
+		$this->assertEquals(get_class($job), get_class($nextJob));
+		$this->assertEquals(2, $nextJob->getArgument());
 	}
 
 	/**
@@ -203,8 +227,6 @@ class JobListTest extends TestCase {
 		$addedJob = $jobs[count($jobs) - 1];
 
 		$this->assertEquals($addedJob, $this->instance->getById($addedJob->getId()));
-
-		$this->instance->remove($job, $argument);
 	}
 
 	public function testSetLastRun() {
@@ -223,33 +245,5 @@ class JobListTest extends TestCase {
 
 		$this->assertGreaterThanOrEqual($timeStart, $addedJob->getLastRun());
 		$this->assertLessThanOrEqual($timeEnd, $addedJob->getLastRun());
-
-		$this->instance->remove($job);
-	}
-
-	public function testGetNextNonExisting() {
-		$job = new TestJob();
-		$this->instance->add($job, 1);
-		$this->instance->add('\OC\Non\Existing\Class');
-		$this->instance->add($job, 2);
-
-		$jobs = $this->getAllSorted();
-
-		$savedJob1 = $jobs[count($jobs) - 2];
-		$savedJob2 = $jobs[count($jobs) - 1];
-
-		$this->config->expects($this->any())
-			->method('getAppValue')
-			->with('backgroundjob', 'lastjob', 0)
-			->will($this->returnValue($savedJob1->getId()));
-
-		$this->instance->getNext();
-
-		$nextJob = $this->instance->getNext();
-
-		$this->assertEquals($savedJob2, $nextJob);
-
-		$this->instance->remove($job, 1);
-		$this->instance->remove($job, 2);
 	}
 }

+ 1 - 1
version.php

@@ -26,7 +26,7 @@
 // We only can count up. The 4. digit is only for the internal patchlevel to trigger DB upgrades
 // between betas, final and RCs. This is _not_ the public version number. Reset minor/patchlevel
 // when updating major/minor version number.
-$OC_Version = array(9, 1, 0, 2);
+$OC_Version = array(9, 1, 0, 3);
 
 // The human readable string
 $OC_VersionString = '9.1.0 pre alpha';