"Fossies" - the Fresh Open Source Software Archive

Member "xxHash-0.8.0/tests/collisions/pool.c" (27 Jul 2020, 10475 Bytes) of package /linux/misc/xxHash-0.8.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "pool.c": 0.7.3_vs_0.7.4.

    1 /*
    2  * Copyright (C) 2016-2020 Yann Collet, Facebook, Inc.
    3  * All rights reserved.
    4  *
    5  * This source code is licensed under both the BSD-style license (found in the
    6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
    7  * in the COPYING file in the root directory of this source tree).
    8  * You may select, at your option, one of the above-listed licenses.
    9  */
   10 
   11 
   12 /* ======   Dependencies   ======= */
   13 #include <stddef.h>    /* size_t */
   14 #include <stdlib.h>    /* malloc, calloc, free */
   15 #include <string.h>    /* memcpy */
   16 #include <assert.h>
   17 
   18 #include "pool.h"
   19 
   20 
   21 /* ======   Compiler specifics   ====== */
   22 #if defined(_MSC_VER)
   23 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
   24 #endif
   25 
   26 
   27 /* ===  Build Macro  === */
   28 
   29 #ifndef POOL_MT   // can be defined on command line
   30 #  define POOL_MT 1
   31 #endif
   32 
   33 
   34 /* ===  Implementation  === */
   35 
   36 #if POOL_MT
   37 
   38 #include "threading.h"   /* pthread adaptation */
   39 
   40 /* A job is a function and an opaque argument */
   41 typedef struct POOL_job_s {
   42     POOL_function function;
   43     void *opaque;
   44 } POOL_job;
   45 
   46 struct POOL_ctx_s {
   47     /* Keep track of the threads */
   48     ZSTD_pthread_t* threads;
   49     size_t threadCapacity;
   50     size_t threadLimit;
   51 
   52     /* The queue is a circular buffer */
   53     POOL_job *queue;
   54     size_t queueHead;
   55     size_t queueTail;
   56     size_t queueSize;
   57 
   58     /* The number of threads working on jobs */
   59     size_t numThreadsBusy;
   60     /* Indicates if the queue is empty */
   61     int queueEmpty;
   62 
   63     /* The mutex protects the queue */
   64     ZSTD_pthread_mutex_t queueMutex;
   65     /* Condition variable for pushers to wait on when the queue is full */
   66     ZSTD_pthread_cond_t queuePushCond;
   67     /* Condition variables for poppers to wait on when the queue is empty */
   68     ZSTD_pthread_cond_t queuePopCond;
   69     /* Indicates if the queue is shutting down */
   70     int shutdown;
   71 };
   72 
   73 /* POOL_thread() :
   74  * Work thread for the thread pool.
   75  * Waits for jobs and executes them.
   76  * @returns : NULL on failure else non-null.
   77  */
   78 static void* POOL_thread(void* opaque)
   79 {
   80     POOL_ctx* const ctx = (POOL_ctx*)opaque;
   81     if (!ctx) { return NULL; }
   82     for (;;) {
   83         /* Lock the mutex and wait for a non-empty queue or until shutdown */
   84         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
   85 
   86         while ( ctx->queueEmpty
   87             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
   88             if (ctx->shutdown) {
   89                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
   90                  * a few threads will be shutdown while !queueEmpty,
   91                  * but enough threads will remain active to finish the queue */
   92                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
   93                 return opaque;
   94             }
   95             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
   96         }
   97         /* Pop a job off the queue */
   98         {   POOL_job const job = ctx->queue[ctx->queueHead];
   99             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
  100             ctx->numThreadsBusy++;
  101             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
  102             /* Unlock the mutex, signal a pusher, and run the job */
  103             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
  104             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  105 
  106             job.function(job.opaque);
  107 
  108             /* If the intended queue size was 0, signal after finishing job */
  109             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  110             ctx->numThreadsBusy--;
  111             if (ctx->queueSize == 1) {
  112                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
  113             }
  114             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  115         }
  116     }  /* for (;;) */
  117     assert(0);  /* Unreachable */
  118 }
  119 
  120 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize)
  121 {
  122     POOL_ctx* ctx;
  123     /* Check parameters */
  124     if (!numThreads) { return NULL; }
  125     /* Allocate the context and zero initialize */
  126     ctx = (POOL_ctx*)calloc(1, sizeof(POOL_ctx));
  127     if (!ctx) { return NULL; }
  128     /* Initialize the job queue.
  129      * It needs one extra space since one space is wasted to differentiate
  130      * empty and full queues.
  131      */
  132     ctx->queueSize = queueSize + 1;
  133     ctx->queue = (POOL_job*)malloc(ctx->queueSize * sizeof(POOL_job));
  134     ctx->queueHead = 0;
  135     ctx->queueTail = 0;
  136     ctx->numThreadsBusy = 0;
  137     ctx->queueEmpty = 1;
  138     (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
  139     (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
  140     (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
  141     ctx->shutdown = 0;
  142     /* Allocate space for the thread handles */
  143     ctx->threads = (ZSTD_pthread_t*)malloc(numThreads * sizeof(ZSTD_pthread_t));
  144     ctx->threadCapacity = 0;
  145     /* Check for errors */
  146     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
  147     /* Initialize the threads */
  148     {   size_t i;
  149         for (i = 0; i < numThreads; ++i) {
  150             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
  151                 ctx->threadCapacity = i;
  152                 POOL_free(ctx);
  153                 return NULL;
  154         }   }
  155         ctx->threadCapacity = numThreads;
  156         ctx->threadLimit = numThreads;
  157     }
  158     return ctx;
  159 }
  160 
  161 /*! POOL_join() :
  162     Shutdown the queue, wake any sleeping threads, and join all of the threads.
  163 */
  164 static void POOL_join(POOL_ctx* ctx) {
  165     /* Shut down the queue */
  166     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  167     ctx->shutdown = 1;
  168     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  169 
  170     /* Wake up sleeping threads */
  171     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
  172     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
  173 
  174     /* Join all of the threads */
  175     {   size_t i;
  176         for (i = 0; i < ctx->threadCapacity; ++i) {
  177             ZSTD_pthread_join(ctx->threads[i], NULL);  /* note : could fail */
  178     }   }
  179 }
  180 
  181 void POOL_free(POOL_ctx *ctx) {
  182     if (!ctx) { return; }
  183     POOL_join(ctx);
  184     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
  185     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
  186     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
  187     free(ctx->queue);
  188     free(ctx->threads);
  189     free(ctx);
  190 }
  191 
  192 
  193 
  194 size_t POOL_sizeof(POOL_ctx *ctx) {
  195     if (ctx==NULL) return 0;  /* supports sizeof NULL */
  196     return sizeof(*ctx)
  197         + ctx->queueSize * sizeof(POOL_job)
  198         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
  199 }
  200 
  201 
  202 /* @return : 0 on success, 1 on error */
  203 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
  204 {
  205     if (numThreads <= ctx->threadCapacity) {
  206         if (!numThreads) return 1;
  207         ctx->threadLimit = numThreads;
  208         return 0;
  209     }
  210     /* numThreads > threadCapacity */
  211     {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)malloc(numThreads * sizeof(ZSTD_pthread_t));
  212         if (!threadPool) return 1;
  213         /* replace existing thread pool */
  214         memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
  215         free(ctx->threads);
  216         ctx->threads = threadPool;
  217         /* Initialize additional threads */
  218         {   size_t threadId;
  219             for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
  220                 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
  221                     ctx->threadCapacity = threadId;
  222                     return 1;
  223             }   }
  224     }   }
  225     /* successfully expanded */
  226     ctx->threadCapacity = numThreads;
  227     ctx->threadLimit = numThreads;
  228     return 0;
  229 }
  230 
  231 /* @return : 0 on success, 1 on error */
  232 int POOL_resize(POOL_ctx* ctx, size_t numThreads)
  233 {
  234     int result;
  235     if (ctx==NULL) return 1;
  236     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  237     result = POOL_resize_internal(ctx, numThreads);
  238     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
  239     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  240     return result;
  241 }
  242 
  243 /**
  244  * Returns 1 if the queue is full and 0 otherwise.
  245  *
  246  * When queueSize is 1 (pool was created with an intended queueSize of 0),
  247  * then a queue is empty if there is a thread free _and_ no job is waiting.
  248  */
  249 static int isQueueFull(POOL_ctx const* ctx) {
  250     if (ctx->queueSize > 1) {
  251         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
  252     } else {
  253         return (ctx->numThreadsBusy == ctx->threadLimit) ||
  254                !ctx->queueEmpty;
  255     }
  256 }
  257 
  258 
  259 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
  260 {
  261     POOL_job const job = {function, opaque};
  262     assert(ctx != NULL);
  263     if (ctx->shutdown) return;
  264 
  265     ctx->queueEmpty = 0;
  266     ctx->queue[ctx->queueTail] = job;
  267     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
  268     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
  269 }
  270 
  271 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
  272 {
  273     assert(ctx != NULL);
  274     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  275     /* Wait until there is space in the queue for the new job */
  276     while (isQueueFull(ctx) && (!ctx->shutdown)) {
  277         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
  278     }
  279     POOL_add_internal(ctx, function, opaque);
  280     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  281 }
  282 
  283 
  284 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
  285 {
  286     assert(ctx != NULL);
  287     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  288     if (isQueueFull(ctx)) {
  289         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  290         return 0;
  291     }
  292     POOL_add_internal(ctx, function, opaque);
  293     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  294     return 1;
  295 }
  296 
  297 
  298 #else  /* POOL_MT  not defined */
  299 
  300 /* ========================== */
  301 /* No multi-threading support */
  302 /* ========================== */
  303 
  304 
  305 /* We don't need any data, but if it is empty, malloc() might return NULL. */
  306 struct POOL_ctx_s {
  307     int dummy;
  308 };
  309 static POOL_ctx g_ctx;
  310 
  311 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
  312     (void)numThreads;
  313     (void)queueSize;
  314     return &g_ctx;
  315 }
  316 
  317 void POOL_free(POOL_ctx* ctx) {
  318     assert(!ctx || ctx == &g_ctx);
  319     (void)ctx;
  320 }
  321 
  322 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
  323     (void)ctx; (void)numThreads;
  324     return 0;
  325 }
  326 
  327 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
  328     (void)ctx;
  329     function(opaque);
  330 }
  331 
  332 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
  333     (void)ctx;
  334     function(opaque);
  335     return 1;
  336 }
  337 
  338 size_t POOL_sizeof(POOL_ctx* ctx) {
  339     if (ctx==NULL) return 0;  /* supports sizeof NULL */
  340     assert(ctx == &g_ctx);
  341     return sizeof(*ctx);
  342 }
  343 
  344 #endif  /* ZSTD_MULTITHREAD */