"Fossies" - the Fresh Open Source Software Archive 
Member "netxms-3.8.166/src/server/core/isc.cpp" (23 Feb 2021, 8222 Bytes) of package /linux/misc/netxms-3.8.166.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "isc.cpp" see the
Fossies "Dox" file reference documentation and the last
Fossies "Diffs" side-by-side code changes report:
3.7.95_vs_3.7.116.
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2021 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: isc.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25
26 //
27 // Constants
28 //
29
30 #define MAX_MSG_SIZE 262144
31
32 #define ISC_STATE_INIT 0
33 #define ISC_STATE_CONNECTED 1
34
35 /**
36 * Service handlers
37 */
38 BOOL EF_SetupSession(ISCSession *, NXCPMessage *);
39 void EF_CloseSession(ISCSession *);
40 BOOL EF_ProcessMessage(ISCSession *, NXCPMessage *, NXCPMessage *);
41
42 /**
43 * Well-known service list
44 */
45 static ISC_SERVICE m_serviceList[] =
46 {
47 { ISC_SERVICE_EVENT_FORWARDER, _T("EventForwarder"),
48 _T("ReceiveForwardedEvents"), EF_SetupSession, EF_CloseSession, EF_ProcessMessage },
49 { 0, NULL, NULL }
50 };
51
52 /**
53 * Request processing thread
54 */
55 static void ProcessingThread(ISCSession *session)
56 {
57 SOCKET sock = session->GetSocket();
58 int i, serviceIndex, state = ISC_STATE_INIT;
59 NXCPMessage *pRequest, response;
60 UINT32 serviceId;
61 TCHAR buffer[256], dbgPrefix[128];
62
63 _sntprintf(dbgPrefix, 128, _T("ISC<%s>:"), IpToStr(session->GetPeerAddress(), buffer));
64
65 SocketMessageReceiver receiver(sock, 4096, MAX_MSG_SIZE);
66 while(true)
67 {
68 MessageReceiverResult result;
69 pRequest = receiver.readMessage(300000, &result);
70 if ((result == MSGRECV_CLOSED) || (result == MSGRECV_COMM_FAILURE) || (result == MSGRECV_TIMEOUT) || (result == MSGRECV_PROTOCOL_ERROR))
71 {
72 if (result != MSGRECV_CLOSED)
73 DbgPrintf(5, _T("%s message read failed: %s"), dbgPrefix, AbstractMessageReceiver::resultToText(result));
74 else
75 DbgPrintf(5, _T("%s connection closed"), dbgPrefix);
76 break; // Communication error or closed connection
77 }
78
79 if (pRequest == nullptr)
80 continue; // Ignore other errors
81
82 if (pRequest->isControl())
83 {
84 DbgPrintf(5, _T("%s received control message %s"), dbgPrefix, NXCPMessageCodeName(pRequest->getCode(), buffer));
85 if (pRequest->getCode() == CMD_GET_NXCP_CAPS)
86 {
87 NXCP_MESSAGE *pRawMsgOut = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
88 pRawMsgOut->id = htonl(pRequest->getId());
89 pRawMsgOut->code = htons((uint16_t)CMD_NXCP_CAPS);
90 pRawMsgOut->flags = htons(MF_CONTROL);
91 pRawMsgOut->numFields = htonl(NXCP_VERSION << 24);
92 pRawMsgOut->size = htonl(NXCP_HEADER_SIZE);
93 if (SendEx(sock, pRawMsgOut, NXCP_HEADER_SIZE, 0, nullptr) != NXCP_HEADER_SIZE)
94 DbgPrintf(5, _T("%s SendEx() failed in ProcessingThread(): %s"), dbgPrefix, GetLastSocketErrorText(buffer, 256));
95 MemFree(pRawMsgOut);
96 }
97 }
98 else
99 {
100 DbgPrintf(5, _T("%s message %s received"), dbgPrefix, NXCPMessageCodeName(pRequest->getCode(), buffer));
101 if (pRequest->getCode() == CMD_KEEPALIVE)
102 {
103 response.setField(VID_RCC, ISC_ERR_SUCCESS);
104 }
105 else
106 {
107 if (state == ISC_STATE_INIT)
108 {
109 if (pRequest->getCode() == CMD_ISC_CONNECT_TO_SERVICE)
110 {
111 // Find requested service
112 serviceId = pRequest->getFieldAsUInt32(VID_SERVICE_ID);
113 DbgPrintf(4, _T("%s attempt to connect to service %d"), dbgPrefix, serviceId);
114 for(i = 0; m_serviceList[i].id != 0; i++)
115 if (m_serviceList[i].id == serviceId)
116 break;
117 if (m_serviceList[i].id != 0)
118 {
119 // Check if service is enabled
120 if (ConfigReadBoolean(m_serviceList[i].enableParameter, false))
121 {
122 if (m_serviceList[i].setupSession(session, pRequest))
123 {
124 response.setField(VID_RCC, ISC_ERR_SUCCESS);
125 state = ISC_STATE_CONNECTED;
126 serviceIndex = i;
127 DbgPrintf(4, _T("%s connected to service %d"), dbgPrefix, serviceId);
128 }
129 else
130 {
131 response.setField(VID_RCC, ISC_ERR_SESSION_SETUP_FAILED);
132 }
133 }
134 else
135 {
136 response.setField(VID_RCC, ISC_ERR_SERVICE_DISABLED);
137 }
138 }
139 else
140 {
141 response.setField(VID_RCC, ISC_ERR_UNKNOWN_SERVICE);
142 }
143 }
144 else
145 {
146 DbgPrintf(4, _T("%s out of state request"), dbgPrefix);
147 response.setField(VID_RCC, ISC_ERR_REQUEST_OUT_OF_STATE);
148 }
149 }
150 else // Established session
151 {
152 if (m_serviceList[serviceIndex].processMsg(session, pRequest, &response))
153 break; // Service asks to close session
154 }
155 }
156
157 response.setId(pRequest->getId());
158 response.setCode(CMD_REQUEST_COMPLETED);
159 NXCP_MESSAGE *pRawMsgOut = response.serialize();
160 DbgPrintf(5, _T("%s sending message %s"), dbgPrefix, NXCPMessageCodeName(response.getCode(), buffer));
161 if (SendEx(sock, pRawMsgOut, ntohl(pRawMsgOut->size), 0, NULL) != (int)ntohl(pRawMsgOut->size))
162 DbgPrintf(5, _T("%s SendEx() failed in ProcessingThread(): %s"), dbgPrefix, strerror(WSAGetLastError()));
163
164 response.deleteAllFields();
165 MemFree(pRawMsgOut);
166 }
167 delete pRequest;
168 }
169
170 // Close_session
171 if (state == ISC_STATE_CONNECTED)
172 m_serviceList[serviceIndex].closeSession(session);
173 DbgPrintf(3, _T("%s session closed"), dbgPrefix);
174
175 shutdown(sock, 2);
176 closesocket(sock);
177 delete session;
178 }
179
180 /**
181 * ISC listener thread
182 */
183 void ISCListener()
184 {
185 SOCKET sock, sockClient;
186 struct sockaddr_in servAddr;
187 int errorCount = 0;
188 socklen_t iSize;
189 ISCSession *session;
190 TCHAR buffer[32];
191
192 // Create socket
193 if ((sock = CreateSocket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
194 {
195 TCHAR buffer[1024];
196 nxlog_write(NXLOG_ERROR, _T("Unable to create socket for ISC listener (%s)"), GetLastSocketErrorText(buffer, 1024));
197 return;
198 }
199
200 SetSocketExclusiveAddrUse(sock);
201 SetSocketReuseFlag(sock);
202 #ifndef _WIN32
203 fcntl(sock, F_SETFD, fcntl(sock, F_GETFD) | FD_CLOEXEC);
204 #endif
205
206 // Fill in local address structure
207 memset(&servAddr, 0, sizeof(struct sockaddr_in));
208 servAddr.sin_family = AF_INET;
209 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
210 servAddr.sin_port = htons(NETXMS_ISC_PORT);
211
212 // Bind socket
213 if (bind(sock, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
214 {
215 TCHAR buffer[1024];
216 nxlog_write(NXLOG_ERROR, _T("Unable to bind socket for ISC listener (%s)"), GetLastSocketErrorText(buffer, 1024));
217 closesocket(sock);
218 /* TODO: we should initiate shutdown from here */
219 return;
220 }
221
222 // Set up queue
223 listen(sock, SOMAXCONN);
224 DbgPrintf(1, _T("ISC listener started"));
225
226 // Wait for connection requests
227 while(!IsShutdownInProgress())
228 {
229 iSize = sizeof(struct sockaddr_in);
230 if ((sockClient = accept(sock, (struct sockaddr *)&servAddr, &iSize)) == -1)
231 {
232 int error;
233
234 #ifdef _WIN32
235 error = WSAGetLastError();
236 if (error != WSAEINTR)
237 #else
238 error = errno;
239 if (error != EINTR)
240 #endif
241 {
242 TCHAR buffer[1024];
243 nxlog_write(NXLOG_ERROR, _T("Unable to accept incoming ISC connection (%s)"), GetLastSocketErrorText(buffer, 1024));
244 }
245 errorCount++;
246 if (errorCount > 1000)
247 {
248 nxlog_write(NXLOG_WARNING, _T("Too many consecutive errors on accept() call in ISC listener"));
249 errorCount = 0;
250 }
251 ThreadSleepMs(500);
252 }
253 else
254 {
255 errorCount = 0; // Reset consecutive errors counter
256
257 // Create new session structure and threads
258 DbgPrintf(3, _T("New ISC connection from %s"), IpToStr(ntohl(servAddr.sin_addr.s_addr), buffer));
259 session = new ISCSession(sockClient, &servAddr);
260 ThreadCreate(ProcessingThread, session);
261 }
262 }
263
264 closesocket(sock);
265 DbgPrintf(1, _T("ISC listener stopped"));
266 }