"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/server/pdsdrv/influxdb/influxdb.cpp" between
netxms-3.8.382.tar.gz and netxms-3.8.405.tar.gz

About: NetXMS is a multi-platform open source network management and monitoring system.

influxdb.cpp  (netxms-3.8.382):influxdb.cpp  (netxms-3.8.405)
skipping to change at line 47 skipping to change at line 47
// debug pdsdrv.influxdb 1-8 // debug pdsdrv.influxdb 1-8
#define DEBUG_TAG _T("pdsdrv.influxdb") #define DEBUG_TAG _T("pdsdrv.influxdb")
/** /**
* Driver class definition * Driver class definition
*/ */
class InfluxDBStorageDriver : public PerfDataStorageDriver class InfluxDBStorageDriver : public PerfDataStorageDriver
{ {
private: private:
const TCHAR *m_hostname; const TCHAR *m_hostname;
UINT16 m_port; uint16_t m_port;
SOCKET m_socket; SOCKET m_socket;
std::string m_queuedMessages; std::string m_queuedMessages;
UINT32 m_queuedMessageCount; uint32_t m_queuedMessageCount;
UINT32 m_maxQueueSize; uint32_t m_maxQueueSize;
Mutex m_mutex; Mutex m_mutex;
void queuePush(const std::string& data); void queuePush(const std::string& data);
static std::string normalizeString(std::string str); static std::string normalizeString(std::string str);
static std::string getString(const TCHAR *tstr); static std::string getString(const TCHAR *tstr);
static void findAndReplaceAll(std::string& data, const std::string& toSearch, const std::string& replaceStr); static void findAndReplaceAll(std::string& data, const std::string& toSearch, const std::string& replaceStr);
static void toLowerCase(std::string& data); static void toLowerCase(std::string& data);
public: public:
skipping to change at line 140 skipping to change at line 140
#ifdef UNICODE_UCS4 #ifdef UNICODE_UCS4
size_t len = ucs4_utf8len(tstr, -1); size_t len = ucs4_utf8len(tstr, -1);
#else #else
size_t len = ucs2_utf8len(tstr, -1); size_t len = ucs2_utf8len(tstr, -1);
#endif #endif
#if HAVE_ALLOCA #if HAVE_ALLOCA
char *buffer = static_cast<char*>(alloca(len + 1)); char *buffer = static_cast<char*>(alloca(len + 1));
#else #else
char *buffer = static_cast<char*>(MemAlloc(len + 1)); char *buffer = static_cast<char*>(MemAlloc(len + 1));
#endif #endif
WideCharToMultiByte(CP_UTF8, 0, tstr, -1, buffer, (int)len + 1, nullptr, null ptr); wchar_to_utf8(tstr, -1, buffer, len + 1);
#if HAVE_ALLOCA #if HAVE_ALLOCA
return std::string(buffer); return std::string(buffer);
#else #else
std::string result(buffer); std::string result(buffer);
MemFree(buffer); MemFree(buffer);
return result; return result;
#endif #endif
#else #else
return std::string(tstr); return std::string(tstr);
#endif #endif
skipping to change at line 178 skipping to change at line 178
} }
} }
/** /**
* Metric Queuing and Packet sending * Metric Queuing and Packet sending
*/ */
void InfluxDBStorageDriver::queuePush(const std::string& data) void InfluxDBStorageDriver::queuePush(const std::string& data)
{ {
m_mutex.lock(); m_mutex.lock();
bool flushNow = data.empty(); bool bufferOverflow = false;
if (!flushNow) bool forceFlush = data.empty();
if (!forceFlush)
{ {
// Check that packet is not longer than 64K // Check that packet is not longer than 64K
if (data.length() + m_queuedMessages.length() < 65534) if (data.length() + m_queuedMessages.length() < 65534)
{ {
m_queuedMessages += data + "\n"; m_queuedMessages += data + "\n";
m_queuedMessageCount++; m_queuedMessageCount++;
} }
else else
{ {
flushNow = true; bufferOverflow = true;
} }
} }
if ((m_queuedMessageCount >= m_maxQueueSize) || flushNow) if ((m_queuedMessageCount >= m_maxQueueSize) || forceFlush || bufferOverflow)
{ {
nxlog_debug_tag(DEBUG_TAG, 7, _T("Queue size: %u / %u (sending)"), m_queue dMessageCount, m_maxQueueSize); nxlog_debug_tag(DEBUG_TAG, 7, _T("Queue size: %u / %u (sending)"), m_queue dMessageCount, m_maxQueueSize);
if (m_queuedMessages.size() > 0) if (m_queuedMessages.size() > 0)
{ {
if (SendEx(m_socket, m_queuedMessages.c_str(), m_queuedMessages.size(), 0, INVALID_MUTEX_HANDLE) <= 0) if (SendEx(m_socket, m_queuedMessages.c_str(), m_queuedMessages.size(), 0, INVALID_MUTEX_HANDLE) <= 0)
{ {
nxlog_debug_tag(DEBUG_TAG, 8, _T("socket error: %s"), _tcserror(errn o)); nxlog_debug_tag(DEBUG_TAG, 8, _T("socket error: %s"), _tcserror(errn o));
// Ignore; will be re-sent with the next message // Ignore; will be re-sent with the next message
} }
else else
{ {
// Data sent - empty queue // Data sent - empty queue
m_queuedMessages.clear(); m_queuedMessages.clear();
m_queuedMessageCount = 0; m_queuedMessageCount = 0;
if (bufferOverflow)
{
m_queuedMessages += data + "\n";
m_queuedMessageCount++;
}
} }
} }
} }
else else
{ {
if (flushNow && !data.empty()) m_queuedMessages += data + "\n";
{ m_queuedMessageCount++;
m_queuedMessages += data + "\n";
m_queuedMessageCount++;
}
nxlog_debug_tag(DEBUG_TAG, 7, _T("Queue size: %u / %u"), m_queuedMessageCo unt, m_maxQueueSize); nxlog_debug_tag(DEBUG_TAG, 7, _T("Queue size: %u / %u"), m_queuedMessageCo unt, m_maxQueueSize);
} }
m_mutex.unlock(); m_mutex.unlock();
} }
/** /**
* Get name * Get name
*/ */
const TCHAR *InfluxDBStorageDriver::getName() const TCHAR *InfluxDBStorageDriver::getName()
 End of changes. 8 change blocks. 
13 lines changed or deleted 16 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)