multicastmanager.class.php (fogproject-1.5.8) | : | multicastmanager.class.php (fogproject-1.5.9) | ||
---|---|---|---|---|
skipping to change at line 105 | skipping to change at line 105 | |||
); | ); | |||
} | } | |||
/** | /** | |||
* Tests if the multicast task is new | * Tests if the multicast task is new | |||
* | * | |||
* @param array $KnownTasks the known tasks | * @param array $KnownTasks the known tasks | |||
* @param int $id test if the id is new | * @param int $id test if the id is new | |||
* | * | |||
* @return bool | * @return bool | |||
*/ | */ | |||
private static function _isMCTaskNew( | private static function _isMCTaskInList( | |||
$KnownTasks, | $Tasks, | |||
$id | $id | |||
) { | ) { | |||
foreach ((array)$KnownTasks as &$Known) { | if (count($Tasks) < 1) { | |||
if ($Known->getID() == $id) { | return false; | |||
return false; | } | |||
foreach ((array)$Tasks as &$Task) { | ||||
if ($Task->getID() == $id) { | ||||
return true; | ||||
} | } | |||
unset($Known); | unset($Task); | |||
} | } | |||
return true; | return false; | |||
} | } | |||
/** | /** | |||
* Gets the multicast task | * Gets the multicast task | |||
* | * | |||
* @param array $KnownTasks the known tasks | * @param array $KnownTasks the known tasks | |||
* @param int $id the id to get | * @param int $id the id to get | |||
* | * | |||
* @return object | * @return object | |||
*/ | */ | |||
private static function _getMCExistingTask( | private static function _getMCExistingTask( | |||
$KnownTasks, | $KnownTasks, | |||
$id | $curTask | |||
) { | ) { | |||
foreach ((array)$KnownTasks as &$Known) { | foreach ((array)$KnownTasks as &$Known) { | |||
if ($Known->getID() == $id) { | if ($Known->getID() == $curTask->getID()) { | |||
// This is very important for MC session joins via PXE menu | ||||
$curTaskTaskIDs = $curTask->getTaskIDs(); | ||||
if (count($curTaskTaskIDs) > count($Known->getTaskIDs())) { | ||||
$Known->setTaskIDs($curTaskTaskIDs); | ||||
} | ||||
return $Known; | return $Known; | |||
} | } | |||
unset($Known); | unset($Known); | |||
} | } | |||
return false; | return false; | |||
} | } | |||
/** | /** | |||
* Removes task from the known list | * Removes task from the known list | |||
* | * | |||
* @param array $KnownTasks the known tasks | * @param array $KnownTasks the known tasks | |||
skipping to change at line 168 | skipping to change at line 176 | |||
} | } | |||
/** | /** | |||
* Multicast tasks are a bit more than | * Multicast tasks are a bit more than | |||
* the others, this is its service loop | * the others, this is its service loop | |||
* | * | |||
* @return void | * @return void | |||
*/ | */ | |||
private function _serviceLoop() | private function _serviceLoop() | |||
{ | { | |||
$KnownTasks = []; | $KnownTasks = []; | |||
$queueTasks = []; | ||||
while (true) { | while (true) { | |||
// Ensure we have a fresh complete and cancel variable. | // Ensure we have a fresh complete and cancel variable. | |||
$completeTasks = $cancelTasks = []; | $completeTasks = $cancelTasks = []; | |||
$queueTasks = []; | ||||
// Handles the sleep timer for us. | // Handles the sleep timer for us. | |||
$date = self::niceDate(); | $date = self::niceDate(); | |||
if (!isset($nextrun)) { | if (!isset($nextrun)) { | |||
$first = true; | $first = true; | |||
$nextrun = clone $date; | $nextrun = clone $date; | |||
} | } | |||
// Actually holds and loops until the proper sleep time is met. | // Actually holds and loops until the proper sleep time is met. | |||
if ($date < $nextrun && $first === false) { | if ($date < $nextrun && $first === false) { | |||
usleep(100000); | usleep(100000); | |||
skipping to change at line 237 | skipping to change at line 245 | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
foreach ($allTasks as &$curTask) { | foreach ($allTasks as &$curTask) { | |||
$totalSlots = $StorageNode->get('maxClients'); | $totalSlots = $StorageNode->get('maxClients'); | |||
$usedSlots = $StorageNode->getUsedSlotCount(); | $usedSlots = $StorageNode->getUsedSlotCount(); | |||
$queuedSlots = $StorageNode->getQueuedSlotCount(); | $queuedSlots = $StorageNode->getQueuedSlotCount(); | |||
$groupOpenSlots = $totalSlots - $usedSlots; | $groupOpenSlots = $totalSlots - $usedSlots; | |||
$new = self::_isMCTaskNew( | $existing = self::_isMCTaskInList( | |||
$KnownTasks, | $KnownTasks, | |||
$curTask->getID() | $curTask->getID() | |||
); | ); | |||
$queued = self::_isMCTaskInList( | ||||
$queueTasks, | ||||
$curTask->getID() | ||||
); | ||||
if ($new) { | if (!$existing) { | |||
$KnownTasks[] = $curTask; | if ($groupOpenSlots < 1) { | |||
self::outall( | if ($queued) { | |||
sprintf( | continue; | |||
$startStr, | } | |||
$curTask->getID(), | self::outall( | |||
$curTask->getName(), | sprintf( | |||
_('is new') | $startStr, | |||
) | $curTask->getID(), | |||
); | $curTask->getName(), | |||
_(' No open slots ') | ||||
) | ||||
); | ||||
$curTask->getSess()->set('stateID', 1); | ||||
if (!$curTask->getSess()->save()) { | ||||
throw new Exception(_('Failed to update Task | ||||
')); | ||||
} else { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_(' Task state has been updated, now | ||||
the task is queued!') | ||||
) | ||||
); | ||||
} | ||||
$queueTasks[] = $curTask; | ||||
continue; | ||||
} | ||||
if (!file_exists($curTask->getImagePath())) { | if (!file_exists($curTask->getImagePath())) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('failed to execute, image file: ') | _('failed to execute, image file: ') | |||
. $curTask->getImagePath() | . $curTask->getImagePath() | |||
. _('not found on this node') | . _('not found on this node') | |||
) | ) | |||
skipping to change at line 289 | skipping to change at line 320 | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('failed to execute, port must be even and numeric') | _('failed to execute, port must be even and numeric') | |||
) | ) | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
if ($groupOpenSlots < 1) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_(' No open slots ') | ||||
) | ||||
); | ||||
$curTask->getSess()->set('stateID', 1); | ||||
if (!$curTask->getSess()->save()) { | ||||
throw new Exception(_('Failed to update Task | ||||
')); | ||||
} else { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_(' Task state has been updated, now | ||||
the task is queued!') | ||||
) | ||||
); | ||||
} | ||||
continue; | ||||
} | ||||
if (!$curTask->startTask()) { | if (!$curTask->startTask()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('failed to start') | _('failed to start') | |||
) | ) | |||
); | ); | |||
if (!$curTask->kilTask()) { | if (!$curTask->killTask()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('could not be killed') | _('could not be killed') | |||
) | ) | |||
); | ); | |||
} else { | } else { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('has been killed') | _('has been killed') | |||
) | ) | |||
); | ); | |||
} | } | |||
continue; | continue; | |||
} | } | |||
$Session = $curTask->getSess(); | if ($queued) { | |||
$Session->set('stateID', self::getProgressState()); | $queueTasks = self::_removeFromKnownList( | |||
if (!$Session->save()) { | $queueTasks, | |||
self::outall( | $curTask->getID() | |||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('unable to be updated') | ||||
) | ||||
); | ); | |||
continue; | ||||
} | } | |||
$KnownTasks[] = $curTask; | ||||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('image file found, file: ') | _('is new') | |||
. $curTask->getImagePath() | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
$curTask->getClientCount() | ||||
. ' ' | ||||
. ( | ||||
$curTask->getClientCount() == 1 ? | ||||
_('client') : | ||||
_('clients') | ||||
) | ||||
. ' ' | ||||
. _('found') | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('sending on base port ') | ||||
. $curTask->getPortBase() | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
" | %s: %s", | ||||
_('Command'), | ||||
$curTask->getCMD() | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('has started') | ||||
) | ) | |||
); | ); | |||
continue; | ||||
} | ||||
$jobcancelled = $jobcompleted = false; | ||||
$jobqueued = false; | ||||
$runningTask = self::_getMCExistingTask( | ||||
$KnownTasks, | ||||
$curTask->getID() | ||||
); | ||||
if ($groupOpenSlots > 0 && !$runningTask->isRunning($run | ||||
ningTask->procRef)) { | ||||
if (!$curTask->startTask()) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('failed to start') | ||||
) | ||||
); | ||||
if (!$curTask->kilTask()) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('could not be killed') | ||||
) | ||||
); | ||||
} else { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('has been killed') | ||||
) | ||||
); | ||||
} | ||||
// continue; | ||||
} | ||||
$Session = $curTask->getSess(); | $Session = $curTask->getSess(); | |||
$Session->set('stateID', self::getProgressState()); | $Session->set('stateID', self::getProgressState()); | |||
if (!$Session->save()) { | if (!$Session->save()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('unable to be updated') | _('unable to be updated') | |||
) | ) | |||
skipping to change at line 511 | skipping to change at line 428 | |||
) | ) | |||
); | ); | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('has started') | _('has started') | |||
) | ) | |||
); | ); | |||
if (!empty($queueTasks)) { | ||||
$queueTasks = self::_removeFromKnownList( | ||||
$queueTasks, | ||||
$curTask->getID() | ||||
); | ||||
} | ||||
$KnownTasks = self::_removeFromKnownList( | ||||
$KnownTasks, | ||||
$curTask->getID() | ||||
); | ||||
$KnownTasks[] = $curTask; | ||||
continue; | continue; | |||
} | } | |||
$jobcancelled = $jobcompleted = false; | ||||
$runningTask = self::_getMCExistingTask( | ||||
$KnownTasks, | ||||
$curTask | ||||
); | ||||
$taskIDs = $runningTask->getTaskIDs(); | $taskIDs = $runningTask->getTaskIDs(); | |||
$find = []; | $find = []; | |||
$find['id'] = $taskIDs; | $find['id'] = $taskIDs; | |||
$find['stateID'] = self::getCancelledState(); | $find['stateID'] = self::getCancelledState(); | |||
Route::ids( | Route::ids( | |||
'task', | 'task', | |||
$find | $find | |||
); | ); | |||
$inTaskCancelledIDs = json_decode(Route::getData(), true ); | $inTaskCancelledIDs = json_decode(Route::getData(), true ); | |||
skipping to change at line 561 | skipping to change at line 471 | |||
_('unable to be updated') | _('unable to be updated') | |||
) | ) | |||
); | ); | |||
} | } | |||
} | } | |||
$SessCancelled = $Session->get('stateID') | $SessCancelled = $Session->get('stateID') | |||
== self::getCancelledState(); | == self::getCancelledState(); | |||
$SessCompleted = $Session->get('stateID') | $SessCompleted = $Session->get('stateID') | |||
== self::getCompleteState(); | == self::getCompleteState(); | |||
$SessQueued = $Session->get('stateID') | ||||
== self::getQueuedState(); | ||||
if ($SessCancelled | if ($SessCancelled | |||
|| count($inTaskCancelledIDs) > 0 | || count($inTaskCancelledIDs) > 0 | |||
) { | ) { | |||
$jobcancelled = true; | $jobcancelled = true; | |||
} | } | |||
if ($SessCompleted | if ($SessCompleted | |||
|| count($inTaskCompletedIDs) > 0 | || (count($inTaskCompletedIDs) > 0 && count($inTaskC | |||
|| ($runningTask->isNamedSession() | ompletedIDs) >= count($taskIDs)) | |||
&& $runningTask->getSessClients()) | || ($runningTask->isNamedSessionFinished()) | |||
) { | ) { | |||
$jobcompleted = true; | $jobcompleted = true; | |||
} | } | |||
if ($SessQueued) { | if (!$jobcancelled && !$jobcompleted) { | |||
$jobqueued = true; | ||||
} | ||||
if (!$jobcancelled && !$jobcompleted && !$jobqueued) { | ||||
if ($runningTask->isRunning($runningTask->procRef)) { | if ($runningTask->isRunning($runningTask->procRef)) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('is already running with pid: ') | _('is already running with pid: ') | |||
. $runningTask->getPID($runningTask->pro cRef) | . $runningTask->getPID($runningTask->pro cRef) | |||
) | ) | |||
); | ); | |||
skipping to change at line 611 | skipping to change at line 514 | |||
); | ); | |||
if (!$runningTask->killTask()) { | if (!$runningTask->killTask()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('could not be killed') | _('could not be killed') | |||
) | ) | |||
); | ); | |||
} else { | } | |||
self::outall( | // Set msClients to zero as a marker for a compl | |||
sprintf( | eted | |||
$startStr, | // multicast session with unregistered clients | |||
$runningTask->getID(), | if (count($taskIDs) == 0) { | |||
$runningTask->getName(), | $Session->set('clients', 0)->save(); | |||
_('has been killed') | ||||
) | ||||
); | ||||
} | } | |||
} | } | |||
} else { | } else { | |||
if ($jobcompleted) { | if ($jobcompleted) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('has been completed') | _('has been completed') | |||
skipping to change at line 644 | skipping to change at line 543 | |||
if ($jobcancelled) { | if ($jobcancelled) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('has been cancelled') | _('has been cancelled') | |||
) | ) | |||
); | ); | |||
$cancelTasks[] = $runningTask; | $cancelTasks[] = $runningTask; | |||
} | ||||
if ($jobqueued) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$runningTask->getID(), | ||||
$runningTask->getName(), | ||||
_('has been queued') | ||||
) | ||||
); | ||||
$queueTasks[] = $runningTask; | ||||
} else { | } else { | |||
if (!$runningTask->killTask()) { | if (!$runningTask->killTask()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('could not be killed') | _('could not be killed') | |||
) | ) | |||
); | ); | |||
skipping to change at line 719 | skipping to change at line 607 | |||
$Task->getName(), | $Task->getName(), | |||
( | ( | |||
$Session->complete() ? | $Session->complete() ? | |||
_('is now completed') : | _('is now completed') : | |||
_('could not be completed') | _('could not be completed') | |||
) | ) | |||
) | ) | |||
); | ); | |||
unset($Task); | unset($Task); | |||
} | } | |||
foreach ($queueTasks as &$Task) { | ||||
$Session = $Task->getSess(); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$Task->getID(), | ||||
$Task->getName(), | ||||
_('is now queued') | ||||
) | ||||
); | ||||
unset($Task); | ||||
} | ||||
} catch (Exception $e) { | } catch (Exception $e) { | |||
self::outall($e->getMessage()); | self::outall($e->getMessage()); | |||
} | } | |||
if ($first) { | if ($first) { | |||
$first = false; | $first = false; | |||
} | } | |||
$tmpTime = self::getSetting(self::$sleeptime); | $tmpTime = self::getSetting(self::$sleeptime); | |||
if ($tmpTime > 0 && static::$zzz != $tmpTime) { | if ($tmpTime > 0 && static::$zzz != $tmpTime) { | |||
static::$zzz = $tmpTime ?: 10; | static::$zzz = $tmpTime ?: 10; | |||
self::outall( | self::outall( | |||
End of changes. 26 change blocks. | ||||
199 lines changed or deleted | 76 lines changed or added |