"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "modules/diskq/tests/test_diskq_truncate.c" between
syslog-ng-3.33.1.tar.gz and syslog-ng-3.33.2.tar.gz

About: syslog-ng (syslog "next generation") is a high performance, stable central logging system. Open source edition.

test_diskq_truncate.c  (syslog-ng-3.33.1):test_diskq_truncate.c  (syslog-ng-3.33.2)
skipping to change at line 39 skipping to change at line 39
#include "apphook.h" #include "apphook.h"
#include "diskq-config.h" #include "diskq-config.h"
#include "queue_utils_lib.h" #include "queue_utils_lib.h"
#include "test_diskq_tools.h" #include "test_diskq_tools.h"
#include <criterion/criterion.h> #include <criterion/criterion.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/stat.h> #include <sys/stat.h>
#define TEST_DISKQ_SIZE 1100000
static LogQueue * static LogQueue *
_get_diskqueue(gchar *filename, DiskQueueOptions *options) _get_non_reliable_diskqueue(gchar *filename, DiskQueueOptions *options)
{ {
LogQueue *q = log_queue_disk_non_reliable_new(options, NULL); LogQueue *q = log_queue_disk_non_reliable_new(options, NULL);
log_queue_set_use_backlog(q, FALSE); log_queue_set_use_backlog(q, FALSE);
log_queue_disk_load_queue(q, filename); log_queue_disk_load_queue(q, filename);
return q; return q;
} }
static LogQueue * static LogQueue *
_create_diskqueue(gchar *filename, DiskQueueOptions *options) _create_non_reliable_diskqueue(gchar *filename, DiskQueueOptions *options, gint qout_size, gdouble truncate_size_ratio)
{ {
LogQueue *q; LogQueue *q;
unlink(filename); unlink(filename);
_construct_options(options, 1100000, 0, FALSE); _construct_options(options, TEST_DISKQ_SIZE, 0, FALSE);
options->qout_size = 40;
q = _get_diskqueue(filename, options); options->qout_size = qout_size;
if (truncate_size_ratio < 0)
truncate_size_ratio = disk_queue_config_get_truncate_size_ratio(configuratio
n);
options->truncate_size_ratio = truncate_size_ratio;
q = _get_non_reliable_diskqueue(filename, options);
return q; return q;
} }
static void static void
_save_diskqueue(LogQueue *q) _save_diskqueue(LogQueue *q)
{ {
gboolean persistent; gboolean persistent;
log_queue_disk_save_queue(q, &persistent); log_queue_disk_save_queue(q, &persistent);
log_queue_unref(q); log_queue_unref(q);
} }
skipping to change at line 140 skipping to change at line 148
gchar *filename; gchar *filename;
} TruncateTestParams; } TruncateTestParams;
static void static void
_test_diskq_truncate(TruncateTestParams params) _test_diskq_truncate(TruncateTestParams params)
{ {
LogQueue *q; LogQueue *q;
DiskQueueOptions options; DiskQueueOptions options;
cr_assert(cfg_init(configuration), "cfg_init failed!"); cr_assert(cfg_init(configuration), "cfg_init failed!");
q = _create_diskqueue(params.filename, &options); q = _create_non_reliable_diskqueue(params.filename, &options, 40, 0);
cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!"); cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!");
feed_some_messages(q, params.number_of_msgs_to_push); feed_some_messages(q, params.number_of_msgs_to_push);
cr_assert_eq(log_queue_get_length(q), params.number_of_msgs_to_push, "Not all messages have been queued!"); cr_assert_eq(log_queue_get_length(q), params.number_of_msgs_to_push, "Not all messages have been queued!");
gint64 messages_on_disk_after_push = qdisk_get_length(((LogQueueDisk *)q)->qdi sk); gint64 messages_on_disk_after_push = qdisk_get_length(((LogQueueDisk *)q)->qdi sk);
_assert_diskq_actual_file_size_with_stored(q); _assert_diskq_actual_file_size_with_stored(q);
send_some_messages(q, params.number_of_msgs_to_pop); send_some_messages(q, params.number_of_msgs_to_pop);
cr_assert_eq(log_queue_get_length(q), params.number_of_msgs_to_push - params.n umber_of_msgs_to_pop, cr_assert_eq(log_queue_get_length(q), params.number_of_msgs_to_push - params.n umber_of_msgs_to_pop,
"Invalid number of messages in disk-queue after messages have bee n popped!"); "Invalid number of messages in disk-queue after messages have bee n popped!");
_save_diskqueue(q); _save_diskqueue(q);
q = _get_diskqueue(params.filename, &options); q = _get_non_reliable_diskqueue(params.filename, &options);
cr_assert_eq(log_queue_get_length(q), params.number_of_msgs_to_push - params.n umber_of_msgs_to_pop, cr_assert_eq(log_queue_get_length(q), params.number_of_msgs_to_push - params.n umber_of_msgs_to_pop,
"Invalid number of messages in disk-queue after opened existing o ne"); "Invalid number of messages in disk-queue after opened existing o ne");
_assert_diskq_actual_file_size_with_stored(q); _assert_diskq_actual_file_size_with_stored(q);
_assert_diskq_actual_file_size_with_expected(q, params.should_file_be_empty_af ter_truncate, _assert_diskq_actual_file_size_with_expected(q, params.should_file_be_empty_af ter_truncate,
messages_on_disk_after_push); messages_on_disk_after_push);
unlink(params.filename); unlink(params.filename);
log_queue_unref(q); log_queue_unref(q);
disk_queue_options_destroy(&options); disk_queue_options_destroy(&options);
skipping to change at line 199 skipping to change at line 207
}); });
} }
static LogQueue * static LogQueue *
_create_reliable_diskqueue(gchar *filename, DiskQueueOptions *options, gboolean use_backlog, _create_reliable_diskqueue(gchar *filename, DiskQueueOptions *options, gboolean use_backlog,
gdouble truncate_size_ratio) gdouble truncate_size_ratio)
{ {
LogQueue *q; LogQueue *q;
unlink(filename); unlink(filename);
_construct_options(options, 1100000, 0, TRUE); _construct_options(options, TEST_DISKQ_SIZE, 0, TRUE);
if (truncate_size_ratio < 0) if (truncate_size_ratio < 0)
truncate_size_ratio = disk_queue_config_get_truncate_size_ratio(configuratio n); truncate_size_ratio = disk_queue_config_get_truncate_size_ratio(configuratio n);
options->truncate_size_ratio = truncate_size_ratio; options->truncate_size_ratio = truncate_size_ratio;
q = log_queue_disk_reliable_new(options, "persist-name"); q = log_queue_disk_reliable_new(options, "persist-name");
log_queue_set_use_backlog(q, use_backlog); log_queue_set_use_backlog(q, use_backlog);
log_queue_disk_load_queue(q, filename); log_queue_disk_load_queue(q, filename);
return q; return q;
} }
Test(diskq_truncate, test_diskq_truncate_on_push) Test(diskq_truncate, test_diskq_truncate_on_push)
{ {
const gint full_disk_message_number = 8194; // measured for disk_buf_size 1100 000 const gint full_disk_message_number = 8194; // measured for disk_buf_size TEST _DISKQ_SIZE
const gint read_is_on_end_message_number = full_disk_message_number - 200; const gint read_is_on_end_message_number = full_disk_message_number - 200;
const gint write_wraps_message_number = read_is_on_end_message_number - 200; const gint write_wraps_message_number = read_is_on_end_message_number - 200;
const gint read_wraps_message_number = 300; const gint read_wraps_message_number = 300;
const gint trigger_truncate_message_number = 1; const gint trigger_truncate_message_number = 1;
LogQueue *q; LogQueue *q;
GString *filename = g_string_new("test_dq_truncate_on_write.rqf"); GString *filename = g_string_new("test_dq_truncate_on_write.rqf");
DiskQueueOptions options; DiskQueueOptions options;
q = _create_reliable_diskqueue(filename->str, &options, FALSE, -1); q = _create_reliable_diskqueue(filename->str, &options, FALSE, -1);
cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!"); cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!");
skipping to change at line 267 skipping to change at line 275
g_string_free(filename, TRUE); g_string_free(filename, TRUE);
_save_diskqueue(q); _save_diskqueue(q);
} }
static void static void
_assert_cursors_are_at_start(LogQueue *q) _assert_cursors_are_at_start(LogQueue *q)
{ {
QDisk *qdisk = ((LogQueueDisk *)q)->qdisk; QDisk *qdisk = ((LogQueueDisk *)q)->qdisk;
cr_assert_eq(qdisk->hdr->read_head, QDISK_RESERVED_SPACE, "Read head was not r cr_assert_eq(qdisk_get_reader_head(qdisk), QDISK_RESERVED_SPACE, "Read head wa
eset!"); s not reset!");
cr_assert_eq(qdisk->hdr->write_head, QDISK_RESERVED_SPACE, "Write head was not cr_assert_eq(qdisk_get_writer_head(qdisk), QDISK_RESERVED_SPACE, "Write head w
reset!"); as not reset!");
cr_assert_eq(qdisk->hdr->backlog_head, QDISK_RESERVED_SPACE, "Backlog head was cr_assert_eq(qdisk_get_backlog_head(qdisk), QDISK_RESERVED_SPACE, "Backlog hea
not reset!"); d was not reset!");
} }
Test(diskq_truncate, test_diskq_truncate_size_ratio_default) Test(diskq_truncate, test_diskq_truncate_size_ratio_default)
{ {
const gint truncate_threshold = (gint)(1100000 * disk_queue_config_get_truncat e_size_ratio(configuration)); const gint truncate_threshold = (gint)(TEST_DISKQ_SIZE * disk_queue_config_get _truncate_size_ratio(configuration));
const gint empty_log_msg_size = 134; const gint empty_log_msg_size = 134;
const gint below_threshold_message_number = truncate_threshold / empty_log_msg _size; const gint below_threshold_message_number = truncate_threshold / empty_log_msg _size;
const gint above_threshold_message_number = below_threshold_message_number + 1 ; const gint above_threshold_message_number = below_threshold_message_number + 1 ;
LogQueue *q; LogQueue *q;
GString *filename = g_string_new("test_dq_truncate_size_ratio_default.rqf"); GString *filename = g_string_new("test_dq_truncate_size_ratio_default.rqf");
DiskQueueOptions options; DiskQueueOptions options;
q = _create_reliable_diskqueue(filename->str, &options, TRUE, -1); q = _create_reliable_diskqueue(filename->str, &options, TRUE, -1);
cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!"); cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!");
skipping to change at line 349 skipping to change at line 357
_assert_diskq_actual_file_size_with_stored(q); _assert_diskq_actual_file_size_with_stored(q);
unlink(filename->str); unlink(filename->str);
g_string_free(filename, TRUE); g_string_free(filename, TRUE);
_save_diskqueue(q); _save_diskqueue(q);
} }
Test(diskq_truncate, test_diskq_truncate_size_ratio_1) Test(diskq_truncate, test_diskq_truncate_size_ratio_1)
{ {
const gint full_disk_message_number = 8194; // measured for disk_buf_size 1100 000 const gint full_disk_message_number = 8194; // measured for disk_buf_size TEST _DISKQ_SIZE
LogQueue *q; LogQueue *q;
GString *filename = g_string_new("test_dq_truncate_size_ratio_1.rqf"); GString *filename = g_string_new("test_dq_truncate_size_ratio_1.rqf");
DiskQueueOptions options; DiskQueueOptions options;
q = _create_reliable_diskqueue(filename->str, &options, TRUE, 1); q = _create_reliable_diskqueue(filename->str, &options, TRUE, 1);
cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!"); cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre ated disk-queue file!");
// 1. feed to full // 1. feed to full
feed_some_messages(q, full_disk_message_number); feed_some_messages(q, full_disk_message_number);
cr_assert_eq(log_queue_get_length(q), full_disk_message_number, cr_assert_eq(log_queue_get_length(q), full_disk_message_number,
skipping to change at line 379 skipping to change at line 387
_assert_diskq_actual_file_size_with_stored(q); _assert_diskq_actual_file_size_with_stored(q);
unlink(filename->str); unlink(filename->str);
g_string_free(filename, TRUE); g_string_free(filename, TRUE);
_save_diskqueue(q); _save_diskqueue(q);
} }
static void static void
_feed_one_large_message(LogQueue *q)
{
LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
path_options.ack_needed = q->use_backlog;
path_options.flow_control_requested = TRUE;
LogMessage *msg = log_msg_new_empty();
log_msg_set_value(msg, LM_V_MESSAGE, "almafalmafalmafalmafalmafalmafalmafalmaf
almafalmafalmafa", -1);
log_msg_add_ack(msg, &path_options);
msg->ack_func = test_ack;
log_queue_push_tail(q, msg, &path_options);
}
Test(diskq_truncate, test_diskq_no_truncate_wrap)
{
const gint just_under_max_size_message_number = 8239; // measured for disk_buf
_size TEST_DISKQ_SIZE
const gint some_messages_to_let_write_head_wrap = 500;
gint unprocessed_messages_in_buffer = 0;
LogQueue *q;
GString *filename = g_string_new("test_dq_no_truncate_wrap.rqf");
DiskQueueOptions options;
q = _create_reliable_diskqueue(filename->str, &options, TRUE, 1);
cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre
ated disk-queue file!");
QDisk *qdisk = ((LogQueueDisk *)q)->qdisk;
// 1. feed to full
feed_some_messages(q, just_under_max_size_message_number);
cr_assert_eq(log_queue_get_length(q), just_under_max_size_message_number,
"Not all messages have been queued!");
unprocessed_messages_in_buffer += just_under_max_size_message_number;
// 2. process some messages, so write_head can wrap
send_some_messages(q, some_messages_to_let_write_head_wrap);
log_queue_ack_backlog(q, some_messages_to_let_write_head_wrap);
unprocessed_messages_in_buffer -= some_messages_to_let_write_head_wrap;
// 3. feed 1 large msg to make file_size big
_feed_one_large_message(q);
cr_assert(qdisk_get_writer_head(qdisk) < qdisk_get_reader_head(qdisk), "write_
head should have wrapped");
cr_assert(qdisk->file_size > TEST_DISKQ_SIZE, "file_size should be bigger than
max size");
unprocessed_messages_in_buffer += 1;
// 4. send and ack all messages
send_some_messages(q, unprocessed_messages_in_buffer);
log_queue_ack_backlog(q, unprocessed_messages_in_buffer);
cr_assert(qdisk_get_length(qdisk) == 0, "qdisk len should be 0");
unprocessed_messages_in_buffer = 0;
unprocessed_messages_in_buffer = 0;
// 5. feed to full
feed_some_messages(q, just_under_max_size_message_number);
cr_assert_eq(log_queue_get_length(q), just_under_max_size_message_number,
"Not all messages have been queued!");
unprocessed_messages_in_buffer += just_under_max_size_message_number;
// 6. process some messages, so write_head can wrap
send_some_messages(q, some_messages_to_let_write_head_wrap);
log_queue_ack_backlog(q, some_messages_to_let_write_head_wrap);
unprocessed_messages_in_buffer -= some_messages_to_let_write_head_wrap;
// 7. feed 2 to wrap write_head and to add one message to the beginning
feed_some_messages(q, 1);
cr_assert(qdisk_get_writer_head(qdisk) == QDISK_RESERVED_SPACE, "write_head sh
ould have wrapped");
feed_some_messages(q, 1);
unprocessed_messages_in_buffer += 2;
// 8. process all messages
send_some_messages(q, unprocessed_messages_in_buffer);
log_queue_ack_backlog(q, unprocessed_messages_in_buffer);
cr_assert(qdisk_get_length(qdisk) == 0, "qdisk len should be 0");
cr_assert(qdisk_get_writer_head(qdisk) == qdisk_get_reader_head(qdisk), "read_
head should be at write_head's position");
unprocessed_messages_in_buffer = 0;
_assert_diskq_actual_file_size_with_stored(q);
unlink(filename->str);
g_string_free(filename, TRUE);
_save_diskqueue(q);
}
Test(diskq_truncate, test_non_reliable_diskq_restart_with_truncation_disabled)
{
const gint qout_size = 128;
const gint just_under_max_size_message_number = 8239 + qout_size; // measured
for disk_buf_size TEST_DISKQ_SIZE
const gint some_messages_to_let_write_head_wrap = 500;
GString *filename = g_string_new("test_dq_non_reliable_restart.rqf");
DiskQueueOptions options;
LogQueue *q = _create_non_reliable_diskqueue(filename->str, &options, qout_siz
e, 1);
cr_assert_eq(log_queue_get_length(q), 0, "No messages should be in a newly cre
ated disk-queue file!");
QDisk *qdisk = ((LogQueueDisk *)q)->qdisk;
// 1. feed to full
feed_some_messages(q, just_under_max_size_message_number);
gint unprocessed_messages_in_buffer = just_under_max_size_message_number;
// 2. process some messages, so write_head can wrap
send_some_messages(q, some_messages_to_let_write_head_wrap);
log_queue_ack_backlog(q, some_messages_to_let_write_head_wrap);
unprocessed_messages_in_buffer -= some_messages_to_let_write_head_wrap;
// 3. feed additional messages, so write_head wraps
feed_some_messages(q, 300);
unprocessed_messages_in_buffer += 300;
cr_assert(qdisk_get_writer_head(qdisk) < qdisk_get_reader_head(qdisk), "write_
head should have wrapped");
// Restart
_save_diskqueue(q);
q = _get_non_reliable_diskqueue(filename->str, &options);
qdisk = ((LogQueueDisk *)q)->qdisk;
// 4. send and ack all messages
send_some_messages(q, unprocessed_messages_in_buffer);
cr_assert(qdisk_get_length(qdisk) == 0, "qdisk len should be 0");
_assert_diskq_actual_file_size_with_stored(q);
unlink(filename->str);
g_string_free(filename, TRUE);
_save_diskqueue(q);
}
static void
setup(void) setup(void)
{ {
msg_init(TRUE); // internal messages will go to stderr(), msg_init() is idempo tent msg_init(TRUE); // internal messages will go to stderr(), msg_init() is idempo tent
app_startup(); app_startup();
configuration = cfg_new_snippet(); configuration = cfg_new_snippet();
} }
static void static void
teardown(void) teardown(void)
{ {
 End of changes. 12 change blocks. 
17 lines changed or deleted 165 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)