"Fossies" - the Fresh Open Source Software Archive

Member "netxms-3.8.166/src/server/core/npe.cpp" (23 Feb 2021, 12647 Bytes) of package /linux/misc/netxms-3.8.166.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "npe.cpp" see the Fossies "Dox" file reference documentation.

    1 /*
    2 ** NetXMS - Network Management System
    3 ** Copyright (C) 2003-2020 Victor Kirhenshtein
    4 **
    5 ** This program is free software; you can redistribute it and/or modify
    6 ** it under the terms of the GNU General Public License as published by
    7 ** the Free Software Foundation; either version 2 of the License, or
    8 ** (at your option) any later version.
    9 **
   10 ** This program is distributed in the hope that it will be useful,
   11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 ** GNU General Public License for more details.
   14 **
   15 ** You should have received a copy of the GNU General Public License
   16 ** along with this program; if not, write to the Free Software
   17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
   18 **
   19 ** File: npe.cpp
   20 **
   21 **/
   22 
   23 #include "nxcore.h"
   24 #include <npe.h>
   25 
   26 /**
   27  * Base class constructor
   28  */
   29 PredictionEngine::PredictionEngine()
   30 {
   31 }
   32 
   33 /**
   34  * Base class destructor
   35  */
   36 PredictionEngine::~PredictionEngine()
   37 {
   38 }
   39 
   40 /**
   41  * Default initialization method - always returns true
   42  */
   43 bool PredictionEngine::initialize(TCHAR *errorMessage)
   44 {
   45    errorMessage[0] = 0;
   46    return true;
   47 }
   48 
   49 /**
   50  * Default implementation - always return false
   51  */
   52 bool PredictionEngine::requiresTraining()
   53 {
   54    return false;
   55 }
   56 
   57 /**
   58  * Default trainig method - do nothing
   59  */
   60 void PredictionEngine::train(UINT32 nodeId, UINT32 dciId, DCObjectStorageClass storageClass)
   61 {
   62 }
   63 
   64 /**
   65  * Get series of predicted values starting with current time. Default implementation
   66  * calls getPredictedValue with incrementing timestamp.
   67  *
   68  * @param nodeId Node object ID
   69  * @param dciId DCI ID
   70  * @param storageClass DCI storage class
   71  * @param count number of values to retrieve
   72  * @param series buffer for values
   73  * @return true on success
   74  */
   75 bool PredictionEngine::getPredictedSeries(UINT32 nodeId, UINT32 dciId, DCObjectStorageClass storageClass, int count, double *series)
   76 {
   77    shared_ptr<NetObj> object = FindObjectById(nodeId);
   78    if ((object == nullptr) || !object->isDataCollectionTarget())
   79       return false;
   80 
   81    shared_ptr<DCObject> dci = static_cast<DataCollectionTarget*>(object.get())->getDCObjectById(dciId, 0);
   82    if (dci->getType() != DCO_TYPE_ITEM)
   83       return false;
   84 
   85    time_t interval = dci->getEffectivePollingInterval();
   86    time_t t = time(nullptr);
   87    for(int i = 0; i < count; i++)
   88    {
   89       series[i] = getPredictedValue(nodeId, dciId, storageClass, t);
   90       t += interval;
   91    }
   92    return true;
   93 }
   94 
   95 /**
   96  * Helper method to read last N values of given DCI
   97  */
   98 StructArray<DciValue> *PredictionEngine::getDciValues(UINT32 nodeId, UINT32 dciId, DCObjectStorageClass storageClass, int maxRows)
   99 {
  100    TCHAR query[1024];
  101    switch(g_dbSyntax)
  102    {
  103       case DB_SYNTAX_MSSQL:
  104          if (g_flags & AF_SINGLE_TABLE_PERF_DATA)
  105             _sntprintf(query, 1024, _T("SELECT TOP %d idata_timestamp,idata_value FROM idata WHERE node_id=%u AND item_id=%u ORDER BY idata_timestamp DESC"), maxRows, nodeId, dciId);
  106          else
  107             _sntprintf(query, 1024, _T("SELECT TOP %d idata_timestamp,idata_value FROM idata_%u WHERE item_id=%u ORDER BY idata_timestamp DESC"), maxRows, nodeId, dciId);
  108          break;
  109       case DB_SYNTAX_ORACLE:
  110          if (g_flags & AF_SINGLE_TABLE_PERF_DATA)
  111             _sntprintf(query, 1024, _T("SELECT * FROM (SELECT idata_timestamp,idata_value FROM idata WHERE node_id=%u AND item_id=%u ORDER BY idata_timestamp DESC) WHERE ROWNUM<=%d"), nodeId, dciId, maxRows);
  112          else
  113             _sntprintf(query, 1024, _T("SELECT * FROM (SELECT idata_timestamp,idata_value FROM idata_%u WHERE item_id=%u ORDER BY idata_timestamp DESC) WHERE ROWNUM<=%d"), nodeId, dciId, maxRows);
  114          break;
  115       case DB_SYNTAX_MYSQL:
  116       case DB_SYNTAX_PGSQL:
  117       case DB_SYNTAX_SQLITE:
  118          if (g_flags & AF_SINGLE_TABLE_PERF_DATA)
  119             _sntprintf(query, 1024, _T("SELECT idata_timestamp,idata_value FROM idata WHERE node_id=%u AND item_id=%u ORDER BY idata_timestamp DESC LIMIT %d"), nodeId, dciId, maxRows);
  120          else
  121             _sntprintf(query, 1024, _T("SELECT idata_timestamp,idata_value FROM idata_%u WHERE item_id=%u ORDER BY idata_timestamp DESC LIMIT %d"), nodeId, dciId, maxRows);
  122          break;
  123       case DB_SYNTAX_TSDB:
  124          if (g_flags & AF_SINGLE_TABLE_PERF_DATA)
  125             _sntprintf(query, 1024, _T("SELECT date_part('epoch',idata_timestamp)::int,idata_value FROM idata_sc_%s WHERE node_id=%u AND item_id=%u ORDER BY idata_timestamp DESC LIMIT %d"),
  126                      DCObject::getStorageClassName(storageClass), nodeId, dciId, maxRows);
  127          else
  128             _sntprintf(query, 1024, _T("SELECT idata_timestamp,idata_value FROM idata_%u WHERE item_id=%u ORDER BY idata_timestamp DESC LIMIT %d"), nodeId, dciId, maxRows);
  129          break;
  130       case DB_SYNTAX_DB2:
  131          if (g_flags & AF_SINGLE_TABLE_PERF_DATA)
  132             _sntprintf(query, 1024, _T("SELECT idata_timestamp,idata_value FROM idata WHERE node_id=%u AND item_id=%u ORDER BY idata_timestamp DESC FETCH FIRST %d ROWS ONLY"), nodeId, dciId, maxRows);
  133          else
  134             _sntprintf(query, 1024, _T("SELECT idata_timestamp,idata_value FROM idata_%u WHERE item_id=%u ORDER BY idata_timestamp DESC FETCH FIRST %d ROWS ONLY"), nodeId, dciId, maxRows);
  135          break;
  136       default:
  137          nxlog_debug(1, _T("INTERNAL ERROR: unsupported database in PredictionEngine::getDciValues"));
  138          return nullptr;   // Unsupported database
  139    }
  140 
  141    StructArray<DciValue> *values = nullptr;
  142 
  143    DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
  144    DB_RESULT hResult = DBSelect(hdb, query);
  145    if (hResult != nullptr)
  146    {
  147       int count = DBGetNumRows(hResult);
  148       values = new StructArray<DciValue>(count);
  149       for(int i = 0; i < count; i++)
  150       {
  151          DciValue v;
  152          v.timestamp = DBGetFieldULong(hResult, i, 0);
  153          v.value = DBGetFieldDouble(hResult, i, 1);
  154          values->add(&v);
  155       }
  156       DBFreeResult(hResult);
  157    }
  158 
  159    DBConnectionPoolReleaseConnection(hdb);
  160    return values;
  161 }
  162 
  163 /**
  164  * Prediction engine registry
  165  */
  166 static StringObjectMap<PredictionEngine> s_engines(Ownership::True);
  167 
  168 /**
  169  * Prediction engine thread pool
  170  */
  171 static ThreadPool *s_npeThreadPool = nullptr;
  172 
  173 /**
  174  * Register prediction engines on startup
  175  */
  176 void RegisterPredictionEngines()
  177 {
  178    s_npeThreadPool = ThreadPoolCreate(_T("NPE"), 0, 1024);
  179    ENUMERATE_MODULES(pfGetPredictionEngines)
  180    {
  181       ObjectArray<PredictionEngine> *engines = CURRENT_MODULE.pfGetPredictionEngines();
  182       engines->setOwner(Ownership::False);
  183       for(int i = 0; i < engines->size(); i++)
  184       {
  185          PredictionEngine *e = engines->get(i);
  186          TCHAR errorMessage[MAX_NPE_ERROR_MESSAGE_LEN];
  187          if (e->initialize(errorMessage))
  188          {
  189             s_engines.set(e->getName(), e);
  190             nxlog_write(NXLOG_INFO, _T("Prediction engine %s version %s registered"), e->getName(), e->getVersion());
  191          }
  192          else
  193          {
  194             nxlog_write(NXLOG_ERROR, _T("Initialization of prediction engine %s version %s failed"), e->getName(), e->getVersion());
  195             delete e;
  196          }
  197       }
  198       delete engines;
  199    }
  200 }
  201 
  202 /**
  203  * Shutdown prediction engines
  204  */
  205 void ShutdownPredictionEngines()
  206 {
  207    ThreadPoolDestroy(s_npeThreadPool);
  208    s_engines.clear();
  209 }
  210 
  211 /**
  212  * Callback for ShowPredictionEngines
  213  */
  214 static EnumerationCallbackResult ShowEngineDetails(const TCHAR *key, const void *value, void *data)
  215 {
  216    const PredictionEngine *p = (const PredictionEngine *)value;
  217    ConsolePrintf((CONSOLE_CTX)data, _T("%-16s | %-24s | %s\n"), key, p->getVersion(), p->getVendor());
  218    return _CONTINUE;
  219 }
  220 
  221 /**
  222  * Show registered prediction engines on console
  223  */
  224 void ShowPredictionEngines(CONSOLE_CTX console)
  225 {
  226    if (s_engines.size() == 0)
  227    {
  228       ConsolePrintf(console, _T("No prediction engines registered\n"));
  229       return;
  230    }
  231 
  232    ConsolePrintf(console, _T("Name             | Version                  | Vendor\n"));
  233    ConsolePrintf(console, _T("-----------------+--------------------------+------------------------------\n"));
  234    s_engines.forEach(ShowEngineDetails, console);
  235 }
  236 
  237 /**
  238  * Find prediction engine by name
  239  */
  240 PredictionEngine NXCORE_EXPORTABLE *FindPredictionEngine(const TCHAR *name)
  241 {
  242    return s_engines.get(name);
  243 }
  244 
  245 /**
  246  * Get list of registered engines into NXCP message
  247  */
  248 void GetPredictionEngines(NXCPMessage *msg)
  249 {
  250    StructArray<KeyValuePair<PredictionEngine>> *a = s_engines.toArray();
  251    UINT32 fieldId = VID_ELEMENT_LIST_BASE;
  252    for(int i = 0; i < a->size(); i++)
  253    {
  254       const PredictionEngine *e = a->get(i)->value;
  255       msg->setField(fieldId++, e->getName());
  256       msg->setField(fieldId++, e->getDescription());
  257       msg->setField(fieldId++, e->getVersion());
  258       msg->setField(fieldId++, e->getVendor());
  259       fieldId += 6;
  260    }
  261    msg->setField(VID_NUM_ELEMENTS, a->size());
  262    delete a;
  263 }
  264 
  265 /**
  266  * Get predicted data for DCI
  267  */
  268 bool GetPredictedData(ClientSession *session, const NXCPMessage *request, NXCPMessage *response, const DataCollectionTarget& dcTarget)
  269 {
  270    static UINT32 s_rowSize[] = { 8, 8, 16, 16, 516, 16, 8, 8, 16 };
  271 
  272    // Find DCI object
  273    shared_ptr<DCObject> dci = dcTarget.getDCObjectById(request->getFieldAsUInt32(VID_DCI_ID), session->getUserId());
  274    if (dci == nullptr)
  275    {
  276       response->setField(VID_RCC, RCC_INVALID_DCI_ID);
  277       return false;
  278    }
  279 
  280    if (dci->getType() != DCO_TYPE_ITEM)
  281    {
  282       response->setField(VID_RCC, RCC_INCOMPATIBLE_OPERATION);
  283       return false;
  284    }
  285 
  286    PredictionEngine *engine = FindPredictionEngine(static_cast<DCItem*>(dci.get())->getPredictionEngine());
  287 
  288    // Send CMD_REQUEST_COMPLETED message
  289    response->setField(VID_RCC, RCC_SUCCESS);
  290    static_cast<DCItem*>(dci.get())->fillMessageWithThresholds(response, false);
  291    session->sendMessage(response);
  292 
  293    int dataType = static_cast<DCItem*>(dci.get())->getDataType();
  294    time_t timeFrom = request->getFieldAsTime(VID_TIME_FROM);
  295    time_t timestamp = request->getFieldAsTime(VID_TIME_TO);
  296    time_t interval = dci->getEffectivePollingInterval();
  297 
  298    // Allocate memory for data and prepare data header
  299    char buffer[64];
  300    int count = std::min(static_cast<int>((timestamp - timeFrom) / interval), MAX_DCI_DATA_RECORDS);
  301    DCI_DATA_HEADER *pData = (DCI_DATA_HEADER *)malloc(count * s_rowSize[dataType] + sizeof(DCI_DATA_HEADER));
  302    pData->dataType = htonl((UINT32)dataType);
  303    pData->dciId = htonl(dci->getId());
  304 
  305    // Fill memory block with records
  306    double *series = MemAllocArray<double>(count);
  307    if (timestamp >= timeFrom)
  308    {
  309       engine->getPredictedSeries(dci->getOwner()->getId(), dci->getId(), dci->getStorageClass(), count, series);
  310 
  311       DCI_DATA_ROW *pCurr = (DCI_DATA_ROW *)(((char *)pData) + sizeof(DCI_DATA_HEADER));
  312       for(int i = 0; i < count; i++)
  313       {
  314          pCurr->timeStamp = htonl((UINT32)timestamp);
  315          switch(dataType)
  316          {
  317             case DCI_DT_INT:
  318                pCurr->value.int32 = htonl((UINT32)((INT32)series[i]));
  319                break;
  320             case DCI_DT_UINT:
  321             case DCI_DT_COUNTER32:
  322                pCurr->value.int32 = htonl((UINT32)series[i]);
  323                break;
  324             case DCI_DT_INT64:
  325                pCurr->value.ext.v64.int64 = htonq((UINT64)((INT64)series[i]));
  326                break;
  327             case DCI_DT_UINT64:
  328             case DCI_DT_COUNTER64:
  329                pCurr->value.ext.v64.int64 = htonq((UINT64)series[i]);
  330                break;
  331             case DCI_DT_FLOAT:
  332                pCurr->value.ext.v64.real = htond(series[i]);
  333                break;
  334             case DCI_DT_STRING:
  335                snprintf(buffer, 64, "%f", series[i]);
  336                mb_to_ucs2(buffer, -1, pCurr->value.string, MAX_DCI_STRING_VALUE);
  337                SwapUCS2String(pCurr->value.string);
  338                break;
  339          }
  340          pCurr = (DCI_DATA_ROW *)(((char *)pCurr) + s_rowSize[dataType]);
  341          timestamp -= interval;
  342       }
  343       pData->numRows = htonl(count);
  344    }
  345    MemFree(series);
  346 
  347    // Prepare and send raw message with fetched data
  348    NXCP_MESSAGE *msg =
  349       CreateRawNXCPMessage(CMD_DCI_DATA, request->getId(), 0,
  350                            pData, count * s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
  351                            nullptr, session->isCompressionEnabled());
  352    MemFree(pData);
  353    session->sendRawMessage(msg);
  354    MemFree(msg);
  355    return true;
  356 }
  357 
  358 /**
  359  * Queue training run for prediction engine
  360  */
  361 void QueuePredictionEngineTraining(PredictionEngine *engine, DCItem *dci)
  362 {
  363    ThreadPoolExecute(s_npeThreadPool, engine, &PredictionEngine::train, dci->getOwner()->getId(), dci->getId(), dci->getStorageClass());
  364 }