"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "includes/jobqueue/JobRunner.php" between
mediawiki-1.31.1.tar.gz and mediawiki-1.32.0.tar.gz

About: MediaWiki is a wiki engine (the collaborative editing software that runs for e.g. Wikipedia, the free encyclopedia).

JobRunner.php  (mediawiki-1.31.1):JobRunner.php  (mediawiki-1.32.0)
skipping to change at line 32 skipping to change at line 32
*/ */
use MediaWiki\MediaWikiServices; use MediaWiki\MediaWikiServices;
use MediaWiki\Logger\LoggerFactory; use MediaWiki\Logger\LoggerFactory;
use Liuggio\StatsdClient\Factory\StatsdDataFactory; use Liuggio\StatsdClient\Factory\StatsdDataFactory;
use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Wikimedia\ScopedCallback; use Wikimedia\ScopedCallback;
use Wikimedia\Rdbms\LBFactory; use Wikimedia\Rdbms\LBFactory;
use Wikimedia\Rdbms\DBError; use Wikimedia\Rdbms\DBError;
use Wikimedia\Rdbms\DBReplicationWaitError;
/** /**
* Job queue runner utility methods * Job queue runner utility methods
* *
* @ingroup JobQueue * @ingroup JobQueue
* @since 1.24 * @since 1.24
*/ */
class JobRunner implements LoggerAwareInterface { class JobRunner implements LoggerAwareInterface {
/** @var Config */ /** @var Config */
protected $config; protected $config;
skipping to change at line 72 skipping to change at line 71
/** /**
* @param LoggerInterface $logger * @param LoggerInterface $logger
* @return void * @return void
*/ */
public function setLogger( LoggerInterface $logger ) { public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger; $this->logger = $logger;
} }
/** /**
* @param LoggerInterface $logger * @param LoggerInterface|null $logger
*/ */
public function __construct( LoggerInterface $logger = null ) { public function __construct( LoggerInterface $logger = null ) {
if ( $logger === null ) { if ( $logger === null ) {
$logger = LoggerFactory::getInstance( 'runJobs' ); $logger = LoggerFactory::getInstance( 'runJobs' );
} }
$this->setLogger( $logger ); $this->setLogger( $logger );
$this->config = MediaWikiServices::getInstance()->getMainConfig() ; $this->config = MediaWikiServices::getInstance()->getMainConfig() ;
} }
/** /**
skipping to change at line 112 skipping to change at line 111
* - maxTime : maximum time in seconds before stopping * - maxTime : maximum time in seconds before stopping
* - throttle : whether to respect job backoff configuration * - throttle : whether to respect job backoff configuration
* @return array Summary response that can easily be JSON serialized * @return array Summary response that can easily be JSON serialized
*/ */
public function run( array $options ) { public function run( array $options ) {
$jobClasses = $this->config->get( 'JobClasses' ); $jobClasses = $this->config->get( 'JobClasses' );
$profilerLimits = $this->config->get( 'TrxProfilerLimits' ); $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
$response = [ 'jobs' => [], 'reached' => 'none-ready' ]; $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
$type = isset( $options['type'] ) ? $options['type'] : false; $type = $options['type'] ?? false;
$maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : f $maxJobs = $options['maxJobs'] ?? false;
alse; $maxTime = $options['maxTime'] ?? false;
$maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : f
alse;
$noThrottle = isset( $options['throttle'] ) && !$options['throttl e']; $noThrottle = isset( $options['throttle'] ) && !$options['throttl e'];
// Bail if job type is invalid // Bail if job type is invalid
if ( $type !== false && !isset( $jobClasses[$type] ) ) { if ( $type !== false && !isset( $jobClasses[$type] ) ) {
$response['reached'] = 'none-possible'; $response['reached'] = 'none-possible';
return $response; return $response;
} }
// Bail out if DB is in read-only mode // Bail out if DB is in read-only mode
if ( wfReadOnly() ) { if ( wfReadOnly() ) {
$response['reached'] = 'read-only'; $response['reached'] = 'read-only';
return $response; return $response;
} }
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerF actory(); $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerF actory();
if ( $lbFactory->hasTransactionRound() ) { if ( $lbFactory->hasTransactionRound() ) {
throw new LogicException( __METHOD__ . ' called with an a ctive transaction round.' ); throw new LogicException( __METHOD__ . ' called with an a ctive transaction round.' );
} }
// Bail out if there is too much DB lag. // Bail out if there is too much DB lag.
// This check should not block as we want to try other wiki queue s. // This check should not block as we want to try other wiki queue s.
list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLa g(); list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag();
if ( $maxLag >= self::MAX_ALLOWED_LAG ) { if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
$response['reached'] = 'replica-lag-limit'; $response['reached'] = 'replica-lag-limit';
return $response; return $response;
} }
// Catch huge single updates that lead to replica DB lag // Catch huge single updates that lead to replica DB lag
$trxProfiler = Profiler::instance()->getTransactionProfiler(); $trxProfiler = Profiler::instance()->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerforman ce' ) ); $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerforman ce' ) );
$trxProfiler->setExpectations( $profilerLimits['JobRunner'], __ME THOD__ ); $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __ME THOD__ );
skipping to change at line 228 skipping to change at line 227
} elseif ( $maxTime && ( microtime( true ) - $sta rtTime ) > $maxTime ) { } elseif ( $maxTime && ( microtime( true ) - $sta rtTime ) > $maxTime ) {
$response['reached'] = 'time-limit'; $response['reached'] = 'time-limit';
break; break;
} }
// Don't let any of the main DB replica DBs get b acked up. // Don't let any of the main DB replica DBs get b acked up.
// This only waits for so long before exiting and letting // This only waits for so long before exiting and letting
// other wikis in the farm (on different masters) get a chance. // other wikis in the farm (on different masters) get a chance.
$timePassed = microtime( true ) - $lastCheckTime; $timePassed = microtime( true ) - $lastCheckTime;
if ( $timePassed >= self::LAG_CHECK_PERIOD || $ti mePassed < 0 ) { if ( $timePassed >= self::LAG_CHECK_PERIOD || $ti mePassed < 0 ) {
try { $success = $lbFactory->waitForReplication
$lbFactory->waitForReplication( [ ( [
'ifWritesSince' => $lastC 'ifWritesSince' => $lastCheckTime
heckTime, ,
'timeout' => self::MAX_AL 'timeout' => self::MAX_ALLOWED_LA
LOWED_LAG G,
] ); ] );
} catch ( DBReplicationWaitError $e ) { if ( !$success ) {
$response['reached'] = 'replica-l ag-limit'; $response['reached'] = 'replica-l ag-limit';
break; break;
} }
$lastCheckTime = microtime( true ); $lastCheckTime = microtime( true );
} }
// Don't let any queue replica DBs/backups fall b
ehind
if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) ==
0 ) {
$group->waitForBackups();
}
// Bail if near-OOM instead of in a job // Bail if near-OOM instead of in a job
if ( !$this->checkMemoryOK() ) { if ( !$this->checkMemoryOK() ) {
$response['reached'] = 'memory-limit'; $response['reached'] = 'memory-limit';
break; break;
} }
} }
} while ( $job ); // stop when there are no jobs } while ( $job ); // stop when there are no jobs
// Sync the persistent backoffs for the next runJobs.php pass // Sync the persistent backoffs for the next runJobs.php pass
skipping to change at line 299 skipping to change at line 293
$rssStart = $this->getMaxRssKb(); $rssStart = $this->getMaxRssKb();
$jobStartTime = microtime( true ); $jobStartTime = microtime( true );
try { try {
$fnameTrxOwner = get_class( $job ) . '::run'; // give run () outer scope $fnameTrxOwner = get_class( $job ) . '::run'; // give run () outer scope
if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_R OUND ) ) { if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_R OUND ) ) {
$lbFactory->beginMasterChanges( $fnameTrxOwner ); $lbFactory->beginMasterChanges( $fnameTrxOwner );
} }
$status = $job->run(); $status = $job->run();
$error = $job->getLastError(); $error = $job->getLastError();
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOw ner ); $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOw ner );
// Important: this must be the last deferred update added
(T100085, T154425)
DeferredUpdates::addCallableUpdate( [ JobQueueGroup::clas
s, 'pushLazyJobs' ] );
// Run any deferred update tasks; doUpdates() manages tra nsactions itself // Run any deferred update tasks; doUpdates() manages tra nsactions itself
DeferredUpdates::doUpdates(); DeferredUpdates::doUpdates();
} catch ( Exception $e ) { } catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e ); MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false; $status = false;
$error = get_class( $e ) . ': ' . $e->getMessage(); $error = get_class( $e ) . ': ' . $e->getMessage();
} }
// Always attempt to call teardown() even if Job throws exception . // Always attempt to call teardown() even if Job throws exception .
try { try {
$job->teardown( $status ); $job->teardown( $status );
skipping to change at line 545 skipping to change at line 537
* *
* @param LBFactory $lbFactory * @param LBFactory $lbFactory
* @param Job $job * @param Job $job
* @param string $fnameTrxOwner * @param string $fnameTrxOwner
* @throws DBError * @throws DBError
*/ */
private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fn ameTrxOwner ) { private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fn ameTrxOwner ) {
$syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ) ; $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ) ;
$time = false; $time = false;
$lb = $lbFactory->getMainLB( wfWikiID() ); $lb = $lbFactory->getMainLB();
if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
// Generally, there is one master connection to the local DB // Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterInd ex() ); $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterInd ex() );
// We need natively blocking fast locks // We need natively blocking fast locks
if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
$time = $dbwSerial->pendingWriteQueryDuration( $d bwSerial::ESTIMATE_DB_APPLY ); $time = $dbwSerial->pendingWriteQueryDuration( $d bwSerial::ESTIMATE_DB_APPLY );
if ( $time < $syncThreshold ) { if ( $time < $syncThreshold ) {
$dbwSerial = false; $dbwSerial = false;
} }
} else { } else {
skipping to change at line 585 skipping to change at line 577
$msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms }ms of writes]"; $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms }ms of writes]";
$this->logger->info( $msg, [ $this->logger->info( $msg, [
'job_type' => $job->getType(), 'job_type' => $job->getType(),
'job_commit_write_ms' => $ms, 'job_commit_write_ms' => $ms,
] ); ] );
$msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
$this->debugCallback( $msg ); $this->debugCallback( $msg );
// Wait for an exclusive lock to commit // Wait for an exclusive lock to commit
if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) { if ( !$dbwSerial->lock( 'jobrunner-serial-commit', $fnameTrxOwner , 30 ) ) {
// This will trigger a rollback in the main loop // This will trigger a rollback in the main loop
throw new DBError( $dbwSerial, "Timed out waiting on comm it queue." ); throw new DBError( $dbwSerial, "Timed out waiting on comm it queue." );
} }
$unlocker = new ScopedCallback( function () use ( $dbwSerial ) { $unlocker = new ScopedCallback( function () use ( $dbwSerial, $fn
$dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ ameTrxOwner ) {
); $dbwSerial->unlock( 'jobrunner-serial-commit', $fnameTrxO
wner );
} ); } );
// Wait for the replica DBs to catch up // Wait for the replica DBs to catch up
$pos = $lb->getMasterPos(); $pos = $lb->getMasterPos();
if ( $pos ) { if ( $pos ) {
$lb->waitForAll( $pos ); $lb->waitForAll( $pos );
} }
// Actually commit the DB master changes // Actually commit the DB master changes
$lbFactory->commitMasterChanges( $lbFactory->commitMasterChanges(
 End of changes. 10 change blocks. 
31 lines changed or deleted 19 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)