"Fossies" - the Fresh Open Source Software Archive

Member "netxms-3.1.300/src/server/core/agent.cpp" (7 Jan 2020, 31894 Bytes) of package /linux/misc/netxms-3.1.300.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "agent.cpp" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 3.0.2357_vs_3.1.241.

    1 /*
    2 ** NetXMS - Network Management System
    3 ** Copyright (C) 2003-2019 Victor Kirhenshtein
    4 **
    5 ** This program is free software; you can redistribute it and/or modify
    6 ** it under the terms of the GNU General Public License as published by
    7 ** the Free Software Foundation; either version 2 of the License, or
    8 ** (at your option) any later version.
    9 **
   10 ** This program is distributed in the hope that it will be useful,
   11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 ** GNU General Public License for more details.
   14 **
   15 ** You should have received a copy of the GNU General Public License
   16 ** along with this program; if not, write to the Free Software
   17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
   18 **
   19 ** File: agent.cpp
   20 **
   21 **/
   22 
   23 #include "nxcore.h"
   24 #include <agent_tunnel.h>
   25 
   26 /**
   27  * Externals
   28  */
   29 void ProcessTrap(SNMP_PDU *pdu, const InetAddress& srcAddr, UINT32 zoneUIN, int srcPort, SNMP_Transport *pTransport, SNMP_Engine *localEngine, bool isInformRq);
   30 void QueueProxiedSyslogMessage(const InetAddress &addr, UINT32 zoneUIN, UINT32 nodeId, time_t timestamp, const char *msg, int msgLen);
   31 
   32 /**
   33  * Create normal agent connection
   34  */
   35 AgentConnectionEx::AgentConnectionEx(UINT32 nodeId, const InetAddress& ipAddr, WORD port, int authMethod, const TCHAR *secret, bool allowCompression) :
   36          AgentConnection(ipAddr, port, authMethod, secret, allowCompression)
   37 {
   38    m_nodeId = nodeId;
   39    m_tunnel = NULL;
   40    m_proxyTunnel = NULL;
   41    m_tcpProxySession = NULL;
   42 }
   43 
   44 /**
   45  * Create agent connection within tunnel
   46  */
   47 AgentConnectionEx::AgentConnectionEx(UINT32 nodeId, AgentTunnel *tunnel, int authMethod, const TCHAR *secret, bool allowCompression) :
   48          AgentConnection(InetAddress::INVALID, 0, authMethod, secret, allowCompression)
   49 {
   50    m_nodeId = nodeId;
   51    m_tunnel = tunnel;
   52    m_tunnel->incRefCount();
   53    m_proxyTunnel = NULL;
   54    m_tcpProxySession = NULL;
   55 }
   56 
   57 /**
   58  * Destructor for extended agent connection class
   59  */
   60 AgentConnectionEx::~AgentConnectionEx()
   61 {
   62    if (m_tunnel != NULL)
   63       m_tunnel->decRefCount();
   64    if (m_proxyTunnel != NULL)
   65       m_proxyTunnel->decRefCount();
   66 }
   67 
   68 /**
   69  * Create communication channel
   70  */
   71 AbstractCommChannel *AgentConnectionEx::createChannel()
   72 {
   73    if (m_tunnel != NULL)
   74       return m_tunnel->createChannel();
   75    if (isProxyMode() && (m_proxyTunnel != NULL))
   76       return m_proxyTunnel->createChannel();
   77    return AgentConnection::createChannel();
   78 }
   79 
   80 /**
   81  * Set tunnel to use
   82  */
   83 void AgentConnectionEx::setTunnel(AgentTunnel *tunnel)
   84 {
   85    if (m_tunnel != NULL)
   86       m_tunnel->decRefCount();
   87    m_tunnel = tunnel;
   88    if (m_tunnel != NULL)
   89       m_tunnel->incRefCount();
   90 }
   91 
   92 /**
   93  * Set proxy tunnel to use
   94  */
   95 void AgentConnectionEx::setProxy(AgentTunnel *tunnel, int authMethod, const TCHAR *secret)
   96 {
   97    if (m_proxyTunnel != NULL)
   98       m_proxyTunnel->decRefCount();
   99    m_proxyTunnel = tunnel;
  100    if (m_proxyTunnel != NULL)
  101       m_proxyTunnel->incRefCount();
  102    setProxy(InetAddress::INVALID, 0, authMethod, secret);
  103 }
  104 
  105 /**
  106  * Trap processor
  107  */
  108 void AgentConnectionEx::onTrap(NXCPMessage *pMsg)
  109 {
  110    if (IsShutdownInProgress())
  111       return;
  112 
  113    Node *pNode = NULL;
  114    TCHAR szBuffer[64];
  115 
  116    debugPrintf(3, _T("AgentConnectionEx::onTrap(): Received trap message from agent at %s, node ID %d"), getIpAddr().toString(szBuffer), m_nodeId);
  117     if (m_nodeId != 0)
  118         pNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  119     if (pNode == NULL)
  120       pNode = FindNodeByIP(0, getIpAddr());
  121    if (pNode != NULL)
  122    {
  123       if (pNode->getStatus() != STATUS_UNMANAGED)
  124       {
  125            // Check for duplicate traps - only accept traps with ID
  126            // higher than last received
  127            // agents prior to 1.1.6 will not send trap id
  128            // we should accept trap in that case to maintain compatibility
  129            bool acceptTrap;
  130            QWORD trapId = pMsg->getFieldAsUInt64(VID_TRAP_ID);
  131            if (trapId != 0)
  132            {
  133                acceptTrap = pNode->checkAgentTrapId(trapId);
  134                debugPrintf(5, _T("AgentConnectionEx::onTrap(): trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
  135            }
  136            else
  137            {
  138                acceptTrap = true;
  139                debugPrintf(5, _T("AgentConnectionEx::onTrap(): trap ID not provided"));
  140            }
  141 
  142            if (acceptTrap)
  143            {
  144                UINT32 dwEventCode = pMsg->getFieldAsUInt32(VID_EVENT_CODE);
  145                if ((dwEventCode == 0) && pMsg->isFieldExist(VID_EVENT_NAME))
  146                {
  147                    TCHAR eventName[256];
  148                    pMsg->getFieldAsString(VID_EVENT_NAME, eventName, 256);
  149                    dwEventCode = EventCodeFromName(eventName, 0);
  150                }
  151                debugPrintf(3, _T("Event from trap: %d"), dwEventCode);
  152 
  153                StringList parameters(pMsg, VID_EVENT_ARG_BASE, VID_NUM_ARGS);
  154                PostEvent(dwEventCode, EventOrigin::AGENT, pMsg->getFieldAsTime(VID_TIMESTAMP), pNode->getId(), parameters);
  155            }
  156       }
  157       else
  158       {
  159          debugPrintf(3, _T("AgentConnectionEx::onTrap(): node %s [%d] in in UNMANAGED state - trap ignored"), pNode->getName(), pNode->getId());
  160       }
  161    }
  162    else
  163    {
  164       debugPrintf(3, _T("AgentConnectionEx::onTrap(): Cannot find node for IP address %s"), getIpAddr().toString(szBuffer));
  165    }
  166 }
  167 
  168 /**
  169  * Incoming syslog message processor
  170  */
  171 void AgentConnectionEx::onSyslogMessage(NXCPMessage *msg)
  172 {
  173    if (IsShutdownInProgress())
  174       return;
  175 
  176    TCHAR buffer[64];
  177    debugPrintf(3, _T("AgentConnectionEx::onSyslogMessage(): Received message from agent at %s, node ID %d"), getIpAddr().toString(buffer), m_nodeId);
  178 
  179    UINT32 zoneUIN = msg->getFieldAsUInt32(VID_ZONE_UIN);
  180    Node *node = NULL;
  181    if (m_nodeId != 0)
  182       node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  183    if (node == NULL)
  184       node = FindNodeByIP(zoneUIN, getIpAddr());
  185    if (node != NULL)
  186    {
  187       // Check for duplicate messages - only accept messages with ID
  188       // higher than last received
  189       if (node->checkSyslogMessageId(msg->getFieldAsUInt64(VID_REQUEST_ID)))
  190       {
  191          int msgLen = msg->getFieldAsInt32(VID_MESSAGE_LENGTH);
  192          if (msgLen < 2048)
  193          {
  194             char message[2048];
  195             msg->getFieldAsBinary(VID_MESSAGE, (BYTE *)message, msgLen + 1);
  196             InetAddress sourceAddr = msg->getFieldAsInetAddress(VID_IP_ADDRESS);
  197             UINT32 sourceNodeId = 0;
  198             if (sourceAddr.isLoopback())
  199             {
  200                debugPrintf(5, _T("Source IP address for syslog message is loopback, setting source node ID to %d"), m_nodeId);
  201                sourceNodeId = m_nodeId;
  202             }
  203             QueueProxiedSyslogMessage(sourceAddr, zoneUIN, sourceNodeId,
  204                                       msg->getFieldAsTime(VID_TIMESTAMP), message, msgLen);
  205          }
  206       }
  207       else
  208       {
  209          debugPrintf(5, _T("AgentConnectionEx::onSyslogMessage(): message ID is invalid (node %s [%d])"), node->getName(), node->getId());
  210       }
  211    }
  212    else
  213    {
  214       debugPrintf(5, _T("AgentConnectionEx::onSyslogMessage(): Cannot find node for IP address %s"), getIpAddr().toString(buffer));
  215    }
  216 }
  217 
  218 /**
  219  * Handler for data push
  220  */
  221 void AgentConnectionEx::onDataPush(NXCPMessage *msg)
  222 {
  223    if (IsShutdownInProgress())
  224       return;
  225 
  226     TCHAR name[MAX_PARAM_NAME], value[MAX_RESULT_LENGTH];
  227     msg->getFieldAsString(VID_NAME, name, MAX_PARAM_NAME);
  228     msg->getFieldAsString(VID_VALUE, value, MAX_RESULT_LENGTH);
  229 
  230    Node *sender = NULL;
  231     if (m_nodeId != 0)
  232         sender = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  233    if (sender == NULL)
  234       sender = FindNodeByIP(0, getIpAddr());
  235 
  236     if (sender != NULL)
  237     {
  238         // Check for duplicate data requests - only accept requests with ID
  239         // higher than last received
  240         // agents prior to 1.2.10 will not send request id
  241         // we should accept data in that case to maintain compatibility
  242         bool acceptRequest;
  243         QWORD requestId = msg->getFieldAsUInt64(VID_REQUEST_ID);
  244         if (requestId != 0)
  245         {
  246             acceptRequest = sender->checkAgentPushRequestId(requestId);
  247             debugPrintf(5, _T("AgentConnectionEx::onDataPush(): requestId is%s valid"), acceptRequest ? _T("") : _T(" not"));
  248         }
  249         else
  250         {
  251             acceptRequest = true;
  252             debugPrintf(5, _T("AgentConnectionEx::onDataPush(): request ID not provided"));
  253         }
  254 
  255         if (acceptRequest)
  256         {
  257          Node *target;
  258          UINT32 objectId = msg->getFieldAsUInt32(VID_OBJECT_ID);
  259          if (objectId != 0)
  260          {
  261             // push on behalf of other node
  262             target = (Node *)FindObjectById(objectId, OBJECT_NODE);
  263             if (target != NULL)
  264             {
  265                if (target->isTrustedNode(sender->getId()))
  266                {
  267                   DbgPrintf(5, _T("%s: agent data push: target set to %s [%d]"), sender->getName(), target->getName(), target->getId());
  268                }
  269                else
  270                {
  271                   DbgPrintf(5, _T("%s: agent data push: not in trusted node list for target %s [%d]"), sender->getName(), target->getName(), target->getId());
  272                   target = NULL;
  273                }
  274             }
  275          }
  276          else
  277          {
  278             target = sender;
  279          }
  280 
  281          if (target != NULL)
  282          {
  283               DbgPrintf(5, _T("%s: agent data push: %s=%s"), target->getName(), name, value);
  284               shared_ptr<DCObject> dci = target->getDCObjectByName(name, 0);
  285               if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM) && (dci->getDataSource() == DS_PUSH_AGENT) && (dci->getStatus() == ITEM_STATUS_ACTIVE))
  286               {
  287                   DbgPrintf(5, _T("%s: agent data push: found DCI %d"), target->getName(), dci->getId());
  288                time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
  289                if (t == 0)
  290                      t = time(NULL);
  291                   target->processNewDCValue(dci, t, value);
  292                if (t > dci->getLastPollTime())
  293                      dci->setLastPollTime(t);
  294               }
  295               else
  296               {
  297                  debugPrintf(5, _T("%s: agent data push: DCI not found for %s"), target->getName(), name);
  298               }
  299          }
  300          else
  301          {
  302             debugPrintf(5, _T("%s: agent data push: target node not found or not accessible"), sender->getName());
  303          }
  304       }
  305     }
  306 }
  307 
  308 /**
  309  * Cancel unknown file monitoring
  310  */
  311 static void CancelUnknownFileMonitoring(Node *object,TCHAR *remoteFile)
  312 {
  313    nxlog_debug(6, _T("AgentConnectionEx::onFileMonitoringData: unknown subscription will be canceled"));
  314    AgentConnection *conn = object->createAgentConnection();
  315    if(conn != NULL)
  316    {
  317       NXCPMessage request(conn->getProtocolVersion());
  318       request.setId(conn->generateRequestId());
  319       request.setCode(CMD_CANCEL_FILE_MONITORING);
  320       request.setField(VID_FILE_NAME, remoteFile);
  321       request.setField(VID_OBJECT_ID, object->getId());
  322       NXCPMessage* response = conn->customRequest(&request);
  323       delete response;
  324       conn->decRefCount();
  325    }
  326 }
  327 
  328 /**
  329  * Recieve file monitoring information and resend to all required user sessions
  330  */
  331 void AgentConnectionEx::onFileMonitoringData(NXCPMessage *pMsg)
  332 {
  333    if (IsShutdownInProgress())
  334       return;
  335 
  336     Node *object = NULL;
  337     if (m_nodeId != 0)
  338         object = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  339     if (object != NULL)
  340     {
  341        TCHAR remoteFile[MAX_PATH];
  342       pMsg->getFieldAsString(VID_FILE_NAME, remoteFile, MAX_PATH);
  343       ObjectArray<ClientSession>* result = g_monitoringList.findClientByFNameAndNodeID(remoteFile, object->getId());
  344       debugPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: found %d sessions for remote file %s on node %s [%d]"), result->size(), remoteFile, object->getName(), object->getId());
  345       int validSessionCount = result->size();
  346       for(int i = 0; i < result->size(); i++)
  347       {
  348          if (!result->get(i)->sendMessage(pMsg))
  349          {
  350             MONITORED_FILE file;
  351             _tcsncpy(file.fileName, remoteFile, MAX_PATH);
  352             file.nodeID = m_nodeId;
  353             file.session = result->get(i);
  354             g_monitoringList.removeMonitoringFile(&file);
  355             validSessionCount--;
  356 
  357             if (validSessionCount == 0)
  358                CancelUnknownFileMonitoring(object, remoteFile);
  359          }
  360       }
  361       if (result->size() == 0)
  362       {
  363          CancelUnknownFileMonitoring(object, remoteFile);
  364       }
  365       delete result;
  366     }
  367     else
  368     {
  369         g_monitoringList.removeDisconnectedNode(m_nodeId);
  370         debugPrintf(6, _T("AgentConnectionEx::onFileMonitoringData: node object not found"));
  371     }
  372 }
  373 
  374 /**
  375  * Ask modules if they can process custom message
  376  */
  377 bool AgentConnectionEx::processCustomMessage(NXCPMessage *msg)
  378 {
  379    if (IsShutdownInProgress())
  380       return false;
  381 
  382    TCHAR buffer[128];
  383    DbgPrintf(6, _T("AgentConnectionEx::processCustomMessage: processing message %s ID %d"),
  384       NXCPMessageCodeName(msg->getCode(), buffer), msg->getId());
  385 
  386    ENUMERATE_MODULES(pfOnAgentMessage)
  387    {
  388       if (g_pModuleList[__i].pfOnAgentMessage(msg, m_nodeId))
  389          return true;    // accepted by module
  390    }
  391    return false;
  392 }
  393 
  394 /**
  395  * Create SNMP proxy transport for sending trap response
  396  */
  397 static SNMP_ProxyTransport *CreateSNMPProxyTransport(AgentConnectionEx *conn, Node *originNode, const InetAddress& originAddr, UINT16 port)
  398 {
  399    conn->incRefCount();
  400    SNMP_ProxyTransport *snmpTransport = new SNMP_ProxyTransport(conn, originAddr, port);
  401    if (originNode != NULL)
  402    {
  403       snmpTransport->setSecurityContext(originNode->getSnmpSecurityContext());
  404    }
  405    return snmpTransport;
  406 }
  407 
  408 /**
  409  * Recieve trap sent throught proxy agent
  410  */
  411 void AgentConnectionEx::onSnmpTrap(NXCPMessage *msg)
  412 {
  413    if (IsShutdownInProgress())
  414       return;
  415 
  416    Node *proxyNode = NULL;
  417    TCHAR ipStringBuffer[4096];
  418 
  419    static BYTE engineId[] = { 0x80, 0x00, 0x00, 0x00, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01, 0x00 };
  420     SNMP_Engine localEngine(engineId, 12);
  421 
  422     debugPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Received SNMP trap message from agent at %s, node ID %d"),
  423       getIpAddr().toString(ipStringBuffer), m_nodeId);
  424 
  425     if (m_nodeId != 0)
  426         proxyNode = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  427    if (proxyNode != NULL)
  428    {
  429       // Check for duplicate traps - only accept traps with ID
  430       // higher than last received
  431       bool acceptTrap;
  432       UINT32 trapId = msg->getId();
  433       if (trapId != 0)
  434       {
  435          acceptTrap = proxyNode->checkSNMPTrapId(trapId);
  436          debugPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trapID is%s valid"), acceptTrap ? _T("") : _T(" not"));
  437       }
  438       else
  439       {
  440          acceptTrap = false;
  441          debugPrintf(5, _T("AgentConnectionEx::onSnmpTrap(): SNMP trap ID not provided"));
  442       }
  443 
  444       if (acceptTrap)
  445       {
  446          InetAddress originSenderIP = msg->getFieldAsInetAddress(VID_IP_ADDRESS);
  447          size_t pduLenght;
  448          const BYTE *pduBytes = msg->getBinaryFieldPtr(VID_PDU, &pduLenght);
  449          UINT32 zoneUIN = IsZoningEnabled() ? msg->getFieldAsUInt32(VID_ZONE_UIN) : 0;
  450          Node *originNode = FindNodeByIP(zoneUIN, originSenderIP);
  451          if ((originNode != NULL) || ConfigReadBoolean(_T("LogAllSNMPTraps"), false))
  452          {
  453             SNMP_PDU *pdu = new SNMP_PDU;
  454             SNMP_SecurityContext *sctx = (originNode != NULL) ? originNode->getSnmpSecurityContext() : NULL;
  455             if (pdu->parse(pduBytes, pduLenght, sctx, true))
  456             {
  457                nxlog_debug(6, _T("SNMPTrapReceiver: received PDU of type %d"), pdu->getCommand());
  458                if ((pdu->getCommand() == SNMP_TRAP) || (pdu->getCommand() == SNMP_INFORM_REQUEST))
  459                {
  460                   bool isInformRequest = (pdu->getCommand() == SNMP_INFORM_REQUEST);
  461                   SNMP_ProxyTransport *snmpTransport = isInformRequest ? CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT)) : NULL;
  462                   if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_INFORM_REQUEST))
  463                   {
  464                      SNMP_SecurityContext *context = snmpTransport->getSecurityContext();
  465                      context->setAuthoritativeEngine(localEngine);
  466                   }
  467                   if (snmpTransport != NULL)
  468                      snmpTransport->setWaitForResponse(false);
  469                   ProcessTrap(pdu, originSenderIP, zoneUIN, msg->getFieldAsUInt16(VID_PORT), snmpTransport, &localEngine, isInformRequest);
  470                   delete snmpTransport;
  471                }
  472                else if ((pdu->getVersion() == SNMP_VERSION_3) && (pdu->getCommand() == SNMP_GET_REQUEST) && (pdu->getAuthoritativeEngine().getIdLen() == 0))
  473                {
  474                   // Engine ID discovery
  475                   nxlog_debug(6, _T("SNMPTrapReceiver: EngineId discovery"));
  476 
  477                   SNMP_ProxyTransport *snmpTransport = CreateSNMPProxyTransport(this, originNode, originSenderIP, msg->getFieldAsUInt16(VID_PORT));
  478 
  479                   SNMP_PDU *response = new SNMP_PDU(SNMP_REPORT, pdu->getRequestId(), pdu->getVersion());
  480                   response->setReportable(false);
  481                   response->setMessageId(pdu->getMessageId());
  482                   response->setContextEngineId(localEngine.getId(), localEngine.getIdLen());
  483 
  484                   SNMP_Variable *var = new SNMP_Variable(_T(".1.3.6.1.6.3.15.1.1.4.0"));
  485                   var->setValueFromString(ASN_INTEGER, _T("2"));
  486                   response->bindVariable(var);
  487 
  488                   SNMP_SecurityContext *context = new SNMP_SecurityContext();
  489                   localEngine.setTime((int)time(NULL));
  490                   context->setAuthoritativeEngine(localEngine);
  491                   context->setSecurityModel(SNMP_SECURITY_MODEL_USM);
  492                   context->setAuthMethod(SNMP_AUTH_NONE);
  493                   context->setPrivMethod(SNMP_ENCRYPT_NONE);
  494                   snmpTransport->setSecurityContext(context);
  495 
  496                   snmpTransport->setWaitForResponse(false);
  497                   snmpTransport->sendMessage(response, 0);
  498                   delete response;
  499                   delete snmpTransport;
  500                }
  501                else if (pdu->getCommand() == SNMP_REPORT)
  502                {
  503                   debugPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
  504                }
  505                delete pdu;
  506             }
  507             else if (pdu->getCommand() == SNMP_REPORT)
  508             {
  509                debugPrintf(6, _T("AgentConnectionEx::onSnmpTrap(): REPORT PDU with error %s"), (const TCHAR *)pdu->getVariable(0)->getName().toString());
  510             }
  511             delete sctx;
  512          }
  513          else
  514          {
  515             debugPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): cannot find origin node with IP %s and not accepting traps from unknown sources"), originSenderIP.toString(ipStringBuffer));
  516          }
  517       }
  518    }
  519    else
  520    {
  521       debugPrintf(3, _T("AgentConnectionEx::onSnmpTrap(): Cannot find node for IP address %s"), getIpAddr().toString(ipStringBuffer));
  522    }
  523 }
  524 
  525 /**
  526  * Deploy policy to agent
  527  */
  528 UINT32 AgentConnectionEx::deployPolicy(GenericAgentPolicy *policy, bool newTypeFormatSupported)
  529 {
  530     UINT32 rqId, rcc;
  531     NXCPMessage msg(getProtocolVersion());
  532 
  533    rqId = generateRequestId();
  534    msg.setId(rqId);
  535     msg.setCode(CMD_DEPLOY_AGENT_POLICY);
  536     if (policy->createDeploymentMessage(&msg, newTypeFormatSupported))
  537     {
  538         if (sendMessage(&msg))
  539         {
  540             rcc = waitForRCC(rqId, getCommandTimeout());
  541         }
  542         else
  543         {
  544             rcc = ERR_CONNECTION_BROKEN;
  545         }
  546     }
  547     else
  548     {
  549         rcc = ERR_INTERNAL_ERROR;
  550     }
  551    return rcc;
  552 }
  553 
  554 /**
  555  * Uninstall policy from agent
  556  */
  557 UINT32 AgentConnectionEx::uninstallPolicy(uuid guid, TCHAR *type, bool newTypeFormatSupported)
  558 {
  559     UINT32 rqId, rcc;
  560     NXCPMessage msg(getProtocolVersion());
  561 
  562    rqId = generateRequestId();
  563    msg.setId(rqId);
  564     msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
  565     if (newTypeFormatSupported)
  566    {
  567        msg.setField(VID_POLICY_TYPE, type);
  568    }
  569    else
  570    {
  571       if (!_tcscmp(type, _T("AgentConfig")))
  572       {
  573          msg.setField(VID_POLICY_TYPE, AGENT_POLICY_CONFIG);
  574       }
  575       else if (!_tcscmp(type, _T("LogParserConfig")))
  576       {
  577          msg.setField(VID_POLICY_TYPE, AGENT_POLICY_LOG_PARSER);
  578       }
  579    }
  580    msg.setField(VID_GUID, guid);
  581    if (sendMessage(&msg))
  582    {
  583       rcc = waitForRCC(rqId, getCommandTimeout());
  584    }
  585    else
  586    {
  587       rcc = ERR_CONNECTION_BROKEN;
  588    }
  589    return rcc;
  590 }
  591 
  592 /**
  593  * Process collected data information (for DCI with agent-side cache)
  594  */
  595 UINT32 AgentConnectionEx::processCollectedData(NXCPMessage *msg)
  596 {
  597    if (IsShutdownInProgress())
  598       return ERR_INTERNAL_ERROR;
  599 
  600    if (m_nodeId == 0)
  601    {
  602       debugPrintf(5, _T("AgentConnectionEx::processCollectedData: node ID is 0 for agent session"));
  603       return ERR_INTERNAL_ERROR;
  604    }
  605 
  606     Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  607    if (node == NULL)
  608    {
  609       debugPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
  610       return ERR_INTERNAL_ERROR;
  611    }
  612 
  613    int origin = msg->getFieldAsInt16(VID_DCI_SOURCE_TYPE);
  614    if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
  615    {
  616       debugPrintf(5, _T("AgentConnectionEx::processCollectedData: unsupported data source type %d"), origin);
  617       return ERR_INTERNAL_ERROR;
  618    }
  619 
  620    // Check that server is not overloaded with DCI data
  621    INT64 queueSize = GetIDataWriterQueueSize();
  622    if (queueSize > 250000)
  623    {
  624       debugPrintf(5, _T("AgentConnectionEx::processCollectedData: database writer queue is too large (%d) - cannot accept new data"), queueSize);
  625       return ERR_RESOURCE_BUSY;
  626    }
  627 
  628    DataCollectionTarget *target;
  629    uuid targetId = msg->getFieldAsGUID(VID_NODE_ID);
  630    if (!targetId.isNull())
  631    {
  632       NetObj *object = FindObjectByGUID(targetId, -1);
  633       if (object == NULL)
  634       {
  635          TCHAR buffer[64];
  636          debugPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find target node with GUID %s"), targetId.toString(buffer));
  637          return ERR_INTERNAL_ERROR;
  638       }
  639       if (!object->isDataCollectionTarget())
  640       {
  641          TCHAR buffer[64];
  642          debugPrintf(5, _T("AgentConnectionEx::processCollectedData: object with GUID %s is not a data collection target"), targetId.toString(buffer));
  643          return ERR_INTERNAL_ERROR;
  644       }
  645       target = (DataCollectionTarget *)object;
  646    }
  647    else
  648    {
  649       target = node;
  650    }
  651 
  652    UINT32 dciId = msg->getFieldAsUInt32(VID_DCI_ID);
  653    shared_ptr<DCObject> dcObject = target->getDCObjectById(dciId, 0);
  654    if (dcObject == NULL)
  655    {
  656       debugPrintf(5, _T("AgentConnectionEx::processCollectedData: cannot find DCI with ID %d on object %s [%d]"),
  657                   dciId, target->getName(), target->getId());
  658       return ERR_INTERNAL_ERROR;
  659    }
  660 
  661    int type = msg->getFieldAsInt16(VID_DCOBJECT_TYPE);
  662    if ((dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
  663    {
  664       debugPrintf(5, _T("AgentConnectionEx::processCollectedData: DCI %s [%d] on object %s [%d] configuration mismatch"),
  665                   dcObject->getName().cstr(), dciId, target->getName(), target->getId());
  666       return ERR_INTERNAL_ERROR;
  667    }
  668 
  669    time_t t = msg->getFieldAsTime(VID_TIMESTAMP);
  670    UINT32 status = msg->getFieldAsUInt32(VID_STATUS);
  671    bool success = true;
  672 
  673    debugPrintf(7, _T("AgentConnectionEx::processCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on object %s [%d]"),
  674                dcObject->getName().cstr(), dciId, type, status, target->getName(), target->getId());
  675 
  676    switch(status)
  677    {
  678       case ERR_SUCCESS:
  679       {
  680          void *value;
  681          switch(type)
  682          {
  683             case DCO_TYPE_ITEM:
  684                value = msg->getFieldAsString(VID_VALUE);
  685                break;
  686             case DCO_TYPE_LIST:
  687                value = new StringList();
  688                break;
  689             case DCO_TYPE_TABLE:
  690                value = new Table(msg);
  691                break;
  692             default:
  693                debugPrintf(5, _T("AgentConnectionEx::processCollectedData: invalid type %d of DCI %s [%d] on object %s [%d]"),
  694                            type, dcObject->getName().cstr(), dciId, target->getName(), target->getId());
  695                return ERR_INTERNAL_ERROR;
  696          }
  697 
  698          if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
  699             dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
  700          success = target->processNewDCValue(dcObject, t, value);
  701          if (t > dcObject->getLastPollTime())
  702             dcObject->setLastPollTime(t);
  703 
  704          switch(type)
  705          {
  706             case DCO_TYPE_ITEM:
  707                MemFree(value);
  708                break;
  709             case DCO_TYPE_LIST:
  710                delete static_cast<StringList*>(value);
  711                break;
  712             case DCO_TYPE_TABLE:
  713                // DCTable will keep ownership of created table
  714                break;
  715          }
  716          break;
  717       }
  718       case ERR_UNKNOWN_PARAMETER:
  719          if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
  720             dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
  721          dcObject->processNewError(false, t);
  722          break;
  723       case ERR_NO_SUCH_INSTANCE:
  724          if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
  725             dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
  726          dcObject->processNewError(true, t);
  727          break;
  728       case ERR_INTERNAL_ERROR:
  729          dcObject->processNewError(true, t);
  730          break;
  731    }
  732 
  733    return success ? ERR_SUCCESS : ERR_INTERNAL_ERROR;
  734 }
  735 
  736 /**
  737  * Process collected data information in bulk mode (for DCI with agent-side cache)
  738  */
  739 UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
  740 {
  741    if (IsShutdownInProgress())
  742       return ERR_INTERNAL_ERROR;
  743 
  744    if (m_nodeId == 0)
  745    {
  746       debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: node ID is 0 for agent session"));
  747       return ERR_INTERNAL_ERROR;
  748    }
  749 
  750    Node *node = (Node *)FindObjectById(m_nodeId, OBJECT_NODE);
  751    if (node == NULL)
  752    {
  753       debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find node object (node ID = %d)"), m_nodeId);
  754       return ERR_INTERNAL_ERROR;
  755    }
  756 
  757    // Check that server is not overloaded with DCI data
  758    INT64 queueSize = GetIDataWriterQueueSize();
  759    if (queueSize > 250000)
  760    {
  761       debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: database writer queue is too large (%d) - cannot accept new data"), queueSize);
  762       return ERR_RESOURCE_BUSY;
  763    }
  764 
  765    int count = request->getFieldAsInt16(VID_NUM_ELEMENTS);
  766    if (count > MAX_BULK_DATA_BLOCK_SIZE)
  767       count = MAX_BULK_DATA_BLOCK_SIZE;
  768    debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: %d elements from node %s [%d]"), count, node->getName(), node->getId());
  769 
  770    // Use half timeout for sending progress updates
  771    UINT32 agentTimeout = request->getFieldAsUInt32(VID_TIMEOUT) / 2;
  772 
  773    BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
  774    memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
  775    UINT32 fieldId = VID_ELEMENT_LIST_BASE;
  776    INT64 startTime = GetCurrentTimeMs();
  777    for(int i = 0; i < count; i++, fieldId += 10)
  778    {
  779       UINT32 elapsed = static_cast<UINT32>(GetCurrentTimeMs() - startTime);
  780       if ((agentTimeout > 0) && (elapsed >= agentTimeout)) // agent timeout 0 means that agent will not understand processing notifications
  781       {
  782          NXCPMessage msg(CMD_REQUEST_COMPLETED, request->getId(), getProtocolVersion());
  783          msg.setField(VID_RCC, ERR_PROCESSING);
  784          msg.setField(VID_PROGRESS, i * 100 / count);
  785          postRawMessage(msg.serialize(isCompressionAllowed()));
  786          startTime = GetCurrentTimeMs();
  787       }
  788 
  789       if (IsShutdownInProgress())
  790       {
  791          status[i] = BULK_DATA_REC_RETRY;
  792          continue;
  793       }
  794 
  795       int origin = request->getFieldAsInt16(fieldId + 1);
  796       if ((origin != DS_NATIVE_AGENT) && (origin != DS_SNMP_AGENT))
  797       {
  798          debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: unsupported data source type %d (element %d)"), origin, i);
  799          status[i] = BULK_DATA_REC_FAILURE;
  800          continue;
  801       }
  802 
  803       DataCollectionTarget *target;
  804       uuid targetId = request->getFieldAsGUID(fieldId + 3);
  805       if (!targetId.isNull())
  806       {
  807          NetObj *object = FindObjectByGUID(targetId, -1);
  808          if (object == NULL)
  809          {
  810             TCHAR buffer[64];
  811             debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find target object with GUID %s (element %d)"),
  812                         targetId.toString(buffer), i);
  813             status[i] = BULK_DATA_REC_FAILURE;
  814             continue;
  815          }
  816          if (!object->isDataCollectionTarget())
  817          {
  818             TCHAR buffer[64];
  819             debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: object with GUID %s (element %d) is not a data collection target"),
  820                         targetId.toString(buffer), i);
  821             status[i] = BULK_DATA_REC_FAILURE;
  822             continue;
  823          }
  824          target = (DataCollectionTarget *)object;
  825       }
  826       else
  827       {
  828          target = node;
  829       }
  830 
  831       UINT32 dciId = request->getFieldAsUInt32(fieldId);
  832       shared_ptr<DCObject> dcObject = target->getDCObjectById(dciId, 0);
  833       if (dcObject == NULL)
  834       {
  835          debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: cannot find DCI with ID %d on object %s [%d] (element %d)"),
  836                      dciId, target->getName(), target->getId(), i);
  837          status[i] = BULK_DATA_REC_FAILURE;
  838          continue;
  839       }
  840 
  841       int type = request->getFieldAsInt16(fieldId + 2);
  842       if ((type != DCO_TYPE_ITEM) || (dcObject->getType() != type) || (dcObject->getDataSource() != origin) || (dcObject->getAgentCacheMode() != AGENT_CACHE_ON))
  843       {
  844          debugPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: DCI %s [%d] on object %s [%d] configuration mismatch (element %d)"),
  845                      dcObject->getName().cstr(), dciId, target->getName(), target->getId(), i);
  846          status[i] = BULK_DATA_REC_FAILURE;
  847          continue;
  848       }
  849 
  850       void *value = request->getFieldAsString(fieldId + 5);
  851       UINT32 statusCode = request->getFieldAsUInt32(fieldId + 6);
  852       debugPrintf(7, _T("AgentConnectionEx::processBulkCollectedData: processing DCI %s [%d] (type=%d) (status=%d) on object %s [%d] (element %d)"),
  853                   dcObject->getName().cstr(), dciId, type, statusCode, target->getName(), target->getId(), i);
  854       time_t t = request->getFieldAsTime(fieldId + 4);
  855       bool success = true;
  856 
  857       switch(statusCode)
  858       {
  859          case ERR_SUCCESS:
  860             if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
  861                dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
  862             success = target->processNewDCValue(dcObject, t, value);
  863             if (t > dcObject->getLastPollTime())
  864                dcObject->setLastPollTime(t);
  865             break;
  866          case ERR_UNKNOWN_PARAMETER:
  867             if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
  868                dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
  869             dcObject->processNewError(false, t);
  870             break;
  871          case ERR_NO_SUCH_INSTANCE:
  872             if (dcObject->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
  873                dcObject->setStatus(ITEM_STATUS_ACTIVE, true);
  874             dcObject->processNewError(true, t);
  875             break;
  876          case ERR_INTERNAL_ERROR:
  877             dcObject->processNewError(true, t);
  878             break;
  879       }
  880 
  881       status[i] = success ? BULK_DATA_REC_SUCCESS : BULK_DATA_REC_FAILURE;
  882       MemFree(value);
  883    }
  884 
  885    response->setField(VID_STATUS, status, count);
  886    return ERR_SUCCESS;
  887 }
  888 
  889 /**
  890  * Set client session for receiving TCP proxy packets
  891  */
  892 void AgentConnectionEx::setTcpProxySession(ClientSession *session)
  893 {
  894    m_tcpProxySession = session;
  895 }
  896 
  897 /**
  898  * Process TCP proxy message
  899  */
  900 void AgentConnectionEx::processTcpProxyData(UINT32 channelId, const void *data, size_t size)
  901 {
  902    if (m_tcpProxySession != NULL)
  903       m_tcpProxySession->processTcpProxyData(this, channelId, data, size);
  904 }