multicastmanager.class.php (fogproject-1.5.7) | : | multicastmanager.class.php (fogproject-1.5.8) | ||
---|---|---|---|---|
skipping to change at line 171 | skipping to change at line 171 | |||
* the others, this is its service loop | * the others, this is its service loop | |||
* | * | |||
* @return void | * @return void | |||
*/ | */ | |||
private function _serviceLoop() | private function _serviceLoop() | |||
{ | { | |||
$KnownTasks = []; | $KnownTasks = []; | |||
while (true) { | while (true) { | |||
// Ensure we have a fresh complete and cancel variable. | // Ensure we have a fresh complete and cancel variable. | |||
$completeTasks = $cancelTasks = []; | $completeTasks = $cancelTasks = []; | |||
$queueTasks = []; | ||||
// Handles the sleep timer for us. | // Handles the sleep timer for us. | |||
$date = self::niceDate(); | $date = self::niceDate(); | |||
if (!isset($nextrun)) { | if (!isset($nextrun)) { | |||
$first = true; | $first = true; | |||
$nextrun = clone $date; | $nextrun = clone $date; | |||
} | } | |||
// Actually holds and loops until the proper sleep time is met. | // Actually holds and loops until the proper sleep time is met. | |||
if ($date < $nextrun && $first === false) { | if ($date < $nextrun && $first === false) { | |||
usleep(100000); | usleep(100000); | |||
skipping to change at line 229 | skipping to change at line 230 | |||
$StorageNode->get('id'), | $StorageNode->get('id'), | |||
$queuedStates | $queuedStates | |||
); | ); | |||
$taskCount = count($allTasks ?: []); | $taskCount = count($allTasks ?: []); | |||
if ($taskCount < 1) { | if ($taskCount < 1) { | |||
self::outall( | self::outall( | |||
' * ' . _('No new tasks found') | ' * ' . _('No new tasks found') | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
foreach ($allTasks as &$curTask) { | foreach ($allTasks as &$curTask) { | |||
$totalSlots = $StorageNode->get('maxClients'); | ||||
$usedSlots = $StorageNode->getUsedSlotCount(); | ||||
$queuedSlots = $StorageNode->getQueuedSlotCount(); | ||||
$groupOpenSlots = $totalSlots - $usedSlots; | ||||
$new = self::_isMCTaskNew( | $new = self::_isMCTaskNew( | |||
$KnownTasks, | $KnownTasks, | |||
$curTask->getID() | $curTask->getID() | |||
); | ); | |||
if ($new) { | if ($new) { | |||
$KnownTasks[] = $curTask; | $KnownTasks[] = $curTask; | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('is new') | _('is new') | |||
) | ) | |||
); | ); | |||
if (!file_exists($curTask->getImagePath())) { | if (!file_exists($curTask->getImagePath())) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('failed to execute, image file') | _('failed to execute, image file: ') | |||
. ': ' | ||||
. $curTask->getImagePath() | . $curTask->getImagePath() | |||
. _('not found on this node') | . _('not found on this node') | |||
) | ) | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
if (!$curTask->getClientCount()) { | if (!$curTask->getClientCount()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_( | _('failed to execute, there are no clien | |||
'failed to execute, ' | ts included') | |||
. 'there are no clients included' | ||||
) | ||||
) | ) | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
if (!is_numeric($curTask->getPortBase()) | if (!is_numeric($curTask->getPortBase()) | |||
|| !($curTask->getPortBase() % 2 == 0) | || !($curTask->getPortBase() % 2 == 0) | |||
) { | ) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_( | _('failed to execute, port must be even | |||
'failed to execute, ' | and numeric') | |||
. 'port must be even and numeric' | ) | |||
) | ); | |||
continue; | ||||
} | ||||
if ($groupOpenSlots < 1) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_(' No open slots ') | ||||
) | ) | |||
); | ); | |||
$curTask->getSess()->set('stateID', 1); | ||||
if (!$curTask->getSess()->save()) { | ||||
throw new Exception(_('Failed to update Task | ||||
')); | ||||
} else { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_(' Task state has been updated, now | ||||
the task is queued!') | ||||
) | ||||
); | ||||
} | ||||
continue; | continue; | |||
} | } | |||
if (!$curTask->startTask()) { | if (!$curTask->startTask()) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('failed to start') | _('failed to start') | |||
) | ) | |||
); | ); | |||
if (!$curTask->kilTask()) { | if (!$curTask->kilTask()) { | |||
skipping to change at line 336 | skipping to change at line 363 | |||
_('unable to be updated') | _('unable to be updated') | |||
) | ) | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('image file found, file') | _('image file found, file: ') | |||
. ': ' | ||||
. $curTask->getImagePath() | . $curTask->getImagePath() | |||
) | ) | |||
); | ); | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
$curTask->getClientCount() | $curTask->getClientCount() | |||
. ' ' | . ' ' | |||
skipping to change at line 362 | skipping to change at line 388 | |||
) | ) | |||
. ' ' | . ' ' | |||
. _('found') | . _('found') | |||
) | ) | |||
); | ); | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('sending on base port') | _('sending on base port ') | |||
. ' ' | ||||
. $curTask->getPortBase() | . $curTask->getPortBase() | |||
) | ) | |||
); | ); | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
" | %s: %s", | " | %s: %s", | |||
_('Command'), | _('Command'), | |||
$curTask->getCMD() | $curTask->getCMD() | |||
) | ) | |||
); | ); | |||
skipping to change at line 385 | skipping to change at line 410 | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$curTask->getID(), | $curTask->getID(), | |||
$curTask->getName(), | $curTask->getName(), | |||
_('has started') | _('has started') | |||
) | ) | |||
); | ); | |||
continue; | continue; | |||
} | } | |||
$jobcancelled = $jobcompleted = false; | $jobcancelled = $jobcompleted = false; | |||
$jobqueued = false; | ||||
$runningTask = self::_getMCExistingTask( | $runningTask = self::_getMCExistingTask( | |||
$KnownTasks, | $KnownTasks, | |||
$curTask->getID() | $curTask->getID() | |||
); | ); | |||
if ($groupOpenSlots > 0 && !$runningTask->isRunning($run | ||||
ningTask->procRef)) { | ||||
if (!$curTask->startTask()) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('failed to start') | ||||
) | ||||
); | ||||
if (!$curTask->kilTask()) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('could not be killed') | ||||
) | ||||
); | ||||
} else { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('has been killed') | ||||
) | ||||
); | ||||
} | ||||
// continue; | ||||
} | ||||
$Session = $curTask->getSess(); | ||||
$Session->set('stateID', self::getProgressState()); | ||||
if (!$Session->save()) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('unable to be updated') | ||||
) | ||||
); | ||||
continue; | ||||
} | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('image file found, file: ') | ||||
. $curTask->getImagePath() | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
$curTask->getClientCount() | ||||
. ' ' | ||||
. ( | ||||
$curTask->getClientCount() == 1 ? | ||||
_('client') : | ||||
_('clients') | ||||
) | ||||
. ' ' | ||||
. _('found') | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('sending on base port ') | ||||
. $curTask->getPortBase() | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
" | %s: %s", | ||||
_('Command'), | ||||
$curTask->getCMD() | ||||
) | ||||
); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('has started') | ||||
) | ||||
); | ||||
if (!empty($queueTasks)) { | ||||
$queueTasks = self::_removeFromKnownList( | ||||
$queueTasks, | ||||
$curTask->getID() | ||||
); | ||||
} | ||||
$KnownTasks = self::_removeFromKnownList( | ||||
$KnownTasks, | ||||
$curTask->getID() | ||||
); | ||||
$KnownTasks[] = $curTask; | ||||
continue; | ||||
} | ||||
$taskIDs = $runningTask->getTaskIDs(); | $taskIDs = $runningTask->getTaskIDs(); | |||
$find = []; | $find = []; | |||
$find['id'] = $taskIDs; | $find['id'] = $taskIDs; | |||
$find['stateID'] = self::getCancelledState(); | $find['stateID'] = self::getCancelledState(); | |||
Route::ids( | Route::ids( | |||
'task', | 'task', | |||
$find | $find | |||
); | ); | |||
$inTaskCancelledIDs = json_decode(Route::getData(), true ); | $inTaskCancelledIDs = json_decode(Route::getData(), true ); | |||
$find['stateID'] = self::getCompleteState(); | $find['stateID'] = self::getCompleteState(); | |||
Route::ids( | Route::ids( | |||
'task', | 'task', | |||
$find | $find | |||
); | ); | |||
$inTaskCompletedIDs = json_decode(Route::getData(), true ); | $inTaskCompletedIDs = json_decode(Route::getData(), true ); | |||
$Session = $runningTask->getSess(); | $Session = $runningTask->getSess(); | |||
if ($Session->get('stateID') != $curTask->getSess()->get | ||||
('stateID')) { | ||||
$Session->set('stateID', $curTask->getSess()->get('s | ||||
tateID')); | ||||
if (!$Session->save()) { | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$curTask->getID(), | ||||
$curTask->getName(), | ||||
_('unable to be updated') | ||||
) | ||||
); | ||||
} | ||||
} | ||||
$SessCancelled = $Session->get('stateID') | $SessCancelled = $Session->get('stateID') | |||
== self::getCancelledState(); | == self::getCancelledState(); | |||
$SessCompleted = $Session->get('stateID') | $SessCompleted = $Session->get('stateID') | |||
== self::getCompleteState(); | == self::getCompleteState(); | |||
$SessQueued = $Session->get('stateID') | ||||
== self::getQueuedState(); | ||||
if ($SessCancelled | if ($SessCancelled | |||
|| count($inTaskCancelledIDs) > 0 | || count($inTaskCancelledIDs) > 0 | |||
) { | ) { | |||
$jobcancelled = true; | $jobcancelled = true; | |||
} | } | |||
if ($SessCompleted | if ($SessCompleted | |||
|| count($inTaskCompletedIDs) > 0 | || count($inTaskCompletedIDs) > 0 | |||
|| ($runningTask->isNamedSession() | || ($runningTask->isNamedSession() | |||
&& $runningTask->getSessClients()) | && $runningTask->getSessClients()) | |||
) { | ) { | |||
$jobcompleted = true; | $jobcompleted = true; | |||
} | } | |||
if (!$jobcancelled && !$jobcompleted) { | ||||
if ($SessQueued) { | ||||
$jobqueued = true; | ||||
} | ||||
if (!$jobcancelled && !$jobcompleted && !$jobqueued) { | ||||
if ($runningTask->isRunning($runningTask->procRef)) { | if ($runningTask->isRunning($runningTask->procRef)) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('is already running with pid') | _('is already running with pid: ') | |||
. ': ' | ||||
. $runningTask->getPID($runningTask->pro cRef) | . $runningTask->getPID($runningTask->pro cRef) | |||
) | ) | |||
); | ); | |||
$runningTask->updateStats(); | $runningTask->updateStats(); | |||
} else { | } else { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('is no longer running') | _('is no longer running') | |||
) | ) | |||
); | ); | |||
skipping to change at line 461 | skipping to change at line 620 | |||
); | ); | |||
} else { | } else { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('has been killed') | _('has been killed') | |||
) | ) | |||
); | ); | |||
$KnownTasks = self::_removeFromKnownList( | ||||
$KnownTasks, | ||||
$runningTask->getID() | ||||
); | ||||
} | } | |||
} | } | |||
} else { | } else { | |||
if ($jobcompleted) { | if ($jobcompleted) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('has been completed') | _('has been completed') | |||
skipping to change at line 490 | skipping to change at line 645 | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('has been cancelled') | _('has been cancelled') | |||
) | ) | |||
); | ); | |||
$cancelTasks[] = $runningTask; | $cancelTasks[] = $runningTask; | |||
} | } | |||
if (!$runningTask->killTask()) { | if ($jobqueued) { | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$runningTask->getID(), | $runningTask->getID(), | |||
$runningTask->getName(), | $runningTask->getName(), | |||
_('could not be killed') | _('has been queued') | |||
) | ) | |||
); | ); | |||
$queueTasks[] = $runningTask; | ||||
} else { | } else { | |||
self::outall( | if (!$runningTask->killTask()) { | |||
sprintf( | self::outall( | |||
$startStr, | sprintf( | |||
$runningTask->getID(), | $startStr, | |||
$runningTask->getName(), | $runningTask->getID(), | |||
_('has been killed') | $runningTask->getName(), | |||
) | _('could not be killed') | |||
); | ) | |||
$KnownTasks = self::_removeFromKnownList( | ); | |||
$KnownTasks, | } else { | |||
$runningTask->getID() | self::outall( | |||
); | sprintf( | |||
$startStr, | ||||
$runningTask->getID(), | ||||
$runningTask->getName(), | ||||
_('has been killed') | ||||
) | ||||
); | ||||
$KnownTasks = self::_removeFromKnownList( | ||||
$KnownTasks, | ||||
$runningTask->getID() | ||||
); | ||||
} | ||||
} | } | |||
} | } | |||
unset($curTask); | unset($curTask); | |||
unset($runningTask); | ||||
} | } | |||
unset($StorageNode); | unset($StorageNode); | |||
} | } | |||
// We need to iterate the complete and cancelTasks | // We need to iterate the complete and cancelTasks | |||
foreach ($cancelTasks as &$Task) { | foreach ($cancelTasks as &$Task) { | |||
$Session = $Task->getSess(); | $Session = $Task->getSess(); | |||
self::outall( | self::outall( | |||
sprintf( | sprintf( | |||
$startStr, | $startStr, | |||
$Task->getID(), | $Task->getID(), | |||
skipping to change at line 551 | skipping to change at line 719 | |||
$Task->getName(), | $Task->getName(), | |||
( | ( | |||
$Session->complete() ? | $Session->complete() ? | |||
_('is now completed') : | _('is now completed') : | |||
_('could not be completed') | _('could not be completed') | |||
) | ) | |||
) | ) | |||
); | ); | |||
unset($Task); | unset($Task); | |||
} | } | |||
foreach ($queueTasks as &$Task) { | ||||
$Session = $Task->getSess(); | ||||
self::outall( | ||||
sprintf( | ||||
$startStr, | ||||
$Task->getID(), | ||||
$Task->getName(), | ||||
_('is now queued') | ||||
) | ||||
); | ||||
unset($Task); | ||||
} | ||||
} catch (Exception $e) { | } catch (Exception $e) { | |||
self::outall($e->getMessage()); | self::outall($e->getMessage()); | |||
} | } | |||
if ($first) { | if ($first) { | |||
$first = false; | $first = false; | |||
} | } | |||
$tmpTime = self::getSetting(self::$sleeptime); | $tmpTime = self::getSetting(self::$sleeptime); | |||
if ($tmpTime > 0 && static::$zzz != $tmpTime) { | if ($tmpTime > 0 && static::$zzz != $tmpTime) { | |||
static::$zzz = $tmpTime ?: 10; | static::$zzz = $tmpTime ?: 10; | |||
self::outall( | self::outall( | |||
End of changes. 25 change blocks. | ||||
35 lines changed or deleted | 222 lines changed or added |