JobRunner.php (mediawiki-1.31.1) | : | JobRunner.php (mediawiki-1.32.0) | ||
---|---|---|---|---|
skipping to change at line 32 | skipping to change at line 32 | |||
*/ | */ | |||
use MediaWiki\MediaWikiServices; | use MediaWiki\MediaWikiServices; | |||
use MediaWiki\Logger\LoggerFactory; | use MediaWiki\Logger\LoggerFactory; | |||
use Liuggio\StatsdClient\Factory\StatsdDataFactory; | use Liuggio\StatsdClient\Factory\StatsdDataFactory; | |||
use Psr\Log\LoggerAwareInterface; | use Psr\Log\LoggerAwareInterface; | |||
use Psr\Log\LoggerInterface; | use Psr\Log\LoggerInterface; | |||
use Wikimedia\ScopedCallback; | use Wikimedia\ScopedCallback; | |||
use Wikimedia\Rdbms\LBFactory; | use Wikimedia\Rdbms\LBFactory; | |||
use Wikimedia\Rdbms\DBError; | use Wikimedia\Rdbms\DBError; | |||
use Wikimedia\Rdbms\DBReplicationWaitError; | ||||
/** | /** | |||
* Job queue runner utility methods | * Job queue runner utility methods | |||
* | * | |||
* @ingroup JobQueue | * @ingroup JobQueue | |||
* @since 1.24 | * @since 1.24 | |||
*/ | */ | |||
class JobRunner implements LoggerAwareInterface { | class JobRunner implements LoggerAwareInterface { | |||
/** @var Config */ | /** @var Config */ | |||
protected $config; | protected $config; | |||
skipping to change at line 72 | skipping to change at line 71 | |||
/** | /** | |||
* @param LoggerInterface $logger | * @param LoggerInterface $logger | |||
* @return void | * @return void | |||
*/ | */ | |||
public function setLogger( LoggerInterface $logger ) { | public function setLogger( LoggerInterface $logger ) { | |||
$this->logger = $logger; | $this->logger = $logger; | |||
} | } | |||
/** | /** | |||
* @param LoggerInterface $logger | * @param LoggerInterface|null $logger | |||
*/ | */ | |||
public function __construct( LoggerInterface $logger = null ) { | public function __construct( LoggerInterface $logger = null ) { | |||
if ( $logger === null ) { | if ( $logger === null ) { | |||
$logger = LoggerFactory::getInstance( 'runJobs' ); | $logger = LoggerFactory::getInstance( 'runJobs' ); | |||
} | } | |||
$this->setLogger( $logger ); | $this->setLogger( $logger ); | |||
$this->config = MediaWikiServices::getInstance()->getMainConfig() ; | $this->config = MediaWikiServices::getInstance()->getMainConfig() ; | |||
} | } | |||
/** | /** | |||
skipping to change at line 112 | skipping to change at line 111 | |||
* - maxTime : maximum time in seconds before stopping | * - maxTime : maximum time in seconds before stopping | |||
* - throttle : whether to respect job backoff configuration | * - throttle : whether to respect job backoff configuration | |||
* @return array Summary response that can easily be JSON serialized | * @return array Summary response that can easily be JSON serialized | |||
*/ | */ | |||
public function run( array $options ) { | public function run( array $options ) { | |||
$jobClasses = $this->config->get( 'JobClasses' ); | $jobClasses = $this->config->get( 'JobClasses' ); | |||
$profilerLimits = $this->config->get( 'TrxProfilerLimits' ); | $profilerLimits = $this->config->get( 'TrxProfilerLimits' ); | |||
$response = [ 'jobs' => [], 'reached' => 'none-ready' ]; | $response = [ 'jobs' => [], 'reached' => 'none-ready' ]; | |||
$type = isset( $options['type'] ) ? $options['type'] : false; | $type = $options['type'] ?? false; | |||
$maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : f | $maxJobs = $options['maxJobs'] ?? false; | |||
alse; | $maxTime = $options['maxTime'] ?? false; | |||
$maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : f | ||||
alse; | ||||
$noThrottle = isset( $options['throttle'] ) && !$options['throttl e']; | $noThrottle = isset( $options['throttle'] ) && !$options['throttl e']; | |||
// Bail if job type is invalid | // Bail if job type is invalid | |||
if ( $type !== false && !isset( $jobClasses[$type] ) ) { | if ( $type !== false && !isset( $jobClasses[$type] ) ) { | |||
$response['reached'] = 'none-possible'; | $response['reached'] = 'none-possible'; | |||
return $response; | return $response; | |||
} | } | |||
// Bail out if DB is in read-only mode | // Bail out if DB is in read-only mode | |||
if ( wfReadOnly() ) { | if ( wfReadOnly() ) { | |||
$response['reached'] = 'read-only'; | $response['reached'] = 'read-only'; | |||
return $response; | return $response; | |||
} | } | |||
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerF actory(); | $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerF actory(); | |||
if ( $lbFactory->hasTransactionRound() ) { | if ( $lbFactory->hasTransactionRound() ) { | |||
throw new LogicException( __METHOD__ . ' called with an a ctive transaction round.' ); | throw new LogicException( __METHOD__ . ' called with an a ctive transaction round.' ); | |||
} | } | |||
// Bail out if there is too much DB lag. | // Bail out if there is too much DB lag. | |||
// This check should not block as we want to try other wiki queue s. | // This check should not block as we want to try other wiki queue s. | |||
list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLa g(); | list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag(); | |||
if ( $maxLag >= self::MAX_ALLOWED_LAG ) { | if ( $maxLag >= self::MAX_ALLOWED_LAG ) { | |||
$response['reached'] = 'replica-lag-limit'; | $response['reached'] = 'replica-lag-limit'; | |||
return $response; | return $response; | |||
} | } | |||
// Catch huge single updates that lead to replica DB lag | // Catch huge single updates that lead to replica DB lag | |||
$trxProfiler = Profiler::instance()->getTransactionProfiler(); | $trxProfiler = Profiler::instance()->getTransactionProfiler(); | |||
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerforman ce' ) ); | $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerforman ce' ) ); | |||
$trxProfiler->setExpectations( $profilerLimits['JobRunner'], __ME THOD__ ); | $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __ME THOD__ ); | |||
skipping to change at line 228 | skipping to change at line 227 | |||
} elseif ( $maxTime && ( microtime( true ) - $sta rtTime ) > $maxTime ) { | } elseif ( $maxTime && ( microtime( true ) - $sta rtTime ) > $maxTime ) { | |||
$response['reached'] = 'time-limit'; | $response['reached'] = 'time-limit'; | |||
break; | break; | |||
} | } | |||
// Don't let any of the main DB replica DBs get b acked up. | // Don't let any of the main DB replica DBs get b acked up. | |||
// This only waits for so long before exiting and letting | // This only waits for so long before exiting and letting | |||
// other wikis in the farm (on different masters) get a chance. | // other wikis in the farm (on different masters) get a chance. | |||
$timePassed = microtime( true ) - $lastCheckTime; | $timePassed = microtime( true ) - $lastCheckTime; | |||
if ( $timePassed >= self::LAG_CHECK_PERIOD || $ti mePassed < 0 ) { | if ( $timePassed >= self::LAG_CHECK_PERIOD || $ti mePassed < 0 ) { | |||
try { | $success = $lbFactory->waitForReplication | |||
$lbFactory->waitForReplication( [ | ( [ | |||
'ifWritesSince' => $lastC | 'ifWritesSince' => $lastCheckTime | |||
heckTime, | , | |||
'timeout' => self::MAX_AL | 'timeout' => self::MAX_ALLOWED_LA | |||
LOWED_LAG | G, | |||
] ); | ] ); | |||
} catch ( DBReplicationWaitError $e ) { | if ( !$success ) { | |||
$response['reached'] = 'replica-l ag-limit'; | $response['reached'] = 'replica-l ag-limit'; | |||
break; | break; | |||
} | } | |||
$lastCheckTime = microtime( true ); | $lastCheckTime = microtime( true ); | |||
} | } | |||
// Don't let any queue replica DBs/backups fall b | ||||
ehind | ||||
if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == | ||||
0 ) { | ||||
$group->waitForBackups(); | ||||
} | ||||
// Bail if near-OOM instead of in a job | // Bail if near-OOM instead of in a job | |||
if ( !$this->checkMemoryOK() ) { | if ( !$this->checkMemoryOK() ) { | |||
$response['reached'] = 'memory-limit'; | $response['reached'] = 'memory-limit'; | |||
break; | break; | |||
} | } | |||
} | } | |||
} while ( $job ); // stop when there are no jobs | } while ( $job ); // stop when there are no jobs | |||
// Sync the persistent backoffs for the next runJobs.php pass | // Sync the persistent backoffs for the next runJobs.php pass | |||
skipping to change at line 299 | skipping to change at line 293 | |||
$rssStart = $this->getMaxRssKb(); | $rssStart = $this->getMaxRssKb(); | |||
$jobStartTime = microtime( true ); | $jobStartTime = microtime( true ); | |||
try { | try { | |||
$fnameTrxOwner = get_class( $job ) . '::run'; // give run () outer scope | $fnameTrxOwner = get_class( $job ) . '::run'; // give run () outer scope | |||
if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_R OUND ) ) { | if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_R OUND ) ) { | |||
$lbFactory->beginMasterChanges( $fnameTrxOwner ); | $lbFactory->beginMasterChanges( $fnameTrxOwner ); | |||
} | } | |||
$status = $job->run(); | $status = $job->run(); | |||
$error = $job->getLastError(); | $error = $job->getLastError(); | |||
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOw ner ); | $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOw ner ); | |||
// Important: this must be the last deferred update added | ||||
(T100085, T154425) | ||||
DeferredUpdates::addCallableUpdate( [ JobQueueGroup::clas | ||||
s, 'pushLazyJobs' ] ); | ||||
// Run any deferred update tasks; doUpdates() manages tra nsactions itself | // Run any deferred update tasks; doUpdates() manages tra nsactions itself | |||
DeferredUpdates::doUpdates(); | DeferredUpdates::doUpdates(); | |||
} catch ( Exception $e ) { | } catch ( Exception $e ) { | |||
MWExceptionHandler::rollbackMasterChangesAndLog( $e ); | MWExceptionHandler::rollbackMasterChangesAndLog( $e ); | |||
$status = false; | $status = false; | |||
$error = get_class( $e ) . ': ' . $e->getMessage(); | $error = get_class( $e ) . ': ' . $e->getMessage(); | |||
} | } | |||
// Always attempt to call teardown() even if Job throws exception . | // Always attempt to call teardown() even if Job throws exception . | |||
try { | try { | |||
$job->teardown( $status ); | $job->teardown( $status ); | |||
skipping to change at line 545 | skipping to change at line 537 | |||
* | * | |||
* @param LBFactory $lbFactory | * @param LBFactory $lbFactory | |||
* @param Job $job | * @param Job $job | |||
* @param string $fnameTrxOwner | * @param string $fnameTrxOwner | |||
* @throws DBError | * @throws DBError | |||
*/ | */ | |||
private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fn ameTrxOwner ) { | private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fn ameTrxOwner ) { | |||
$syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ) ; | $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ) ; | |||
$time = false; | $time = false; | |||
$lb = $lbFactory->getMainLB( wfWikiID() ); | $lb = $lbFactory->getMainLB(); | |||
if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { | if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { | |||
// Generally, there is one master connection to the local DB | // Generally, there is one master connection to the local DB | |||
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterInd ex() ); | $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterInd ex() ); | |||
// We need natively blocking fast locks | // We need natively blocking fast locks | |||
if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { | if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { | |||
$time = $dbwSerial->pendingWriteQueryDuration( $d bwSerial::ESTIMATE_DB_APPLY ); | $time = $dbwSerial->pendingWriteQueryDuration( $d bwSerial::ESTIMATE_DB_APPLY ); | |||
if ( $time < $syncThreshold ) { | if ( $time < $syncThreshold ) { | |||
$dbwSerial = false; | $dbwSerial = false; | |||
} | } | |||
} else { | } else { | |||
skipping to change at line 585 | skipping to change at line 577 | |||
$msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms }ms of writes]"; | $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms }ms of writes]"; | |||
$this->logger->info( $msg, [ | $this->logger->info( $msg, [ | |||
'job_type' => $job->getType(), | 'job_type' => $job->getType(), | |||
'job_commit_write_ms' => $ms, | 'job_commit_write_ms' => $ms, | |||
] ); | ] ); | |||
$msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; | $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; | |||
$this->debugCallback( $msg ); | $this->debugCallback( $msg ); | |||
// Wait for an exclusive lock to commit | // Wait for an exclusive lock to commit | |||
if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) { | if ( !$dbwSerial->lock( 'jobrunner-serial-commit', $fnameTrxOwner , 30 ) ) { | |||
// This will trigger a rollback in the main loop | // This will trigger a rollback in the main loop | |||
throw new DBError( $dbwSerial, "Timed out waiting on comm it queue." ); | throw new DBError( $dbwSerial, "Timed out waiting on comm it queue." ); | |||
} | } | |||
$unlocker = new ScopedCallback( function () use ( $dbwSerial ) { | $unlocker = new ScopedCallback( function () use ( $dbwSerial, $fn | |||
$dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ | ameTrxOwner ) { | |||
); | $dbwSerial->unlock( 'jobrunner-serial-commit', $fnameTrxO | |||
wner ); | ||||
} ); | } ); | |||
// Wait for the replica DBs to catch up | // Wait for the replica DBs to catch up | |||
$pos = $lb->getMasterPos(); | $pos = $lb->getMasterPos(); | |||
if ( $pos ) { | if ( $pos ) { | |||
$lb->waitForAll( $pos ); | $lb->waitForAll( $pos ); | |||
} | } | |||
// Actually commit the DB master changes | // Actually commit the DB master changes | |||
$lbFactory->commitMasterChanges( | $lbFactory->commitMasterChanges( | |||
End of changes. 10 change blocks. | ||||
31 lines changed or deleted | 19 lines changed or added |