"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "mod_http2/h2_workers.c" between
mod_http2-1.15.16.tar.gz and mod_http2-1.15.17.tar.gz

About: mod_h[ttp]2 is an Apache httpd module implementing the HTTP2 protocol. It uses nghttp2 as base engine and connects it with the Apache infrastructure. Hint: Module (previous name "mod_h2") now integrated into Apache httpd (since v2.4.17). Experimental release.

h2_workers.c  (mod_http2-1.15.16):h2_workers.c  (mod_http2-1.15.17)
skipping to change at line 37 skipping to change at line 37
#include "h2.h" #include "h2.h"
#include "h2_private.h" #include "h2_private.h"
#include "h2_mplx.h" #include "h2_mplx.h"
#include "h2_task.h" #include "h2_task.h"
#include "h2_workers.h" #include "h2_workers.h"
#include "h2_util.h" #include "h2_util.h"
typedef struct h2_slot h2_slot; typedef struct h2_slot h2_slot;
struct h2_slot { struct h2_slot {
int id; int id;
int sticks;
h2_slot *next; h2_slot *next;
h2_workers *workers; h2_workers *workers;
int aborted;
int sticks;
h2_task *task; h2_task *task;
apr_thread_t *thread; apr_thread_t *thread;
apr_thread_mutex_t *lock; apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle; apr_thread_cond_t *not_idle;
}; };
static h2_slot *pop_slot(h2_slot **phead) static h2_slot *pop_slot(h2_slot *volatile *phead)
{ {
/* Atomically pop a slot from the list */ /* Atomically pop a slot from the list */
for (;;) { for (;;) {
h2_slot *first = *phead; h2_slot *first = *phead;
if (first == NULL) { if (first == NULL) {
return NULL; return NULL;
} }
if (apr_atomic_casptr((void*)phead, first->next, first) == first) { if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
first->next = NULL; first->next = NULL;
return first; return first;
} }
} }
} }
static void push_slot(h2_slot **phead, h2_slot *slot) static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
{ {
/* Atomically push a slot to the list */ /* Atomically push a slot to the list */
ap_assert(!slot->next); ap_assert(!slot->next);
for (;;) { for (;;) {
h2_slot *next = slot->next = *phead; h2_slot *next = slot->next = *phead;
if (apr_atomic_casptr((void*)phead, slot, next) == next) { if (apr_atomic_casptr((void*)phead, slot, next) == next) {
return; return;
} }
} }
} }
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
{ {
apr_status_t status; apr_status_t status;
slot->workers = workers; slot->workers = workers;
slot->aborted = 0;
slot->task = NULL; slot->task = NULL;
if (!slot->lock) { if (!slot->lock) {
status = apr_thread_mutex_create(&slot->lock, status = apr_thread_mutex_create(&slot->lock,
APR_THREAD_MUTEX_DEFAULT, APR_THREAD_MUTEX_DEFAULT,
workers->pool); workers->pool);
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
push_slot(&workers->free, slot); push_slot(&workers->free, slot);
return status; return status;
} }
skipping to change at line 104 skipping to change at line 102
if (!slot->not_idle) { if (!slot->not_idle) {
status = apr_thread_cond_create(&slot->not_idle, workers->pool); status = apr_thread_cond_create(&slot->not_idle, workers->pool);
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
push_slot(&workers->free, slot); push_slot(&workers->free, slot);
return status; return status;
} }
} }
ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
"h2_workers: new thread for slot %d", slot->id); "h2_workers: new thread for slot %d", slot->id);
/* thread will either immediately start work or add itself /* thread will either immediately start work or add itself
* to the idle queue */ * to the idle queue */
apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, apr_atomic_inc32(&workers->worker_count);
workers->pool); status = apr_thread_create(&slot->thread, workers->thread_attr,
if (!slot->thread) { slot_run, slot, workers->pool);
if (status != APR_SUCCESS) {
apr_atomic_dec32(&workers->worker_count);
push_slot(&workers->free, slot); push_slot(&workers->free, slot);
return APR_ENOMEM; return status;
} }
apr_atomic_inc32(&workers->worker_count);
return APR_SUCCESS; return APR_SUCCESS;
} }
static apr_status_t add_worker(h2_workers *workers) static apr_status_t add_worker(h2_workers *workers)
{ {
h2_slot *slot = pop_slot(&workers->free); h2_slot *slot = pop_slot(&workers->free);
if (slot) { if (slot) {
return activate_slot(workers, slot); return activate_slot(workers, slot);
} }
return APR_EAGAIN; return APR_EAGAIN;
skipping to change at line 139 skipping to change at line 139
if (slot) { if (slot) {
apr_thread_mutex_lock(slot->lock); apr_thread_mutex_lock(slot->lock);
apr_thread_cond_signal(slot->not_idle); apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(slot->lock); apr_thread_mutex_unlock(slot->lock);
} }
else if (workers->dynamic) { else if (workers->dynamic) {
add_worker(workers); add_worker(workers);
} }
} }
static void cleanup_zombies(h2_workers *workers) static void join_zombies(h2_workers *workers)
{ {
h2_slot *slot; h2_slot *slot;
while ((slot = pop_slot(&workers->zombies))) { while ((slot = pop_slot(&workers->zombies))) {
if (slot->thread) { apr_status_t status;
apr_status_t status; ap_assert(slot->thread != NULL);
apr_thread_join(&status, slot->thread); apr_thread_join(&status, slot->thread);
slot->thread = NULL; slot->thread = NULL;
}
apr_atomic_dec32(&workers->worker_count);
slot->next = NULL;
push_slot(&workers->free, slot); push_slot(&workers->free, slot);
} }
} }
static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m) static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
{ {
apr_status_t rv; apr_status_t rv;
rv = h2_mplx_s_pop_task(m, &slot->task); rv = h2_mplx_s_pop_task(m, &slot->task);
if (slot->task) { if (slot->task) {
skipping to change at line 187 skipping to change at line 185
wake_idle_worker(slot->workers); wake_idle_worker(slot->workers);
return H2_FIFO_OP_REPUSH; return H2_FIFO_OP_REPUSH;
} }
return H2_FIFO_OP_PULL; return H2_FIFO_OP_PULL;
} }
/** /**
* Get the next task for the given worker. Will block until a task arrives * Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist. * or the max_wait timer expires and more than min workers exist.
*/ */
static apr_status_t get_next(h2_slot *slot) static int get_next(h2_slot *slot)
{ {
h2_workers *workers = slot->workers; h2_workers *workers = slot->workers;
apr_status_t status;
slot->task = NULL; while (!workers->aborted) {
while (!slot->aborted) { ap_assert(slot->task == NULL);
if (!slot->task) { if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot); /* The queue is terminated with the MPM child being cleaned up,
if (status == APR_EOF) { * just leave.
return status; */
} break;
} }
if (slot->task) { if (slot->task) {
return APR_SUCCESS; return 1;
} }
cleanup_zombies(workers); join_zombies(workers);
apr_thread_mutex_lock(slot->lock); apr_thread_mutex_lock(slot->lock);
push_slot(&workers->idle, slot); if (!workers->aborted) {
apr_thread_cond_wait(slot->not_idle, slot->lock); push_slot(&workers->idle, slot);
apr_thread_cond_wait(slot->not_idle, slot->lock);
}
apr_thread_mutex_unlock(slot->lock); apr_thread_mutex_unlock(slot->lock);
} }
return APR_EOF;
return 0;
} }
static void slot_done(h2_slot *slot) static void slot_done(h2_slot *slot)
{ {
push_slot(&(slot->workers->zombies), slot); h2_workers *workers = slot->workers;
push_slot(&workers->zombies, slot);
/* If this worker is the last one exiting and the MPM child is stopping,
* unblock workers_pool_cleanup().
*/
if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
apr_thread_mutex_lock(workers->lock);
apr_thread_cond_signal(workers->all_done);
apr_thread_mutex_unlock(workers->lock);
}
} }
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{ {
h2_slot *slot = wctx; h2_slot *slot = wctx;
while (!slot->aborted) { /* Get the h2_task(s) from the ->mplxs queue. */
while (get_next(slot)) {
/* Get a h2_task from the mplxs queue. */ ap_assert(slot->task != NULL);
get_next(slot); do {
while (slot->task) {
h2_task_do(slot->task, thread, slot->id); h2_task_do(slot->task, thread, slot->id);
/* Report the task as done. If stickyness is left, offer the /* Report the task as done. If stickyness is left, offer the
* mplx the opportunity to give us back a new task right away. * mplx the opportunity to give us back a new task right away.
*/ */
if (!slot->aborted && (--slot->sticks > 0)) { if (!slot->workers->aborted && --slot->sticks > 0) {
h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task); h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task);
} }
else { else {
h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL); h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL);
slot->task = NULL; slot->task = NULL;
} }
} } while (slot->task);
} }
slot_done(slot); slot_done(slot);
apr_thread_exit(thread, APR_SUCCESS);
return NULL; return NULL;
} }
static apr_status_t workers_pool_cleanup(void *data) static apr_status_t workers_pool_cleanup(void *data)
{ {
h2_workers *workers = data; h2_workers *workers = data;
h2_slot *slot; h2_slot *slot;
if (!workers->aborted) { workers->aborted = 1;
workers->aborted = 1; h2_fifo_term(workers->mplxs);
/* abort all idle slots */
for (;;) {
slot = pop_slot(&workers->idle);
if (slot) {
apr_thread_mutex_lock(slot->lock);
slot->aborted = 1;
apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(slot->lock);
}
else {
break;
}
}
h2_fifo_term(workers->mplxs); /* abort all idle slots */
while ((slot = pop_slot(&workers->idle))) {
apr_thread_mutex_lock(slot->lock);
apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(slot->lock);
}
cleanup_zombies(workers); /* wait for all the workers to become zombies and join them */
apr_thread_mutex_lock(workers->lock);
if (apr_atomic_read32(&workers->worker_count)) {
apr_thread_cond_wait(workers->all_done, workers->lock);
} }
apr_thread_mutex_unlock(workers->lock);
join_zombies(workers);
return APR_SUCCESS; return APR_SUCCESS;
} }
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
int min_workers, int max_workers, int min_workers, int max_workers,
int idle_secs) int idle_secs)
{ {
apr_status_t status; apr_status_t status;
h2_workers *workers; h2_workers *workers;
apr_pool_t *pool; apr_pool_t *pool;
int i, n; int i, n;
ap_assert(s); ap_assert(s);
ap_assert(server_pool); ap_assert(pchild);
/* let's have our own pool that will be parent to all h2_worker /* let's have our own pool that will be parent to all h2_worker
* instances we create. This happens in various threads, but always * instances we create. This happens in various threads, but always
* guarded by our lock. Without this pool, all subpool creations would * guarded by our lock. Without this pool, all subpool creations would
* happen on the pool handed to us, which we do not guard. * happen on the pool handed to us, which we do not guard.
*/ */
apr_pool_create(&pool, server_pool); apr_pool_create(&pool, pchild);
apr_pool_tag(pool, "h2_workers"); apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers)); workers = apr_pcalloc(pool, sizeof(h2_workers));
if (!workers) { if (!workers) {
return NULL; return NULL;
} }
workers->s = s; workers->s = s;
workers->pool = pool; workers->pool = pool;
workers->min_workers = min_workers; workers->min_workers = min_workers;
workers->max_workers = max_workers; workers->max_workers = max_workers;
skipping to change at line 340 skipping to change at line 348
ap_thread_stacksize); ap_thread_stacksize);
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
"h2_workers: using stacksize=%ld", "h2_workers: using stacksize=%ld",
(long)ap_thread_stacksize); (long)ap_thread_stacksize);
} }
status = apr_thread_mutex_create(&workers->lock, status = apr_thread_mutex_create(&workers->lock,
APR_THREAD_MUTEX_DEFAULT, APR_THREAD_MUTEX_DEFAULT,
workers->pool); workers->pool);
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
status = apr_thread_cond_create(&workers->all_done, workers->pool);
}
if (status == APR_SUCCESS) {
n = workers->nslots = workers->max_workers; n = workers->nslots = workers->max_workers;
workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot)); workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
if (workers->slots == NULL) { if (workers->slots == NULL) {
workers->nslots = 0; n = workers->nslots = 0;
status = APR_ENOMEM; status = APR_ENOMEM;
} }
for (i = 0; i < n; ++i) { for (i = 0; i < n; ++i) {
workers->slots[i].id = i; workers->slots[i].id = i;
} }
} }
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
/* we activate all for now, TODO: support min_workers again. /* we activate all for now, TODO: support min_workers again.
* do this in reverse for vanity reasons so slot 0 will most * do this in reverse for vanity reasons so slot 0 will most
* likely be at head of idle queue. */ * likely be at head of idle queue. */
skipping to change at line 365 skipping to change at line 376
for (i = n-1; i >= 0; --i) { for (i = n-1; i >= 0; --i) {
status = activate_slot(workers, &workers->slots[i]); status = activate_slot(workers, &workers->slots[i]);
} }
/* the rest of the slots go on the free list */ /* the rest of the slots go on the free list */
for(i = n; i < workers->nslots; ++i) { for(i = n; i < workers->nslots; ++i) {
push_slot(&workers->free, &workers->slots[i]); push_slot(&workers->free, &workers->slots[i]);
} }
workers->dynamic = (workers->worker_count < workers->max_workers); workers->dynamic = (workers->worker_count < workers->max_workers);
} }
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup); /* Stop/join the workers threads when the MPM child exits (pchild is
* destroyed), and as a pre_cleanup of pchild thus before the threads
* pools (children of workers->pool) so that they are not destroyed
* before/under us.
*/
apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup);
return workers; return workers;
} }
return NULL; return NULL;
} }
apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
{ {
apr_status_t status = h2_fifo_push(workers->mplxs, m); apr_status_t status = h2_fifo_push(workers->mplxs, m);
wake_idle_worker(workers); wake_idle_worker(workers);
return status; return status;
 End of changes. 34 change blocks. 
64 lines changed or deleted 80 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)