"Fossies" - the Fresh Open Source Software Archive

Member "netxms-3.8.166/src/server/core/isc.cpp" (23 Feb 2021, 8222 Bytes) of package /linux/misc/netxms-3.8.166.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "isc.cpp" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 3.7.95_vs_3.7.116.

    1 /* 
    2 ** NetXMS - Network Management System
    3 ** Copyright (C) 2003-2021 Victor Kirhenshtein
    4 **
    5 ** This program is free software; you can redistribute it and/or modify
    6 ** it under the terms of the GNU General Public License as published by
    7 ** the Free Software Foundation; either version 2 of the License, or
    8 ** (at your option) any later version.
    9 **
   10 ** This program is distributed in the hope that it will be useful,
   11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 ** GNU General Public License for more details.
   14 **
   15 ** You should have received a copy of the GNU General Public License
   16 ** along with this program; if not, write to the Free Software
   17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
   18 **
   19 ** File: isc.cpp
   20 **
   21 **/
   22 
   23 #include "nxcore.h"
   24 
   25 
   26 //
   27 // Constants
   28 //
   29 
   30 #define MAX_MSG_SIZE       262144
   31 
   32 #define ISC_STATE_INIT        0
   33 #define ISC_STATE_CONNECTED   1
   34 
   35 /**
   36  * Service handlers
   37  */
   38 BOOL EF_SetupSession(ISCSession *, NXCPMessage *);
   39 void EF_CloseSession(ISCSession *);
   40 BOOL EF_ProcessMessage(ISCSession *, NXCPMessage *, NXCPMessage *);
   41 
   42 /**
   43  * Well-known service list
   44  */
   45 static ISC_SERVICE m_serviceList[] = 
   46 {
   47     { ISC_SERVICE_EVENT_FORWARDER, _T("EventForwarder"),
   48       _T("ReceiveForwardedEvents"), EF_SetupSession, EF_CloseSession, EF_ProcessMessage },
   49     { 0, NULL, NULL }
   50 };
   51 
   52 /**
   53  * Request processing thread
   54  */
   55 static void ProcessingThread(ISCSession *session)
   56 {
   57    SOCKET sock = session->GetSocket();
   58    int i, serviceIndex, state = ISC_STATE_INIT;
   59    NXCPMessage *pRequest, response;
   60    UINT32 serviceId;
   61     TCHAR buffer[256], dbgPrefix[128];
   62 
   63     _sntprintf(dbgPrefix, 128, _T("ISC<%s>:"), IpToStr(session->GetPeerAddress(), buffer));
   64 
   65     SocketMessageReceiver receiver(sock, 4096, MAX_MSG_SIZE);
   66    while(true)
   67    {
   68       MessageReceiverResult result;
   69       pRequest = receiver.readMessage(300000, &result);
   70       if ((result == MSGRECV_CLOSED) || (result == MSGRECV_COMM_FAILURE) || (result == MSGRECV_TIMEOUT) || (result == MSGRECV_PROTOCOL_ERROR))
   71         {
   72             if (result != MSGRECV_CLOSED)
   73             DbgPrintf(5, _T("%s message read failed: %s"), dbgPrefix, AbstractMessageReceiver::resultToText(result));
   74             else
   75             DbgPrintf(5, _T("%s connection closed"), dbgPrefix);
   76          break;   // Communication error or closed connection
   77         }
   78 
   79       if (pRequest == nullptr)
   80          continue;   // Ignore other errors
   81 
   82       if (pRequest->isControl())
   83       {
   84          DbgPrintf(5, _T("%s received control message %s"), dbgPrefix, NXCPMessageCodeName(pRequest->getCode(), buffer));
   85          if (pRequest->getCode() == CMD_GET_NXCP_CAPS)
   86          {
   87             NXCP_MESSAGE *pRawMsgOut = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
   88             pRawMsgOut->id = htonl(pRequest->getId());
   89             pRawMsgOut->code = htons((uint16_t)CMD_NXCP_CAPS);
   90             pRawMsgOut->flags = htons(MF_CONTROL);
   91             pRawMsgOut->numFields = htonl(NXCP_VERSION << 24);
   92             pRawMsgOut->size = htonl(NXCP_HEADER_SIZE);
   93                 if (SendEx(sock, pRawMsgOut, NXCP_HEADER_SIZE, 0, nullptr) != NXCP_HEADER_SIZE)
   94                     DbgPrintf(5, _T("%s SendEx() failed in ProcessingThread(): %s"), dbgPrefix, GetLastSocketErrorText(buffer, 256));
   95                 MemFree(pRawMsgOut);
   96          }
   97       }
   98         else
   99         {
  100             DbgPrintf(5, _T("%s message %s received"), dbgPrefix, NXCPMessageCodeName(pRequest->getCode(), buffer));
  101             if (pRequest->getCode() == CMD_KEEPALIVE)
  102             {
  103                 response.setField(VID_RCC, ISC_ERR_SUCCESS);
  104             }
  105             else
  106             {
  107                 if (state == ISC_STATE_INIT)
  108                 {
  109                     if (pRequest->getCode() == CMD_ISC_CONNECT_TO_SERVICE)
  110                     {
  111                         // Find requested service
  112                     serviceId = pRequest->getFieldAsUInt32(VID_SERVICE_ID);
  113                         DbgPrintf(4, _T("%s attempt to connect to service %d"), dbgPrefix, serviceId);
  114                         for(i = 0; m_serviceList[i].id != 0; i++)
  115                             if (m_serviceList[i].id == serviceId)
  116                                 break;
  117                         if (m_serviceList[i].id != 0)
  118                         {
  119                             // Check if service is enabled
  120                             if (ConfigReadBoolean(m_serviceList[i].enableParameter, false))
  121                             {
  122                                 if (m_serviceList[i].setupSession(session, pRequest))
  123                                 {
  124                                     response.setField(VID_RCC, ISC_ERR_SUCCESS);
  125                                     state = ISC_STATE_CONNECTED;
  126                                     serviceIndex = i;
  127                                     DbgPrintf(4, _T("%s connected to service %d"), dbgPrefix, serviceId);
  128                                 }
  129                                 else
  130                                 {
  131                                     response.setField(VID_RCC, ISC_ERR_SESSION_SETUP_FAILED);
  132                                 }
  133                             }
  134                             else
  135                             {
  136                                 response.setField(VID_RCC, ISC_ERR_SERVICE_DISABLED);
  137                             }
  138                         }
  139                         else
  140                         {
  141                             response.setField(VID_RCC, ISC_ERR_UNKNOWN_SERVICE);
  142                         }
  143                     }
  144                     else
  145                     {
  146                         DbgPrintf(4, _T("%s out of state request"), dbgPrefix);
  147                         response.setField(VID_RCC, ISC_ERR_REQUEST_OUT_OF_STATE);
  148                     }
  149                 }
  150                 else    // Established session
  151                 {
  152                     if (m_serviceList[serviceIndex].processMsg(session, pRequest, &response))
  153                         break;  // Service asks to close session
  154                 }
  155             }
  156             
  157             response.setId(pRequest->getId());
  158             response.setCode(CMD_REQUEST_COMPLETED);
  159             NXCP_MESSAGE *pRawMsgOut = response.serialize();
  160             DbgPrintf(5, _T("%s sending message %s"), dbgPrefix, NXCPMessageCodeName(response.getCode(), buffer));
  161             if (SendEx(sock, pRawMsgOut, ntohl(pRawMsgOut->size), 0, NULL) != (int)ntohl(pRawMsgOut->size))
  162                 DbgPrintf(5, _T("%s SendEx() failed in ProcessingThread(): %s"), dbgPrefix, strerror(WSAGetLastError()));
  163       
  164             response.deleteAllFields();
  165             MemFree(pRawMsgOut);
  166         }
  167       delete pRequest;
  168    }
  169 
  170     // Close_session
  171     if (state == ISC_STATE_CONNECTED)
  172         m_serviceList[serviceIndex].closeSession(session);
  173     DbgPrintf(3, _T("%s session closed"), dbgPrefix);
  174 
  175    shutdown(sock, 2);
  176    closesocket(sock);
  177     delete session;
  178 }
  179 
  180 /**
  181  * ISC listener thread
  182  */
  183 void ISCListener()
  184 {
  185    SOCKET sock, sockClient;
  186    struct sockaddr_in servAddr;
  187    int errorCount = 0;
  188    socklen_t iSize;
  189     ISCSession *session;
  190     TCHAR buffer[32];
  191 
  192    // Create socket
  193    if ((sock = CreateSocket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
  194    {
  195       TCHAR buffer[1024];
  196       nxlog_write(NXLOG_ERROR, _T("Unable to create socket for ISC listener (%s)"), GetLastSocketErrorText(buffer, 1024));
  197       return;
  198    }
  199 
  200     SetSocketExclusiveAddrUse(sock);
  201     SetSocketReuseFlag(sock);
  202 #ifndef _WIN32
  203    fcntl(sock, F_SETFD, fcntl(sock, F_GETFD) | FD_CLOEXEC);
  204 #endif
  205 
  206    // Fill in local address structure
  207    memset(&servAddr, 0, sizeof(struct sockaddr_in));
  208    servAddr.sin_family = AF_INET;
  209    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  210    servAddr.sin_port = htons(NETXMS_ISC_PORT);
  211 
  212    // Bind socket
  213    if (bind(sock, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
  214    {
  215       TCHAR buffer[1024];
  216       nxlog_write(NXLOG_ERROR, _T("Unable to bind socket for ISC listener (%s)"), GetLastSocketErrorText(buffer, 1024));
  217       closesocket(sock);
  218       /* TODO: we should initiate shutdown from here */
  219       return;
  220    }
  221 
  222    // Set up queue
  223    listen(sock, SOMAXCONN);
  224     DbgPrintf(1, _T("ISC listener started"));
  225 
  226    // Wait for connection requests
  227    while(!IsShutdownInProgress())
  228    {
  229       iSize = sizeof(struct sockaddr_in);
  230       if ((sockClient = accept(sock, (struct sockaddr *)&servAddr, &iSize)) == -1)
  231       {
  232          int error;
  233 
  234 #ifdef _WIN32
  235          error = WSAGetLastError();
  236          if (error != WSAEINTR)
  237 #else
  238          error = errno;
  239          if (error != EINTR)
  240 #endif
  241          {
  242             TCHAR buffer[1024];
  243             nxlog_write(NXLOG_ERROR, _T("Unable to accept incoming ISC connection (%s)"), GetLastSocketErrorText(buffer, 1024));
  244          }
  245          errorCount++;
  246          if (errorCount > 1000)
  247          {
  248             nxlog_write(NXLOG_WARNING, _T("Too many consecutive errors on accept() call in ISC listener"));
  249             errorCount = 0;
  250          }
  251          ThreadSleepMs(500);
  252       }
  253         else
  254         {
  255             errorCount = 0;     // Reset consecutive errors counter
  256 
  257             // Create new session structure and threads
  258             DbgPrintf(3, _T("New ISC connection from %s"), IpToStr(ntohl(servAddr.sin_addr.s_addr), buffer));
  259             session = new ISCSession(sockClient, &servAddr);
  260             ThreadCreate(ProcessingThread, session);
  261         }
  262    }
  263 
  264    closesocket(sock);
  265     DbgPrintf(1, _T("ISC listener stopped"));
  266 }