"Fossies" - the Fresh Open Source Software Archive

Member "netxms-3.8.166/src/libnetxms/msgwq.cpp" (23 Feb 2021, 11168 Bytes) of package /linux/misc/netxms-3.8.166.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "msgwq.cpp" see the Fossies "Dox" file reference documentation.

    1 /* 
    2 ** NetXMS - Network Management System
    3 ** NetXMS Foundation Library
    4 ** Copyright (C) 2003-2020 Victor Kirhenshtein
    5 **
    6 ** This program is free software; you can redistribute it and/or modify
    7 ** it under the terms of the GNU Lesser General Public License as published
    8 ** by the Free Software Foundation; either version 3 of the License, or
    9 ** (at your option) any later version.
   10 **
   11 ** This program is distributed in the hope that it will be useful,
   12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 ** GNU General Public License for more details.
   15 **
   16 ** You should have received a copy of the GNU Lesser General Public License
   17 ** along with this program; if not, write to the Free Software
   18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
   19 **
   20 ** File: msgwq.cpp
   21 **
   22 **/
   23 
   24 #include "libnetxms.h"
   25 #include <nxcpapi.h>
   26 
   27 /** 
   28  * Interval between checking messages TTL in milliseconds
   29  */
   30 #define TTL_CHECK_INTERVAL    30000
   31 
   32 /**
   33  * Buffer allocation step
   34  */
   35 #define ALLOCATION_STEP       16
   36 
   37 /**
   38  * Housekeeper data
   39  */
   40 MUTEX __EXPORT MsgWaitQueue::m_housekeeperLock = MutexCreate();
   41 HashMap<uint64_t, MsgWaitQueue> __EXPORT *MsgWaitQueue::m_activeQueues = new HashMap<uint64_t, MsgWaitQueue>(Ownership::False);
   42 CONDITION __EXPORT MsgWaitQueue::m_shutdownCondition = ConditionCreate(true);
   43 THREAD __EXPORT MsgWaitQueue::m_housekeeperThread = INVALID_THREAD_HANDLE;
   44 
   45 /**
   46  * Constructor
   47  */
   48 MsgWaitQueue::MsgWaitQueue()
   49 {
   50    m_holdTime = 30000;      // Default message TTL is 30 seconds
   51    m_size = 0;
   52    m_allocated = 0;
   53    m_elements = nullptr;
   54    m_sequence = 1;
   55 #if defined(_WIN32)
   56    InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
   57    memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
   58    m_wakeupEvents[0] = CreateEvent(nullptr, FALSE, FALSE, nullptr);
   59    memset(m_waiters, 0, MAX_MSGQUEUE_WAITERS);
   60 #elif defined(_USE_GNU_PTH)
   61    pth_mutex_init(&m_mutex);
   62    pth_cond_init(&m_wakeupCondition);
   63 #else
   64    pthread_mutex_init(&m_mutex, nullptr);
   65    pthread_cond_init(&m_wakeupCondition, nullptr);
   66 #endif
   67 
   68    // register new queue
   69    MutexLock(m_housekeeperLock);
   70    if (m_activeQueues != nullptr)
   71       m_activeQueues->set(CAST_FROM_POINTER(this, uint64_t), this);
   72    if (m_housekeeperThread == INVALID_THREAD_HANDLE)
   73    {
   74       m_housekeeperThread = ThreadCreateEx(MsgWaitQueue::housekeeperThread, 0, nullptr);
   75    }
   76    MutexUnlock(m_housekeeperLock);
   77 }
   78 
   79 /**
   80  * Destructor
   81  */
   82 MsgWaitQueue::~MsgWaitQueue()
   83 {
   84    // unregister queue
   85    MutexLock(m_housekeeperLock);
   86    if (m_activeQueues != nullptr)
   87       m_activeQueues->remove(CAST_FROM_POINTER(this, uint64_t));
   88    MutexUnlock(m_housekeeperLock);
   89 
   90    clear();
   91 
   92 #if defined(_WIN32)
   93    DeleteCriticalSection(&m_mutex);
   94    for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
   95       if (m_wakeupEvents[i] != NULL)
   96          CloseHandle(m_wakeupEvents[i]);
   97 #elif defined(_USE_GNU_PTH)
   98    // nothing to do if libpth is used
   99 #else
  100    pthread_mutex_destroy(&m_mutex);
  101    pthread_cond_destroy(&m_wakeupCondition);
  102 #endif
  103 }
  104 
  105 /**
  106  * Clear queue
  107  */
  108 void MsgWaitQueue::clear()
  109 {
  110    lock();
  111 
  112    for(int i = 0; i < m_allocated; i++)
  113    {
  114       if (m_elements[i].msg == NULL)
  115          continue;
  116 
  117       if (m_elements[i].isBinary)
  118       {
  119          MemFree(m_elements[i].msg);
  120       }
  121       else
  122       {
  123          delete (NXCPMessage *)(m_elements[i].msg);
  124       }
  125    }
  126    m_size = 0;
  127    m_allocated = 0;
  128    MemFreeAndNull(m_elements);
  129    unlock();
  130 }
  131 
  132 /**
  133  * Put message into queue
  134  */
  135 void MsgWaitQueue::put(NXCPMessage *pMsg)
  136 {
  137    lock();
  138 
  139    int pos;
  140    if (m_size == m_allocated)
  141    {
  142       pos = m_allocated;
  143       m_allocated += ALLOCATION_STEP;
  144       m_elements = MemRealloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
  145       memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
  146    }
  147    else
  148    {
  149       for(pos = 0; m_elements[pos].msg != NULL; pos++);
  150    }
  151     
  152    m_elements[pos].code = pMsg->getCode();
  153    m_elements[pos].isBinary = 0;
  154    m_elements[pos].id = pMsg->getId();
  155    m_elements[pos].ttl = m_holdTime;
  156    m_elements[pos].msg = pMsg;
  157    m_elements[pos].sequence = m_sequence++;
  158    m_size++;
  159 
  160 #if defined(_WIN32)
  161    for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
  162       if (m_waiters[i])
  163          SetEvent(m_wakeupEvents[i]);
  164 #elif defined(_USE_GNU_PTH)
  165    pth_cond_notify(&m_wakeupCondition, TRUE);
  166 #else
  167    pthread_cond_broadcast(&m_wakeupCondition);
  168 #endif
  169 
  170    unlock();
  171 }
  172 
  173 /**
  174  * Put raw message into queue
  175  */
  176 void MsgWaitQueue::put(NXCP_MESSAGE *pMsg)
  177 {
  178    lock();
  179 
  180    int pos;
  181    if (m_size == m_allocated)
  182    {
  183       pos = m_allocated;
  184       m_allocated += ALLOCATION_STEP;
  185       m_elements = MemRealloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
  186       memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
  187    }
  188    else
  189    {
  190       for(pos = 0; m_elements[pos].msg != NULL; pos++);
  191    }
  192 
  193    m_elements[pos].code = pMsg->code;
  194    m_elements[pos].isBinary = 1;
  195    m_elements[pos].id = pMsg->id;
  196    m_elements[pos].ttl = m_holdTime;
  197    m_elements[pos].msg = pMsg;
  198    m_elements[pos].sequence = m_sequence++;
  199    m_size++;
  200 
  201 #ifdef _WIN32
  202    for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
  203       if (m_waiters[i])
  204          SetEvent(m_wakeupEvents[i]);
  205 #elif defined(_USE_GNU_PTH)
  206    pth_cond_notify(&m_wakeupCondition, TRUE);
  207 #else
  208    pthread_cond_broadcast(&m_wakeupCondition);
  209 #endif
  210 
  211    unlock();
  212 }
  213 
  214 /**
  215  * Wait for message with specific code and ID
  216  * Function return pointer to the message on success or
  217  * NULL on timeout or error
  218  */
  219 void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32 dwId, UINT32 dwTimeOut)
  220 {
  221    lock();
  222 
  223 #ifdef _WIN32
  224    int slot = -1;
  225 #endif
  226 
  227    do
  228    {
  229       UINT64 minSeq = _ULL(0xFFFFFFFFFFFFFFFF);
  230       int index = -1;
  231       for(int i = 0; i < m_allocated; i++)
  232         {
  233          if ((m_elements[i].msg != NULL) && 
  234              (m_elements[i].id == dwId) &&
  235              (m_elements[i].code == wCode) &&
  236              (m_elements[i].isBinary == isBinary))
  237          {
  238             if (m_elements[i].sequence < minSeq)
  239             {
  240                minSeq = m_elements[i].sequence;
  241                index = i;
  242             }
  243          }
  244         }
  245 
  246       if (index != -1)
  247       {
  248          void *msg = m_elements[index].msg;
  249          m_elements[index].msg = NULL;
  250          m_size--;
  251 #ifdef _WIN32
  252          if (slot != -1)
  253             m_waiters[slot] = 0;    // release waiter slot
  254 #endif
  255          unlock();
  256          return msg;
  257       }
  258 
  259       INT64 startTime = GetCurrentTimeMs();
  260        
  261 #if defined(_WIN32)
  262       // Find free slot if needed
  263       if (slot == -1)
  264       {
  265          for(slot = 0; slot < MAX_MSGQUEUE_WAITERS; slot++)
  266             if (!m_waiters[slot])
  267             {
  268                m_waiters[slot] = 1;
  269                if (m_wakeupEvents[slot] == NULL)
  270                   m_wakeupEvents[slot] = CreateEvent(NULL, FALSE, FALSE, NULL);
  271                break;
  272             }
  273 
  274          if (slot == MAX_MSGQUEUE_WAITERS)
  275          {
  276             slot = -1;
  277          }
  278       }
  279 
  280       LeaveCriticalSection(&m_mutex);
  281       if (slot != -1)
  282          WaitForSingleObject(m_wakeupEvents[slot], dwTimeOut);
  283       else
  284          Sleep(50);  // Just sleep if there are no waiter slots (highly unlikely during normal operation)
  285       EnterCriticalSection(&m_mutex);
  286 #elif HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
  287        struct timespec ts;
  288 
  289        ts.tv_sec = dwTimeOut / 1000;
  290        ts.tv_nsec = (dwTimeOut % 1000) * 1000000;
  291       pthread_cond_reltimedwait_np(&m_wakeupCondition, &m_mutex, &ts);
  292 #elif defined(_USE_GNU_PTH)
  293       pth_event_t ev = pth_event(PTH_EVENT_TIME, pth_timeout(dwTimeOut / 1000, (dwTimeOut % 1000) * 1000));
  294       pth_cond_await(&m_wakeupCondition, &m_mutex, ev);
  295       pth_event_free(ev, PTH_FREE_ALL);
  296 #else
  297        struct timeval now;
  298        struct timespec ts;
  299 
  300        gettimeofday(&now, NULL);
  301        ts.tv_sec = now.tv_sec + (dwTimeOut / 1000);
  302 
  303        now.tv_usec += (dwTimeOut % 1000) * 1000;
  304        ts.tv_sec += now.tv_usec / 1000000;
  305        ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
  306 
  307        pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
  308 #endif   /* _WIN32 */
  309 
  310       UINT32 sleepTime = (UINT32)(GetCurrentTimeMs() - startTime);
  311       dwTimeOut -= std::min(sleepTime, dwTimeOut);
  312    } while(dwTimeOut > 0);
  313 
  314 #ifdef _WIN32
  315    if (slot != -1)
  316       m_waiters[slot] = 0;    // release waiter slot
  317 #endif
  318 
  319    unlock();
  320    return NULL;
  321 }
  322 
  323 /**
  324  * Housekeeping run
  325  */
  326 void MsgWaitQueue::housekeeperRun()
  327 {
  328    lock();
  329    if (m_size > 0)
  330    {
  331       for(int i = 0; i < m_allocated; i++)
  332        {
  333          if (m_elements[i].msg == NULL)
  334             continue;
  335 
  336          if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
  337          {
  338             if (m_elements[i].isBinary)
  339             {
  340                MemFree(m_elements[i].msg);
  341             }
  342             else
  343             {
  344                delete (NXCPMessage *)(m_elements[i].msg);
  345             }
  346             m_elements[i].msg = NULL;
  347             m_size--;
  348          }
  349          else
  350          {
  351             m_elements[i].ttl -= TTL_CHECK_INTERVAL;
  352          }
  353        }
  354 
  355       // compact queue if possible
  356       if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
  357       {
  358          m_allocated = ALLOCATION_STEP;
  359          MemFree(m_elements);
  360          m_elements = MemAllocArray<WAIT_QUEUE_ELEMENT>(m_allocated);
  361       }
  362    }
  363    unlock();
  364 }
  365 
  366 /**
  367  * Callback for enumerating active queues
  368  */
  369 EnumerationCallbackResult MsgWaitQueue::houseKeeperCallback(const uint64_t& key, MsgWaitQueue *queue)
  370 {
  371    queue->housekeeperRun();
  372    return _CONTINUE;
  373 }
  374 
  375 /**
  376  * Housekeeper thread
  377  */
  378 THREAD_RESULT THREAD_CALL MsgWaitQueue::housekeeperThread(void *arg)
  379 {
  380    ThreadSetName("MsgWaitQueue");
  381    while(!ConditionWait(m_shutdownCondition, TTL_CHECK_INTERVAL))
  382    {
  383       MutexLock(m_housekeeperLock);
  384       m_activeQueues->forEach(MsgWaitQueue::houseKeeperCallback);
  385       MutexUnlock(m_housekeeperLock);
  386    }
  387    return THREAD_OK;
  388 }
  389 
  390 /**
  391  * Shutdown message wait queue background tasks
  392  */
  393 void MsgWaitQueue::shutdown()
  394 {
  395    ConditionSet(m_shutdownCondition);
  396    ThreadJoin(m_housekeeperThread);
  397    MutexLock(m_housekeeperLock);
  398    m_housekeeperThread = INVALID_THREAD_HANDLE;
  399    delete_and_null(m_activeQueues);
  400    MutexUnlock(m_housekeeperLock);
  401    ConditionDestroy(m_shutdownCondition);
  402    MutexDestroy(m_housekeeperLock);
  403 }
  404 
  405 /**
  406  * Diag info callback
  407  */
  408 EnumerationCallbackResult MsgWaitQueue::diagInfoCallback(const uint64_t& key, MsgWaitQueue *queue, StringBuffer *output)
  409 {
  410    TCHAR buffer[256];
  411    _sntprintf(buffer, 256, _T("   %p size=%d holdTime=%d\n"), queue, queue->m_size, queue->m_holdTime);
  412    output->append(buffer);
  413    return _CONTINUE;
  414 }
  415 
  416 /**
  417  * Get diagnostic info
  418  */
  419 StringBuffer MsgWaitQueue::getDiagInfo()
  420 {
  421    StringBuffer out;
  422    MutexLock(m_housekeeperLock);
  423    out.append(m_activeQueues->size());
  424    out.append(_T(" active queues\nHousekeeper thread state is "));
  425    out.append((m_housekeeperThread != INVALID_THREAD_HANDLE) ? _T("RUNNING\n") : _T("STOPPED\n"));
  426    if (m_activeQueues->size() > 0)
  427    {
  428       out.append(_T("Active queues:\n"));
  429       m_activeQueues->forEach(MsgWaitQueue::diagInfoCallback, &out);
  430    }
  431    MutexUnlock(m_housekeeperLock);
  432    return out;
  433 }