"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "includes/jobqueue/JobQueueDB.php" between
mediawiki-1.31.1.tar.gz and mediawiki-1.32.0.tar.gz

About: MediaWiki is a wiki engine (the collaborative editing software that runs for e.g. Wikipedia, the free encyclopedia).

JobQueueDB.php  (mediawiki-1.31.1):JobQueueDB.php  (mediawiki-1.32.0)
skipping to change at line 58 skipping to change at line 58
* Additional parameters include: * Additional parameters include:
* - cluster : The name of an external cluster registered via LBFactory . * - cluster : The name of an external cluster registered via LBFactory .
* If not specified, the primary DB cluster for the wiki wi ll be used. * If not specified, the primary DB cluster for the wiki wi ll be used.
* This can be overridden with a custom cluster so that DB handles will * This can be overridden with a custom cluster so that DB handles will
* be retrieved via LBFactory::getExternalLB() and getConne ction(). * be retrieved via LBFactory::getExternalLB() and getConne ction().
* @param array $params * @param array $params
*/ */
protected function __construct( array $params ) { protected function __construct( array $params ) {
parent::__construct( $params ); parent::__construct( $params );
$this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; $this->cluster = $params['cluster'] ?? false;
$this->cache = ObjectCache::getMainWANInstance(); $this->cache = ObjectCache::getMainWANInstance();
} }
protected function supportedOrders() { protected function supportedOrders() {
return [ 'random', 'timestamp', 'fifo' ]; return [ 'random', 'timestamp', 'fifo' ];
} }
protected function optimalOrder() { protected function optimalOrder() {
return 'random'; return 'random';
} }
skipping to change at line 199 skipping to change at line 199
// a) sqlite; DB connection is probably a regular round-aware han dle. // a) sqlite; DB connection is probably a regular round-aware han dle.
// If the connection is busy with a transaction, then defer the j ob writes // If the connection is busy with a transaction, then defer the j ob writes
// until right before the main round commit step. Any errors that bubble // until right before the main round commit step. Any errors that bubble
// up will rollback the main commit round. // up will rollback the main commit round.
// b) mysql/postgres; DB connection is generally a separate CONN_ TRX_AUTOCOMMIT handle. // b) mysql/postgres; DB connection is generally a separate CONN_ TRX_AUTOCOMMIT handle.
// No transaction is active nor will be started by writes, so enq ueue the jobs // No transaction is active nor will be started by writes, so enq ueue the jobs
// now so that any errors will show up immediately as the interfa ce expects. Any // now so that any errors will show up immediately as the interfa ce expects. Any
// errors that bubble up will rollback the main commit round. // errors that bubble up will rollback the main commit round.
$fname = __METHOD__; $fname = __METHOD__;
$dbw->onTransactionPreCommitOrIdle( $dbw->onTransactionPreCommitOrIdle(
function () use ( $dbw, $jobs, $flags, $fname ) { function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
$this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ); $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
}, },
$fname $fname
); );
} }
/** /**
* This function should *not* be called outside of JobQueueDB * This function should *not* be called outside of JobQueueDB
* *
* @param IDatabase $dbw * @param IDatabase $dbw
skipping to change at line 224 skipping to change at line 224
* @return void * @return void
*/ */
public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
if ( !count( $jobs ) ) { if ( !count( $jobs ) ) {
return; return;
} }
$rowSet = []; // (sha1 => job) map for jobs that are de-duplicate d $rowSet = []; // (sha1 => job) map for jobs that are de-duplicate d
$rowList = []; // list of jobs for jobs that are not de-duplicate d $rowList = []; // list of jobs for jobs that are not de-duplicate d
foreach ( $jobs as $job ) { foreach ( $jobs as $job ) {
$row = $this->insertFields( $job ); $row = $this->insertFields( $job, $dbw );
if ( $job->ignoreDuplicates() ) { if ( $job->ignoreDuplicates() ) {
$rowSet[$row['job_sha1']] = $row; $rowSet[$row['job_sha1']] = $row;
} else { } else {
$rowList[] = $row; $rowList[] = $row;
} }
} }
if ( $flags & self::QOS_ATOMIC ) { if ( $flags & self::QOS_ATOMIC ) {
$dbw->startAtomic( $method ); // wrap all the job additio ns in one transaction $dbw->startAtomic( $method ); // wrap all the job additio ns in one transaction
} }
skipping to change at line 267 skipping to change at line 267
JobQueue::incrStats( 'inserts', $this->type, count( $rows ) ); JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
JobQueue::incrStats( 'dupe_inserts', $this->type, JobQueue::incrStats( 'dupe_inserts', $this->type,
count( $rowSet ) + count( $rowList ) - count( $ro ws ) count( $rowSet ) + count( $rowList ) - count( $ro ws )
); );
} catch ( DBError $e ) { } catch ( DBError $e ) {
$this->throwDBException( $e ); $this->throwDBException( $e );
} }
if ( $flags & self::QOS_ATOMIC ) { if ( $flags & self::QOS_ATOMIC ) {
$dbw->endAtomic( $method ); $dbw->endAtomic( $method );
} }
return;
} }
/** /**
* @see JobQueue::doPop() * @see JobQueue::doPop()
* @return Job|bool * @return Job|bool
*/ */
protected function doPop() { protected function doPop() {
$dbw = $this->getMasterDB(); $dbw = $this->getMasterDB();
try { try {
$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setti ng $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setti ng
skipping to change at line 503 skipping to change at line 501
* @return bool * @return bool
*/ */
protected function doDeduplicateRootJob( IJobSpecification $job ) { protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getParams(); $params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) { if ( !isset( $params['rootJobSignature'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
} elseif ( !isset( $params['rootJobTimestamp'] ) ) { } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
} }
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
// Callers should call batchInsert() and then this function so th at if the insert // Callers should call JobQueueGroup::push() before this method s o that if the insert
// fails, the de-duplication registration will be aborted. Since the insert is // fails, the de-duplication registration will be aborted. Since the insert is
// deferred till "transaction idle", do the same here, so that th e ordering is // deferred till "transaction idle", do the same here, so that th e ordering is
// maintained. Having only the de-duplication registration succee d would cause // maintained. Having only the de-duplication registration succee d would cause
// jobs to become no-ops without any actual jobs that made them r edundant. // jobs to become no-ops without any actual jobs that made them r edundant.
$dbw = $this->getMasterDB(); $dbw = $this->getMasterDB();
$cache = $this->dupCache; $cache = $this->dupCache;
$dbw->onTransactionIdle( $dbw->onTransactionCommitOrIdle(
function () use ( $cache, $params, $key, $dbw ) { function () use ( $cache, $params, $key ) {
$timestamp = $cache->get( $key ); // current last timestamp of this job $timestamp = $cache->get( $key ); // current last timestamp of this job
if ( $timestamp && $timestamp >= $params['rootJob Timestamp'] ) { if ( $timestamp && $timestamp >= $params['rootJob Timestamp'] ) {
return true; // a newer version of this r oot job was enqueued return true; // a newer version of this r oot job was enqueued
} }
// Update the timestamp of the last root job star ted at the location... // Update the timestamp of the last root job star ted at the location...
return $cache->set( $key, $params['rootJobTimesta mp'], JobQueueDB::ROOTJOB_TTL ); return $cache->set( $key, $params['rootJobTimesta mp'], JobQueueDB::ROOTJOB_TTL );
}, },
__METHOD__ __METHOD__
); );
skipping to change at line 725 skipping to change at line 723
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD_ _ ); $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD_ _ );
} catch ( DBError $e ) { } catch ( DBError $e ) {
$this->throwDBException( $e ); $this->throwDBException( $e );
} }
return $count; return $count;
} }
/** /**
* @param IJobSpecification $job * @param IJobSpecification $job
* @param IDatabase $db
* @return array * @return array
*/ */
protected function insertFields( IJobSpecification $job ) { protected function insertFields( IJobSpecification $job, IDatabase $db )
$dbw = $this->getMasterDB(); {
return [ return [
// Fields that describe the nature of the job // Fields that describe the nature of the job
'job_cmd' => $job->getType(), 'job_cmd' => $job->getType(),
'job_namespace' => $job->getTitle()->getNamespace(), 'job_namespace' => $job->getTitle()->getNamespace(),
'job_title' => $job->getTitle()->getDBkey(), 'job_title' => $job->getTitle()->getDBkey(),
'job_params' => self::makeBlob( $job->getParams() ), 'job_params' => self::makeBlob( $job->getParams() ),
// Additional job metadata // Additional job metadata
'job_timestamp' => $dbw->timestamp(), 'job_timestamp' => $db->timestamp(),
'job_sha1' => Wikimedia\base_convert( 'job_sha1' => Wikimedia\base_convert(
sha1( serialize( $job->getDeduplicationInfo() ) ) , sha1( serialize( $job->getDeduplicationInfo() ) ) ,
16, 36, 31 16, 36, 31
), ),
'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
]; ];
} }
/** /**
* @throws JobQueueConnectionError * @throws JobQueueConnectionError
skipping to change at line 793 skipping to change at line 790
? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CO NN_TRX_AUTOCOMMIT ) ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CO NN_TRX_AUTOCOMMIT )
// Jobs insertion will be defered until the PRESEND stage to reduce contention. // Jobs insertion will be defered until the PRESEND stage to reduce contention.
: $lb->getConnectionRef( $index, [], $this->wiki ); : $lb->getConnectionRef( $index, [], $this->wiki );
} }
/** /**
* @param string $property * @param string $property
* @return string * @return string
*/ */
private function getCacheKey( $property ) { private function getCacheKey( $property ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
$cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $thi return $this->cache->makeGlobalKey(
s->type, $property ); 'jobqueue', $this->wiki, $cluster, $this->type, $property
);
} }
/** /**
* @param array|bool $params * @param array|bool $params
* @return string * @return string
*/ */
protected static function makeBlob( $params ) { protected static function makeBlob( $params ) {
if ( $params !== false ) { if ( $params !== false ) {
return serialize( $params ); return serialize( $params );
} else { } else {
 End of changes. 11 change blocks. 
15 lines changed or deleted 13 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)