"Fossies" - the Fresh Open Source Software Archive 
Member "netxms-3.8.166/src/server/core/zone.cpp" (23 Feb 2021, 25300 Bytes) of package /linux/misc/netxms-3.8.166.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "zone.cpp" see the
Fossies "Dox" file reference documentation and the last
Fossies "Diffs" side-by-side code changes report:
3.6.300_vs_3.7.95.
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2020 Raden Solutions
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: zone.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 #define DEBUG_TAG_ZONE_PROXY _T("zone.proxy")
26
27 /**
28 * Dump index to console
29 */
30 void DumpIndex(CONSOLE_CTX pCtx, InetAddressIndex *index);
31
32 /**
33 * Zone class default constructor
34 */
35 Zone::Zone() : super()
36 {
37 m_id = 0;
38 m_uin = 0;
39 _tcscpy(m_name, _T("Default"));
40 m_proxyNodes = new ObjectArray<ZoneProxy>(0, 16, Ownership::True);
41 GenerateRandomBytes(m_proxyAuthKey, ZONE_PROXY_KEY_LENGTH);
42 m_idxNodeByAddr = new InetAddressIndex;
43 m_idxInterfaceByAddr = new InetAddressIndex;
44 m_idxSubnetByAddr = new InetAddressIndex;
45 m_lastHealthCheck = NEVER;
46 m_lockedForHealthCheck = false;
47 }
48
49 /**
50 * Constructor for new zone object
51 */
52 Zone::Zone(int32_t uin, const TCHAR *name) : super()
53 {
54 m_id = 0;
55 m_uin = uin;
56 _tcslcpy(m_name, name, MAX_OBJECT_NAME);
57 m_proxyNodes = new ObjectArray<ZoneProxy>(0, 16, Ownership::True);
58 GenerateRandomBytes(m_proxyAuthKey, ZONE_PROXY_KEY_LENGTH);
59 m_idxNodeByAddr = new InetAddressIndex;
60 m_idxInterfaceByAddr = new InetAddressIndex;
61 m_idxSubnetByAddr = new InetAddressIndex;
62 m_lastHealthCheck = NEVER;
63 m_lockedForHealthCheck = false;
64 setCreationTime();
65 }
66
67 /**
68 * Zone class destructor
69 */
70 Zone::~Zone()
71 {
72 delete m_proxyNodes;
73 delete m_idxNodeByAddr;
74 delete m_idxInterfaceByAddr;
75 delete m_idxSubnetByAddr;
76 }
77
78 /**
79 * Create object from database data
80 */
81 bool Zone::loadFromDatabase(DB_HANDLE hdb, UINT32 dwId)
82 {
83 m_id = dwId;
84
85 if (!loadCommonProperties(hdb))
86 return false;
87
88 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT zone_guid FROM zones WHERE id=?"));
89 if (hStmt == nullptr)
90 return false;
91
92 bool success = false;
93
94 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, dwId);
95 DB_RESULT hResult = DBSelectPrepared(hStmt);
96 if (hResult != nullptr)
97 {
98 if (DBGetNumRows(hResult) == 0)
99 {
100 if (dwId == BUILTIN_OID_ZONE0)
101 {
102 m_uin = 0;
103 success = true;
104 }
105 else
106 {
107 nxlog_debug(4, _T("Cannot load zone object %ld - missing record in \"zones\" table"), (long)m_id);
108 }
109 }
110 else
111 {
112 m_uin = DBGetFieldULong(hResult, 0, 0);
113 success = true;
114 }
115 DBFreeResult(hResult);
116 }
117 DBFreeStatement(hStmt);
118
119 if (success)
120 {
121 success = false;
122 hStmt = DBPrepare(hdb, _T("SELECT proxy_node FROM zone_proxies WHERE object_id=?"));
123 if (hStmt != nullptr)
124 {
125 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, dwId);
126 hResult = DBSelectPrepared(hStmt);
127 if (hResult != nullptr)
128 {
129 int count = DBGetNumRows(hResult);
130 for(int i = 0; i < count; i++)
131 m_proxyNodes->add(new ZoneProxy(DBGetFieldULong(hResult, i, 0)));
132 DBFreeResult(hResult);
133 success = true;
134 }
135 DBFreeStatement(hStmt);
136 }
137 }
138
139 // Load access list
140 if (success)
141 success = loadACLFromDB(hdb);
142
143 return success;
144 }
145
146 /**
147 * Save object to database
148 */
149 bool Zone::saveToDatabase(DB_HANDLE hdb)
150 {
151 bool success = super::saveToDatabase(hdb);
152
153 if (success && (m_modified & MODIFY_OTHER))
154 {
155 DB_STATEMENT hStmt;
156 if (IsDatabaseRecordExist(hdb, _T("zones"), _T("id"), m_id))
157 {
158 hStmt = DBPrepare(hdb, _T("UPDATE zones SET zone_guid=? WHERE id=?"));
159 }
160 else
161 {
162 hStmt = DBPrepare(hdb, _T("INSERT INTO zones (zone_guid,id) VALUES (?,?)"));
163 }
164 if (hStmt != nullptr)
165 {
166 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_uin);
167 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_id);
168 success = DBExecute(hStmt);
169 DBFreeStatement(hStmt);
170 }
171 else
172 {
173 success = false;
174 }
175 }
176
177 if (success)
178 success = executeQueryOnObject(hdb, _T("DELETE FROM zone_proxies WHERE object_id=?"));
179
180 lockProperties();
181 if (success && !m_proxyNodes->isEmpty())
182 {
183 DB_STATEMENT hStmt = DBPrepare(hdb, _T("INSERT INTO zone_proxies (object_id,proxy_node) VALUES (?,?)"), m_proxyNodes->size() > 1);
184 if (hStmt != nullptr)
185 {
186 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
187 for (int i = 0; i < m_proxyNodes->size() && success; i++)
188 {
189 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_proxyNodes->get(i)->nodeId);
190 success = DBExecute(hStmt);
191 }
192 DBFreeStatement(hStmt);
193 }
194 else
195 {
196 success = false;
197 }
198 }
199 unlockProperties();
200
201 return success;
202 }
203
204 /**
205 * Delete zone object from database
206 */
207 bool Zone::deleteFromDatabase(DB_HANDLE hdb)
208 {
209 bool success = super::deleteFromDatabase(hdb);
210 if (success)
211 success = executeQueryOnObject(hdb, _T("DELETE FROM zones WHERE id=?"));
212 if (success)
213 success = executeQueryOnObject(hdb, _T("DELETE FROM zone_proxies WHERE object_id=?"));
214 if (success)
215 success = executeQueryOnObject(hdb, _T("DELETE FROM shared_secrets WHERE zone=?"));
216 if (success)
217 success = executeQueryOnObject(hdb, _T("DELETE FROM snmp_communities WHERE zone=?"));
218 if (success)
219 success = executeQueryOnObject(hdb, _T("DELETE FROM usm_credentials WHERE zone=?"));
220 if (success)
221 success = executeQueryOnObject(hdb, _T("DELETE FROM snmp_ports WHERE zone=?"));
222 return success;
223 }
224
225 /**
226 * Create NXCP message with object's data
227 */
228 void Zone::fillMessageInternal(NXCPMessage *msg, UINT32 userId)
229 {
230 super::fillMessageInternal(msg, userId);
231 msg->setField(VID_ZONE_UIN, m_uin);
232
233 auto idList = reinterpret_cast<uint32_t*>(MemAllocLocal(m_proxyNodes->size() * sizeof(uint32_t)));
234 for (int i = 0; i < m_proxyNodes->size(); i++)
235 idList[i] = m_proxyNodes->get(i)->nodeId;
236 msg->setFieldFromInt32Array(VID_ZONE_PROXY_LIST, m_proxyNodes->size(), idList);
237 MemFreeLocal(idList);
238 }
239
240 /**
241 * Modify object from message
242 */
243 UINT32 Zone::modifyFromMessageInternal(NXCPMessage *request)
244 {
245 if (request->isFieldExist(VID_ZONE_PROXY_LIST))
246 {
247 IntegerArray<UINT32> newProxyList;
248 request->getFieldAsInt32Array(VID_ZONE_PROXY_LIST, &newProxyList);
249 for (int i = 0; i < newProxyList.size(); i++)
250 {
251 int j;
252 for(j = 0; j < m_proxyNodes->size(); j++)
253 {
254 if (m_proxyNodes->get(j)->nodeId == newProxyList.get(i))
255 break;
256 }
257 if (j == m_proxyNodes->size())
258 m_proxyNodes->add(new ZoneProxy(newProxyList.get(i)));
259 }
260
261 Iterator<ZoneProxy> *it = m_proxyNodes->iterator();
262 while(it->hasNext())
263 {
264 ZoneProxy *proxy = it->next();
265
266 int j;
267 for(j = 0; j < newProxyList.size(); j++)
268 {
269 if (proxy->nodeId == newProxyList.get(j))
270 break;
271 }
272 if (j == newProxyList.size())
273 it->remove();
274 }
275 delete it;
276 }
277
278 return super::modifyFromMessageInternal(request);
279 }
280
281 /**
282 * Add node as zone proxy
283 */
284 void Zone::addProxy(const Node& node)
285 {
286 uint32_t nodeId = node.getId();
287
288 lockProperties();
289 bool found = false;
290 for(int i = 0; i < m_proxyNodes->size(); i++)
291 if (m_proxyNodes->get(i)->nodeId == nodeId)
292 {
293 found = true;
294 break;
295 }
296 if (!found)
297 {
298 m_proxyNodes->add(new ZoneProxy(nodeId));
299 setModified(MODIFY_OTHER);
300 }
301 unlockProperties();
302 }
303
304 /**
305 * Update interface index
306 */
307 void Zone::updateInterfaceIndex(const InetAddress& oldIp, const InetAddress& newIp, const shared_ptr<Interface>& iface)
308 {
309 m_idxInterfaceByAddr->remove(oldIp);
310 m_idxInterfaceByAddr->put(newIp, iface);
311 }
312
313 /**
314 * Update node index
315 */
316 void Zone::updateNodeIndex(const InetAddress& oldIp, const InetAddress& newIp, const shared_ptr<Node>& node)
317 {
318 m_idxNodeByAddr->remove(oldIp);
319 m_idxNodeByAddr->put(newIp, node);
320 }
321
322 /**
323 * Called by client session handler to check if threshold summary should be shown for this object.
324 */
325 bool Zone::showThresholdSummary() const
326 {
327 return true;
328 }
329
330 /**
331 * Remove interface from index
332 */
333 void Zone::removeFromIndex(const Interface& iface)
334 {
335 const ObjectArray<InetAddress> *list = iface.getIpAddressList()->getList();
336 for(int i = 0; i < list->size(); i++)
337 {
338 InetAddress *addr = list->get(i);
339 if (addr->isValidUnicast())
340 {
341 shared_ptr<NetObj> o = m_idxInterfaceByAddr->get(*addr);
342 if ((o != nullptr) && (o->getId() == iface.getId()))
343 {
344 m_idxInterfaceByAddr->remove(*addr);
345 }
346 }
347 }
348 }
349
350 /**
351 * Callback for processing zone configuration synchronization
352 */
353 static EnumerationCallbackResult ForceConfigurationSync(const UINT32 *nodeId, void *arg)
354 {
355 shared_ptr<Node> node = static_pointer_cast<Node>(FindObjectById(*nodeId, OBJECT_NODE));
356 if (node != nullptr)
357 node->forceSyncDataCollectionConfig();
358 return _CONTINUE;
359 }
360
361 /**
362 * Get proxy node for given object. Always prefers proxy that is already assigned to the object
363 * and will update assigned proxy property if changed.
364 */
365 UINT32 Zone::getProxyNodeId(NetObj *object, bool backup)
366 {
367 ZoneProxy *proxy = nullptr;
368 HashSet<uint32_t> syncSet;
369
370 lockProperties();
371
372 if ((object != nullptr) && (object->getAssignedZoneProxyId(backup) != 0))
373 {
374 for(int i = 0; i < m_proxyNodes->size(); i++)
375 {
376 ZoneProxy *p = m_proxyNodes->get(i);
377 if (p->nodeId == object->getAssignedZoneProxyId(backup))
378 {
379 if (p->isAvailable)
380 {
381 proxy = p;
382 }
383 else
384 {
385 p->assignments--;
386 syncSet.put(p->nodeId);
387
388 uint32_t otherProxy = object->getAssignedZoneProxyId(!backup);
389 if (otherProxy != 0)
390 syncSet.put(otherProxy);
391 }
392 break;
393 }
394 }
395 }
396
397 if (proxy == nullptr)
398 {
399 if (m_proxyNodes->size() > (backup ? 1 : 0))
400 {
401 if (!backup)
402 proxy = m_proxyNodes->get(0);
403 for(int i = 0; i < m_proxyNodes->size(); i++)
404 {
405 ZoneProxy *p = m_proxyNodes->get(i);
406 if (!p->isAvailable)
407 continue;
408
409 if ((object != nullptr) && (p->nodeId == object->getAssignedZoneProxyId(!backup)))
410 continue;
411
412 if ((proxy == nullptr) || (p->compareLoad(proxy) < 0) || !proxy->isAvailable)
413 proxy = p;
414 }
415 }
416 if (object != nullptr)
417 {
418 if (proxy != nullptr)
419 {
420 object->setAssignedZoneProxyId(proxy->nodeId, backup);
421 proxy->assignments++;
422 syncSet.put(proxy->nodeId);
423
424 uint32_t otherProxy = object->getAssignedZoneProxyId(!backup);
425 if (otherProxy != 0)
426 syncSet.put(otherProxy);
427 }
428 else
429 {
430 object->setAssignedZoneProxyId(0, backup);
431 }
432 }
433 }
434
435 uint32_t id = (proxy != nullptr) ? proxy->nodeId : 0;
436 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 8, _T("Zone::getProxyNodeId: selected %s proxy [%u] for object %s [%u] in zone %s [uin=%u]"),
437 backup ? _T("backup") : _T("primary"),
438 id, (object != nullptr) ? object->getName() : _T("(null)"), (object != nullptr) ? object->getId() : 0,
439 m_name, m_uin);
440
441 unlockProperties();
442
443 syncSet.forEach(ForceConfigurationSync, nullptr);
444 return id;
445 }
446
447 /**
448 * Check if given node is a proxy for this zone
449 */
450 bool Zone::isProxyNode(UINT32 nodeId) const
451 {
452 bool result = false;
453 lockProperties();
454 for(int i = 0; i < m_proxyNodes->size(); i++)
455 if (m_proxyNodes->get(i)->nodeId == nodeId)
456 {
457 result = true;
458 break;
459 }
460 unlockProperties();
461 return result;
462 }
463
464 /**
465 * Get number of assignments for given proxy node
466 */
467 UINT32 Zone::getProxyNodeAssignments(UINT32 nodeId) const
468 {
469 UINT32 result = 0;
470 lockProperties();
471 for(int i = 0; i < m_proxyNodes->size(); i++)
472 if (m_proxyNodes->get(i)->nodeId == nodeId)
473 {
474 result = m_proxyNodes->get(i)->assignments;
475 break;
476 }
477 unlockProperties();
478 return result;
479 }
480
481 /**
482 * Check if given proxy node is available.
483 */
484 bool Zone::isProxyNodeAvailable(UINT32 nodeId) const
485 {
486 bool result = false;
487 lockProperties();
488 for(int i = 0; i < m_proxyNodes->size(); i++)
489 if (m_proxyNodes->get(i)->nodeId == nodeId)
490 {
491 result = m_proxyNodes->get(i)->isAvailable;
492 break;
493 }
494 unlockProperties();
495 return result;
496 }
497
498 /**
499 * Get all proxy nodes. Returned array must be destroyed by the caller.
500 */
501 IntegerArray<UINT32> *Zone::getAllProxyNodes() const
502 {
503 lockProperties();
504 IntegerArray<UINT32> *nodes = new IntegerArray<UINT32>(m_proxyNodes->size());
505 for(int i = 0; i < m_proxyNodes->size(); i++)
506 nodes->add(m_proxyNodes->get(i)->nodeId);
507 unlockProperties();
508 return nodes;
509 }
510
511 /**
512 * Fill configuration message for agent
513 */
514 void Zone::fillAgentConfigurationMessage(NXCPMessage *msg) const
515 {
516 lockProperties();
517 msg->setField(VID_ZONE_UIN, m_uin);
518 msg->setField(VID_SHARED_SECRET, m_proxyAuthKey, ZONE_PROXY_KEY_LENGTH);
519
520 UINT32 fieldId = VID_ZONE_PROXY_BASE, count = 0;
521 for(int i = 0; i < m_proxyNodes->size(); i++)
522 {
523 ZoneProxy *p = m_proxyNodes->get(i);
524 shared_ptr<NetObj> node = FindObjectById(p->nodeId, OBJECT_NODE);
525 if (node != nullptr)
526 {
527 msg->setField(fieldId++, p->nodeId);
528 msg->setField(fieldId++, static_cast<Node&>(*node).getIpAddress());
529 fieldId += 8;
530 count++;
531 }
532 }
533 msg->setField(VID_ZONE_PROXY_COUNT, count);
534
535 unlockProperties();
536 }
537
538 /**
539 * Acquire connection to any available proxy node
540 */
541 shared_ptr<AgentConnectionEx> Zone::acquireConnectionToProxy(bool validate)
542 {
543 uint32_t nodeId = getProxyNodeId(nullptr);
544 if (nodeId == 0)
545 {
546 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 7, _T("Zone::acquireConnectionToProxy: no active proxy in zone %s [uin=%u]"), m_name, m_uin);
547 return shared_ptr<AgentConnectionEx>();
548 }
549
550 shared_ptr<NetObj> node = FindObjectById(nodeId, OBJECT_NODE);
551 return (node != nullptr) ? static_cast<Node&>(*node).acquireProxyConnection(ZONE_PROXY, validate) : shared_ptr<AgentConnectionEx>();
552 }
553
554 /**
555 * Update proxy status. Passive mode should be used when actual communication with the proxy
556 * should be avoided (for example during server startup).
557 */
558 void Zone::updateProxyStatus(const shared_ptr<Node>& node, bool activeMode)
559 {
560 lockProperties();
561 for(int i = 0; i < m_proxyNodes->size(); i++)
562 {
563 ZoneProxy *p = m_proxyNodes->get(i);
564 if (p->nodeId == node->getId())
565 {
566 bool isAvailable = node->isNativeAgent() &&
567 ((node->getState() & NSF_AGENT_UNREACHABLE) == 0) &&
568 ((node->getState() & DCSF_UNREACHABLE) == 0) &&
569 (node->getStatus() != STATUS_UNMANAGED) &&
570 ((node->getFlags() & NF_DISABLE_NXCP) == 0);
571 if (isAvailable != p->isAvailable)
572 {
573 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 4, _T("Zone %s [uin=%u] proxy %s [%u] availability changed to %s"),
574 m_name, m_uin, node->getName(), node->getId(), isAvailable ? _T("YES") : _T("NO"));
575 p->isAvailable = isAvailable;
576 }
577 if (isAvailable && activeMode)
578 {
579 // Update should be executed on separate thread to avoid server deadlock
580 ThreadPoolExecute(g_mainThreadPool, self(), &Zone::updateProxyLoadData, node);
581 }
582 break;
583 }
584 }
585 unlockProperties();
586 }
587
588 /**
589 * Update proxy load information
590 */
591 void Zone::updateProxyLoadData(shared_ptr<Node> node)
592 {
593 double cpuLoad = node->getMetricFromAgentAsDouble(_T("System.CPU.LoadAvg15"), -1);
594 double dataCollectorLoad = node->getMetricFromAgentAsDouble(_T("Agent.ThreadPool.LoadAverage15(DATACOLL)"), -1);
595 int64_t dataSenderQueueSize = node->getMetricFromAgentAsInt32(_T("Agent.DataCollectorQueueSize"), -1); // FIXME: rename to Agent.DataSenderQueueSize
596 if ((cpuLoad >= 0) || (dataCollectorLoad >= 0) || (dataSenderQueueSize >= 0))
597 {
598 lockProperties();
599 for(int i = 0; i < m_proxyNodes->size(); i++)
600 {
601 ZoneProxy *p = m_proxyNodes->get(i);
602 if (p->nodeId == node->getId())
603 {
604 if (cpuLoad >= 0)
605 p->cpuLoad = cpuLoad;
606 if (dataCollectorLoad >= 0)
607 p->dataCollectorLoad = dataCollectorLoad;
608 if (dataSenderQueueSize >= 0)
609 {
610 UpdateExpMovingAverage(p->rawDataSenderLoad, EMA_EXP_15, dataSenderQueueSize);
611 double load = GetExpMovingAverageValue(p->rawDataSenderLoad);
612 p->dataSenderLoadTrend = load - p->dataSenderLoad;
613 p->dataSenderLoad = load;
614 }
615 break;
616 }
617 }
618 unlockProperties();
619 }
620 }
621
622 /**
623 * Filter nodes by assigned proxy ID
624 */
625 static bool ProxyFilter(NetObj *object, void *context)
626 {
627 return object->getAssignedZoneProxyId(false) == CAST_FROM_POINTER(context, UINT32);
628 }
629
630 /**
631 * Compare nodes by estimated proxy load
632 */
633 static int CompareNodesByProxyLoad(const NetObj& n1, const NetObj& n2)
634 {
635 double d = static_cast<const Node&>(n1).getProxyLoadFactor() - static_cast<const Node&>(n2).getProxyLoadFactor();
636 return (d < 0) ? -1 : (d > 0) ? 1 : 0;
637 }
638
639 /**
640 * Callback for updating node backup proxy
641 */
642 static void UpdateNodeBackupProxy(void *node)
643 {
644 static_cast<Node*>(node)->getEffectiveSnmpProxy(true);
645 }
646
647 /**
648 * Migrate proxy load
649 */
650 void Zone::migrateProxyLoad(ZoneProxy *source, ZoneProxy *target)
651 {
652 SharedObjectArray<NetObj> *nodes = g_idxNodeById.getObjects(ProxyFilter, CAST_TO_POINTER(source->nodeId, void*));
653 nodes->sort(CompareNodesByProxyLoad);
654
655 double loadFactor = 0;
656 for(int i = 0; i < nodes->size(); i++)
657 loadFactor += static_cast<Node*>(nodes->get(i))->getProxyLoadFactor();
658 double targetLoadFactor = loadFactor * 0.75;
659
660 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): %d nodes on proxy [%u], load factor %f, target %f"),
661 m_name, m_uin, nodes->size(), source->nodeId, loadFactor, targetLoadFactor);
662
663 for(int i = 0; (i < nodes->size()) && (loadFactor > targetLoadFactor); i++)
664 {
665 Node *n = static_cast<Node*>(nodes->get(i));
666 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): moving node %s [%u] from [%u] to [%u]"),
667 m_name, m_uin, n->getName(), n->getId(), source->nodeId, target->nodeId);
668 source->assignments--;
669 target->assignments++;
670 n->setAssignedZoneProxyId(target->nodeId, false);
671 if (n->getAssignedZoneProxyId(true) == target->nodeId)
672 {
673 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): backup proxy for node node %s [%u] should be changed"),
674 m_name, m_uin, n->getName(), n->getId());
675 n->setAssignedZoneProxyId(0, true);
676 target->assignments--;
677 ThreadPoolExecute(g_mainThreadPool, UpdateNodeBackupProxy, n);
678 }
679 loadFactor -= n->getProxyLoadFactor();
680 }
681
682 delete nodes;
683
684 shared_ptr<Node> node = static_pointer_cast<Node>(FindObjectById(target->nodeId, OBJECT_NODE));
685 if (node != nullptr)
686 node->forceSyncDataCollectionConfig();
687
688 node = static_pointer_cast<Node>(FindObjectById(source->nodeId, OBJECT_NODE));
689 if (node != nullptr)
690 node->forceSyncDataCollectionConfig();
691 }
692
693 /**
694 * Do zone health check
695 */
696 void Zone::healthCheck(PollerInfo *poller)
697 {
698 poller->startExecution();
699 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): started"), m_name, m_uin);
700
701 lockProperties();
702
703 // Calculate average load
704 int count = 0;
705 double dataSenderLoad = 0, dataCollectorLoad = 0, cpuLoad = 0;
706 UINT32 assignments = 0;
707 for(int i = 0; i < m_proxyNodes->size(); i++)
708 {
709 ZoneProxy *p = m_proxyNodes->get(i);
710 if (!p->isAvailable)
711 continue;
712
713 dataSenderLoad += p->dataSenderLoad;
714 dataCollectorLoad += p->dataCollectorLoad;
715 cpuLoad += p->cpuLoad;
716 assignments += p->assignments;
717 count++;
718 }
719
720 // Check if load balancing is needed
721 if (count > 1)
722 {
723 dataSenderLoad /= count;
724 dataCollectorLoad /= count;
725 cpuLoad /= count;
726 assignments /= count;
727 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): %d active proxies, average values: %f/%f/%f"),
728 m_name, m_uin, count, dataSenderLoad, dataCollectorLoad, cpuLoad);
729
730 time_t now = time(nullptr);
731 ObjectArray<ZoneProxy> sources(count); // potential sources for removing load
732 ObjectArray<ZoneProxy> targets(count); // potential sources for adding load
733 for(int i = 0; i < m_proxyNodes->size(); i++)
734 {
735 ZoneProxy *p = m_proxyNodes->get(i);
736 if (!p->isAvailable)
737 continue;
738
739 if ((p->dataSenderLoad <= dataSenderLoad) &&
740 (p->dataCollectorLoad <= dataCollectorLoad) &&
741 (p->cpuLoad <= cpuLoad) &&
742 (p->assignments <= assignments) &&
743 (now - p->loadBalanceTimestamp >= 420)) // was not re-balanced within last 7 minutes
744 {
745 targets.add(p);
746 }
747 else if (((p->dataSenderLoad > dataSenderLoad) ||
748 (p->dataSenderLoadTrend > 0) ||
749 (p->dataCollectorLoad > dataCollectorLoad) ||
750 ((p->cpuLoad > cpuLoad) && (p->cpuLoad > 1)) ||
751 (p->assignments > assignments * 2)) &&
752 (now - p->loadBalanceTimestamp >= 420)) // was not re-balanced within last 7 minutes
753 {
754 sources.add(p);
755 }
756 }
757
758 if (!sources.isEmpty() && !targets.isEmpty())
759 {
760 int j = 0;
761 for(int i = 0; i < sources.size(); i++)
762 {
763 ZoneProxy *s = sources.get(i);
764 ZoneProxy *t = targets.get(j);
765 j++;
766 if (j == targets.size())
767 j = 0;
768
769 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): moving load: [%u] -> [%u]"), m_name, m_uin, s->nodeId, t->nodeId);
770 migrateProxyLoad(s, t);
771 s->loadBalanceTimestamp = now;
772 t->loadBalanceTimestamp = now;
773 }
774 }
775 else
776 {
777 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): proxy load balancing is not %s"),
778 m_name, m_uin, sources.isEmpty() ? _T("needed") : _T("possible"));
779 }
780 }
781 else
782 {
783 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): proxy load balancing is not needed"), m_name, m_uin);
784 }
785
786 m_lastHealthCheck = time(nullptr);
787 m_lockedForHealthCheck = false;
788
789 unlockProperties();
790
791 nxlog_debug_tag(DEBUG_TAG_ZONE_PROXY, 6, _T("ZoneHealthCheck(%s [%u]): completed"), m_name, m_uin);
792 delete poller;
793 }
794
795 /**
796 * Create NXSL object for this object
797 */
798 NXSL_Value *Zone::createNXSLObject(NXSL_VM *vm) const
799 {
800 return vm->createValue(new NXSL_Object(vm, &g_nxslZoneClass, new shared_ptr<Zone>(self())));
801 }
802
803 /**
804 * Dump interface index to console
805 */
806 void Zone::dumpInterfaceIndex(ServerConsole *console) const
807 {
808 DumpIndex(console, m_idxInterfaceByAddr);
809 }
810
811 /**
812 * Dump node index to console
813 */
814 void Zone::dumpNodeIndex(ServerConsole *console) const
815 {
816 DumpIndex(console, m_idxNodeByAddr);
817 }
818
819 /**
820 * Dump subnet index to console
821 */
822 void Zone::dumpSubnetIndex(ServerConsole *console) const
823 {
824 DumpIndex(console, m_idxSubnetByAddr);
825 }
826
827 /**
828 * Dump internal state to console
829 */
830 void Zone::dumpState(ServerConsole *console) const
831 {
832 lockProperties();
833 if (!m_proxyNodes->isEmpty())
834 {
835 console->print(_T(" Proxies:\n"));
836 for(int i = 0; i < m_proxyNodes->size(); i++)
837 {
838 ZoneProxy *p = m_proxyNodes->get(i);
839 console->printf(_T(" [\x1b[33;1m%7u\x1b[0m] assignments=%u available=\x1b[%s\x1b[0m senderLoad=%f(%f) dcLoad=%f cpuLoad=%f\n"),
840 p->nodeId, p->assignments, p->isAvailable ? _T("32;1myes") : _T("31;1mno"),
841 p->dataSenderLoad, p->dataSenderLoadTrend, p->dataCollectorLoad, p->cpuLoad);
842 }
843 }
844 else
845 {
846 console->print(_T(" No proxy nodes defined\n"));
847 }
848 unlockProperties();
849 }
850
851 /**
852 * Serialize object to JSON
853 */
854 json_t *Zone::toJson()
855 {
856 json_t *root = super::toJson();
857
858 lockProperties();
859
860 json_object_set_new(root, "uin", json_integer(m_uin));
861 json_object_set_new(root, "proxyNodeId", json_object_array(m_proxyNodes));
862
863 unlockProperties();
864 return root;
865 }