"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/zabbix_server/preprocessor/preproc_manager.c" between
zabbix-5.4.0.tar.gz and zabbix-5.4.1.tar.gz

About: ZABBIX is an enterprise-class distributed monitoring solution for servers and applications.

preproc_manager.c  (zabbix-5.4.0):preproc_manager.c  (zabbix-5.4.1)
skipping to change at line 1146 skipping to change at line 1146
worker->task = NULL; worker->task = NULL;
preprocessor_free_direct_request(direct_request); preprocessor_free_direct_request(direct_request);
preprocessor_assign_tasks(manager); preprocessor_assign_tasks(manager);
preprocessing_flush_queue(manager); preprocessing_flush_queue(manager);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
} }
static void preprocessor_get_items_totals(zbx_preprocessing_manager_t *manage
r, int *total, int *queued,
int *processing, int *done, int *pending)
{
#define ZBX_MAX_REQUEST_STATE_PRINT_LIMIT 25
zbx_preproc_item_stats_t *item;
zbx_list_iterator_t iterator;
zbx_preprocessing_request_t *request;
zbx_hashset_t items;
*total = 0;
*queued = 0;
*processing = 0;
*done = 0;
*pending = 0;
zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAUL
T_UINT64_COMPARE_FUNC);
zbx_list_iterator_init(&manager->queue, &iterator);
while (SUCCEED == zbx_list_iterator_next(&iterator))
{
zbx_list_iterator_peek(&iterator, (void **)&request);
if (NULL == (item = zbx_hashset_search(&items, &request->value.it
emid)))
{
zbx_preproc_item_stats_t item_local = {.itemid = r
equest->value.itemid};
item = zbx_hashset_insert(&items, &item_local, sizeof(ite
m_local));
}
switch(request->state)
{
case REQUEST_STATE_QUEUED:
if (*queued < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
{
zabbix_log(LOG_LEVEL_DEBUG, "oldest queue
d itemid: " ZBX_FS_UI64
" values:%d pos:%d", item
->itemid, item->values_num, *total);
}
(*queued)++;
break;
case REQUEST_STATE_PROCESSING:
if (*processing < ZBX_MAX_REQUEST_STATE_PRINT_LIM
IT)
{
zabbix_log(LOG_LEVEL_DEBUG, "oldest proce
ssing itemid: " ZBX_FS_UI64
" values:%d pos:%d", item
->itemid, item->values_num, *total);
}
(*processing)++;
break;
case REQUEST_STATE_DONE:
if (*done < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
{
zabbix_log(LOG_LEVEL_DEBUG, "oldest done
itemid: " ZBX_FS_UI64
" values:%d pos:%d", item
->itemid, item->values_num, *total);
}
(*done)++;
break;
case REQUEST_STATE_PENDING:
if (*pending < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
{
zabbix_log(LOG_LEVEL_DEBUG, "oldest pendi
ng itemid: " ZBX_FS_UI64
" values:%d pos:%d", item
->itemid, item->values_num, *total);
}
(*pending)++;
break;
}
item->values_num++;
(*total)++;
}
zbx_hashset_destroy(&items);
#undef ZBX_MAX_REQUEST_STATE_PRINT_LIMIT
}
/****************************************************************************** /******************************************************************************
* * * *
* Function: preprocessor_get_diag_stats * * Function: preprocessor_get_diag_stats *
* * * *
* Purpose: return diagnostic statistics * * Purpose: return diagnostic statistics *
* * * *
* Parameters: manager - [IN] preprocessing manager * * Parameters: manager - [IN] preprocessing manager *
* client - [IN] IPC client * * client - [IN] IPC client *
* * * *
******************************************************************************/ ******************************************************************************/
static void preprocessor_get_diag_stats(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client) static void preprocessor_get_diag_stats(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client)
{ {
unsigned char *data; unsigned char *data;
zbx_uint32_t data_len; zbx_uint32_t data_len;
int total, queued, processing, done, pending;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
data_len = zbx_preprocessor_pack_diag_stats(&data, manager->queued_num, m preprocessor_get_items_totals(manager, &total, &queued, &processing, &don
anager->preproc_num); e, &pending);
data_len = zbx_preprocessor_pack_diag_stats(&data, total, queued, process
ing, done, pending);
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len); zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
zbx_free(data); zbx_free(data);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
} }
/****************************************************************************** /******************************************************************************
* * * *
* Function: preproc_sort_item_by_values_desc * * Function: preproc_sort_item_by_values_desc *
* * * *
skipping to change at line 1185 skipping to change at line 1262
* * * *
******************************************************************************/ ******************************************************************************/
static int preproc_sort_item_by_values_desc(const void *d1, const void *d2) static int preproc_sort_item_by_values_desc(const void *d1, const void *d2)
{ {
zbx_preproc_item_stats_t *i1 = *(zbx_preproc_item_stats_t **)d1; zbx_preproc_item_stats_t *i1 = *(zbx_preproc_item_stats_t **)d1;
zbx_preproc_item_stats_t *i2 = *(zbx_preproc_item_stats_t **)d2; zbx_preproc_item_stats_t *i2 = *(zbx_preproc_item_stats_t **)d2;
return i2->values_num - i1->values_num; return i2->values_num - i1->values_num;
} }
static void preprocessor_get_items_view(zbx_preprocessing_manager_t *manager,
zbx_hashset_t *items,
zbx_vector_ptr_t *view)
{
zbx_preproc_item_stats_t *item;
zbx_list_iterator_t iterator;
zbx_preprocessing_request_t *request;
zbx_list_iterator_init(&manager->queue, &iterator);
while (SUCCEED == zbx_list_iterator_next(&iterator))
{
zbx_list_iterator_peek(&iterator, (void **)&request);
if (NULL == (item = zbx_hashset_search(items, &request->value.ite
mid)))
{
zbx_preproc_item_stats_t item_local = {.itemid = r
equest->value.itemid};
item = zbx_hashset_insert(items, &item_local, sizeof(item
_local));
zbx_vector_ptr_append(view, item);
}
/* There might be processed, but not yet flushed items at the sta
rt of queue with */
/* freed preprocessing steps and steps_num being zero. Because of
that keep updating */
/* items steps_num to have preprocessing steps of last queued ite
m. */
item->steps_num = request->steps_num;
item->values_num++;
}
}
/****************************************************************************** /******************************************************************************
* * * *
* Function: preprocessor_get_top_items * * Function: preprocessor_get_top_items *
* * * *
* Purpose: return diagnostic top view * * Purpose: return diagnostic top view *
* * * *
* Parameters: manager - [IN] preprocessing manager * * Parameters: manager - [IN] preprocessing manager *
* client - [IN] IPC client * * client - [IN] IPC client *
* message - [IN] the message with request * * message - [IN] the message with request *
* * * *
******************************************************************************/ ******************************************************************************/
static void preprocessor_get_top_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client, static void preprocessor_get_top_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
zbx_ipc_message_t *message) zbx_ipc_message_t *message)
{ {
int limit; int limit;
unsigned char *data; unsigned char *data;
zbx_uint32_t data_len; zbx_uint32_t data_len;
zbx_hashset_t items; zbx_hashset_t items;
zbx_vector_ptr_t view; zbx_vector_ptr_t view;
zbx_list_iterator_t iterator;
zbx_preprocessing_request_t *request;
zbx_preproc_item_stats_t *item;
int items_num;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_preprocessor_unpack_top_request(&limit, message->data); zbx_preprocessor_unpack_top_request(&limit, message->data);
zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAUL T_UINT64_COMPARE_FUNC); zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAUL T_UINT64_COMPARE_FUNC);
zbx_vector_ptr_create(&view); zbx_vector_ptr_create(&view);
zbx_list_iterator_init(&manager->queue, &iterator); preprocessor_get_items_view(manager, &items, &view);
while (SUCCEED == zbx_list_iterator_next(&iterator))
zbx_vector_ptr_sort(&view, preproc_sort_item_by_values_desc);
data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_ite
m_stats_t **)view.values,
MIN(limit, view.values_num));
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data,
data_len);
zbx_free(data);
zbx_vector_ptr_destroy(&view);
zbx_hashset_destroy(&items);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}
static void preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t
*manager, zbx_ipc_client_t *client,
zbx_ipc_message_t *message)
{
int limit, i;
unsigned char *data;
zbx_uint32_t data_len;
zbx_hashset_t items;
zbx_vector_ptr_t view, view_preproc;
zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
zbx_preprocessor_unpack_top_request(&limit, message->data);
zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAUL
T_UINT64_COMPARE_FUNC);
zbx_vector_ptr_create(&view);
zbx_vector_ptr_create(&view_preproc);
zbx_vector_ptr_reserve(&view_preproc, limit);
preprocessor_get_items_view(manager, &items, &view);
for (i = 0; i < view.values_num && 0 < limit; i++)
{ {
zbx_list_iterator_peek(&iterator, (void **)&request); zbx_preproc_item_stats_t *item;
if (NULL == (item = zbx_hashset_search(&items, &request->value.it item = (zbx_preproc_item_stats_t *)view.values[i];
emid)))
{
zbx_preproc_item_stats_t item_local = {.itemid = r
equest->value.itemid};
item = zbx_hashset_insert(&items, &item_local, sizeof(ite /* only items with preprocessing can slow down queue */
m_local)); if (0 == item->steps_num)
zbx_vector_ptr_append(&view, item); continue;
}
/* There might be processed, but not yet flushed items at the sta
rt of queue with */
/* freed preprocessing steps and steps_num being zero. Because of
that keep updating */
/* items steps_num to have preprocessing steps of last queued ite
m. */
item->steps_num = request->steps_num;
item->values_num++;
}
zbx_vector_ptr_sort(&view, preproc_sort_item_by_values_desc); zbx_vector_ptr_append(&view_preproc, item);
items_num = MIN(limit, view.values_num); limit--;
}
data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_ite data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_ite
m_stats_t **)view.values, items_num); m_stats_t **)view_preproc.values,
MIN(limit, view_preproc.values_num));
zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len); zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
zbx_free(data); zbx_free(data);
zbx_vector_ptr_destroy(&view_preproc);
zbx_vector_ptr_destroy(&view); zbx_vector_ptr_destroy(&view);
zbx_hashset_destroy(&items); zbx_hashset_destroy(&items);
zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
} }
/****************************************************************************** /******************************************************************************
* * * *
* Function: preprocessor_init_manager * * Function: preprocessor_init_manager *
* * * *
skipping to change at line 1445 skipping to change at line 1574
break; break;
case ZBX_IPC_PREPROCESSOR_TEST_RESULT: case ZBX_IPC_PREPROCESSOR_TEST_RESULT:
preprocessor_flush_test_result(&manager, client, message); preprocessor_flush_test_result(&manager, client, message);
break; break;
case ZBX_IPC_PREPROCESSOR_DIAG_STATS: case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
preprocessor_get_diag_stats(&manager, cli ent); preprocessor_get_diag_stats(&manager, cli ent);
break; break;
case ZBX_IPC_PREPROCESSOR_TOP_ITEMS: case ZBX_IPC_PREPROCESSOR_TOP_ITEMS:
preprocessor_get_top_items(&manager, clie nt, message); preprocessor_get_top_items(&manager, clie nt, message);
break; break;
case ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEM
S:
preprocessor_get_oldest_preproc_items(&ma
nager, client, message);
break;
} }
zbx_ipc_message_free(message); zbx_ipc_message_free(message);
} }
if (NULL != client) if (NULL != client)
zbx_ipc_client_release(client); zbx_ipc_client_release(client);
if (0 == manager.preproc_num || 1 < time_now - time_flush) if (0 == manager.preproc_num || 1 < time_now - time_flush)
{ {
 End of changes. 13 change blocks. 
36 lines changed or deleted 190 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)