"Fossies" - the Fresh Open Source Software Archive

Member "netxms-3.8.166/src/agent/core/session.cpp" (23 Feb 2021, 45459 Bytes) of package /linux/misc/netxms-3.8.166.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "session.cpp" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 3.7.145_vs_3.8.120.

    1 /*
    2 ** NetXMS multiplatform core agent
    3 ** Copyright (C) 2003-2021 Raden Solutions
    4 **
    5 ** This program is free software; you can redistribute it and/or modify
    6 ** it under the terms of the GNU General Public License as published by
    7 ** the Free Software Foundation; either version 2 of the License, or
    8 ** (at your option) any later version.
    9 **
   10 ** This program is distributed in the hope that it will be useful,
   11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 ** GNU General Public License for more details.
   14 **
   15 ** You should have received a copy of the GNU General Public License
   16 ** along with this program; if not, write to the Free Software
   17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
   18 **
   19 ** File: session.cpp
   20 **
   21 **/
   22 
   23 #include "nxagentd.h"
   24 
   25 /**
   26  * Externals
   27  */
   28 void UnregisterSession(uint32_t id);
   29 uint32_t DeployPolicy(CommSession *session, NXCPMessage *request);
   30 uint32_t UninstallPolicy(CommSession *session, NXCPMessage *request);
   31 uint32_t GetPolicyInventory(CommSession *session, NXCPMessage *msg);
   32 void ClearDataCollectionConfiguration();
   33 uint32_t AddUserAgentNotification(uint64_t serverId, NXCPMessage *request);
   34 uint32_t RemoveUserAgentNotification(uint64_t serverId, NXCPMessage *request);
   35 uint32_t UpdateUserAgentNotifications(uint64_t serverId, NXCPMessage *request);
   36 void RegisterSessionForNotifications(const shared_ptr<CommSession>& session);
   37 
   38 extern VolatileCounter g_authenticationFailures;
   39 
   40 /**
   41  * Communication request processing thread pool
   42  */
   43 ThreadPool *g_commThreadPool = nullptr;
   44 
   45 /**
   46  * Agent action thread pool
   47  */
   48 ThreadPool *g_executorThreadPool = nullptr;
   49 
   50 /**
   51  * Web service collector thread pool
   52  */
   53 ThreadPool *g_webSvcThreadPool = nullptr;
   54 
   55 /**
   56  * Next free session ID
   57  */
   58 static VolatileCounter s_sessionId = 0;
   59 
   60 /**
   61  * Agent proxy statistics
   62  */
   63 static VolatileCounter64 s_proxyConnectionRequests = 0;
   64 static VolatileCounter s_activeProxySessions = 0;
   65 static VolatileCounter64 s_tcpProxyConnectionRequests = 0;
   66 
   67 /**
   68  * Handler for agent proxy stats parameters
   69  */
   70 LONG H_AgentProxyStats(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
   71 {
   72    switch(*arg)
   73    {
   74       case 'A':
   75          ret_uint(value, static_cast<uint32_t>(s_activeProxySessions));
   76          break;
   77       case 'C':
   78          ret_uint64(value, static_cast<uint64_t>(s_proxyConnectionRequests));
   79          break;
   80       case 'T':
   81          ret_uint64(value, static_cast<uint64_t>(s_tcpProxyConnectionRequests));
   82          break;
   83       default:
   84          return SYSINFO_RC_UNSUPPORTED;
   85    }
   86    return SYSINFO_RC_SUCCESS;
   87 }
   88 
   89 /**
   90  * Client session class constructor
   91  */
   92 CommSession::CommSession(const shared_ptr<AbstractCommChannel>& channel, const InetAddress &serverAddr,
   93          bool masterServer, bool controlServer) : m_downloadFileMap(Ownership::True), m_tcpProxies(0, 16, Ownership::True), m_channel(channel)
   94 {
   95    m_id = InterlockedIncrement(&s_sessionId);
   96    m_index = INVALID_INDEX;
   97    _sntprintf(m_key, 32, _T("CommSession-%u"), m_id);
   98    m_processingQueue = new ObjectQueue<NXCPMessage>(64, Ownership::True);
   99    m_protocolVersion = NXCP_VERSION;
  100    m_hProxySocket = INVALID_SOCKET;
  101    m_processingThread = INVALID_THREAD_HANDLE;
  102    m_proxyReadThread = INVALID_THREAD_HANDLE;
  103    m_tcpProxyReadThread = INVALID_THREAD_HANDLE;
  104    m_serverId = 0;
  105    m_serverAddr = serverAddr;
  106    m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
  107    m_masterServer = masterServer;
  108    m_controlServer = controlServer;
  109    m_proxyConnection = false;
  110    m_acceptTraps = false;
  111    m_acceptData = false;
  112    m_acceptFileUpdates = false;
  113    m_ipv6Aware = false;
  114    m_bulkReconciliationSupported = false;
  115    m_disconnected = false;
  116    m_allowCompression = false;
  117    m_ts = time(nullptr);
  118    m_socketWriteMutex = MutexCreate();
  119    m_responseQueue = new MsgWaitQueue();
  120    m_requestId = 0;
  121    m_tcpProxyLock = MutexCreate();
  122 }
  123 
  124 /**
  125  * Callback for aborting active file transfers
  126  */
  127 static EnumerationCallbackResult AbortFileTransfer(const uint32_t& key, DownloadFileInfo *file, CommSession *session)
  128 {
  129    session->debugPrintf(4, _T("Transfer of file %s aborted because of session termination"), file->getFileName());
  130    file->close(false);
  131    return _CONTINUE;
  132 }
  133 
  134 /**
  135  * Destructor
  136  */
  137 CommSession::~CommSession()
  138 {
  139    if (m_proxyConnection)
  140       InterlockedDecrement(&s_activeProxySessions);
  141 
  142    m_channel->shutdown();
  143    if (m_hProxySocket != INVALID_SOCKET)
  144       closesocket(m_hProxySocket);
  145    m_disconnected = true;
  146 
  147    delete m_processingQueue;
  148     MutexDestroy(m_socketWriteMutex);
  149    delete m_responseQueue;
  150    MutexDestroy(m_tcpProxyLock);
  151 
  152    m_downloadFileMap.forEach(AbortFileTransfer, this);
  153 }
  154 
  155 /**
  156  * Debug print in session context
  157  */
  158 void CommSession::debugPrintf(int level, const TCHAR *format, ...)
  159 {
  160    va_list args;
  161    va_start(args, format);
  162    nxlog_debug_tag_object2(_T("comm.cs"), m_id, level, format, args);
  163    va_end(args);
  164 }
  165 
  166 /**
  167  * Start all threads
  168  */
  169 void CommSession::run()
  170 {
  171    m_processingThread = ThreadCreateEx(this, &CommSession::processingThread);
  172    ThreadCreate(self(), &CommSession::readThread);
  173 }
  174 
  175 /**
  176  * Disconnect session
  177  */
  178 void CommSession::disconnect()
  179 {
  180     debugPrintf(5, _T("CommSession::disconnect()"));
  181    MutexLock(m_tcpProxyLock);
  182    m_tcpProxies.clear();
  183    MutexUnlock(m_tcpProxyLock);
  184    m_channel->shutdown();
  185    if (m_hProxySocket != -1)
  186       shutdown(m_hProxySocket, SHUT_RDWR);
  187    m_disconnected = true;
  188 }
  189 
  190 /**
  191  * Reading thread
  192  */
  193 void CommSession::readThread()
  194 {
  195    CommChannelMessageReceiver receiver(m_channel, 4096, MAX_AGENT_MSG_SIZE);
  196    while(!m_disconnected)
  197    {
  198       if (!m_proxyConnection)
  199       {
  200          MessageReceiverResult result;
  201          NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
  202 
  203          // Check for decryption error
  204          if (result == MSGRECV_DECRYPTION_FAILURE)
  205          {
  206             debugPrintf(4, _T("Unable to decrypt received message"));
  207             continue;
  208          }
  209 
  210          // Check for timeout
  211          if (result == MSGRECV_TIMEOUT)
  212          {
  213             if (m_ts < time(nullptr) - (time_t)g_dwIdleTimeout)
  214             {
  215                debugPrintf(5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
  216                break;
  217             }
  218             continue;
  219          }
  220 
  221          // Receive error
  222          if (msg == nullptr)
  223          {
  224             if (result == MSGRECV_CLOSED)
  225                debugPrintf(5, _T("Communication channel closed by peer"));
  226             else
  227                debugPrintf(5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
  228             break;
  229          }
  230 
  231          // Update activity timestamp
  232          m_ts = time(nullptr);
  233 
  234          if (nxlog_get_debug_level() >= 8)
  235          {
  236             String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), m_protocolVersion);
  237             debugPrintf(8, _T("Message dump:\n%s"), msgDump.cstr());
  238          }
  239 
  240          if (msg->isBinary())
  241          {
  242             TCHAR buffer[64];
  243             debugPrintf(6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
  244 
  245             if (msg->getCode() == CMD_FILE_DATA)
  246             {
  247                DownloadFileInfo *dInfo = m_downloadFileMap.get(msg->getId());
  248                if (dInfo != nullptr)
  249                {
  250                   if (dInfo->write(msg->getBinaryData(), msg->getBinaryDataSize(), msg->isCompressedStream()))
  251                   {
  252                      if (msg->isEndOfFile())
  253                      {
  254                         debugPrintf(4, _T("Transfer of file %s completed"), dInfo->getFileName());
  255                         dInfo->close(true);
  256                         m_downloadFileMap.remove(msg->getId());
  257 
  258                         NXCPMessage response(CMD_REQUEST_COMPLETED, msg->getId(), m_protocolVersion);
  259                         response.setField(VID_RCC, ERR_SUCCESS);
  260                         sendMessage(&response);
  261                      }
  262                   }
  263                   else
  264                   {
  265                      debugPrintf(4, _T("Transfer of file %s aborted because of local I/O error (%s)"),
  266                            dInfo->getFileName(), _tcserror(errno));
  267                      dInfo->close(false);
  268                      m_downloadFileMap.remove(msg->getId());
  269 
  270                      NXCPMessage response(CMD_REQUEST_COMPLETED, msg->getId(), m_protocolVersion);
  271                      response.setField(VID_RCC, ERR_IO_FAILURE);
  272                      sendMessage(&response);
  273                   }
  274                }
  275             }
  276             else if (msg->getCode() == CMD_TCP_PROXY_DATA)
  277             {
  278                uint32_t proxyId = msg->getId();
  279                MutexLock(m_tcpProxyLock);
  280                for(int i = 0; i < m_tcpProxies.size(); i++)
  281                {
  282                   TcpProxy *p = m_tcpProxies.get(i);
  283                   if (p->getId() == proxyId)
  284                   {
  285                      p->writeSocket(msg->getBinaryData(), msg->getBinaryDataSize());
  286                      break;
  287                   }
  288                }
  289                MutexUnlock(m_tcpProxyLock);
  290             }
  291             delete msg;
  292          }
  293          else if (msg->isControl())
  294          {
  295             TCHAR buffer[64];
  296             debugPrintf(6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
  297 
  298             if (msg->getCode() == CMD_GET_NXCP_CAPS)
  299             {
  300                uint32_t peerNXCPVersion = msg->getEncodedProtocolVersion(); // Before NXCP version 5 encoded version will be 0, assume version 4
  301                m_protocolVersion = (peerNXCPVersion == 0) ? 4 : MIN(peerNXCPVersion, NXCP_VERSION);
  302                debugPrintf(4, _T("Using protocol version %d"), m_protocolVersion);
  303 
  304                NXCP_MESSAGE *response = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
  305                response->id = htonl(msg->getId());
  306                response->code = htons((WORD)CMD_NXCP_CAPS);
  307                response->flags = htons(MF_CONTROL | MF_NXCP_VERSION(m_protocolVersion));
  308                response->numFields = htonl(m_protocolVersion << 24);
  309                response->size = htonl(NXCP_HEADER_SIZE);
  310                sendRawMessage(response, m_encryptionContext.get());
  311             }
  312             delete msg;
  313          }
  314          else
  315          {
  316             TCHAR buffer[64];
  317             debugPrintf(6, _T("Received message %s (%d)"), NXCPMessageCodeName(msg->getCode(), buffer), msg->getId());
  318 
  319             uint32_t rcc;
  320             switch(msg->getCode())
  321             {
  322                case CMD_REQUEST_COMPLETED:
  323                   m_responseQueue->put(msg);
  324                   break;
  325                case CMD_REQUEST_SESSION_KEY:
  326                   if (!m_encryptionContext)  // cannot use != nullptr because HP-UX compiler does not understand it
  327                   {
  328                      NXCPEncryptionContext *encryptionContext = nullptr;
  329                      NXCPMessage *response = nullptr;
  330                      SetupEncryptionContext(msg, &encryptionContext, &response, nullptr, m_protocolVersion);
  331                      m_encryptionContext = shared_ptr<NXCPEncryptionContext>(encryptionContext);
  332                      sendMessage(response);
  333                      delete response;
  334                      receiver.setEncryptionContext(m_encryptionContext);
  335                   }
  336                   delete msg;
  337                   break;
  338                case CMD_SETUP_PROXY_CONNECTION:
  339                   InterlockedIncrement64(&s_proxyConnectionRequests);
  340                   rcc = setupProxyConnection(msg);
  341                   // When proxy session established incoming messages will
  342                   // not be processed locally. Acknowledgment message sent
  343                   // by setupProxyConnection() in case of success.
  344                   if (rcc == ERR_SUCCESS)
  345                   {
  346                      InterlockedIncrement(&s_activeProxySessions);
  347                      m_processingQueue->clear();
  348                      m_processingQueue->setShutdownMode();
  349                   }
  350                   else
  351                   {
  352                      NXCPMessage response(CMD_REQUEST_COMPLETED, msg->getId(), m_protocolVersion);
  353                      response.setField(VID_RCC, rcc);
  354                      sendMessage(&response);
  355                   }
  356                   delete msg;
  357                   break;
  358                case CMD_SNMP_REQUEST:
  359                   if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
  360                   {
  361                      proxySnmpRequest(msg);
  362                   }
  363                   else
  364                   {
  365                      NXCPMessage response(CMD_REQUEST_COMPLETED, msg->getId(), m_protocolVersion);
  366                      response.setField(VID_RCC, ERR_ACCESS_DENIED);
  367                      sendMessage(&response);
  368                      delete msg;
  369                   }
  370                   break;
  371                case CMD_QUERY_WEB_SERVICE:
  372                   if (g_dwFlags & AF_ENABLE_WEBSVC_PROXY)
  373                   {
  374                      queryWebService(msg);
  375                   }
  376                   else
  377                   {
  378                      NXCPMessage response(CMD_REQUEST_COMPLETED, msg->getId(), m_protocolVersion);
  379                      response.setField(VID_RCC, ERR_ACCESS_DENIED);
  380                      sendMessage(&response);
  381                      delete msg;
  382                   }
  383                   break;
  384                default:
  385                   m_processingQueue->put(msg);
  386                   break;
  387             }
  388          }
  389       }
  390       else  // m_proxyConnection
  391       {
  392          int rc = m_channel->poll((g_dwIdleTimeout + 1) * 1000);
  393          if (rc <= 0)
  394             break;
  395          if (rc > 0)
  396          {
  397             // Update activity timestamp
  398             m_ts = time(nullptr);
  399 
  400             char buffer[32768];
  401             ssize_t bytes = m_channel->recv(buffer, 32768);
  402             if (bytes <= 0)
  403                break;
  404             SendEx(m_hProxySocket, buffer, bytes, 0, nullptr);
  405          }
  406       }
  407    }
  408 
  409    // Notify other threads to exit
  410    m_processingQueue->setShutdownMode();
  411    if (m_hProxySocket != INVALID_SOCKET)
  412       shutdown(m_hProxySocket, SHUT_RDWR);
  413 
  414    // Wait for other threads to finish
  415    ThreadJoin(m_processingThread);
  416    if (m_proxyConnection)
  417       ThreadJoin(m_proxyReadThread);
  418 
  419    MutexLock(m_tcpProxyLock);
  420    m_tcpProxies.clear();
  421    MutexUnlock(m_tcpProxyLock);
  422    ThreadJoin(m_tcpProxyReadThread);
  423 
  424    debugPrintf(5, _T("Session with %s closed"), m_serverAddr.toString().cstr());
  425 
  426    UnregisterSession(m_id);
  427    debugPrintf(6, _T("Receiver thread stopped"));
  428 }
  429 
  430 /**
  431  * Send prepared raw message over the network and destroy it
  432  */
  433 bool CommSession::sendRawMessage(NXCP_MESSAGE *msg, NXCPEncryptionContext *ctx)
  434 {
  435    if (m_disconnected)
  436    {
  437       MemFree(msg);
  438       debugPrintf(6, _T("Aborting sendRawMessage call because session is disconnected"));
  439       return false;
  440    }
  441 
  442    bool success = true;
  443    if (nxlog_get_debug_level() >= 6)
  444    {
  445       TCHAR buffer[128];
  446       debugPrintf(6, _T("Sending message %s (ID %d; size %d; %s)"), NXCPMessageCodeName(ntohs(msg->code), buffer),
  447                ntohl(msg->id), ntohl(msg->size),
  448                ntohs(msg->flags) & MF_COMPRESSED ? _T("compressed") : _T("uncompressed"));
  449       if (nxlog_get_debug_level() >= 8)
  450       {
  451          String msgDump = NXCPMessage::dump(msg, m_protocolVersion);
  452          debugPrintf(8, _T("Outgoing message dump:\n%s"), (const TCHAR *)msgDump);
  453       }
  454    }
  455 
  456    if (ctx != nullptr)
  457    {
  458       NXCP_ENCRYPTED_MESSAGE *enMsg = ctx->encryptMessage(msg);
  459       if (enMsg != NULL)
  460       {
  461          if (m_channel->send(enMsg, ntohl(enMsg->size), m_socketWriteMutex) <= 0)
  462          {
  463             success = false;
  464          }
  465          MemFree(enMsg);
  466       }
  467    }
  468    else
  469    {
  470       if (m_channel->send(msg, ntohl(msg->size), m_socketWriteMutex) <= 0)
  471       {
  472          success = false;
  473       }
  474    }
  475 
  476     if (!success)
  477     {
  478       TCHAR buffer[128];
  479        debugPrintf(6, _T("CommSession::SendRawMessage() for %s (size %d) failed (error %d: %s)"),
  480                 NXCPMessageCodeName(ntohs(msg->code), buffer), ntohl(msg->size), WSAGetLastError(), _tcserror(WSAGetLastError()));
  481     }
  482     MemFree(msg);
  483    return success;
  484 }
  485 
  486 /**
  487  * Send message directly to socket
  488  */
  489 bool CommSession::sendMessage(const NXCPMessage *msg)
  490 {
  491    if (m_disconnected)
  492       return false;
  493 
  494    return sendRawMessage(msg->serialize(m_allowCompression), m_encryptionContext.get());
  495 }
  496 
  497 /**
  498  * Send raw message directly to socket
  499  */
  500 bool CommSession::sendRawMessage(const NXCP_MESSAGE *msg)
  501 {
  502    return sendRawMessage(MemCopyBlock(msg, ntohl(msg->size)), m_encryptionContext.get());
  503 }
  504 
  505 /**
  506  * Post message
  507  */
  508 void CommSession::postMessage(const NXCPMessage *msg)
  509 {
  510    if (m_disconnected)
  511       return;
  512    ThreadPoolExecuteSerialized(g_commThreadPool, m_key, self(), &CommSession::sendMessageInBackground, msg->serialize(m_allowCompression));
  513 }
  514 
  515 /**
  516  * Post raw message
  517  */
  518 void CommSession::postRawMessage(const NXCP_MESSAGE *msg)
  519 {
  520    if (m_disconnected)
  521       return;
  522    ThreadPoolExecuteSerialized(g_commThreadPool, m_key, self(), &CommSession::sendMessageInBackground, MemCopyBlock(msg, ntohl(msg->size)));
  523 }
  524 
  525 /**
  526  * Send message on background thread
  527  */
  528 void CommSession::sendMessageInBackground(NXCP_MESSAGE *msg)
  529 {
  530    sendRawMessage(msg, m_encryptionContext.get());
  531 }
  532 
  533 /**
  534  * Message processing thread
  535  */
  536 void CommSession::processingThread()
  537 {
  538    while(true)
  539    {
  540       NXCPMessage *request = m_processingQueue->getOrBlock();
  541       if (request == INVALID_POINTER_VALUE)    // Session termination indicator
  542          break;
  543       uint16_t command = request->getCode();
  544 
  545       // Prepare response message
  546       NXCPMessage response(CMD_REQUEST_COMPLETED, request->getId(), m_protocolVersion);
  547 
  548       // Check if authentication required
  549       if ((!m_authenticated) && (command != CMD_AUTHENTICATE))
  550       {
  551             debugPrintf(6, _T("Authentication required"));
  552             response.setField(VID_RCC, ERR_AUTH_REQUIRED);
  553       }
  554       else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && !m_encryptionContext)  // cannot use != nullptr because HP-UX compiler does not understand it
  555       {
  556             debugPrintf(6, _T("Encryption required"));
  557             response.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
  558       }
  559       else
  560       {
  561          switch(command)
  562          {
  563             case CMD_AUTHENTICATE:
  564                authenticate(request, &response);
  565                break;
  566             case CMD_GET_PARAMETER:
  567                getParameter(request, &response);
  568                break;
  569             case CMD_GET_LIST:
  570                getList(request, &response);
  571                break;
  572             case CMD_GET_TABLE:
  573                getTable(request, &response);
  574                break;
  575             case CMD_KEEPALIVE:
  576                response.setField(VID_RCC, ERR_SUCCESS);
  577                break;
  578             case CMD_ACTION:
  579                action(request, &response);
  580                break;
  581             case CMD_TRANSFER_FILE:
  582                recvFile(request, &response);
  583                break;
  584             case CMD_UPGRADE_AGENT:
  585                response.setField(VID_RCC, upgrade(request));
  586                break;
  587             case CMD_GET_PARAMETER_LIST:
  588                response.setField(VID_RCC, ERR_SUCCESS);
  589                GetParameterList(&response);
  590                break;
  591             case CMD_GET_ENUM_LIST:
  592                response.setField(VID_RCC, ERR_SUCCESS);
  593                GetEnumList(&response);
  594                break;
  595             case CMD_GET_TABLE_LIST:
  596                response.setField(VID_RCC, ERR_SUCCESS);
  597                GetTableList(&response);
  598                break;
  599             case CMD_READ_AGENT_CONFIG_FILE:
  600                getConfig(&response);
  601                break;
  602             case CMD_WRITE_AGENT_CONFIG_FILE:
  603                updateConfig(request, &response);
  604                break;
  605             case CMD_ENABLE_AGENT_TRAPS:
  606                m_acceptTraps = true;
  607                RegisterSessionForNotifications(self());
  608                response.setField(VID_RCC, ERR_SUCCESS);
  609                break;
  610             case CMD_ENABLE_FILE_UPDATES:
  611                if (m_masterServer)
  612                {
  613                   m_acceptFileUpdates = true;
  614                   response.setField(VID_RCC, ERR_SUCCESS);
  615                }
  616                else
  617                {
  618                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  619                }
  620                break;
  621                 case CMD_DEPLOY_AGENT_POLICY:
  622                     if (m_masterServer)
  623                     {
  624                        response.setField(VID_RCC, DeployPolicy(this, request));
  625                     }
  626                     else
  627                     {
  628                        response.setField(VID_RCC, ERR_ACCESS_DENIED);
  629                     }
  630                     break;
  631                 case CMD_UNINSTALL_AGENT_POLICY:
  632                     if (m_masterServer)
  633                     {
  634                        response.setField(VID_RCC, UninstallPolicy(this, request));
  635                     }
  636                     else
  637                     {
  638                        response.setField(VID_RCC, ERR_ACCESS_DENIED);
  639                     }
  640                     break;
  641                 case CMD_GET_POLICY_INVENTORY:
  642                     if (m_masterServer)
  643                     {
  644                        response.setField(VID_RCC, GetPolicyInventory(this, &response));
  645                     }
  646                     else
  647                     {
  648                        response.setField(VID_RCC, ERR_ACCESS_DENIED);
  649                     }
  650                     break;
  651             case CMD_TAKE_SCREENSHOT:
  652                     if (m_controlServer)
  653                     {
  654                   TCHAR sessionName[256];
  655                   request->getFieldAsString(VID_NAME, sessionName, 256);
  656                   debugPrintf(6, _T("Take screenshot from session \"%s\""), sessionName);
  657                   SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
  658                   if (conn != NULL)
  659                   {
  660                      debugPrintf(6, _T("Session agent connector acquired"));
  661                      conn->takeScreenshot(&response);
  662                      conn->decRefCount();
  663                   }
  664                   else
  665                   {
  666                      response.setField(VID_RCC, ERR_NO_SESSION_AGENT);
  667                   }
  668                     }
  669                     else
  670                     {
  671                        response.setField(VID_RCC, ERR_ACCESS_DENIED);
  672                     }
  673                     break;
  674             case CMD_GET_HOSTNAME_BY_IPADDR:
  675                getHostNameByAddr(request, &response);
  676                break;
  677             case CMD_SET_SERVER_CAPABILITIES:
  678                // Servers before 2.0 use VID_ENABLED
  679                m_ipv6Aware = request->isFieldExist(VID_IPV6_SUPPORT) ? request->getFieldAsBoolean(VID_IPV6_SUPPORT) : request->getFieldAsBoolean(VID_ENABLED);
  680                m_bulkReconciliationSupported = request->getFieldAsBoolean(VID_BULK_RECONCILIATION);
  681                m_allowCompression = request->getFieldAsBoolean(VID_ENABLE_COMPRESSION);
  682                response.setField(VID_RCC, ERR_SUCCESS);
  683                response.setField(VID_FLAGS, static_cast<uint16_t>((m_controlServer ? 0x01 : 0x00) | (m_masterServer ? 0x02 : 0x00)));
  684                debugPrintf(1, _T("Server capabilities: IPv6: %s; bulk reconciliation: %s; compression: %s"),
  685                            m_ipv6Aware ? _T("yes") : _T("no"),
  686                            m_bulkReconciliationSupported ? _T("yes") : _T("no"),
  687                            m_allowCompression ? _T("yes") : _T("no"));
  688                break;
  689             case CMD_SET_SERVER_ID:
  690                m_serverId = request->getFieldAsUInt64(VID_SERVER_ID);
  691                debugPrintf(1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
  692                response.setField(VID_RCC, ERR_SUCCESS);
  693                break;
  694             case CMD_DATA_COLLECTION_CONFIG:
  695                if (m_serverId != 0)
  696                {
  697                   ConfigureDataCollection(m_serverId, *request);
  698                   m_acceptData = true;
  699                   response.setField(VID_RCC, ERR_SUCCESS);
  700                }
  701                else
  702                {
  703                   debugPrintf(1, _T("Data collection configuration command received but server ID is not set"));
  704                   response.setField(VID_RCC, ERR_SERVER_ID_UNSET);
  705                }
  706                break;
  707             case CMD_CLEAN_AGENT_DCI_CONF:
  708                if (m_masterServer)
  709                {
  710                   ClearDataCollectionConfiguration();
  711                   response.setField(VID_RCC, ERR_SUCCESS);
  712                }
  713                else
  714                {
  715                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  716                }
  717                break;
  718             case CMD_SETUP_TCP_PROXY:
  719                InterlockedIncrement64(&s_tcpProxyConnectionRequests);
  720                if (m_masterServer && (g_dwFlags & AF_ENABLE_TCP_PROXY))
  721                {
  722                   setupTcpProxy(request, &response);
  723                }
  724                else
  725                {
  726                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  727                }
  728                break;
  729             case CMD_CLOSE_TCP_PROXY:
  730                if (m_masterServer && (g_dwFlags & AF_ENABLE_TCP_PROXY))
  731                {
  732                   response.setField(VID_RCC, closeTcpProxy(request));
  733                }
  734                else
  735                {
  736                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  737                }
  738                break;
  739             case CMD_ADD_UA_NOTIFICATION:
  740                if (m_masterServer)
  741                {
  742                   response.setField(VID_RCC, AddUserAgentNotification(m_serverId, request));
  743                }
  744                else
  745                {
  746                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  747                }
  748                break;
  749             case CMD_RECALL_UA_NOTIFICATION:
  750                if (m_masterServer)
  751                {
  752                   response.setField(VID_RCC, RemoveUserAgentNotification(m_serverId, request));
  753                }
  754                else
  755                {
  756                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  757                }
  758                break;
  759             case CMD_UPDATE_UA_NOTIFICATIONS:
  760                if (m_masterServer)
  761                {
  762                   response.setField(VID_RCC, UpdateUserAgentNotifications(m_serverId, request));
  763                }
  764                else
  765                {
  766                   response.setField(VID_RCC, ERR_ACCESS_DENIED);
  767                }
  768                break;
  769             default:
  770                // Attempt to process unknown command by subagents
  771                if (!ProcessCommandBySubAgent(command, request, &response, this))
  772                   response.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
  773                break;
  774          }
  775       }
  776       delete request;
  777 
  778       // Send response
  779       sendMessage(&response);
  780    }
  781 }
  782 
  783 /**
  784  * Log authentication failure
  785  */
  786 static inline void LogAuthFailure(const InetAddress& serverAddr, const TCHAR *method)
  787 {
  788    TCHAR buffer[64];
  789    nxlog_write(NXLOG_WARNING, _T("Authentication failed for peer %s (method = %s)"), serverAddr.toString(buffer), method);
  790    InterlockedIncrement(&g_authenticationFailures);
  791 }
  792 
  793 /**
  794  * Authenticate peer
  795  */
  796 void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
  797 {
  798    if (m_authenticated)
  799    {
  800       // Already authenticated
  801       pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
  802    }
  803    else
  804    {
  805       TCHAR szSecret[MAX_SECRET_LENGTH];
  806       BYTE hash[32];
  807 
  808       int authMethod = pRequest->getFieldAsInt16(VID_AUTH_METHOD);
  809       if (authMethod == 0)
  810          authMethod = AUTH_SHA1_HASH;
  811       switch(authMethod)
  812       {
  813          case AUTH_PLAINTEXT:
  814             pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
  815             if (!_tcscmp(szSecret, g_szSharedSecret))
  816             {
  817                m_authenticated = true;
  818                pMsg->setField(VID_RCC, ERR_SUCCESS);
  819             }
  820             else
  821             {
  822                LogAuthFailure(m_serverAddr, _T("PLAIN"));
  823                pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
  824             }
  825             break;
  826          case AUTH_MD5_HASH:
  827             pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
  828 #ifdef UNICODE
  829                 {
  830                     char sharedSecret[256];
  831                     WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
  832                     sharedSecret[255] = 0;
  833                     CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
  834                 }
  835 #else
  836             CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
  837 #endif
  838             if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
  839             {
  840                m_authenticated = true;
  841                pMsg->setField(VID_RCC, ERR_SUCCESS);
  842             }
  843             else
  844             {
  845                LogAuthFailure(m_serverAddr, _T("MD5"));
  846                pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
  847             }
  848             break;
  849          case AUTH_SHA1_HASH:
  850             pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
  851 #ifdef UNICODE
  852                 {
  853                     char sharedSecret[256];
  854                     WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
  855                     sharedSecret[255] = 0;
  856                     CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
  857                 }
  858 #else
  859             CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
  860 #endif
  861             if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
  862             {
  863                m_authenticated = true;
  864                pMsg->setField(VID_RCC, ERR_SUCCESS);
  865             }
  866             else
  867             {
  868                LogAuthFailure(m_serverAddr, _T("SHA1"));
  869                pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
  870             }
  871             break;
  872          default:
  873             pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
  874             break;
  875       }
  876    }
  877 }
  878 
  879 /**
  880  * Get parameter's value
  881  */
  882 void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
  883 {
  884    TCHAR parameter[MAX_RUNTIME_PARAM_NAME];
  885    pRequest->getFieldAsString(VID_PARAMETER, parameter, MAX_RUNTIME_PARAM_NAME);
  886 
  887    TCHAR value[MAX_RESULT_LENGTH];
  888    UINT32 errorCode = GetParameterValue(parameter, value, this);
  889 
  890    pMsg->setField(VID_RCC, errorCode);
  891    if (errorCode == ERR_SUCCESS)
  892       pMsg->setField(VID_VALUE, value);
  893 }
  894 
  895 /**
  896  * Get list of values
  897  */
  898 void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
  899 {
  900    TCHAR szParameter[MAX_RUNTIME_PARAM_NAME];
  901    pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_RUNTIME_PARAM_NAME);
  902 
  903    StringList value;
  904    UINT32 dwErrorCode = GetListValue(szParameter, &value, this);
  905    pMsg->setField(VID_RCC, dwErrorCode);
  906    if (dwErrorCode == ERR_SUCCESS)
  907    {
  908       value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
  909    }
  910 }
  911 
  912 /**
  913  * Get table
  914  */
  915 void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
  916 {
  917    TCHAR szParameter[MAX_RUNTIME_PARAM_NAME];
  918 
  919    pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_RUNTIME_PARAM_NAME);
  920 
  921    Table value;
  922    uint32_t rcc = GetTableValue(szParameter, &value, this);
  923    pMsg->setField(VID_RCC, rcc);
  924    if (rcc == ERR_SUCCESS)
  925    {
  926         value.fillMessage(*pMsg, 0, -1);    // no row limit
  927    }
  928 }
  929 
  930 /**
  931  * Query web service
  932  */
  933 void CommSession::queryWebService(NXCPMessage *request)
  934 {
  935    TCHAR *url = request->getFieldAsString(VID_URL);
  936    ThreadPoolExecuteSerialized(g_webSvcThreadPool, url, QueryWebService, request, static_cast<AbstractCommSession*>(this));
  937    MemFree(url);
  938 }
  939 
  940 /**
  941  * Perform action on request
  942  */
  943 void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
  944 {
  945    if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
  946       ExecuteAction(pRequest, pMsg, self());
  947    else
  948       pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
  949 }
  950 
  951 /**
  952  * Prepare for receiving file
  953  */
  954 void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
  955 {
  956     TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
  957 
  958     if (m_masterServer)
  959     {
  960         szFileName[0] = 0;
  961         pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
  962         debugPrintf(5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
  963       BuildFullPath(szFileName, szFullPath);
  964 
  965         // Check if for some reason we have already opened file
  966       pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId(), pRequest->getFieldAsTime(VID_MODIFICATION_TIME)));
  967     }
  968     else
  969     {
  970         pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
  971     }
  972 }
  973 
  974 /**
  975  * Open file for writing
  976  */
  977 uint32_t CommSession::openFile(TCHAR *szFullPath, uint32_t requestId, time_t fileModTime)
  978 {
  979    DownloadFileInfo *fInfo = new DownloadFileInfo(szFullPath, fileModTime);
  980    debugPrintf(4, _T("CommSession::openFile(): Writing to local file \"%s\""), szFullPath);
  981 
  982    if (!fInfo->open())
  983    {
  984       delete fInfo;
  985       debugPrintf(2, _T("CommSession::openFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
  986       return ERR_IO_FAILURE;
  987    }
  988    else
  989    {
  990       m_downloadFileMap.set(requestId, fInfo);
  991       return ERR_SUCCESS;
  992    }
  993 }
  994 
  995 /**
  996  * Progress callback for file sending
  997  */
  998 static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
  999 {
 1000     static_cast<CommSession*>(cbArg)->updateTimeStamp();
 1001 }
 1002 
 1003 /**
 1004  * Send file to server
 1005  */
 1006 bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset, bool allowCompression, VolatileCounter *cancellationFlag)
 1007 {
 1008    if (m_disconnected)
 1009       return false;
 1010     return SendFileOverNXCP(m_channel.get(), requestId, file, m_encryptionContext.get(), offset, SendFileProgressCallback, this, m_socketWriteMutex,
 1011             allowCompression ? NXCP_STREAM_COMPRESSION_DEFLATE : NXCP_STREAM_COMPRESSION_NONE, cancellationFlag);
 1012 }
 1013 
 1014 /**
 1015  * Upgrade agent from package in the file store
 1016  */
 1017 UINT32 CommSession::upgrade(NXCPMessage *request)
 1018 {
 1019    if (m_masterServer)
 1020    {
 1021       TCHAR packageName[MAX_PATH] = _T("");
 1022       request->getFieldAsString(VID_FILE_NAME, packageName, MAX_PATH);
 1023             
 1024       // Store upgrade file name to delete it after system start
 1025       WriteRegistry(_T("upgrade.file"), packageName);
 1026       debugPrintf(3, _T("Starting agent upgrade using package %s"), packageName);
 1027 
 1028       TCHAR fullPath[MAX_PATH];
 1029       BuildFullPath(packageName, fullPath);
 1030       return UpgradeAgent(fullPath);
 1031    }
 1032    else
 1033    {
 1034       debugPrintf(3, _T("Upgrade request from server which is not master (upgrade will not start)"));
 1035       return ERR_ACCESS_DENIED;
 1036    }
 1037 }
 1038 
 1039 /**
 1040  * Get agent's configuration file
 1041  */
 1042 void CommSession::getConfig(NXCPMessage *pMsg)
 1043 {
 1044    if (m_masterServer)
 1045    {
 1046       pMsg->setField(VID_RCC,
 1047          pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
 1048    }
 1049    else
 1050    {
 1051       pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
 1052    }
 1053 }
 1054 
 1055 /**
 1056  * Update agent's configuration file
 1057  */
 1058 void CommSession::updateConfig(NXCPMessage *request, NXCPMessage *response)
 1059 {
 1060    if (m_masterServer)
 1061    {
 1062       size_t size;
 1063       const BYTE *config = request->getBinaryFieldPtr(VID_CONFIG_FILE, &size);
 1064       if (config != nullptr)
 1065       {
 1066          debugPrintf(5, _T("CommSession::updateConfig(): writing %u bytes to %s"), static_cast<uint32_t>(size), g_szConfigFile);
 1067          SaveFileStatus status = SaveFile(g_szConfigFile, config, size, false, true);
 1068          if (status == SaveFileStatus::SUCCESS)
 1069          {
 1070             response->setField(VID_RCC, ERR_SUCCESS);
 1071             g_restartPending = true;
 1072             debugPrintf(2, _T("CommSession::updateConfig(): agent configuration file replaced"));
 1073          }
 1074          else if ((status == SaveFileStatus::OPEN_ERROR) || (status == SaveFileStatus::RENAME_ERROR))
 1075          {
 1076             debugPrintf(2, _T("CommSession::updateConfig(): cannot opening file \"%s\" for writing (%s)"), g_szConfigFile, _tcserror(errno));
 1077             response->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
 1078          }
 1079          else
 1080          {
 1081             debugPrintf(2, _T("CommSession::updateConfig(): error writing file (%s)"), _tcserror(errno));
 1082             response->setField(VID_RCC, ERR_IO_FAILURE);
 1083          }
 1084       }
 1085       else
 1086       {
 1087          debugPrintf(2, _T("CommSession::updateConfig(): file content not provided in request"));
 1088          response->setField(VID_RCC, ERR_MALFORMED_COMMAND);
 1089       }
 1090    }
 1091    else
 1092    {
 1093       debugPrintf(2, _T("CommSession::updateConfig(): access denied"));
 1094       response->setField(VID_RCC, ERR_ACCESS_DENIED);
 1095    }
 1096 }
 1097 
 1098 /**
 1099  * Get hostname by IP address
 1100  */
 1101 void CommSession::getHostNameByAddr(NXCPMessage *request, NXCPMessage *response)
 1102 {
 1103    InetAddress addr = request->getFieldAsInetAddress(VID_IP_ADDRESS);
 1104    if (addr.isValid())
 1105    {
 1106       TCHAR dnsName[MAX_DNS_NAME];
 1107       if (addr.getHostByAddr(dnsName, MAX_DNS_NAME) != NULL)
 1108       {
 1109          response->setField(VID_NAME, dnsName);
 1110          response->setField(VID_RCC, ERR_SUCCESS);
 1111       }
 1112       else
 1113       {
 1114          response->setField(VID_RCC, ERR_NO_SUCH_INSTANCE);
 1115       }
 1116    }
 1117    else
 1118    {
 1119       response->setField(VID_RCC, ERR_BAD_ARGUMENTS);
 1120    }
 1121 }
 1122 
 1123 /**
 1124  * Setup proxy connection
 1125  */
 1126 UINT32 CommSession::setupProxyConnection(NXCPMessage *request)
 1127 {
 1128    if (!m_masterServer || !(g_dwFlags & AF_ENABLE_PROXY))
 1129       return ERR_ACCESS_DENIED;
 1130 
 1131    InetAddress addr = request->isFieldExist(VID_DESTINATION_ADDRESS) ?
 1132             request->getFieldAsInetAddress(VID_DESTINATION_ADDRESS) :
 1133             request->getFieldAsInetAddress(VID_IP_ADDRESS);
 1134    uint16_t port = request->getFieldAsUInt16(VID_AGENT_PORT);
 1135    m_hProxySocket = ConnectToHost(addr, port, 10000);
 1136    if (m_hProxySocket == INVALID_SOCKET)
 1137    {
 1138       debugPrintf(5, _T("Failed to setup proxy connection to %s:%d"), (const TCHAR *)addr.toString(), port);
 1139       return ERR_CONNECT_FAILED;
 1140    }
 1141 
 1142    // Finish proxy connection setup
 1143    shared_ptr<NXCPEncryptionContext> savedCtx = m_encryptionContext;
 1144    m_encryptionContext.reset();
 1145    m_proxyConnection = true;
 1146    m_proxyReadThread = ThreadCreateEx(this, &CommSession::proxyReadThread);
 1147 
 1148    // Send confirmation message
 1149    // We cannot use sendMessage() because encryption context already overridden
 1150    NXCPMessage msg(CMD_REQUEST_COMPLETED, request->getId());
 1151    msg.setField(VID_RCC, RCC_SUCCESS);
 1152    NXCP_MESSAGE *pRawMsg = msg.serialize();
 1153    sendRawMessage(pRawMsg, savedCtx.get());
 1154 
 1155    debugPrintf(5, _T("Established proxy connection to %s:%d"), addr.toString().cstr(), port);
 1156    return ERR_SUCCESS;
 1157 }
 1158 
 1159 /**
 1160  * Proxy reading thread
 1161  */
 1162 void CommSession::proxyReadThread()
 1163 {
 1164    SocketPoller sp;
 1165    while(true)
 1166    {
 1167       sp.reset();
 1168       sp.add(m_hProxySocket);
 1169       int rc = sp.poll(500);   // Half-second timeout
 1170       if (rc < 0)
 1171          break;
 1172       if (rc > 0)
 1173       {
 1174          char buffer[32768];
 1175          rc = recv(m_hProxySocket, buffer, 32768, 0);
 1176          if (rc <= 0)
 1177             break;
 1178          m_channel->send(buffer, rc, m_socketWriteMutex);
 1179       }
 1180    }
 1181    disconnect();
 1182 }
 1183 
 1184 /**
 1185  * Wait for request completion
 1186  */
 1187 uint32_t CommSession::doRequest(NXCPMessage *msg, uint32_t timeout)
 1188 {
 1189    if (!sendMessage(msg))
 1190       return ERR_CONNECTION_BROKEN;
 1191 
 1192    NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
 1193    UINT32 rcc;
 1194    if (response != NULL)
 1195    {
 1196       rcc = response->getFieldAsUInt32(VID_RCC);
 1197       delete response;
 1198    }
 1199    else
 1200    {
 1201       rcc = ERR_REQUEST_TIMEOUT;
 1202    }
 1203    return rcc;
 1204 }
 1205 
 1206 /**
 1207  * Wait for request completion
 1208  */
 1209 NXCPMessage *CommSession::doRequestEx(NXCPMessage *msg, uint32_t timeout)
 1210 {
 1211    if (!sendMessage(msg))
 1212       return NULL;
 1213    return m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
 1214 }
 1215 
 1216 /**
 1217  * Wait for specific message
 1218  */
 1219 NXCPMessage *CommSession::waitForMessage(UINT16 code, UINT32 id, UINT32 timeout)
 1220 {
 1221    return m_responseQueue->waitForMessage(code, id, timeout);
 1222 }
 1223 
 1224 /**
 1225  * Generate new request ID
 1226  */
 1227 uint32_t CommSession::generateRequestId()
 1228 {
 1229    return static_cast<uint32_t>(InterlockedIncrement(&m_requestId));
 1230 }
 1231 
 1232 /**
 1233  * Setup TCP proxy
 1234  */
 1235 void CommSession::setupTcpProxy(NXCPMessage *request, NXCPMessage *response)
 1236 {
 1237    uint32_t rcc = ERR_CONNECT_FAILED;
 1238    InetAddress addr = request->getFieldAsInetAddress(VID_IP_ADDRESS);
 1239    UINT16 port = request->getFieldAsUInt16(VID_PORT);
 1240    SOCKET s = ConnectToHost(addr, port, 5000);
 1241    if (s != INVALID_SOCKET)
 1242    {
 1243       TcpProxy *proxy = new TcpProxy(this, s);
 1244       response->setField(VID_CHANNEL_ID, proxy->getId());
 1245       MutexLock(m_tcpProxyLock);
 1246       m_tcpProxies.add(proxy);
 1247       if (m_tcpProxyReadThread == INVALID_THREAD_HANDLE)
 1248          m_tcpProxyReadThread = ThreadCreateEx(this, &CommSession::tcpProxyReadThread);
 1249       MutexUnlock(m_tcpProxyLock);
 1250       debugPrintf(5, _T("TCP proxy %d created (destination address %s port %d)"),
 1251                proxy->getId(), (const TCHAR *)addr.toString(), (int)port);
 1252       rcc = ERR_SUCCESS;
 1253    }
 1254    else
 1255    {
 1256       debugPrintf(5, _T("Cannot setup TCP proxy (cannot connect to %s port %d - %hs)"),
 1257                (const TCHAR *)addr.toString(), (int)port, strerror(errno));
 1258    }
 1259    response->setField(VID_RCC, rcc);
 1260 }
 1261 
 1262 /**
 1263  * Close TCP proxy
 1264  */
 1265 uint32_t CommSession::closeTcpProxy(NXCPMessage *request)
 1266 {
 1267    uint32_t rcc = ERR_INVALID_OBJECT;
 1268    uint32_t id = request->getFieldAsUInt32(VID_CHANNEL_ID);
 1269    MutexLock(m_tcpProxyLock);
 1270    for(int i = 0; i < m_tcpProxies.size(); i++)
 1271    {
 1272       if (m_tcpProxies.get(i)->getId() == id)
 1273       {
 1274          m_tcpProxies.remove(i);
 1275          rcc = ERR_SUCCESS;
 1276          break;
 1277       }
 1278    }
 1279    MutexUnlock(m_tcpProxyLock);
 1280    return rcc;
 1281 }
 1282 
 1283 /**
 1284  * TCP proxy read thread
 1285  */
 1286 void CommSession::tcpProxyReadThread()
 1287 {
 1288    debugPrintf(2, _T("TCP proxy read thread started"));
 1289 
 1290    SocketPoller sp;
 1291    while(!m_disconnected)
 1292    {
 1293       sp.reset();
 1294 
 1295       MutexLock(m_tcpProxyLock);
 1296       if (m_tcpProxies.isEmpty())
 1297       {
 1298          MutexUnlock(m_tcpProxyLock);
 1299          ThreadSleepMs(500);
 1300          continue;
 1301       }
 1302       for(int i = 0; i < m_tcpProxies.size(); i++)
 1303          sp.add(m_tcpProxies.get(i)->getSocket());
 1304       MutexUnlock(m_tcpProxyLock);
 1305 
 1306       int rc = sp.poll(500);
 1307       if (rc < 0)
 1308          break;
 1309 
 1310       if (rc > 0)
 1311       {
 1312          MutexLock(m_tcpProxyLock);
 1313          for(int i = 0; i < m_tcpProxies.size(); i++)
 1314          {
 1315             TcpProxy *p = m_tcpProxies.get(i);
 1316             if (sp.isSet(p->getSocket()))
 1317             {
 1318                if (!p->readSocket())
 1319                {
 1320                   // Socket read error, close proxy
 1321                   debugPrintf(5, _T("TCP proxy %d closed because of socket read error"), p->getId());
 1322                   m_tcpProxies.remove(i);
 1323                   i--;
 1324                }
 1325             }
 1326          }
 1327          MutexUnlock(m_tcpProxyLock);
 1328       }
 1329    }
 1330 
 1331    debugPrintf(2, _T("TCP proxy read thread stopped"));
 1332 }
 1333 
 1334 /**
 1335  * Prepare setup message for proxy session on external subagent side
 1336  */
 1337 void CommSession::prepareProxySessionSetupMsg(NXCPMessage *msg)
 1338 {
 1339    msg->setField(VID_SESSION_ID, m_id);
 1340    msg->setField(VID_SERVER_ID, m_serverId);
 1341    msg->setField(VID_IP_ADDRESS, m_serverAddr);
 1342 
 1343    UINT32 flags = 0;
 1344    if (m_masterServer)
 1345       flags |= 0x01;
 1346    if (m_controlServer)
 1347       flags |= 0x02;
 1348    if (m_acceptData)
 1349       flags |= 0x04;
 1350    if (m_acceptTraps)
 1351       flags |= 0x08;
 1352    if (m_acceptFileUpdates)
 1353       flags |= 0x10;
 1354    if (m_ipv6Aware)
 1355       flags |= 0x20;
 1356    msg->setField(VID_FLAGS, flags);
 1357 }
 1358 
 1359 /**
 1360  * Virtual session constructor
 1361  */
 1362 VirtualSession::VirtualSession(UINT64 serverId)
 1363 {
 1364    m_id = InterlockedIncrement(&s_sessionId);
 1365    m_serverId = serverId;
 1366 }
 1367 
 1368 /**
 1369  * Virtual session destructor
 1370  */
 1371 VirtualSession::~VirtualSession()
 1372 {
 1373 }
 1374 
 1375 /**
 1376  * Debug print in virtual session context
 1377  */
 1378 void VirtualSession::debugPrintf(int level, const TCHAR *format, ...)
 1379 {
 1380    va_list args;
 1381    va_start(args, format);
 1382    nxlog_debug_tag_object2(_T("comm.vs"), m_id, level, format, args);
 1383    va_end(args);
 1384 }
 1385 
 1386 /**
 1387  * Proxy session constructor
 1388  */
 1389 ProxySession::ProxySession(NXCPMessage *msg)
 1390 {
 1391    m_id = msg->getFieldAsUInt32(VID_SESSION_ID);
 1392    m_serverId = msg->getFieldAsUInt64(VID_SERVER_ID);
 1393    m_serverAddress = msg->getFieldAsInetAddress(VID_IP_ADDRESS);
 1394    
 1395    UINT32 flags = msg->getFieldAsUInt32(VID_FLAGS);
 1396    m_masterServer = ((flags & 0x01) != 0);
 1397    m_controlServer = ((flags & 0x02) != 0);
 1398    m_canAcceptData = ((flags & 0x04) != 0);
 1399    m_canAcceptTraps = ((flags & 0x08) != 0);
 1400    m_canAcceptFileUpdates = ((flags & 0x10) != 0);
 1401    m_ipv6Aware = ((flags & 0x20) != 0);
 1402 }
 1403 
 1404 /**
 1405  * Proxy session destructor
 1406  */
 1407 ProxySession::~ProxySession()
 1408 {
 1409 }
 1410 
 1411 /**
 1412  * Debug print in proxy session context
 1413  */
 1414 void ProxySession::debugPrintf(int level, const TCHAR *format, ...)
 1415 {
 1416    va_list args;
 1417    va_start(args, format);
 1418    nxlog_debug_tag_object2(_T("comm.ps"), m_id, level, format, args);
 1419    va_end(args);
 1420 }
 1421 
 1422 /**
 1423  * Send message to client via master agent
 1424  */
 1425 bool ProxySession::sendMessage(const NXCPMessage *msg)
 1426 {
 1427    NXCP_MESSAGE *rawMsg = msg->serialize();
 1428    bool success = sendRawMessage(rawMsg);
 1429    free(rawMsg);
 1430    return success;
 1431 }
 1432 
 1433 /**
 1434  * Post message to client via master agent
 1435  */
 1436 void ProxySession::postMessage(const NXCPMessage *msg)
 1437 {
 1438    sendMessage(msg);
 1439 }
 1440 
 1441 /**
 1442  * Send raw message to client via master agent
 1443  */
 1444 bool ProxySession::sendRawMessage(const NXCP_MESSAGE *msg)
 1445 {
 1446    UINT32 msgSize = ntohl(msg->size);
 1447    NXCP_MESSAGE *pmsg = (NXCP_MESSAGE *)malloc(msgSize + NXCP_HEADER_SIZE);
 1448    pmsg->code = htons(CMD_PROXY_MESSAGE);
 1449    pmsg->id = htonl(m_id);
 1450    pmsg->flags = htons(MF_BINARY);
 1451    pmsg->size = htonl(msgSize + NXCP_HEADER_SIZE);
 1452    pmsg->numFields = msg->size;
 1453    memcpy(pmsg->fields, msg, msgSize);
 1454    bool success = SendRawMessageToMasterAgent(pmsg);
 1455    free(pmsg);
 1456    return success;
 1457 }
 1458 
 1459 /**
 1460  * Post raw message to client via master agent
 1461  */
 1462 void ProxySession::postRawMessage(const NXCP_MESSAGE *msg)
 1463 {
 1464    sendRawMessage(msg);
 1465 }