"Fossies" - the Fresh Open Source Software Archive

Member "openpa-1.0.4/test/test_queue.c" (5 Dec 2012, 16120 Bytes) of package /linux/misc/openpa-1.0.4.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
    2 /*
    3  *  (C) 2010 by Argonne National Laboratory.
    4  *      See COPYRIGHT in top-level directory.
    5  */
    6 
    7 /*
    8  * Uncomment this definition to disable the OPA library and instead use naive
    9  * (non-atomic) operations.  This should cause failures.
   10  */
   11 /*
   12  * Note: The NAIVE implementation is likely to cause an infinite loop because of
   13  * the busy wait in OPA_Queue_dequeue that might never exit if an element was
   14  * enqueued improperly.
   15  */
   16 /*
   17 #define OPA_TEST_NAIVE
   18 */
   19 
   20 #include "opa_test.h"
   21 #include "opa_queue.h"
   22 
   23 /* Definitions for test_queue_sanity */
   24 typedef struct {
   25     OPA_Queue_element_hdr_t hdr;        /* Queue element header */
   26     int         val;                    /* Value of element */
   27 } sanity_t;
   28 
   29 /* Definitions for test_queue_threaded */
   30 #define THREADED_NITER 500000
   31 typedef struct {
   32     OPA_Queue_element_hdr_t hdr;        /* Queue element header */
   33     int         threadno;               /* Unique thread number */
   34     int         sequenceno;             /* Unique sequence number within thread */
   35 } threaded_element_t;
   36 typedef enum {
   37     THREADED_MODE_DEFAULT,              /* Test does not try to influence scheduling */
   38     THREADED_MODE_EMPTY,                /* Test tries to keep queue empty */
   39     THREADED_MODE_FULL,                 /* Test tries to keep queue full */
   40     THREADED_MODE_NMODES                /* Number of modes (must be last) */
   41 } threaded_mode_t;
   42 typedef struct {
   43     OPA_Queue_info_t *qhead;            /* Head for shared queue */
   44     int         threadno;               /* Unique thread number */
   45     threaded_mode_t mode;               /* Test mode (empty or full) */
   46 } threaded_t;
   47 
   48 
   49 /*-------------------------------------------------------------------------
   50  * Function: test_queue_sanity
   51  *
   52  * Purpose: Tests basic functionality of the OPA Queue in a single
   53  *          threaded environment.
   54  *
   55  * Return: Success: 0
   56  *         Failure: 1
   57  *
   58  * Programmer: Neil Fortner
   59  *             Wednesday, April 1, 2009
   60  *
   61  * Modifications:
   62  *
   63  *-------------------------------------------------------------------------
   64  */
   65 static int test_queue_sanity(void)
   66 {
   67     OPA_Queue_info_t    qhead;
   68     sanity_t            elements[4];
   69     sanity_t            *element;
   70     int                 i;
   71 
   72     TESTING("queue sanity", 0);
   73 
   74     /* Initialize queue */
   75     OPA_Queue_init(&qhead);
   76 
   77     /* Make sure the queue is empty */
   78     if(!OPA_Queue_is_empty(&qhead)) TEST_ERROR;
   79     if(NULL != OPA_Queue_peek_head(&qhead)) TEST_ERROR;
   80 
   81     /* Initialize elements */
   82     for(i=0; i<4; i++) {
   83         OPA_Queue_header_init(&elements[i].hdr);
   84         elements[i].val = i;
   85     } /* end for */
   86 
   87     /* Enqueue element 0 */
   88     OPA_Queue_enqueue(&qhead, &elements[0], sanity_t, hdr);
   89 
   90     /* Peek and make sure we get element 0 */
   91     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
   92     if(NULL == (element = OPA_Queue_peek_head(&qhead))) TEST_ERROR;
   93     if(element->val != 0) TEST_ERROR;
   94 
   95     /* Dequeue and make sure we get element 0 */
   96     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
   97     OPA_Queue_dequeue(&qhead, element, sanity_t, hdr);
   98     if(element->val != 0) TEST_ERROR;
   99 
  100     /* Make sure the queue is now empty */
  101     if(!OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  102 
  103     /* Enqueue elements 1 and 2 */
  104     OPA_Queue_enqueue(&qhead, &elements[1], sanity_t, hdr);
  105     OPA_Queue_enqueue(&qhead, &elements[2], sanity_t, hdr);
  106 
  107     /* Peek and make sure we get element 1 */
  108     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  109     if(NULL == (element = OPA_Queue_peek_head(&qhead))) TEST_ERROR;
  110     if(element->val != 1) TEST_ERROR;
  111 
  112     /* Dequeue and make sure we get element 1 */
  113     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  114     OPA_Queue_dequeue(&qhead, element, sanity_t, hdr);
  115     if(element->val != 1) TEST_ERROR;
  116 
  117     /* Enqueue element 3 */
  118     OPA_Queue_enqueue(&qhead, &elements[3], sanity_t, hdr);
  119 
  120     /* Dequeue and make sure we get element 2 */
  121     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  122     OPA_Queue_dequeue(&qhead, element, sanity_t, hdr);
  123     if(element->val != 2) TEST_ERROR;
  124 
  125     /* Peek and make sure we get element 3 */
  126     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  127     if(NULL == (element = OPA_Queue_peek_head(&qhead))) TEST_ERROR;
  128     if(element->val != 3) TEST_ERROR;
  129 
  130     /* Dequeue and make sure we get element 3 */
  131     if(OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  132     OPA_Queue_dequeue(&qhead, element, sanity_t, hdr);
  133     if(element->val != 3) TEST_ERROR;
  134 
  135     /* Make sure the queue is now empty */
  136     if(!OPA_Queue_is_empty(&qhead)) TEST_ERROR;
  137 
  138     PASSED();
  139     return 0;
  140 
  141 error:
  142     return 1;
  143 } /* end test_queue_sanity() */
  144 
  145 
  146 #if defined(OPA_HAVE_PTHREAD_H)
  147 /*-------------------------------------------------------------------------
  148  * Function: test_queue_threaded_enqueue
  149  *
  150  * Purpose: Helper (enqueue) routine for test_queue_threaded.  Enqueues
  151  *          elements to the shared queue with increasing sequence number
  152  *          and constant thread number.
  153  *
  154  * Return: Success: NULL
  155  *         Failure: non-NULL
  156  *
  157  * Programmer: Neil Fortner
  158  *             Tuesday, June 8, 2010
  159  *
  160  * Modifications:
  161  *
  162  *-------------------------------------------------------------------------
  163  */
  164 static void *test_queue_threaded_enqueue(void *_udata)
  165 {
  166     threaded_t  *udata = (threaded_t *)_udata;
  167     threaded_element_t *element = NULL;         /* Element to enqueue */
  168     int         niter = THREADED_NITER / iter_reduction[curr_test];
  169     int         i;
  170 
  171     /* Main loop */
  172     for(i=0; i<niter; i++) {
  173         /* If we're trying to keep the queue empty, yield here so the dequeuer
  174          * has a chance to empty the queue */
  175         if(udata->mode == THREADED_MODE_EMPTY)
  176             OPA_TEST_YIELD();
  177 
  178         /* Create an element to enqueue */
  179         if(NULL == (element = (threaded_element_t *) malloc(sizeof(threaded_element_t))))
  180             pthread_exit((void *) 1);
  181         OPA_Queue_header_init(&element->hdr);
  182         element->threadno = udata->threadno;
  183         element->sequenceno = i;
  184 
  185         /* Enqueue the element */
  186         OPA_Queue_enqueue(udata->qhead, element, threaded_element_t, hdr);
  187     } /* end for */
  188 
  189     /* Exit */
  190     pthread_exit(NULL);
  191 } /* end test_queue_threaded_enqueue() */
  192 
  193 
  194 /*-------------------------------------------------------------------------
  195  * Function: test_queue_threaded_dequeue
  196  *
  197  * Purpose: Helper (dequeue) routine for test_queue_threaded.  Dequeues
  198  *          elements from the shared queue and verifies that each thread
  199  *          added each sequence number exactly once and in the right
  200  *          order, and that order was preserved by the queue.
  201  *
  202  * Return: Success: 0
  203  *         Failure: 1
  204  *
  205  * Programmer: Neil Fortner
  206  *             Tuesday, June 8, 2010
  207  *
  208  * Modifications:
  209  *
  210  *-------------------------------------------------------------------------
  211  */
  212 static int test_queue_threaded_dequeue(void *_udata)
  213 {
  214     threaded_t  *udata = (threaded_t *)_udata;
  215     threaded_element_t *element = NULL;         /* Dequeued element */
  216     threaded_element_t *peek_element = NULL;    /* Peeked element */
  217     int         *expect_sn = NULL;              /* Expected sequence numbers */
  218     const int   nenqueuers = num_threads[curr_test] - 1;
  219     int         ndequeues = 0;  /* Number of times we have dequeued */
  220     int         peek_counter = 0; /* Iterations since last peek */
  221     const int   niter = (THREADED_NITER / iter_reduction[curr_test])
  222                         * nenqueuers;
  223     int         nerrors = 0;    /* Number of errors */
  224     int         i;
  225 
  226     /* Allocate expect_sn array */
  227     if(NULL == (expect_sn = (int *) calloc(nenqueuers, sizeof(int))))
  228         return(1);
  229 
  230     /* Main loop.  Terminates when either the expected number of elements have
  231      * been dequeued or the loop has continued too many times. */
  232     for(i=0; ndequeues < niter && i < (1000 * niter); i++) {
  233         /* If we're trying to keep the queue full, yield here so the enqueuers
  234          * have a chance to fill the queue */
  235         if(udata->mode == THREADED_MODE_FULL)
  236             OPA_TEST_YIELD();
  237 
  238         /* Check if the queue is nonempty */
  239         if(!OPA_Queue_is_empty(udata->qhead)) {
  240             /* Increment peek counter, and peek if the counter is 100 */
  241             peek_counter++;
  242             if(peek_counter == 100) {
  243                 if(NULL == (peek_element = OPA_Queue_peek_head(udata->qhead)))
  244                     nerrors++;
  245                 peek_counter = 0;
  246                 if(OPA_Queue_is_empty(udata->qhead))
  247                     return(1);
  248             } /* end if */
  249 
  250             /* Dequeue the head element */
  251             OPA_Queue_dequeue(udata->qhead, element, threaded_element_t, hdr);
  252 
  253             /* Increment the counter */
  254             ndequeues++;
  255 
  256             /* Make sure the dequeued element is the same as the peeked element,
  257              * if applicable */
  258             if(peek_element) {
  259                 if(peek_element->threadno != element->threadno
  260                         || peek_element->sequenceno != element->sequenceno) {
  261                     printf("    Dequeued element does not match peeked element\n");
  262                     nerrors++;
  263                 } /* end if */
  264                 peek_element = NULL;
  265             } /* end if */
  266 
  267             /* Check if the thread number is valid */
  268             if(element->threadno < 0 || element->threadno >= nenqueuers) {
  269                 printf("    Unexpected thread number: %d\n", element->threadno);
  270                 nerrors++;
  271             } else {
  272                 /* Check if the sequence number is equal to that expected, and
  273                  * update the expected sequence number */
  274                 if(element->sequenceno != expect_sn[element->threadno]) {
  275                     printf("    Unexpected sequence number: %d Expected: %d\n",
  276                             element->sequenceno, expect_sn[element->threadno]);
  277                     nerrors++;
  278                 } /* end if */
  279                 expect_sn[element->threadno] = element->sequenceno + 1;
  280             } /* end else */
  281 
  282             /* Free the element */
  283             free(element);
  284         } else
  285             OPA_TEST_YIELD();
  286     } /* end for */
  287 
  288     /* Check if the expected number of elements were dequeued */
  289     if(ndequeues != niter) {
  290         printf("    Incorrect number of elements dequeued: %d Expected: %d\n",
  291                 ndequeues, niter);
  292         nerrors++;
  293     } /* end if */
  294 
  295     /* Verify that the queue is now empty */
  296     if(!OPA_Queue_is_empty(udata->qhead)) {
  297         printf("    Queue is not empty at end of test!\n");
  298         nerrors++;
  299     } /* end if */
  300 
  301     /* Sanity checking that expect_sn is consistent if no errors */
  302     if(!nerrors)
  303         for(i=0; i<nenqueuers; i++)
  304             assert(expect_sn[i] == THREADED_NITER / iter_reduction[curr_test]);
  305 
  306     /* Free expect_sn */
  307     free(expect_sn);
  308 
  309     /* Any non-NULL exit value indicates an error, we use (void *) 1 here */
  310     return(nerrors ? 1 : 0);
  311 } /* end test_queue_threaded_dequeue() */
  312 #endif /* OPA_HAVE_PTHREAD_H */
  313 
  314 
  315 /*-------------------------------------------------------------------------
  316  * Function: test_queue_threaded
  317  *
  318  * Purpose: Tests that the queue behaves correctly in a multithreaded
  319  *          environment.  Creates many enqueuers and one dequeuer, and
  320  *          makes sure that elements are dequeued in the expected order.
  321  *
  322  * Return: Success: 0
  323  *         Failure: 1
  324  *
  325  * Programmer: Neil Fortner
  326  *             Wednesday, April 1, 2009
  327  *
  328  * Modifications:
  329  *
  330  *-------------------------------------------------------------------------
  331  */
  332 static int test_queue_threaded(threaded_mode_t mode)
  333 {
  334 #if defined(OPA_HAVE_PTHREAD_H)
  335     pthread_t           *threads = NULL; /* Threads */
  336     pthread_attr_t      ptattr;         /* Thread attributes */
  337     static threaded_t   *thread_data = NULL; /* User data structs for threads */
  338     OPA_Queue_info_t    qhead;          /* The queue */
  339     void                *ret;           /* Thread return value */
  340     unsigned            nthreads = num_threads[curr_test];
  341     int                 nerrors = 0;    /* number of errors */
  342     int                 i;
  343 
  344     switch(mode) {
  345         case THREADED_MODE_DEFAULT:
  346             TESTING("multithreaded queue", nthreads);
  347             break;
  348         case THREADED_MODE_EMPTY:
  349             TESTING("multithreaded queue (empty queue)", nthreads);
  350             break;
  351         case THREADED_MODE_FULL:
  352             TESTING("multithreaded queue (full queue)", nthreads);
  353             break;
  354         default:
  355             FAIL_OP_ERROR(printf("    Unknown mode\n"));
  356     } /* end switch */
  357 
  358     /* Allocate array of threads */
  359     if(NULL == (threads = (pthread_t *) malloc(nthreads * sizeof(pthread_t))))
  360         TEST_ERROR;
  361 
  362     /* Set threads to be joinable */
  363     pthread_attr_init(&ptattr);
  364     pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_JOINABLE);
  365 
  366     /* Allocate array of thread data */
  367     if(NULL == (thread_data = (threaded_t *) calloc(nthreads, sizeof(threaded_t))))
  368         TEST_ERROR;
  369 
  370     /* Initialize queue */
  371     OPA_Queue_init(&qhead);
  372 
  373     /* Initialize thread data structs */
  374     for(i=0; i<nthreads; i++) {
  375         thread_data[i].qhead = &qhead;
  376         thread_data[i].threadno = i;
  377         thread_data[i].mode = mode;
  378     } /* end for */
  379 
  380     /* Create the threads. */
  381     for(i=0; i<(nthreads - 1); i++)
  382         if(pthread_create(&threads[i], &ptattr, test_queue_threaded_enqueue,
  383                 &thread_data[i])) TEST_ERROR;
  384     nerrors = test_queue_threaded_dequeue(&thread_data[i]);
  385 
  386     /* Free the attribute */
  387     if(pthread_attr_destroy(&ptattr)) TEST_ERROR;
  388 
  389     /* Join the threads */
  390     for (i=0; i<(nthreads - 1); i++) {
  391         if(pthread_join(threads[i], &ret)) TEST_ERROR;
  392         if(ret)
  393             nerrors++;
  394     } /* end for */
  395 
  396     /* Check for errors */
  397     if(nerrors)
  398         FAIL_OP_ERROR(printf("    Unexpected return from %d thread%s\n", nerrors,
  399                 nerrors == 1 ? "" : "s"));
  400 
  401     /* Free memory */
  402     free(threads);
  403     free(thread_data);
  404 
  405     PASSED();
  406 
  407 #else /* OPA_HAVE_PTHREAD_H */
  408     TESTING("multithreaded queue", 0);
  409     SKIPPED();
  410     puts("    pthread.h not available");
  411 #endif /* OPA_HAVE_PTHREAD_H */
  412 
  413     return 0;
  414 
  415 #if defined(OPA_HAVE_PTHREAD_H)
  416 error:
  417     if(threads) free(threads);
  418     if(thread_data) free(thread_data);
  419     return 1;
  420 #endif /* OPA_HAVE_PTHREAD_H */
  421 } /* end test_barriers_linear_array() */
  422 
  423 
  424 /*-------------------------------------------------------------------------
  425  * Function:    main
  426  *
  427  * Purpose:     Tests the opa queue
  428  *
  429  * Return:      Success:        exit(0)
  430  *
  431  *              Failure:        exit(1)
  432  *
  433  * Programmer:  Neil Fortner
  434  *              Tuesday, June 8, 2010
  435  *
  436  * Modifications:
  437  *
  438  *-------------------------------------------------------------------------
  439  */
  440 int main(int argc, char **argv)
  441 {
  442     threaded_mode_t mode;       /* Mode for threaded test */
  443     unsigned nerrors = 0;
  444 #if defined(OPA_USE_LOCK_BASED_PRIMITIVES)
  445     OPA_emulation_ipl_t shm_lock;
  446     OPA_Interprocess_lock_init(&shm_lock, 1/*isLeader*/);
  447 #endif
  448 
  449     /* Initialize shared memory.  Because we are just running threads in one
  450      * process, the memory will always be symmetric.  Just set the base address
  451      * to 0. */
  452     if(OPA_Shm_asymm_init((char *)0) != 0) {
  453         printf("Unable to initialize shared memory\n");
  454         goto error;
  455     } /* end if */
  456 
  457     /* Simple tests */
  458     nerrors += test_queue_sanity();
  459 
  460     /* Loop over test configurations */
  461     for(curr_test=0; curr_test<num_thread_tests; curr_test++) {
  462         /* Don't test with only 1 thread */
  463         if(num_threads[curr_test] == 1)
  464             continue;
  465 
  466         /* Threaded tests */
  467         for(mode = THREADED_MODE_DEFAULT; mode < THREADED_MODE_NMODES; mode++)
  468             nerrors += test_queue_threaded(mode);
  469     } /* end for */
  470 
  471     if(nerrors)
  472         goto error;
  473     printf("All queue tests passed.\n");
  474 
  475     return 0;
  476 
  477 error:
  478     if(!nerrors)
  479         nerrors = 1;
  480     printf("***** %d QUEUE TEST%s FAILED! *****\n",
  481             nerrors, 1 == nerrors ? "" : "S");
  482     return 1;
  483 } /* end main() */
  484