"Fossies" - the Fresh Open Source Software Archive

Member "bind-9.17.5/lib/isc/rwlock.c" (4 Sep 2020, 19056 Bytes) of package /linux/misc/dns/bind9/9.17.5/bind-9.17.5.tar.xz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "rwlock.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
    3  *
    4  * This Source Code Form is subject to the terms of the Mozilla Public
    5  * License, v. 2.0. If a copy of the MPL was not distributed with this
    6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
    7  *
    8  * See the COPYRIGHT file distributed with this work for additional
    9  * information regarding copyright ownership.
   10  */
   11 
   12 /*! \file */
   13 
   14 #include <inttypes.h>
   15 #include <stdbool.h>
   16 #include <stddef.h>
   17 
   18 #if defined(sun) && (defined(__sparc) || defined(__sparc__))
   19 #include <synch.h> /* for smt_pause(3c) */
   20 #endif /* if defined(sun) && (defined(__sparc) || defined(__sparc__)) */
   21 
   22 #include <isc/atomic.h>
   23 #include <isc/magic.h>
   24 #include <isc/platform.h>
   25 #include <isc/print.h>
   26 #include <isc/rwlock.h>
   27 #include <isc/util.h>
   28 
   29 #if USE_PTHREAD_RWLOCK
   30 
   31 #include <errno.h>
   32 #include <pthread.h>
   33 
   34 isc_result_t
   35 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
   36         unsigned int write_quota) {
   37     UNUSED(read_quota);
   38     UNUSED(write_quota);
   39     REQUIRE(pthread_rwlock_init(&rwl->rwlock, NULL) == 0);
   40     atomic_init(&rwl->downgrade, false);
   41     return (ISC_R_SUCCESS);
   42 }
   43 
   44 isc_result_t
   45 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
   46     switch (type) {
   47     case isc_rwlocktype_read:
   48         REQUIRE(pthread_rwlock_rdlock(&rwl->rwlock) == 0);
   49         break;
   50     case isc_rwlocktype_write:
   51         while (true) {
   52             REQUIRE(pthread_rwlock_wrlock(&rwl->rwlock) == 0);
   53             /* Unlock if in middle of downgrade operation */
   54             if (atomic_load_acquire(&rwl->downgrade)) {
   55                 REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) ==
   56                     0);
   57                 while (atomic_load_acquire(&rwl->downgrade)) {
   58                 }
   59                 continue;
   60             }
   61             break;
   62         }
   63         break;
   64     default:
   65         INSIST(0);
   66         ISC_UNREACHABLE();
   67     }
   68     return (ISC_R_SUCCESS);
   69 }
   70 
   71 isc_result_t
   72 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
   73     int ret = 0;
   74     switch (type) {
   75     case isc_rwlocktype_read:
   76         ret = pthread_rwlock_tryrdlock(&rwl->rwlock);
   77         break;
   78     case isc_rwlocktype_write:
   79         ret = pthread_rwlock_trywrlock(&rwl->rwlock);
   80         if ((ret == 0) && atomic_load_acquire(&rwl->downgrade)) {
   81             isc_rwlock_unlock(rwl, type);
   82             return (ISC_R_LOCKBUSY);
   83         }
   84         break;
   85     default:
   86         INSIST(0);
   87     }
   88 
   89     switch (ret) {
   90     case 0:
   91         return (ISC_R_SUCCESS);
   92     case EBUSY:
   93         return (ISC_R_LOCKBUSY);
   94     case EAGAIN:
   95         return (ISC_R_LOCKBUSY);
   96     default:
   97         INSIST(0);
   98         ISC_UNREACHABLE();
   99     }
  100 }
  101 
  102 isc_result_t
  103 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
  104     UNUSED(type);
  105     REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) == 0);
  106     return (ISC_R_SUCCESS);
  107 }
  108 
  109 isc_result_t
  110 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
  111     UNUSED(rwl);
  112     return (ISC_R_LOCKBUSY);
  113 }
  114 
  115 void
  116 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
  117     atomic_store_release(&rwl->downgrade, true);
  118     isc_rwlock_unlock(rwl, isc_rwlocktype_write);
  119     isc_rwlock_lock(rwl, isc_rwlocktype_read);
  120     atomic_store_release(&rwl->downgrade, false);
  121 }
  122 
  123 void
  124 isc_rwlock_destroy(isc_rwlock_t *rwl) {
  125     pthread_rwlock_destroy(&rwl->rwlock);
  126 }
  127 
  128 #else /* if USE_PTHREAD_RWLOCK */
  129 
  130 #define RWLOCK_MAGIC      ISC_MAGIC('R', 'W', 'L', 'k')
  131 #define VALID_RWLOCK(rwl) ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
  132 
  133 #ifndef RWLOCK_DEFAULT_READ_QUOTA
  134 #define RWLOCK_DEFAULT_READ_QUOTA 4
  135 #endif /* ifndef RWLOCK_DEFAULT_READ_QUOTA */
  136 
  137 #ifndef RWLOCK_DEFAULT_WRITE_QUOTA
  138 #define RWLOCK_DEFAULT_WRITE_QUOTA 4
  139 #endif /* ifndef RWLOCK_DEFAULT_WRITE_QUOTA */
  140 
  141 #ifndef RWLOCK_MAX_ADAPTIVE_COUNT
  142 #define RWLOCK_MAX_ADAPTIVE_COUNT 100
  143 #endif /* ifndef RWLOCK_MAX_ADAPTIVE_COUNT */
  144 
  145 #if defined(_MSC_VER)
  146 #include <intrin.h>
  147 #define isc_rwlock_pause() YieldProcessor()
  148 #elif defined(__x86_64__)
  149 #include <immintrin.h>
  150 #define isc_rwlock_pause() _mm_pause()
  151 #elif defined(__i386__)
  152 #define isc_rwlock_pause() __asm__ __volatile__("rep; nop")
  153 #elif defined(__ia64__)
  154 #define isc_rwlock_pause() __asm__ __volatile__("hint @pause")
  155 #elif defined(__arm__) && HAVE_ARM_YIELD
  156 #define isc_rwlock_pause() __asm__ __volatile__("yield")
  157 #elif defined(sun) && (defined(__sparc) || defined(__sparc__))
  158 #define isc_rwlock_pause() smt_pause()
  159 #elif (defined(__sparc) || defined(__sparc__)) && HAVE_SPARC_PAUSE
  160 #define isc_rwlock_pause() __asm__ __volatile__("pause")
  161 #elif defined(__ppc__) || defined(_ARCH_PPC) || defined(_ARCH_PWR) || \
  162     defined(_ARCH_PWR2) || defined(_POWER)
  163 #define isc_rwlock_pause() __asm__ volatile("or 27,27,27")
  164 #else /* if defined(_MSC_VER) */
  165 #define isc_rwlock_pause()
  166 #endif /* if defined(_MSC_VER) */
  167 
  168 static isc_result_t
  169 isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type);
  170 
  171 #ifdef ISC_RWLOCK_TRACE
  172 #include <stdio.h> /* Required for fprintf/stderr. */
  173 
  174 #include <isc/thread.h> /* Required for isc_thread_self(). */
  175 
  176 static void
  177 print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
  178     fprintf(stderr,
  179         "rwlock %p thread %lu %s(%s): "
  180         "write_requests=%u, write_completions=%u, "
  181         "cnt_and_flag=0x%x, readers_waiting=%u, "
  182         "write_granted=%u, write_quota=%u\n",
  183         rwl, isc_thread_self(), operation,
  184         (type == isc_rwlocktype_read ? "read" : "write"),
  185         atomic_load_acquire(&rwl->write_requests),
  186         atomic_load_acquire(&rwl->write_completions),
  187         atomic_load_acquire(&rwl->cnt_and_flag), rwl->readers_waiting,
  188         atomic_load_acquire(&rwl->write_granted), rwl->write_quota);
  189 }
  190 #endif          /* ISC_RWLOCK_TRACE */
  191 
  192 isc_result_t
  193 isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
  194         unsigned int write_quota) {
  195     REQUIRE(rwl != NULL);
  196 
  197     /*
  198      * In case there's trouble initializing, we zero magic now.  If all
  199      * goes well, we'll set it to RWLOCK_MAGIC.
  200      */
  201     rwl->magic = 0;
  202 
  203     atomic_init(&rwl->spins, 0);
  204     atomic_init(&rwl->write_requests, 0);
  205     atomic_init(&rwl->write_completions, 0);
  206     atomic_init(&rwl->cnt_and_flag, 0);
  207     rwl->readers_waiting = 0;
  208     atomic_init(&rwl->write_granted, 0);
  209     if (read_quota != 0) {
  210         UNEXPECTED_ERROR(__FILE__, __LINE__,
  211                  "read quota is not supported");
  212     }
  213     if (write_quota == 0) {
  214         write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
  215     }
  216     rwl->write_quota = write_quota;
  217 
  218     isc_mutex_init(&rwl->lock);
  219 
  220     isc_condition_init(&rwl->readable);
  221     isc_condition_init(&rwl->writeable);
  222 
  223     rwl->magic = RWLOCK_MAGIC;
  224 
  225     return (ISC_R_SUCCESS);
  226 }
  227 
  228 void
  229 isc_rwlock_destroy(isc_rwlock_t *rwl) {
  230     REQUIRE(VALID_RWLOCK(rwl));
  231 
  232     REQUIRE(atomic_load_acquire(&rwl->write_requests) ==
  233             atomic_load_acquire(&rwl->write_completions) &&
  234         atomic_load_acquire(&rwl->cnt_and_flag) == 0 &&
  235         rwl->readers_waiting == 0);
  236 
  237     rwl->magic = 0;
  238     (void)isc_condition_destroy(&rwl->readable);
  239     (void)isc_condition_destroy(&rwl->writeable);
  240     isc_mutex_destroy(&rwl->lock);
  241 }
  242 
  243 /*
  244  * When some architecture-dependent atomic operations are available,
  245  * rwlock can be more efficient than the generic algorithm defined below.
  246  * The basic algorithm is described in the following URL:
  247  *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
  248  *
  249  * The key is to use the following integer variables modified atomically:
  250  *   write_requests, write_completions, and cnt_and_flag.
  251  *
  252  * write_requests and write_completions act as a waiting queue for writers
  253  * in order to ensure the FIFO order.  Both variables begin with the initial
  254  * value of 0.  When a new writer tries to get a write lock, it increments
  255  * write_requests and gets the previous value of the variable as a "ticket".
  256  * When write_completions reaches the ticket number, the new writer can start
  257  * writing.  When the writer completes its work, it increments
  258  * write_completions so that another new writer can start working.  If the
  259  * write_requests is not equal to write_completions, it means a writer is now
  260  * working or waiting.  In this case, a new readers cannot start reading, or
  261  * in other words, this algorithm basically prefers writers.
  262  *
  263  * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
  264  * variable is a kind of structure with two members: writer_flag (1 bit) and
  265  * reader_count (31 bits).  The writer_flag shows whether a writer is working,
  266  * and the reader_count shows the number of readers currently working or almost
  267  * ready for working.  A writer who has the current "ticket" tries to get the
  268  * lock by exclusively setting the writer_flag to 1, provided that the whole
  269  * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
  270  * a new reader tries to increment the "reader_count" field provided that
  271  * the writer_flag is 0 (meaning there is no writer working).
  272  *
  273  * If some of the above operations fail, the reader or the writer sleeps
  274  * until the related condition changes.  When a working reader or writer
  275  * completes its work, some readers or writers are sleeping, and the condition
  276  * that suspended the reader or writer has changed, it wakes up the sleeping
  277  * readers or writers.
  278  *
  279  * As already noted, this algorithm basically prefers writers.  In order to
  280  * prevent readers from starving, however, the algorithm also introduces the
  281  * "writer quota" (Q).  When Q consecutive writers have completed their work,
  282  * suspending readers, the last writer will wake up the readers, even if a new
  283  * writer is waiting.
  284  *
  285  * Implementation specific note: due to the combination of atomic operations
  286  * and a mutex lock, ordering between the atomic operation and locks can be
  287  * very sensitive in some cases.  In particular, it is generally very important
  288  * to check the atomic variable that requires a reader or writer to sleep after
  289  * locking the mutex and before actually sleeping; otherwise, it could be very
  290  * likely to cause a deadlock.  For example, assume "var" is a variable
  291  * atomically modified, then the corresponding code would be:
  292  *  if (var == need_sleep) {
  293  *      LOCK(lock);
  294  *      if (var == need_sleep)
  295  *          WAIT(cond, lock);
  296  *      UNLOCK(lock);
  297  *  }
  298  * The second check is important, since "var" is protected by the atomic
  299  * operation, not by the mutex, and can be changed just before sleeping.
  300  * (The first "if" could be omitted, but this is also important in order to
  301  * make the code efficient by avoiding the use of the mutex unless it is
  302  * really necessary.)
  303  */
  304 
  305 #define WRITER_ACTIVE 0x1
  306 #define READER_INCR   0x2
  307 
  308 static isc_result_t
  309 isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
  310     int32_t cntflag;
  311 
  312     REQUIRE(VALID_RWLOCK(rwl));
  313 
  314 #ifdef ISC_RWLOCK_TRACE
  315     print_lock("prelock", rwl, type);
  316 #endif /* ifdef ISC_RWLOCK_TRACE */
  317 
  318     if (type == isc_rwlocktype_read) {
  319         if (atomic_load_acquire(&rwl->write_requests) !=
  320             atomic_load_acquire(&rwl->write_completions))
  321         {
  322             /* there is a waiting or active writer */
  323             LOCK(&rwl->lock);
  324             if (atomic_load_acquire(&rwl->write_requests) !=
  325                 atomic_load_acquire(&rwl->write_completions))
  326             {
  327                 rwl->readers_waiting++;
  328                 WAIT(&rwl->readable, &rwl->lock);
  329                 rwl->readers_waiting--;
  330             }
  331             UNLOCK(&rwl->lock);
  332         }
  333 
  334         cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
  335                            READER_INCR);
  336         POST(cntflag);
  337         while (1) {
  338             if ((atomic_load_acquire(&rwl->cnt_and_flag) &
  339                  WRITER_ACTIVE) == 0) {
  340                 break;
  341             }
  342 
  343             /* A writer is still working */
  344             LOCK(&rwl->lock);
  345             rwl->readers_waiting++;
  346             if ((atomic_load_acquire(&rwl->cnt_and_flag) &
  347                  WRITER_ACTIVE) != 0) {
  348                 WAIT(&rwl->readable, &rwl->lock);
  349             }
  350             rwl->readers_waiting--;
  351             UNLOCK(&rwl->lock);
  352 
  353             /*
  354              * Typically, the reader should be able to get a lock
  355              * at this stage:
  356              *   (1) there should have been no pending writer when
  357              *       the reader was trying to increment the
  358              *       counter; otherwise, the writer should be in
  359              *       the waiting queue, preventing the reader from
  360              *       proceeding to this point.
  361              *   (2) once the reader increments the counter, no
  362              *       more writer can get a lock.
  363              * Still, it is possible another writer can work at
  364              * this point, e.g. in the following scenario:
  365              *   A previous writer unlocks the writer lock.
  366              *   This reader proceeds to point (1).
  367              *   A new writer appears, and gets a new lock before
  368              *   the reader increments the counter.
  369              *   The reader then increments the counter.
  370              *   The previous writer notices there is a waiting
  371              *   reader who is almost ready, and wakes it up.
  372              * So, the reader needs to confirm whether it can now
  373              * read explicitly (thus we loop).  Note that this is
  374              * not an infinite process, since the reader has
  375              * incremented the counter at this point.
  376              */
  377         }
  378 
  379         /*
  380          * If we are temporarily preferred to writers due to the writer
  381          * quota, reset the condition (race among readers doesn't
  382          * matter).
  383          */
  384         atomic_store_release(&rwl->write_granted, 0);
  385     } else {
  386         int32_t prev_writer;
  387 
  388         /* enter the waiting queue, and wait for our turn */
  389         prev_writer = atomic_fetch_add_release(&rwl->write_requests, 1);
  390         while (atomic_load_acquire(&rwl->write_completions) !=
  391                prev_writer) {
  392             LOCK(&rwl->lock);
  393             if (atomic_load_acquire(&rwl->write_completions) !=
  394                 prev_writer) {
  395                 WAIT(&rwl->writeable, &rwl->lock);
  396                 UNLOCK(&rwl->lock);
  397                 continue;
  398             }
  399             UNLOCK(&rwl->lock);
  400             break;
  401         }
  402 
  403         while (!atomic_compare_exchange_weak_acq_rel(
  404             &rwl->cnt_and_flag, &(int_fast32_t){ 0 },
  405             WRITER_ACTIVE))
  406         {
  407             /* Another active reader or writer is working. */
  408             LOCK(&rwl->lock);
  409             if (atomic_load_acquire(&rwl->cnt_and_flag) != 0) {
  410                 WAIT(&rwl->writeable, &rwl->lock);
  411             }
  412             UNLOCK(&rwl->lock);
  413         }
  414 
  415         INSIST((atomic_load_acquire(&rwl->cnt_and_flag) &
  416             WRITER_ACTIVE));
  417         atomic_fetch_add_release(&rwl->write_granted, 1);
  418     }
  419 
  420 #ifdef ISC_RWLOCK_TRACE
  421     print_lock("postlock", rwl, type);
  422 #endif /* ifdef ISC_RWLOCK_TRACE */
  423 
  424     return (ISC_R_SUCCESS);
  425 }
  426 
  427 isc_result_t
  428 isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
  429     int32_t cnt = 0;
  430     int32_t spins = atomic_load_acquire(&rwl->spins) * 2 + 10;
  431     int32_t max_cnt = ISC_MAX(spins, RWLOCK_MAX_ADAPTIVE_COUNT);
  432     isc_result_t result = ISC_R_SUCCESS;
  433 
  434     do {
  435         if (cnt++ >= max_cnt) {
  436             result = isc__rwlock_lock(rwl, type);
  437             break;
  438         }
  439         isc_rwlock_pause();
  440     } while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
  441 
  442     atomic_fetch_add_release(&rwl->spins, (cnt - spins) / 8);
  443 
  444     return (result);
  445 }
  446 
  447 isc_result_t
  448 isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
  449     int32_t cntflag;
  450 
  451     REQUIRE(VALID_RWLOCK(rwl));
  452 
  453 #ifdef ISC_RWLOCK_TRACE
  454     print_lock("prelock", rwl, type);
  455 #endif /* ifdef ISC_RWLOCK_TRACE */
  456 
  457     if (type == isc_rwlocktype_read) {
  458         /* If a writer is waiting or working, we fail. */
  459         if (atomic_load_acquire(&rwl->write_requests) !=
  460             atomic_load_acquire(&rwl->write_completions))
  461         {
  462             return (ISC_R_LOCKBUSY);
  463         }
  464 
  465         /* Otherwise, be ready for reading. */
  466         cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
  467                            READER_INCR);
  468         if ((cntflag & WRITER_ACTIVE) != 0) {
  469             /*
  470              * A writer is working.  We lose, and cancel the read
  471              * request.
  472              */
  473             cntflag = atomic_fetch_sub_release(&rwl->cnt_and_flag,
  474                                READER_INCR);
  475             /*
  476              * If no other readers are waiting and we've suspended
  477              * new writers in this short period, wake them up.
  478              */
  479             if (cntflag == READER_INCR &&
  480                 atomic_load_acquire(&rwl->write_completions) !=
  481                     atomic_load_acquire(&rwl->write_requests))
  482             {
  483                 LOCK(&rwl->lock);
  484                 BROADCAST(&rwl->writeable);
  485                 UNLOCK(&rwl->lock);
  486             }
  487 
  488             return (ISC_R_LOCKBUSY);
  489         }
  490     } else {
  491         /* Try locking without entering the waiting queue. */
  492         int_fast32_t zero = 0;
  493         if (!atomic_compare_exchange_strong_acq_rel(
  494                 &rwl->cnt_and_flag, &zero, WRITER_ACTIVE))
  495         {
  496             return (ISC_R_LOCKBUSY);
  497         }
  498 
  499         /*
  500          * XXXJT: jump into the queue, possibly breaking the writer
  501          * order.
  502          */
  503         atomic_fetch_sub_release(&rwl->write_completions, 1);
  504         atomic_fetch_add_release(&rwl->write_granted, 1);
  505     }
  506 
  507 #ifdef ISC_RWLOCK_TRACE
  508     print_lock("postlock", rwl, type);
  509 #endif /* ifdef ISC_RWLOCK_TRACE */
  510 
  511     return (ISC_R_SUCCESS);
  512 }
  513 
  514 isc_result_t
  515 isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
  516     REQUIRE(VALID_RWLOCK(rwl));
  517 
  518     int_fast32_t reader_incr = READER_INCR;
  519 
  520     /* Try to acquire write access. */
  521     atomic_compare_exchange_strong_acq_rel(&rwl->cnt_and_flag, &reader_incr,
  522                            WRITER_ACTIVE);
  523     /*
  524      * There must have been no writer, and there must have
  525      * been at least one reader.
  526      */
  527     INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
  528            (reader_incr & ~WRITER_ACTIVE) != 0);
  529 
  530     if (reader_incr == READER_INCR) {
  531         /*
  532          * We are the only reader and have been upgraded.
  533          * Now jump into the head of the writer waiting queue.
  534          */
  535         atomic_fetch_sub_release(&rwl->write_completions, 1);
  536     } else {
  537         return (ISC_R_LOCKBUSY);
  538     }
  539 
  540     return (ISC_R_SUCCESS);
  541 }
  542 
  543 void
  544 isc_rwlock_downgrade(isc_rwlock_t *rwl) {
  545     int32_t prev_readers;
  546 
  547     REQUIRE(VALID_RWLOCK(rwl));
  548 
  549     /* Become an active reader. */
  550     prev_readers = atomic_fetch_add_release(&rwl->cnt_and_flag,
  551                         READER_INCR);
  552     /* We must have been a writer. */
  553     INSIST((prev_readers & WRITER_ACTIVE) != 0);
  554 
  555     /* Complete write */
  556     atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
  557     atomic_fetch_add_release(&rwl->write_completions, 1);
  558 
  559     /* Resume other readers */
  560     LOCK(&rwl->lock);
  561     if (rwl->readers_waiting > 0) {
  562         BROADCAST(&rwl->readable);
  563     }
  564     UNLOCK(&rwl->lock);
  565 }
  566 
  567 isc_result_t
  568 isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
  569     int32_t prev_cnt;
  570 
  571     REQUIRE(VALID_RWLOCK(rwl));
  572 
  573 #ifdef ISC_RWLOCK_TRACE
  574     print_lock("preunlock", rwl, type);
  575 #endif /* ifdef ISC_RWLOCK_TRACE */
  576 
  577     if (type == isc_rwlocktype_read) {
  578         prev_cnt = atomic_fetch_sub_release(&rwl->cnt_and_flag,
  579                             READER_INCR);
  580         /*
  581          * If we're the last reader and any writers are waiting, wake
  582          * them up.  We need to wake up all of them to ensure the
  583          * FIFO order.
  584          */
  585         if (prev_cnt == READER_INCR &&
  586             atomic_load_acquire(&rwl->write_completions) !=
  587                 atomic_load_acquire(&rwl->write_requests))
  588         {
  589             LOCK(&rwl->lock);
  590             BROADCAST(&rwl->writeable);
  591             UNLOCK(&rwl->lock);
  592         }
  593     } else {
  594         bool wakeup_writers = true;
  595 
  596         /*
  597          * Reset the flag, and (implicitly) tell other writers
  598          * we are done.
  599          */
  600         atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
  601         atomic_fetch_add_release(&rwl->write_completions, 1);
  602 
  603         if ((atomic_load_acquire(&rwl->write_granted) >=
  604              rwl->write_quota) ||
  605             (atomic_load_acquire(&rwl->write_requests) ==
  606              atomic_load_acquire(&rwl->write_completions)) ||
  607             (atomic_load_acquire(&rwl->cnt_and_flag) & ~WRITER_ACTIVE))
  608         {
  609             /*
  610              * We have passed the write quota, no writer is
  611              * waiting, or some readers are almost ready, pending
  612              * possible writers.  Note that the last case can
  613              * happen even if write_requests != write_completions
  614              * (which means a new writer in the queue), so we need
  615              * to catch the case explicitly.
  616              */
  617             LOCK(&rwl->lock);
  618             if (rwl->readers_waiting > 0) {
  619                 wakeup_writers = false;
  620                 BROADCAST(&rwl->readable);
  621             }
  622             UNLOCK(&rwl->lock);
  623         }
  624 
  625         if ((atomic_load_acquire(&rwl->write_requests) !=
  626              atomic_load_acquire(&rwl->write_completions)) &&
  627             wakeup_writers)
  628         {
  629             LOCK(&rwl->lock);
  630             BROADCAST(&rwl->writeable);
  631             UNLOCK(&rwl->lock);
  632         }
  633     }
  634 
  635 #ifdef ISC_RWLOCK_TRACE
  636     print_lock("postunlock", rwl, type);
  637 #endif /* ifdef ISC_RWLOCK_TRACE */
  638 
  639     return (ISC_R_SUCCESS);
  640 }
  641 
  642 #endif /* USE_PTHREAD_RWLOCK */