"Fossies" - the Fresh Open Source Software Archive

Member "ngpt-2.2.1/pth_sync.c" (19 Mar 2003, 36021 Bytes) of package /linux/privat/old/ngpt-2.2.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "pth_sync.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2 **  NGPT - Next Generation POSIX Threading
    3 **  Copyright (c) 2001 IBM Corporation <babt@us.ibm.com>
    4 **  Portions Copyright (c) 1999-2000 Ralf S. Engelschall <rse@engelschall.com>
    5 **
    6 **  This file is part of NGPT, a non-preemptive thread scheduling
    7 **  library which can be found at http://www.ibm.com/developer.
    8 **
    9 **  This library is free software; you can redistribute it and/or
   10 **  modify it under the terms of the GNU Lesser General Public
   11 **  License as published by the Free Software Foundation; either
   12 **  version 2.1 of the License, or (at your option) any later version.
   13 **
   14 **  This library is distributed in the hope that it will be useful,
   15 **  but WITHOUT ANY WARRANTY; without even the implied warranty of
   16 **  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   17 **  Lesser General Public License for more details.
   18 **
   19 **  You should have received a copy of the GNU Lesser General Public
   20 **  License along with this library; if not, write to the Free Software
   21 **  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
   22 **  USA.
   23 **
   24 **  pth_sync.c: Pth synchronization facilities
   25 */
   26                              /* ``It is hard to fly with
   27                                   the eagles when you work
   28                                   with the turkeys.''
   29                                           -- Unknown  */
   30 
   31 /************************************************************************************
   32 **   WARNING   WARNING   WARNING   WARNING   WARNING   WARNING   WARNING   WARNING          
   33 ************************************************************************************
   34 **
   35 **   DO NOT PUT PTH_DEBUG or PRINTF STATEMENTS IN THIS FILE.
   36 **
   37 **  The debug/print routines and their variations use the pthread_mutex routines
   38 **  which subsequently find their way into the routines in this source file.  This
   39 **  can and will cause recursive calling and eventual stack overflow.
   40 **
   41 ************************************************************************************
   42 **   WARNING   WARNING   WARNING   WARNING   WARNING   WARNING   WARNING   WARNING          
   43 ************************************************************************************/
   44 #ifdef PTH_DEBUG             /* Protection if warning is not heeded... */
   45 #undef PTH_DEBUG
   46 #endif
   47 
   48 #include "pth_p.h"
   49 #include "allocation.h"
   50 #include "schedule.h"
   51 #include <sys/stat.h>
   52 #include <errno.h>
   53 
   54   /* Debug switches [0/1 disable/enable] */
   55 
   56 #define SYNC_DEBUG_SHARED_INIT 1
   57 
   58 
   59 #if cpp
   60 #define PTH_SHARED_FILENAME     "/ngpt"
   61 #ifndef PTH_DEFAULT_MAX_SHARED_OBJECTS
   62 #define PTH_DEFAULT_MAX_SHARED_OBJECTS      256
   63 #endif
   64 #ifndef PTH_DEFAULT_SHARED_ADDRESS
   65 #define PTH_DEFAULT_SHARED_ADDRESS  0
   66 #endif
   67 #define PTH_SHARED_TYPE_MUTEX       0
   68 #define PTH_SHARED_TYPE_COND        1
   69 #define PTH_VERSION_CURRENT
   70 typedef struct pth_shared_sync_st   pth_shared_sync_t;
   71 struct pth_shared_sync_st {
   72     int     used;
   73     union {
   74     pth_mutex_t mx;
   75     pth_cond_t  cn;
   76     } u;
   77 };
   78 typedef struct pth_shared_area_st   pth_shared_area_t;
   79 struct pth_shared_area_st {
   80     int         verid;
   81     size_t      addr;
   82     size_t      num_objs;
   83     pth_qlock_t     lock;
   84     pth_shared_sync_t   o[0];
   85 };
   86 
   87 #define COMPUTE_PTH_SHARED_SIZE(nobj)  ((size_t)(sizeof(pth_shared_area_t) + (nobj) * sizeof(pth_shared_sync_t)))
   88 
   89 #endif
   90 
   91 intern int          pth_shared_fd = 0;
   92 intern pth_shared_area_t    *pth_shared_area;
   93 intern size_t           PTH_SHARED_SIZE;
   94 intern size_t           PTH_MAX_SHARED_OBJECTS;
   95 
   96 /*
   97 **  Mutual Exclusion Locks
   98 */
   99 
  100 intern int pth_initialize_shared(void)
  101 {
  102     struct pth_shared_area_st file_shared_area;
  103     unsigned long pth_shared_address;
  104 
  105     if (pth_shared_fd == 0) {
  106     /* Open the shared area file that we'll mmap */
  107     pth_shared_fd = shared_area_file_open(O_RDWR, (S_IRWXU | S_IRWXG | S_IRWXO));
  108     if (pth_shared_fd == -1) {
  109             __fdebugmsg (SYNC_DEBUG_SHARED_INIT,
  110                          "%s(): cannot open shared area: %s\n",
  111                          __FUNCTION__, strerror (errno));
  112             return FALSE;
  113         }
  114 
  115     /* read in the shared file so that we can get init params */
  116     if (pth_sc(read)(pth_shared_fd, &file_shared_area,
  117                       sizeof(file_shared_area)) == -1) {
  118             __fdebugmsg (SYNC_DEBUG_SHARED_INIT,
  119                          "%s(): cannot read shared area file: %s\n",
  120                          __FUNCTION__, strerror (errno));
  121             return FALSE;
  122         }
  123 
  124     if (file_shared_area.verid != PTH_INTERNAL_VERSION) {
  125             __fdebugmsg (SYNC_DEBUG_SHARED_INIT,
  126                          "%s(): shared area file version wrong\n", __FUNCTION__);
  127             return FALSE;
  128         }
  129 
  130     /* access the shared area address and number of lock objects */
  131     pth_shared_address = file_shared_area.addr;
  132     PTH_MAX_SHARED_OBJECTS = file_shared_area.num_objs;
  133     PTH_SHARED_SIZE = COMPUTE_PTH_SHARED_SIZE(PTH_MAX_SHARED_OBJECTS);
  134 
  135     /* Do the mmap of the shared area */
  136     pth_shared_area = (pth_shared_area_t *)mmap((void *)pth_shared_address,
  137                             PTH_SHARED_SIZE,
  138                             PROT_READ | PROT_WRITE | PROT_EXEC, MAP_SHARED, 
  139                             pth_shared_fd, 0);
  140     if (pth_shared_area == MAP_FAILED) {
  141             __fdebugmsg (SYNC_DEBUG_SHARED_INIT,
  142                          "%s(): cannot map shared area: %s\n",
  143                          __FUNCTION__, strerror (errno));
  144         return FALSE;
  145         }
  146         futex_region(pth_shared_area, PTH_SHARED_SIZE);
  147     pth_lock_init(pth_shared_area->lock);
  148     }
  149     return TRUE;
  150 }
  151 
  152 intern int pth_find_shared_mutex(pth_mutex_t *mutex)
  153 {
  154     int indx;
  155     int retval;
  156 
  157     pth_acquire_lock(&(pth_shared_area->lock));
  158     /* search the in-use objects in the pth_shared_area for mutex addr */
  159     for (indx = 0, retval = FALSE; indx < PTH_MAX_SHARED_OBJECTS; ++indx) {
  160     /* if not in use, skip to next */
  161     if (pth_shared_area->o[indx].used == FALSE)
  162         continue;
  163     /* if in use and equal to value passed we found it, break out */
  164     if (mutex == &pth_shared_area->o[indx].u.mx) {
  165         retval = TRUE;
  166         break;
  167     }
  168     }
  169     /* release shared area lock and return with search status */
  170     pth_release_lock(&(pth_shared_area->lock));
  171     return retval;
  172 }
  173 
  174 intern pth_mutex_t *pth_alloc_shared_mutex(void)
  175 {
  176     int indx;
  177     pth_mutex_t *rmx;
  178 
  179     pth_acquire_lock(&(pth_shared_area->lock));
  180     
  181     for (indx = 0, rmx = NULL; indx < PTH_MAX_SHARED_OBJECTS; ++indx) {
  182     /* If slot is in use skip over it */
  183     if (pth_shared_area->o[indx].used == TRUE)
  184         continue;
  185     /* Found available slot, allocate it */
  186     rmx = &(pth_shared_area->o[indx].u.mx);
  187     rmx->mx_index = indx;
  188     rmx->mx_shared.type = PTH_SHARED_TYPE_MUTEX;
  189     pth_shared_area->o[indx].used = TRUE;
  190     msync(pth_shared_area, PTH_SHARED_SIZE, MS_SYNC | MS_INVALIDATE);
  191     break;
  192     }
  193 
  194     pth_release_lock(&(pth_shared_area->lock));
  195     return rmx;
  196 }
  197 
  198 intern pth_cond_t *pth_alloc_shared_cond(void)
  199 {
  200     int indx;
  201     pth_cond_t *rcn;
  202     
  203     pth_acquire_lock(&(pth_shared_area->lock));
  204 
  205     for (indx = 0, rcn = NULL; indx < PTH_MAX_SHARED_OBJECTS; ++indx) {
  206     /* If slot is in use skip over it */
  207     if (pth_shared_area->o[indx].used == TRUE)
  208         continue;
  209     /* Found available slot, allocate it */
  210     rcn = &(pth_shared_area->o[indx].u.cn);
  211     rcn->cn_index = indx;
  212     rcn->cn_shared.type = PTH_SHARED_TYPE_COND;
  213     pth_shared_area->o[indx].used = TRUE;
  214     msync(pth_shared_area, PTH_SHARED_SIZE, MS_SYNC | MS_INVALIDATE);
  215     break;
  216     }
  217 
  218     pth_release_lock(&(pth_shared_area->lock));
  219     return rcn;
  220 }
  221 
  222 int pth_mutex_init(pth_mutex_t *mutex, pth_mutexattr_t *pattr)
  223 {
  224     int pshared = (pattr != NULL) ? pattr->pshared : FALSE;
  225     if (mutex == NULL)
  226     return FALSE;
  227     rfutex_init(&mutex->mx_shared, mutex, pshared);
  228     if (pshared == FALSE)
  229     mutex->mx_index = -1;
  230     else {
  231     if ((mutex->mx_count > 0) && (mutex->mx_owner != pth_get_current()))
  232         return FALSE;
  233     else if (pattr && (pattr->robustness == PTH_MUTEX_ROBUST_NP))
  234         mutex->mx_shared.type |= PTH_MUTEX_ROBUST_NP;
  235     }
  236 
  237     mutex->mx_type = (pattr != NULL) ? pattr->type : PTH_MUTEX_NORMAL;
  238     mutex->mx_node.rn_next = NULL;
  239     mutex->mx_node.rn_prev = NULL;
  240     mutex->mx_state = PTH_MUTEX_INITIALIZED;
  241     mutex->mx_owner = NULL;
  242     mutex->mx_count = 0;
  243     mutex->mx_owner_pid = 0; 
  244     pth_qlock_init(&mutex->mx_lock);
  245     mutex->mx_waitlist.th_next = &mutex->mx_waitlist;
  246     mutex->mx_waitlist.th_prev = &mutex->mx_waitlist;
  247     mutex->mx_waitcount = 0;
  248     return TRUE;
  249 }
  250 
  251 static inline void _pth_acquire(pth_qlock_t *spinlock, pid_t tid)
  252 {
  253     spin_lock(&spinlock->spinlock, (void *) tid, NULL);
  254 }
  255  
  256 static inline void _pth_release(pth_qlock_t *spinlock, pid_t tid)
  257 {
  258     spin_unlock(&spinlock->spinlock, (void *) tid);
  259 }
  260 
  261 int pth_mutex_acquire(pth_mutex_t *mutex, int tryonly, pth_event_t ev_extra)
  262 {
  263     pth_descr_t descr = pth_get_native_descr();
  264     pth_t current;
  265     static pth_key_t ev_key = PTH_KEY_INIT;
  266     pth_event_t ev;
  267     int retcode = 0;
  268 
  269     if (mutex->mx_state & PTH_MUTEX_INTERNAL_LOCKED) {
  270     _pth_acquire_lock(&(mutex->mx_lock), 0);
  271     return 0;
  272     }
  273     if (unlikely (descr == NULL))
  274     return EINVAL;
  275 
  276     if (mutex->mx_shared.pshared == TRUE)
  277     return pth_shared_mutex_acquire(mutex, tryonly, ev_extra, descr);
  278 
  279     current = descr->current;
  280     _pth_acquire(&(mutex->mx_lock), descr->tid);
  281 
  282     /* still not locked, so simply acquire mutex? */
  283     if ((!(mutex->mx_state & PTH_MUTEX_LOCKED)) && !mutex->mx_waitcount) {
  284 
  285     /* 
  286      * Should be no contention here...
  287      *  But only try to acquire it if we don't already have it...
  288      */
  289 
  290         mutex->mx_state |= PTH_MUTEX_LOCKED;
  291         mutex->mx_owner = current;
  292         mutex->mx_owner_pid = descr->pid;
  293         mutex->mx_count = 1;
  294     if (!current->mutex_owned)
  295         current->mutex_owned = mutex;
  296     else
  297         pth_ring_append(&(current->mutexring), &(mutex->mx_node));
  298     _pth_release(&(mutex->mx_lock), descr->tid);
  299         return 0;
  300     }
  301 
  302     /* already locked by caller? */
  303     if (mutex->mx_owner == current && mutex->mx_count >= 1) {
  304     if (mutex->mx_type == PTH_MUTEX_RECURSIVE_NP) {
  305         /* recursive lock */
  306             mutex->mx_count++;
  307     } else
  308         retcode = tryonly ? EBUSY : EDEADLK;
  309     _pth_release(&(mutex->mx_lock), descr->tid);
  310         return retcode;
  311     }
  312 
  313     /* should we just tryonly?            ibm*/
  314     if (tryonly) {              /*ibm*/
  315     _pth_release(&(mutex->mx_lock), descr->tid);
  316         return EBUSY;               /*ibm*/
  317     }
  318 
  319     if (pth_number_of_natives > 1)
  320     PTH_ELEMENT_INSERT(&current->mutex_cond_wait, &mutex->mx_waitlist);
  321 
  322     if (!descr->is_bounded) {
  323 
  324     /* wait for mutex to become unlocked.. */
  325     for (;;) {
  326 
  327         _pth_release(&(mutex->mx_lock), descr->tid);
  328         /* Set up the event handling...waiting for mutex */
  329         ev = pth_event(PTH_EVENT_MUTEX|PTH_MODE_STATIC, &ev_key, mutex);
  330         if (ev_extra != NULL)
  331             pth_event_concat(ev, ev_extra, NULL);
  332 
  333         pth_wait(ev);
  334         if (ev_extra != NULL) {
  335         pth_event_isolate(ev);
  336         if (!PTH_EVENT_OCCURRED(ev))
  337             return EINTR;
  338         }
  339         descr = pth_get_native_descr();
  340         _pth_acquire(&(mutex->mx_lock), descr->tid);
  341 
  342         if (!(mutex->mx_state & PTH_MUTEX_LOCKED)) {
  343         if (pth_number_of_natives > 1)
  344             PTH_ELEMENT_DELETE(&current->mutex_cond_wait); 
  345         break;   /* for non-shared mutex, process original way */
  346         }
  347     }
  348     } else {
  349     fd_set rfds;
  350     char minibuf[64];
  351     int rc, fdmax;
  352     
  353     for (;;) {
  354         FD_ZERO(&rfds);
  355         FD_SET(descr->sigpipe[0], &rfds);
  356         fdmax = descr->sigpipe[0]; 
  357 
  358         mutex->mx_waitcount++;
  359         descr->is_bound = 0;
  360         current->state = PTH_STATE_WAITING;
  361         _pth_release(&(mutex->mx_lock), descr->tid);
  362 
  363         if (mutex->mx_state & PTH_MUTEX_LOCKED) {
  364         while ((rc = pth_sc(select)(fdmax+1, &rfds, NULL, NULL, NULL)) < 0
  365             && errno == EINTR) ;
  366         if (rc > 0 && FD_ISSET(descr->sigpipe[0], &rfds))
  367             FD_CLR(descr->sigpipe[0], &rfds);
  368         while (pth_sc(read)(descr->sigpipe[0], minibuf, sizeof(minibuf)) > 0) ;
  369         }
  370 
  371         _pth_acquire(&(mutex->mx_lock), descr->tid);
  372         mutex->mx_waitcount--;
  373         descr->is_bound = 1;
  374         current->state = PTH_STATE_READY;
  375         if (!(mutex->mx_state & PTH_MUTEX_LOCKED)) {
  376         if (pth_number_of_natives > 1)
  377             PTH_ELEMENT_DELETE(&current->mutex_cond_wait);
  378         break;   /* for non-shared mutex, process original way */
  379         }
  380     }
  381     }
  382     /* now it's again unlocked, so acquire mutex */ 
  383     mutex->mx_state |= PTH_MUTEX_LOCKED;
  384     mutex->mx_owner = current;
  385     mutex->mx_owner_pid = descr->pid;
  386     mutex->mx_count = 1;
  387     if (!current->mutex_owned)
  388     current->mutex_owned = mutex;
  389     else
  390     pth_ring_append(&(current->mutexring), &(mutex->mx_node));
  391     _pth_release(&(mutex->mx_lock), descr->tid);
  392     return retcode;
  393 }
  394 
  395 int pth_shared_mutex_acquire(pth_mutex_t *mutex, int tryonly, pth_event_t ev_extra, 
  396                 pth_descr_t descr)
  397 {
  398     pth_t current;
  399     static pth_key_t ev_key = PTH_KEY_INIT;
  400     pth_event_t ev;
  401     int need_futex = TRUE;
  402     int futx_fd = 0;
  403     int retcode = 0;
  404 
  405     current = descr->current;
  406     _pth_acquire(&(mutex->mx_lock), descr->tid);
  407 
  408     /* still not locked, so simply acquire mutex? */
  409     if ((!(mutex->mx_state & PTH_MUTEX_LOCKED)) && !mutex->mx_waitcount) {
  410 
  411     /* 
  412      * Should be no contention here...
  413      *  But only try to acquire it if we don't already have it...
  414      */
  415     /* check the return value of futex_acquire and lock the mutex
  416      * only if it return success.
  417      * no need to hold mx_lock for futex calls .... 
  418      */
  419     _pth_release(&(mutex->mx_lock), descr->tid);
  420         futx_fd = futex_acquire(&mutex->mx_shared.futex, FALSE);
  421     _pth_acquire(&(mutex->mx_lock), descr->tid);
  422     /* if new request for acquire and waiting for futex.... */
  423     if (futx_fd > 0) { 
  424         /* don't acquire the futex again, it's already waiting! */
  425         need_futex = FALSE;
  426         goto wait; 
  427     }
  428     if (futx_fd == -1) {
  429         _pth_release(&(mutex->mx_lock), descr->tid);
  430         return EINVAL;
  431     }
  432 
  433         mutex->mx_state |= PTH_MUTEX_LOCKED;
  434         mutex->mx_owner = current;
  435         mutex->mx_owner_pid = descr->pid;
  436         mutex->mx_count = 1;
  437     if (!current->mutex_owned)
  438         current->mutex_owned = mutex;
  439     else
  440         pth_ring_append(&(current->mutexring), &(mutex->mx_node));
  441     _pth_release(&(mutex->mx_lock), descr->tid);
  442         return 0;
  443     }
  444 
  445 wait:
  446     /* already locked by caller? */
  447     if (mutex->mx_owner == current && mutex->mx_count >= 1 && mutex->mx_owner_pid==descr->pid) {
  448     if (mutex->mx_type == PTH_MUTEX_RECURSIVE_NP) {
  449         /* recursive lock */
  450             mutex->mx_count++;
  451     } else
  452         retcode = tryonly ? EBUSY : EDEADLK;
  453     _pth_release(&(mutex->mx_lock), descr->tid);
  454         return retcode;
  455     }
  456 
  457     /* should we just tryonly?            ibm*/
  458     if (tryonly) {              /*ibm*/
  459     _pth_release(&(mutex->mx_lock), descr->tid);
  460         return EBUSY;               /*ibm*/
  461     }
  462 
  463     /* Check whether the current owner is dead or alive for robust mutex */
  464     if ((mutex->mx_shared.type & PTH_MUTEX_ROBUST_NP) &&
  465     (kill(mutex->mx_owner_pid, 0) == -1)) {
  466     /* if current owner is dead, acquire the mutex and return EOWNERDEAD */
  467     mutex->mx_node.rn_next = NULL;
  468     mutex->mx_node.rn_prev = NULL;
  469     mutex->mx_state |= PTH_MUTEX_NOT_CONSISTENT;
  470     retcode = EOWNERDEAD;
  471     goto done;
  472     }
  473 
  474     if (!descr->is_bounded) {
  475 
  476     /* wait for mutex to become unlocked.. */
  477     for (;;) {
  478 
  479         _pth_release(&(mutex->mx_lock), descr->tid);
  480         /* 
  481          * Before we set up event handling, set up the async wait for the futex 
  482          * if this is a shared mutex...
  483          * This may give a us the mutex, in which case, we'll acquire mutex...
  484          */
  485         if (need_futex) {
  486         futx_fd = futex_acquire(&mutex->mx_shared.futex, FALSE);
  487         if (futx_fd == 0) { /* got the futex...now acquire mutex */
  488             _pth_acquire(&(mutex->mx_lock), descr->tid);
  489             break;
  490         }
  491         if (futx_fd == -1)  /* got an error? */
  492             return EINVAL;  /* TODO: check what we should return!!! */
  493         }
  494         /* Set up the event handling...waiting for futex... */
  495         ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, futx_fd);
  496         if (ev_extra != NULL)
  497             pth_event_concat(ev, ev_extra, NULL);
  498 
  499         pth_wait(ev);
  500         if (ev_extra != NULL) {
  501         pth_event_isolate(ev);
  502         if (!PTH_EVENT_OCCURRED(ev))
  503             return EINTR;
  504         }
  505         if (futx_fd > 0)
  506         close(futx_fd);
  507         descr = pth_get_native_descr();
  508         _pth_acquire(&(mutex->mx_lock), descr->tid);
  509 
  510         /* for shared-mutex, futex acquire is needed */
  511         need_futex = TRUE;
  512         if (mutex->mx_state & PTH_MUTEX_NOT_CONSISTENT) {
  513         /* if mutex is not consistent, return error */
  514         _pth_release(&(mutex->mx_lock), descr->tid);
  515         return ENOTRECOVERABLE;
  516         }
  517     }
  518     } else {
  519     fd_set rfds;
  520     char minibuf[64];
  521     int rc, fdmax;
  522     
  523     for (;;) {
  524         /*
  525          * Before we set up event handling, set up the async wait for the futex 
  526          * if this is a shared mutex...
  527          * This may give a us the mutex, in which case, we'll acquire mutex....
  528          */
  529         if (need_futex) {
  530         _pth_release(&(mutex->mx_lock), descr->tid);
  531                 futx_fd = futex_acquire(&mutex->mx_shared.futex, FALSE);
  532         _pth_acquire(&(mutex->mx_lock), descr->tid);
  533         if (futx_fd == 0)   /* got the futex...now acquire mutex */
  534            break;
  535             if (futx_fd == -1) {    /* got an error? */
  536            _pth_release(&(mutex->mx_lock), descr->tid);
  537            return EINVAL;  /* TODO: check what we should return!!! */
  538         }
  539         }
  540         FD_ZERO(&rfds);
  541         FD_SET(futx_fd, &rfds);
  542         fdmax = futx_fd;
  543         FD_SET(descr->sigpipe[0], &rfds);
  544         if (fdmax < descr->sigpipe[0])
  545         fdmax = descr->sigpipe[0];
  546 
  547         mutex->mx_waitcount++;
  548         descr->is_bound = 0;
  549         current->state = PTH_STATE_WAITING;
  550         _pth_release(&(mutex->mx_lock), descr->tid);
  551 
  552         if (futx_fd > 0 || mutex->mx_state & PTH_MUTEX_LOCKED) {
  553         while ((rc = pth_sc(select)(fdmax+1, &rfds, NULL, NULL, NULL)) < 0
  554             && errno == EINTR) ;
  555         if (rc > 0 && FD_ISSET(descr->sigpipe[0], &rfds))
  556             FD_CLR(descr->sigpipe[0], &rfds);
  557         while (pth_sc(read)(descr->sigpipe[0], minibuf, sizeof(minibuf)) > 0) ;
  558         }
  559         if (futx_fd > 0)
  560         close(futx_fd);
  561 
  562         _pth_acquire(&(mutex->mx_lock), descr->tid);
  563         mutex->mx_waitcount--;
  564         descr->is_bound = 1;
  565         current->state = PTH_STATE_READY;
  566         /* for shared-mutex, futex acquire is needed */
  567         need_futex = TRUE;
  568         if (mutex->mx_state & PTH_MUTEX_NOT_CONSISTENT) {
  569         /* if mutex is not consistent, return error */
  570         _pth_release(&(mutex->mx_lock), descr->tid);
  571         return ENOTRECOVERABLE;
  572         }
  573     }
  574     }
  575 done:
  576     /* now it's again unlocked, so acquire mutex */ 
  577     mutex->mx_state |= PTH_MUTEX_LOCKED;
  578     mutex->mx_owner = current;
  579     mutex->mx_owner_pid = descr->pid;
  580     mutex->mx_count = 1;
  581     if (!current->mutex_owned)
  582     current->mutex_owned = mutex;
  583     else
  584     pth_ring_append(&(current->mutexring), &(mutex->mx_node));
  585     _pth_release(&(mutex->mx_lock), descr->tid);
  586     return retcode;
  587 }
  588 
  589 int pth_mutex_release(pth_mutex_t *mutex)
  590 {
  591     pth_t current, tmp;
  592     pth_descr_t descr = pth_get_native_descr();
  593     pth_list_t *mq;
  594 
  595     if (!(mutex->mx_state & PTH_MUTEX_LOCKED)) {
  596     if (mutex->mx_state & PTH_MUTEX_INTERNAL_LOCKED) {
  597         _pth_release_lock(&(mutex->mx_lock), 0);
  598         return 0;
  599     }
  600     return EPERM;
  601     }
  602     if ((current = descr->current) != mutex->mx_owner)
  603     return EPERM;
  604     
  605     _pth_acquire(&(mutex->mx_lock), descr->tid);
  606     /* decrement recursion counter and release mutex */
  607     mutex->mx_count--;
  608     if (mutex->mx_count <= 0) {
  609         mutex->mx_state &= ~(PTH_MUTEX_LOCKED);
  610         mutex->mx_owner = NULL;
  611         mutex->mx_owner_pid = 0;
  612         mutex->mx_count = 0;
  613     if (current->mutex_owned == mutex)
  614         current->mutex_owned = 0;
  615     else
  616         pth_ring_delete(&(current->mutexring), &(mutex->mx_node));
  617     if (!mutex->mx_shared.pshared) {
  618         if (pth_number_of_natives > 1) {
  619         mq = mutex->mx_waitlist.th_prev;
  620         if (mq != &mutex->mx_waitlist) {
  621             char c = (int)1;
  622             pth_descr_t ds;
  623             tmp = (pth_t)((char *)mq - (int)&((pth_t)0)->mutex_cond_wait);
  624             if ((ds = tmp->boundnative)) {
  625             _pth_release(&(mutex->mx_lock), descr->tid);
  626             pth_sc(write)(ds->sigpipe[1], &c, sizeof(char));
  627             return 0;
  628             } else {
  629             ds = tmp->waited_native;
  630             if (ds != NULL) {
  631                 /* move waiting thread from wait-queue to ready-queue */
  632                 spin_lock (&ds->wait_queue.lock, descr, NULL);
  633                 if (tmp->waited_native) { /* to avoid race with eventmgr */
  634                 __pqueue_delete_node(&tmp->node);
  635                 tmp->waited_native = NULL;
  636                 spin_unlock(&ds->wait_queue.lock, descr);
  637 
  638                 spin_lock(&pth_RQ.lock, descr, NULL);
  639                 tmp->state = PTH_STATE_READY;
  640                 __thread_eprio_recalculate (tmp);
  641                 __pqueue_append_node(&pth_RQ, &tmp->node);
  642                 spin_unlock(&pth_RQ.lock, descr);
  643                 } else
  644                 spin_unlock(&ds->wait_queue.lock, descr);
  645             }
  646             _pth_release(&(mutex->mx_lock), descr->tid);
  647             }
  648             if (descr->current->boundnative)
  649             pth_wakeup_anative();
  650             return 0;
  651         }
  652         }
  653         _pth_release(&(mutex->mx_lock), descr->tid);
  654     } else {
  655         _pth_release(&(mutex->mx_lock), descr->tid);
  656             futex_release(&mutex->mx_shared.futex);
  657     }
  658     return 0;
  659     }
  660     _pth_release(&(mutex->mx_lock), descr->tid);
  661     return 0;
  662 }
  663 
  664 intern void pth_mutex_releaseall(pth_t thread)
  665 {
  666     pth_ringnode_t *rn, *rnf;
  667     pth_mutex_t *m;
  668 
  669     if (thread == NULL)
  670         return;
  671     /* iterate over all mutexes of thread */
  672     pth_acquire_lock(&(thread->lock));
  673     if (thread->mutex_owned)
  674     pth_mutex_release(thread->mutex_owned);
  675     rn = rnf = pth_ring_first(&(thread->mutexring));
  676     while (rn != NULL) {
  677     m = (pth_mutex_t *)((char *)rn - (int)&((pth_mutex_t *)0)->mx_node);
  678         pth_mutex_release(m);
  679 
  680     /* 
  681      * make sure that if mutex was grabbed recursively,
  682      * it's released recursively...
  683      */
  684     if (m->mx_count > 0)
  685         continue;
  686 
  687         rn = pth_ring_next(&(thread->mutexring), rn);
  688         if (rn == rnf)
  689             break;
  690     }
  691     pth_release_lock(&(thread->lock));
  692     return;
  693 }
  694 
  695 
  696 int pth_mutex_destroy(pth_mutex_t *mutex)
  697 {
  698     /* 
  699      * Sanity check... 
  700      *  This only valid for shared mutexes.
  701      */
  702     if (mutex->mx_shared.pshared != TRUE)
  703     return EINVAL;
  704 
  705     /* Clean up the area in shared memory for this mutex... */
  706     pth_shared_area->o[mutex->mx_index].used = FALSE;
  707     mutex->mx_state = 0;
  708     mutex->mx_index = 0;
  709     mutex->mx_owner = 0;
  710     mutex->mx_owner_pid = 0;
  711     mutex->mx_count = 0;
  712     mutex->mx_waitcount = 0;
  713     pth_lock_init(mutex->mx_lock);
  714 
  715     /* Clean up the futex as well... */
  716     futex_destroy(&mutex->mx_shared.futex);
  717 
  718     return 0;
  719 }
  720     
  721 /*
  722 **  Read-Write Locks
  723 */
  724 
  725 int pth_rwlock_init(pth_rwlock_t *rwlock, int pshared)
  726 {
  727     rwlock->rw_state = PTH_RWLOCK_INITIALIZED;
  728     rwlock->rw_readers = 0;
  729 
  730     if (pshared == TRUE) {
  731     pth_mutexattr_t a;
  732     a.type = 0;
  733     a.priority = 0;
  734     a.protocol = 0;
  735     a.pshared = TRUE;
  736     a.robustness = 0;
  737     if ((rwlock->rw_mutex_rd = pth_alloc_shared_mutex()) == NULL)
  738         return ENOMEM;
  739     pth_mutex_init(rwlock->rw_mutex_rd, &a);
  740     if ((rwlock->rw_mutex_rw = pth_alloc_shared_mutex()) == NULL) {
  741         pth_mutex_destroy(rwlock->rw_mutex_rd);
  742         return ENOMEM;
  743     }
  744     pth_mutex_init(rwlock->rw_mutex_rw, &a);
  745     rwlock->rw_pshared = TRUE;
  746     } else {
  747     /* we allocate two pth_mutex_t at once to save a page */
  748     if ((rwlock->rw_mutex_rd = (pth_mutex_t *)pth_malloc(sizeof(pth_mutex_t)<<1)) == NULL)
  749         return ENOMEM;
  750     rwlock->rw_mutex_rw = &rwlock->rw_mutex_rd[1];
  751     pth_mutex_init(rwlock->rw_mutex_rd, NULL);
  752     pth_mutex_init(rwlock->rw_mutex_rw, NULL);
  753     rwlock->rw_pshared = FALSE;
  754     }
  755     return 0;
  756 }
  757 
  758 int pth_rwlock_destroy(pth_rwlock_t *rwlock)
  759 {
  760 
  761     if ((rwlock->rw_mutex_rd->mx_state & PTH_MUTEX_LOCKED) ||
  762     (rwlock->rw_mutex_rw->mx_state & PTH_MUTEX_LOCKED))
  763     return EBUSY;
  764 
  765     if (rwlock->rw_pshared == FALSE){
  766     pth_free_mem(rwlock->rw_mutex_rd, sizeof(pth_rwlock_t)<<1);
  767     } else {
  768     pth_mutex_destroy(rwlock->rw_mutex_rd);
  769     pth_mutex_destroy(rwlock->rw_mutex_rw);
  770     }
  771     return 0;
  772 }
  773 
  774 int pth_rwlock_acquire(pth_rwlock_t *rwlock, int op, int tryonly, pth_event_t ev_extra)
  775 {
  776     int rc;
  777     if (!(rwlock->rw_state & PTH_RWLOCK_INITIALIZED))
  778         return EINVAL;
  779 
  780     /* acquire lock */
  781     if (op == PTH_RWLOCK_RW) {
  782         /* read-write lock is simple */
  783         if ((rc = pth_mutex_acquire(rwlock->rw_mutex_rw, tryonly, ev_extra)))
  784             return rc;
  785         rwlock->rw_mode = PTH_RWLOCK_RW;
  786     }
  787     else {
  788         /* read-only lock is more complicated to get right */
  789         if ((rc = pth_mutex_acquire(rwlock->rw_mutex_rd, tryonly, ev_extra)))
  790             return rc;
  791         rwlock->rw_readers++;
  792         if (rwlock->rw_readers == 1) {
  793             if ((rc = pth_mutex_acquire(rwlock->rw_mutex_rw, tryonly, ev_extra))) {
  794                 rwlock->rw_readers--;
  795                 pth_mutex_release(rwlock->rw_mutex_rd);
  796                 return rc;
  797             }
  798         }
  799         rwlock->rw_mode = PTH_RWLOCK_RD;
  800         pth_mutex_release(rwlock->rw_mutex_rd);
  801     }
  802     return 0;
  803 }
  804 
  805 int pth_rwlock_release(pth_rwlock_t *rwlock)
  806 {
  807     int rc;
  808 
  809     if (!(rwlock->rw_state & PTH_RWLOCK_INITIALIZED))
  810         return EINVAL;
  811 
  812     /* release lock */
  813     if (rwlock->rw_mode == PTH_RWLOCK_RW) {
  814         /* read-write unlock is simple */
  815         if ((rc = pth_mutex_release(rwlock->rw_mutex_rw)))
  816             return rc;
  817     }
  818     else {
  819         /* read-only unlock is more complicated to get right */
  820         if ((rc = pth_mutex_acquire(rwlock->rw_mutex_rd, FALSE, NULL)))
  821             return rc;
  822         rwlock->rw_readers--;
  823         if (rwlock->rw_readers == 0) {
  824             if ((rc = pth_mutex_release(rwlock->rw_mutex_rw))) {
  825                 rwlock->rw_readers++;
  826                 pth_mutex_release(rwlock->rw_mutex_rd);
  827                 return rc;
  828             }
  829         }
  830         rwlock->rw_mode = PTH_RWLOCK_RD;
  831         pth_mutex_release(rwlock->rw_mutex_rd);
  832     }
  833     return 0;
  834 }
  835 
  836 /*
  837 **  Condition Variables
  838 */
  839 
  840 #define PTH_COND_BOUNDED _BIT(4)
  841 
  842 int _pth_cond_init(pth_cond_t *cond, int pshared)
  843 {
  844     rfutex_init(&cond->cn_shared, cond, pshared);
  845     if (pshared != TRUE)
  846     cond->cn_index = -1;
  847 
  848     cond->cn_state   = PTH_COND_INITIALIZED;
  849     cond->cn_waiters = 0;
  850     cond->cn_wakecnt = 0;
  851     pth_lock_init(cond->cn_lock);
  852     cond->cn_waitlist.th_next = &cond->cn_waitlist;
  853     cond->cn_waitlist.th_prev = &cond->cn_waitlist;
  854     return TRUE;
  855 }
  856 
  857 int pth_cond_destroy(pth_cond_t *cond)
  858 {
  859     /*
  860      * Sanity check...
  861      * This only valid for shared cond variables.
  862      */
  863     if (cond->cn_shared.pshared != TRUE)
  864        return EINVAL;
  865 
  866     /* Clean up the area in shared memory for this cond var... */
  867     pth_shared_area->o[cond->cn_index].used = FALSE;
  868     cond->cn_state     = 0;
  869     cond->cn_index     = 0;
  870     cond->cn_waiters   = 0;
  871     cond->cn_wakecnt = 0;
  872     pth_lock_init(cond->cn_lock);
  873 
  874     /* Clean up the futex as well... */
  875     rfutex_destroy(&cond->cn_shared);
  876 
  877     return 0;
  878 }
  879 
  880 static void pth_cond_cleanup_handler(void *_cleanvec) __attribute__ ((unused));
  881 static void pth_cond_cleanup_handler(void *_cleanvec)
  882 {
  883     pth_mutex_t *mutex = (pth_mutex_t *)(((void **)_cleanvec)[0]);
  884     pth_cond_t  *cond  = (pth_cond_t  *)(((void **)_cleanvec)[1]);
  885     pth_t current = pth_get_current();
  886 
  887     /* fix number of waiters */
  888     pth_acquire_lock(&(cond->cn_lock));
  889     cond->cn_waiters--;
  890     if (pth_number_of_natives > 1 && !cond->cn_shared.pshared)
  891     PTH_ELEMENT_DELETE(&current->mutex_cond_wait);
  892     if (cond->cn_waiters == 0 || (cond->cn_state & PTH_COND_HANDLED)) {
  893     /* clean signal */
  894     if (cond->cn_state & PTH_COND_SIGNALED) {
  895         cond->cn_state &= ~(PTH_COND_SIGNALED);
  896         cond->cn_state &= ~(PTH_COND_BROADCAST);
  897         cond->cn_state &= ~(PTH_COND_HANDLED);
  898     }
  899     }
  900     pth_release_lock(&(cond->cn_lock));
  901 
  902     /* re-acquire mutex when pth_cond_await() is cancelled
  903        in order to restore the condition variable semantics */
  904     pth_mutex_acquire(mutex, FALSE, NULL);
  905 
  906     return;
  907 }
  908 
  909 int pth_cond_await(pth_cond_t *cond, pth_mutex_t *mutex, pth_event_t ev_extra)
  910 {
  911     static pth_key_t ev_key = PTH_KEY_INIT;
  912     pth_event_t ev;
  913     pth_descr_t descr = pth_get_native_descr();
  914     pid_t tid = descr->tid;
  915     pth_t current= descr->current;
  916     int futx_fd = 0;
  917 
  918     /* consistency checks */
  919     if ((!(cond->cn_state & PTH_COND_INITIALIZED)) || (mutex->mx_owner != current))
  920         return FALSE;
  921 
  922     _pth_acquire(&(cond->cn_lock), tid);
  923 
  924     /* add us to the number of waiters */
  925     cond->cn_waiters++;
  926     if (cond->cn_shared.pshared == TRUE) {
  927         futx_fd = futex_add_waiter(&cond->cn_shared.futex);
  928     if (futx_fd < 0) {
  929         _pth_release(&(cond->cn_lock), tid);
  930         return FALSE;
  931     }
  932     } else if (pth_number_of_natives > 1) 
  933     PTH_ELEMENT_INSERT(&current->mutex_cond_wait, &cond->cn_waitlist);
  934 
  935     if (descr->is_bounded) {
  936     fd_set rfds;
  937     char minibuf[64];
  938     int rc, fdmax;
  939     pth_time_t delay;
  940     struct timeval delay_timeval;
  941     struct timeval *pdelay = NULL;
  942     pth_time_t now;
  943     
  944     FD_ZERO(&rfds);
  945     if (futx_fd > 0) {
  946         FD_SET(futx_fd, &rfds);
  947         fdmax = futx_fd;
  948         FD_SET(descr->sigpipe[0], &rfds);
  949         if (fdmax < descr->sigpipe[0])
  950         fdmax = descr->sigpipe[0];
  951     } else {
  952         FD_SET(descr->sigpipe[0], &rfds);
  953         fdmax = descr->sigpipe[0]; 
  954     }
  955     if (ev_extra != NULL) {
  956         pth_time_set_now(&now);
  957         delay = ev_extra->ev_args.TIME.tv;
  958         pth_time_sub(&delay, &now);
  959         delay_timeval = pth_time_to_timeval(&delay);
  960         pdelay = &delay_timeval;
  961     }
  962 
  963     descr->is_bound = 0;
  964     current->state = PTH_STATE_WAITING;
  965         _pth_release(&(cond->cn_lock), tid);
  966     pth_mutex_release(mutex);
  967 
  968     while ((rc = pth_sc(select)(fdmax+1, &rfds, NULL, NULL, pdelay)) < 0
  969         && errno == EINTR) ;
  970 
  971     if (rc == 0 && ev_extra != NULL)
  972         ev_extra->ev_occurred = TRUE;
  973     if (rc > 0 && FD_ISSET(descr->sigpipe[0], &rfds))
  974         FD_CLR(descr->sigpipe[0], &rfds);
  975     while (pth_sc(read)(descr->sigpipe[0], minibuf, sizeof(minibuf)) > 0) ;
  976     if (futx_fd > 0)
  977         close(futx_fd);
  978 
  979     _pth_acquire(&(cond->cn_lock), tid);
  980     descr->is_bound = 1;
  981     current->state = PTH_STATE_READY;
  982     } else {
  983     _pth_release(&(cond->cn_lock), tid);
  984 
  985     /* wait until the condition is signaled */
  986     if (futx_fd > 0)
  987         ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, futx_fd);
  988     else
  989         ev = pth_event(PTH_EVENT_COND|PTH_MODE_STATIC, &ev_key, cond);
  990     if (ev_extra != NULL)
  991         pth_event_concat(ev, ev_extra, NULL);
  992 
  993     /* release mutex (caller had to acquire it first) */
  994     pth_mutex_release(mutex);
  995 
  996     wait:
  997     /* now wait... */
  998     pth_wait(ev);
  999     tid = current_tid();
 1000     if (ev_extra != NULL)
 1001         pth_event_isolate(ev);
 1002     if (futx_fd > 0)
 1003         close(futx_fd);
 1004 
 1005     _pth_acquire(&(cond->cn_lock), tid);
 1006 
 1007     if ((!(cond->cn_state & PTH_COND_SIGNALED)) && 
 1008         PTH_EVENT_OCCURRED(ev) && !cond->cn_shared.pshared) {
 1009         _pth_release(&(cond->cn_lock), tid);
 1010         goto wait;
 1011     }
 1012     }
 1013 
 1014     if (pth_number_of_natives > 1 && !cond->cn_shared.pshared)
 1015     PTH_ELEMENT_DELETE(&current->mutex_cond_wait);
 1016 
 1017     /* remove us from the number of waiters */ 
 1018     cond->cn_waiters--;
 1019 
 1020     if (cond->cn_waiters == 0 || (cond->cn_state & PTH_COND_HANDLED)) {
 1021     /* clean signal */
 1022     if (cond->cn_state & PTH_COND_SIGNALED) {
 1023         cond->cn_state &= ~(PTH_COND_BROADCAST);
 1024         cond->cn_state &= ~(PTH_COND_HANDLED);
 1025         if (--cond->cn_wakecnt == 0)
 1026         cond->cn_state &= ~(PTH_COND_SIGNALED);
 1027     }
 1028     }
 1029 
 1030     _pth_release(&(cond->cn_lock), tid);
 1031 
 1032     /* reacquire mutex */
 1033     pth_mutex_acquire(mutex, FALSE, NULL);
 1034 
 1035     if (current->cancelreq)
 1036     pth_cancel_point(FALSE);
 1037     return TRUE;
 1038 }
 1039 
 1040 static int pth_check_waiters(void *arg)
 1041 {
 1042     pth_cond_t *cond = (pth_cond_t *)arg;
 1043 
 1044     if (!cond->cn_waiters)
 1045     return TRUE;
 1046     else
 1047     return FALSE;
 1048 }
 1049 
 1050 int pth_cond_notify(pth_cond_t *cond, int broadcast)
 1051 {
 1052     pth_event_t ev;
 1053     static pth_key_t ev_key = PTH_KEY_INIT; 
 1054     pth_descr_t descr = pth_get_native_descr();
 1055     char c = (int)1;
 1056     pth_descr_t ds = NULL;
 1057     
 1058     /* consistency checks */
 1059     if (!(cond->cn_state & PTH_COND_INITIALIZED))
 1060         return FALSE;
 1061 
 1062     _pth_acquire(&(cond->cn_lock), descr->tid);
 1063     /* do something only if there is at least one waiters (POSIX semantics) */
 1064     if (cond->cn_waiters > 0) {
 1065     /* signal the condition */
 1066     cond->cn_state |= PTH_COND_SIGNALED;
 1067     if (broadcast)
 1068         cond->cn_state |= PTH_COND_BROADCAST;
 1069     else
 1070         cond->cn_state &= ~(PTH_COND_BROADCAST);
 1071     cond->cn_state &= ~(PTH_COND_HANDLED);
 1072 
 1073     if (pth_number_of_natives > 1 && !cond->cn_shared.pshared) {
 1074         pth_t tmp;
 1075         pth_list_t *cq;
 1076         cq = cond->cn_waitlist.th_prev;
 1077         while (cq != &cond->cn_waitlist) {
 1078         tmp = (pth_t)((char *)cq - (int)&((pth_t)0)->mutex_cond_wait);
 1079         if ((ds = tmp->boundnative)) {
 1080             if (!broadcast) {
 1081             cond->cn_wakecnt++;
 1082             _pth_release(&(cond->cn_lock), descr->tid);
 1083             pth_sc(write)(ds->sigpipe[1], &c, sizeof(char));
 1084             return TRUE;
 1085             }
 1086             pth_sc(write)(ds->sigpipe[1], &c, sizeof(char));
 1087             ds = NULL;
 1088         } else {
 1089             ds = tmp->waited_native;
 1090             if (ds != NULL) {
 1091             /* move waiting thread from wait-queue to ready-queue */
 1092             spin_lock (&ds->wait_queue.lock, descr, NULL);
 1093             if (tmp->waited_native) { /* to avoid race with eventmgr */
 1094                 if (!broadcast) cond->cn_state |= PTH_COND_HANDLED;
 1095                 __pqueue_delete_node(&tmp->node);
 1096                 tmp->waited_native = NULL;
 1097                 spin_unlock(&ds->wait_queue.lock, descr);
 1098 
 1099                 spin_lock(&pth_RQ.lock, descr, NULL);
 1100                 tmp->state = PTH_STATE_READY;
 1101                 __thread_eprio_recalculate (tmp);
 1102                 __pqueue_append_node(&pth_RQ, &tmp->node);
 1103                 spin_unlock(&pth_RQ.lock, descr);
 1104             } else
 1105                 spin_unlock(&ds->wait_queue.lock, descr);
 1106                     }
 1107             if (!broadcast) break;
 1108         }
 1109         cq = cq->th_prev;
 1110         }
 1111     }
 1112     cond->cn_wakecnt++;
 1113     _pth_release(&(cond->cn_lock), descr->tid);
 1114     if (cond->cn_shared.pshared == TRUE) {
 1115         /* wakeup the waiters */
 1116         if (broadcast) {
 1117                 futex_notify_all(&cond->cn_shared.futex);
 1118         } else {
 1119                 futex_notify(&cond->cn_shared.futex);
 1120         }
 1121         if (broadcast && (cond->cn_waiters > 0)) {
 1122         pth_wakeup_anative();
 1123         while (cond->cn_waiters > 0)
 1124             pth_yield(NULL);
 1125         }
 1126     } else {
 1127         /* and give other threads a chance to awake */
 1128         if (!broadcast) {
 1129         if (descr->is_bounded)
 1130             pth_wakeup_anative();
 1131         else if (ds != NULL && !ds->is_bound)
 1132             pth_sc(write)(ds->sigpipe[1], &c, sizeof(char));
 1133         } else if (cond->cn_waiters > 0) {
 1134             if (!descr->current->boundnative) {
 1135             /* wait until all waiters are awake... */
 1136             ev = pth_event(PTH_EVENT_FUNC|PTH_MODE_STATIC, &ev_key, pth_check_waiters, 
 1137                    (void *)cond, pth_time_zero);
 1138             pth_wait(ev);
 1139         } else {
 1140             pth_wakeup_anative();
 1141             while (cond->cn_waiters > 0)
 1142             pth_yield(NULL);
 1143         }
 1144         }
 1145     }
 1146     } else {
 1147         _pth_release(&(cond->cn_lock), descr->tid);
 1148     }
 1149 
 1150     /* return to caller */
 1151     return TRUE;
 1152 }
 1153 
 1154 /*
 1155 **  Barriers
 1156 */
 1157 
 1158 int pth_barrier_init(pth_barrier_t *barrier, int threshold)
 1159 {
 1160     if (barrier == NULL || threshold <= 0)
 1161         return FALSE;
 1162     if (!pth_mutex_init(&(barrier->br_mutex), NULL))
 1163         return FALSE;
 1164     if (!pth_cond_init(&(barrier->br_cond)))
 1165         return FALSE;
 1166     barrier->br_state     = PTH_BARRIER_INITIALIZED;
 1167     barrier->br_threshold = threshold;
 1168     barrier->br_count     = threshold;
 1169     barrier->br_cycle     = FALSE;
 1170     return TRUE;
 1171 }
 1172 
 1173 int pth_barrier_reach(pth_barrier_t *barrier)
 1174 {
 1175     int cancel, cycle;
 1176     int rv;
 1177 
 1178     if (barrier == NULL)
 1179         return FALSE;
 1180     if (!(barrier->br_state & PTH_BARRIER_INITIALIZED))
 1181         return FALSE;
 1182 
 1183     if (!pth_mutex_acquire(&(barrier->br_mutex), FALSE, NULL))
 1184         return FALSE;
 1185     cycle = barrier->br_cycle;
 1186     if (--(barrier->br_count) == 0) {
 1187         /* last thread reached the barrier */
 1188         barrier->br_cycle   = !(barrier->br_cycle);
 1189         barrier->br_count   = barrier->br_threshold;
 1190         if ((rv = pth_cond_notify(&(barrier->br_cond), TRUE)))
 1191             rv = PTH_BARRIER_TAILLIGHT;
 1192     }
 1193     else {
 1194         /* wait until remaining threads have reached the barrier, too */
 1195         pth_cancel_state(PTH_CANCEL_DISABLE, &cancel);
 1196         if (barrier->br_threshold == barrier->br_count)
 1197             rv = PTH_BARRIER_HEADLIGHT;
 1198         else
 1199             rv = TRUE;
 1200         while (cycle == barrier->br_cycle) {
 1201             if (!(rv = pth_cond_await(&(barrier->br_cond), &(barrier->br_mutex), NULL)))
 1202                 break;
 1203         }
 1204         pth_cancel_state(cancel, NULL);
 1205     }
 1206     pth_mutex_release(&(barrier->br_mutex));
 1207     return rv;
 1208 }
 1209