"Fossies" - the Fresh Open Source Software Archive

Member "musl-1.1.23/src/aio/aio.c" (16 Jul 2019, 10160 Bytes) of package /linux/misc/musl-1.1.23.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "aio.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 1.1.20_vs_1.1.21.

    1 #include <aio.h>
    2 #include <pthread.h>
    3 #include <semaphore.h>
    4 #include <limits.h>
    5 #include <errno.h>
    6 #include <unistd.h>
    7 #include <stdlib.h>
    8 #include <sys/auxv.h>
    9 #include "syscall.h"
   10 #include "atomic.h"
   11 #include "pthread_impl.h"
   12 
   13 /* The following is a threads-based implementation of AIO with minimal
   14  * dependence on implementation details. Most synchronization is
   15  * performed with pthread primitives, but atomics and futex operations
   16  * are used for notification in a couple places where the pthread
   17  * primitives would be inefficient or impractical.
   18  *
   19  * For each fd with outstanding aio operations, an aio_queue structure
   20  * is maintained. These are reference-counted and destroyed by the last
   21  * aio worker thread to exit. Accessing any member of the aio_queue
   22  * structure requires a lock on the aio_queue. Adding and removing aio
   23  * queues themselves requires a write lock on the global map object,
   24  * a 4-level table mapping file descriptor numbers to aio queues. A
   25  * read lock on the map is used to obtain locks on existing queues by
   26  * excluding destruction of the queue by a different thread while it is
   27  * being locked.
   28  *
   29  * Each aio queue has a list of active threads/operations. Presently there
   30  * is a one to one relationship between threads and operations. The only
   31  * members of the aio_thread structure which are accessed by other threads
   32  * are the linked list pointers, op (which is immutable), running (which
   33  * is updated atomically), and err (which is synchronized via running),
   34  * so no locking is necessary. Most of the other other members are used
   35  * for sharing data between the main flow of execution and cancellation
   36  * cleanup handler.
   37  *
   38  * Taking any aio locks requires having all signals blocked. This is
   39  * necessary because aio_cancel is needed by close, and close is required
   40  * to be async-signal safe. All aio worker threads run with all signals
   41  * blocked permanently.
   42  */
   43 
   44 struct aio_thread {
   45     pthread_t td;
   46     struct aiocb *cb;
   47     struct aio_thread *next, *prev;
   48     struct aio_queue *q;
   49     volatile int running;
   50     int err, op;
   51     ssize_t ret;
   52 };
   53 
   54 struct aio_queue {
   55     int fd, seekable, append, ref, init;
   56     pthread_mutex_t lock;
   57     pthread_cond_t cond;
   58     struct aio_thread *head;
   59 };
   60 
   61 struct aio_args {
   62     struct aiocb *cb;
   63     struct aio_queue *q;
   64     int op;
   65     sem_t sem;
   66 };
   67 
   68 static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER;
   69 static struct aio_queue *****map;
   70 static volatile int aio_fd_cnt;
   71 volatile int __aio_fut;
   72 
   73 static struct aio_queue *__aio_get_queue(int fd, int need)
   74 {
   75     if (fd < 0) {
   76         errno = EBADF;
   77         return 0;
   78     }
   79     int a=fd>>24;
   80     unsigned char b=fd>>16, c=fd>>8, d=fd;
   81     struct aio_queue *q = 0;
   82     pthread_rwlock_rdlock(&maplock);
   83     if ((!map || !map[a] || !map[a][b] || !map[a][b][c] || !(q=map[a][b][c][d])) && need) {
   84         pthread_rwlock_unlock(&maplock);
   85         if (fcntl(fd, F_GETFD) < 0) return 0;
   86         pthread_rwlock_wrlock(&maplock);
   87         if (!map) map = calloc(sizeof *map, (-1U/2+1)>>24);
   88         if (!map) goto out;
   89         if (!map[a]) map[a] = calloc(sizeof **map, 256);
   90         if (!map[a]) goto out;
   91         if (!map[a][b]) map[a][b] = calloc(sizeof ***map, 256);
   92         if (!map[a][b]) goto out;
   93         if (!map[a][b][c]) map[a][b][c] = calloc(sizeof ****map, 256);
   94         if (!map[a][b][c]) goto out;
   95         if (!(q = map[a][b][c][d])) {
   96             map[a][b][c][d] = q = calloc(sizeof *****map, 1);
   97             if (q) {
   98                 q->fd = fd;
   99                 pthread_mutex_init(&q->lock, 0);
  100                 pthread_cond_init(&q->cond, 0);
  101                 a_inc(&aio_fd_cnt);
  102             }
  103         }
  104     }
  105     if (q) pthread_mutex_lock(&q->lock);
  106 out:
  107     pthread_rwlock_unlock(&maplock);
  108     return q;
  109 }
  110 
  111 static void __aio_unref_queue(struct aio_queue *q)
  112 {
  113     if (q->ref > 1) {
  114         q->ref--;
  115         pthread_mutex_unlock(&q->lock);
  116         return;
  117     }
  118 
  119     /* This is potentially the last reference, but a new reference
  120      * may arrive since we cannot free the queue object without first
  121      * taking the maplock, which requires releasing the queue lock. */
  122     pthread_mutex_unlock(&q->lock);
  123     pthread_rwlock_wrlock(&maplock);
  124     pthread_mutex_lock(&q->lock);
  125     if (q->ref == 1) {
  126         int fd=q->fd;
  127         int a=fd>>24;
  128         unsigned char b=fd>>16, c=fd>>8, d=fd;
  129         map[a][b][c][d] = 0;
  130         a_dec(&aio_fd_cnt);
  131         pthread_rwlock_unlock(&maplock);
  132         pthread_mutex_unlock(&q->lock);
  133         free(q);
  134     } else {
  135         q->ref--;
  136         pthread_rwlock_unlock(&maplock);
  137         pthread_mutex_unlock(&q->lock);
  138     }
  139 }
  140 
  141 static void cleanup(void *ctx)
  142 {
  143     struct aio_thread *at = ctx;
  144     struct aio_queue *q = at->q;
  145     struct aiocb *cb = at->cb;
  146     struct sigevent sev = cb->aio_sigevent;
  147 
  148     /* There are four potential types of waiters we could need to wake:
  149      *   1. Callers of aio_cancel/close.
  150      *   2. Callers of aio_suspend with a single aiocb.
  151      *   3. Callers of aio_suspend with a list.
  152      *   4. AIO worker threads waiting for sequenced operations.
  153      * Types 1-3 are notified via atomics/futexes, mainly for AS-safety
  154      * considerations. Type 4 is notified later via a cond var. */
  155 
  156     cb->__ret = at->ret;
  157     if (a_swap(&at->running, 0) < 0)
  158         __wake(&at->running, -1, 1);
  159     if (a_swap(&cb->__err, at->err) != EINPROGRESS)
  160         __wake(&cb->__err, -1, 1);
  161     if (a_swap(&__aio_fut, 0))
  162         __wake(&__aio_fut, -1, 1);
  163 
  164     pthread_mutex_lock(&q->lock);
  165 
  166     if (at->next) at->next->prev = at->prev;
  167     if (at->prev) at->prev->next = at->next;
  168     else q->head = at->next;
  169 
  170     /* Signal aio worker threads waiting for sequenced operations. */
  171     pthread_cond_broadcast(&q->cond);
  172 
  173     __aio_unref_queue(q);
  174 
  175     if (sev.sigev_notify == SIGEV_SIGNAL) {
  176         siginfo_t si = {
  177             .si_signo = sev.sigev_signo,
  178             .si_value = sev.sigev_value,
  179             .si_code = SI_ASYNCIO,
  180             .si_pid = getpid(),
  181             .si_uid = getuid()
  182         };
  183         __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
  184     }
  185     if (sev.sigev_notify == SIGEV_THREAD) {
  186         a_store(&__pthread_self()->cancel, 0);
  187         sev.sigev_notify_function(sev.sigev_value);
  188     }
  189 }
  190 
  191 static void *io_thread_func(void *ctx)
  192 {
  193     struct aio_thread at, *p;
  194 
  195     struct aio_args *args = ctx;
  196     struct aiocb *cb = args->cb;
  197     int fd = cb->aio_fildes;
  198     int op = args->op;
  199     void *buf = (void *)cb->aio_buf;
  200     size_t len = cb->aio_nbytes;
  201     off_t off = cb->aio_offset;
  202 
  203     struct aio_queue *q = args->q;
  204     ssize_t ret;
  205 
  206     pthread_mutex_lock(&q->lock);
  207     sem_post(&args->sem);
  208 
  209     at.op = op;
  210     at.running = 1;
  211     at.ret = -1;
  212     at.err = ECANCELED;
  213     at.q = q;
  214     at.td = __pthread_self();
  215     at.cb = cb;
  216     at.prev = 0;
  217     if ((at.next = q->head)) at.next->prev = &at;
  218     q->head = &at;
  219 
  220     if (!q->init) {
  221         int seekable = lseek(fd, 0, SEEK_CUR) >= 0;
  222         q->seekable = seekable;
  223         q->append = !seekable || (fcntl(fd, F_GETFL) & O_APPEND);
  224         q->init = 1;
  225     }
  226 
  227     pthread_cleanup_push(cleanup, &at);
  228 
  229     /* Wait for sequenced operations. */
  230     if (op!=LIO_READ && (op!=LIO_WRITE || q->append)) {
  231         for (;;) {
  232             for (p=at.next; p && p->op!=LIO_WRITE; p=p->next);
  233             if (!p) break;
  234             pthread_cond_wait(&q->cond, &q->lock);
  235         }
  236     }
  237 
  238     pthread_mutex_unlock(&q->lock);
  239 
  240     switch (op) {
  241     case LIO_WRITE:
  242         ret = q->append ? write(fd, buf, len) : pwrite(fd, buf, len, off);
  243         break;
  244     case LIO_READ:
  245         ret = !q->seekable ? read(fd, buf, len) : pread(fd, buf, len, off);
  246         break;
  247     case O_SYNC:
  248         ret = fsync(fd);
  249         break;
  250     case O_DSYNC:
  251         ret = fdatasync(fd);
  252         break;
  253     }
  254     at.ret = ret;
  255     at.err = ret<0 ? errno : 0;
  256     
  257     pthread_cleanup_pop(1);
  258 
  259     return 0;
  260 }
  261 
  262 static size_t io_thread_stack_size = MINSIGSTKSZ+2048;
  263 static pthread_once_t init_stack_size_once;
  264 
  265 static void init_stack_size()
  266 {
  267     unsigned long val = __getauxval(AT_MINSIGSTKSZ);
  268     if (val > MINSIGSTKSZ) io_thread_stack_size = val + 512;
  269 }
  270 
  271 static int submit(struct aiocb *cb, int op)
  272 {
  273     int ret = 0;
  274     pthread_attr_t a;
  275     sigset_t allmask, origmask;
  276     pthread_t td;
  277     struct aio_queue *q = __aio_get_queue(cb->aio_fildes, 1);
  278     struct aio_args args = { .cb = cb, .op = op, .q = q };
  279     sem_init(&args.sem, 0, 0);
  280 
  281     if (!q) {
  282         if (errno != EBADF) errno = EAGAIN;
  283         cb->__ret = -1;
  284         cb->__err = errno;
  285         return -1;
  286     }
  287     q->ref++;
  288     pthread_mutex_unlock(&q->lock);
  289 
  290     if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
  291         if (cb->aio_sigevent.sigev_notify_attributes)
  292             a = *cb->aio_sigevent.sigev_notify_attributes;
  293         else
  294             pthread_attr_init(&a);
  295     } else {
  296         pthread_once(&init_stack_size_once, init_stack_size);
  297         pthread_attr_init(&a);
  298         pthread_attr_setstacksize(&a, io_thread_stack_size);
  299         pthread_attr_setguardsize(&a, 0);
  300     }
  301     pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
  302     sigfillset(&allmask);
  303     pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
  304     cb->__err = EINPROGRESS;
  305     if (pthread_create(&td, &a, io_thread_func, &args)) {
  306         pthread_mutex_lock(&q->lock);
  307         __aio_unref_queue(q);
  308         cb->__err = errno = EAGAIN;
  309         cb->__ret = ret = -1;
  310     }
  311     pthread_sigmask(SIG_SETMASK, &origmask, 0);
  312 
  313     if (!ret) {
  314         while (sem_wait(&args.sem));
  315     }
  316 
  317     return ret;
  318 }
  319 
  320 int aio_read(struct aiocb *cb)
  321 {
  322     return submit(cb, LIO_READ);
  323 }
  324 
  325 int aio_write(struct aiocb *cb)
  326 {
  327     return submit(cb, LIO_WRITE);
  328 }
  329 
  330 int aio_fsync(int op, struct aiocb *cb)
  331 {
  332     if (op != O_SYNC && op != O_DSYNC) {
  333         errno = EINVAL;
  334         return -1;
  335     }
  336     return submit(cb, op);
  337 }
  338 
  339 ssize_t aio_return(struct aiocb *cb)
  340 {
  341     return cb->__ret;
  342 }
  343 
  344 int aio_error(const struct aiocb *cb)
  345 {
  346     a_barrier();
  347     return cb->__err & 0x7fffffff;
  348 }
  349 
  350 int aio_cancel(int fd, struct aiocb *cb)
  351 {
  352     sigset_t allmask, origmask;
  353     int ret = AIO_ALLDONE;
  354     struct aio_thread *p;
  355     struct aio_queue *q;
  356 
  357     /* Unspecified behavior case. Report an error. */
  358     if (cb && fd != cb->aio_fildes) {
  359         errno = EINVAL;
  360         return -1;
  361     }
  362 
  363     sigfillset(&allmask);
  364     pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
  365 
  366     errno = ENOENT;
  367     if (!(q = __aio_get_queue(fd, 0))) {
  368         if (errno == EBADF) ret = -1;
  369         goto done;
  370     }
  371 
  372     for (p = q->head; p; p = p->next) {
  373         if (cb && cb != p->cb) continue;
  374         /* Transition target from running to running-with-waiters */
  375         if (a_cas(&p->running, 1, -1)) {
  376             pthread_cancel(p->td);
  377             __wait(&p->running, 0, -1, 1);
  378             if (p->err == ECANCELED) ret = AIO_CANCELED;
  379         }
  380     }
  381 
  382     pthread_mutex_unlock(&q->lock);
  383 done:
  384     pthread_sigmask(SIG_SETMASK, &origmask, 0);
  385     return ret;
  386 }
  387 
  388 int __aio_close(int fd)
  389 {
  390     a_barrier();
  391     if (aio_fd_cnt) aio_cancel(fd, 0);
  392     return fd;
  393 }
  394 
  395 weak_alias(aio_cancel, aio_cancel64);
  396 weak_alias(aio_error, aio_error64);
  397 weak_alias(aio_fsync, aio_fsync64);
  398 weak_alias(aio_read, aio_read64);
  399 weak_alias(aio_write, aio_write64);
  400 weak_alias(aio_return, aio_return64);