"Fossies" - the Fresh Open Source Software Archive

Member "wrk-4.2.0/src/ae.c" (7 Feb 2021, 15529 Bytes) of package /linux/www/wrk-4.2.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "ae.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 4.0.2_vs_4.1.0.

    1 /* A simple event-driven programming library. Originally I wrote this code
    2  * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
    3  * it in form of a library for easy reuse.
    4  *
    5  * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
    6  * All rights reserved.
    7  *
    8  * Redistribution and use in source and binary forms, with or without
    9  * modification, are permitted provided that the following conditions are met:
   10  *
   11  *   * Redistributions of source code must retain the above copyright notice,
   12  *     this list of conditions and the following disclaimer.
   13  *   * Redistributions in binary form must reproduce the above copyright
   14  *     notice, this list of conditions and the following disclaimer in the
   15  *     documentation and/or other materials provided with the distribution.
   16  *   * Neither the name of Redis nor the names of its contributors may be used
   17  *     to endorse or promote products derived from this software without
   18  *     specific prior written permission.
   19  *
   20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   30  * POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <stdio.h>
   34 #include <sys/time.h>
   35 #include <sys/types.h>
   36 #include <unistd.h>
   37 #include <stdlib.h>
   38 #include <poll.h>
   39 #include <string.h>
   40 #include <time.h>
   41 #include <errno.h>
   42 
   43 #include "ae.h"
   44 #include "zmalloc.h"
   45 #include "config.h"
   46 
   47 /* Include the best multiplexing layer supported by this system.
   48  * The following should be ordered by performances, descending. */
   49 #ifdef HAVE_EVPORT
   50 #include "ae_evport.c"
   51 #else
   52     #ifdef HAVE_EPOLL
   53     #include "ae_epoll.c"
   54     #else
   55         #ifdef HAVE_KQUEUE
   56         #include "ae_kqueue.c"
   57         #else
   58         #include "ae_select.c"
   59         #endif
   60     #endif
   61 #endif
   62 
   63 aeEventLoop *aeCreateEventLoop(int setsize) {
   64     aeEventLoop *eventLoop;
   65     int i;
   66 
   67     if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
   68     eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
   69     eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
   70     if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
   71     eventLoop->setsize = setsize;
   72     eventLoop->lastTime = time(NULL);
   73     eventLoop->timeEventHead = NULL;
   74     eventLoop->timeEventNextId = 0;
   75     eventLoop->stop = 0;
   76     eventLoop->maxfd = -1;
   77     eventLoop->beforesleep = NULL;
   78     if (aeApiCreate(eventLoop) == -1) goto err;
   79     /* Events with mask == AE_NONE are not set. So let's initialize the
   80      * vector with it. */
   81     for (i = 0; i < setsize; i++)
   82         eventLoop->events[i].mask = AE_NONE;
   83     return eventLoop;
   84 
   85 err:
   86     if (eventLoop) {
   87         zfree(eventLoop->events);
   88         zfree(eventLoop->fired);
   89         zfree(eventLoop);
   90     }
   91     return NULL;
   92 }
   93 
   94 /* Return the current set size. */
   95 int aeGetSetSize(aeEventLoop *eventLoop) {
   96     return eventLoop->setsize;
   97 }
   98 
   99 /* Resize the maximum set size of the event loop.
  100  * If the requested set size is smaller than the current set size, but
  101  * there is already a file descriptor in use that is >= the requested
  102  * set size minus one, AE_ERR is returned and the operation is not
  103  * performed at all.
  104  *
  105  * Otherwise AE_OK is returned and the operation is successful. */
  106 int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
  107     int i;
  108 
  109     if (setsize == eventLoop->setsize) return AE_OK;
  110     if (eventLoop->maxfd >= setsize) return AE_ERR;
  111     if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
  112 
  113     eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
  114     eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
  115     eventLoop->setsize = setsize;
  116 
  117     /* Make sure that if we created new slots, they are initialized with
  118      * an AE_NONE mask. */
  119     for (i = eventLoop->maxfd+1; i < setsize; i++)
  120         eventLoop->events[i].mask = AE_NONE;
  121     return AE_OK;
  122 }
  123 
  124 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
  125     aeApiFree(eventLoop);
  126     zfree(eventLoop->events);
  127     zfree(eventLoop->fired);
  128     zfree(eventLoop);
  129 }
  130 
  131 void aeStop(aeEventLoop *eventLoop) {
  132     eventLoop->stop = 1;
  133 }
  134 
  135 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
  136         aeFileProc *proc, void *clientData)
  137 {
  138     if (fd >= eventLoop->setsize) {
  139         errno = ERANGE;
  140         return AE_ERR;
  141     }
  142     aeFileEvent *fe = &eventLoop->events[fd];
  143 
  144     if (aeApiAddEvent(eventLoop, fd, mask) == -1)
  145         return AE_ERR;
  146     fe->mask |= mask;
  147     if (mask & AE_READABLE) fe->rfileProc = proc;
  148     if (mask & AE_WRITABLE) fe->wfileProc = proc;
  149     fe->clientData = clientData;
  150     if (fd > eventLoop->maxfd)
  151         eventLoop->maxfd = fd;
  152     return AE_OK;
  153 }
  154 
  155 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
  156 {
  157     if (fd >= eventLoop->setsize) return;
  158     aeFileEvent *fe = &eventLoop->events[fd];
  159     if (fe->mask == AE_NONE) return;
  160 
  161     aeApiDelEvent(eventLoop, fd, mask);
  162     fe->mask = fe->mask & (~mask);
  163     if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
  164         /* Update the max fd */
  165         int j;
  166 
  167         for (j = eventLoop->maxfd-1; j >= 0; j--)
  168             if (eventLoop->events[j].mask != AE_NONE) break;
  169         eventLoop->maxfd = j;
  170     }
  171 }
  172 
  173 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
  174     if (fd >= eventLoop->setsize) return 0;
  175     aeFileEvent *fe = &eventLoop->events[fd];
  176 
  177     return fe->mask;
  178 }
  179 
  180 static void aeGetTime(long *seconds, long *milliseconds)
  181 {
  182     struct timeval tv;
  183 
  184     gettimeofday(&tv, NULL);
  185     *seconds = tv.tv_sec;
  186     *milliseconds = tv.tv_usec/1000;
  187 }
  188 
  189 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
  190     long cur_sec, cur_ms, when_sec, when_ms;
  191 
  192     aeGetTime(&cur_sec, &cur_ms);
  193     when_sec = cur_sec + milliseconds/1000;
  194     when_ms = cur_ms + milliseconds%1000;
  195     if (when_ms >= 1000) {
  196         when_sec ++;
  197         when_ms -= 1000;
  198     }
  199     *sec = when_sec;
  200     *ms = when_ms;
  201 }
  202 
  203 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
  204         aeTimeProc *proc, void *clientData,
  205         aeEventFinalizerProc *finalizerProc)
  206 {
  207     long long id = eventLoop->timeEventNextId++;
  208     aeTimeEvent *te;
  209 
  210     te = zmalloc(sizeof(*te));
  211     if (te == NULL) return AE_ERR;
  212     te->id = id;
  213     aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
  214     te->timeProc = proc;
  215     te->finalizerProc = finalizerProc;
  216     te->clientData = clientData;
  217     te->next = eventLoop->timeEventHead;
  218     eventLoop->timeEventHead = te;
  219     return id;
  220 }
  221 
  222 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
  223 {
  224     aeTimeEvent *te = eventLoop->timeEventHead;
  225     while(te) {
  226         if (te->id == id) {
  227             te->id = AE_DELETED_EVENT_ID;
  228             return AE_OK;
  229         }
  230         te = te->next;
  231     }
  232     return AE_ERR; /* NO event with the specified ID found */
  233 }
  234 
  235 /* Search the first timer to fire.
  236  * This operation is useful to know how many time the select can be
  237  * put in sleep without to delay any event.
  238  * If there are no timers NULL is returned.
  239  *
  240  * Note that's O(N) since time events are unsorted.
  241  * Possible optimizations (not needed by Redis so far, but...):
  242  * 1) Insert the event in order, so that the nearest is just the head.
  243  *    Much better but still insertion or deletion of timers is O(N).
  244  * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
  245  */
  246 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
  247 {
  248     aeTimeEvent *te = eventLoop->timeEventHead;
  249     aeTimeEvent *nearest = NULL;
  250 
  251     while(te) {
  252         if (!nearest || te->when_sec < nearest->when_sec ||
  253                 (te->when_sec == nearest->when_sec &&
  254                  te->when_ms < nearest->when_ms))
  255             nearest = te;
  256         te = te->next;
  257     }
  258     return nearest;
  259 }
  260 
  261 /* Process time events */
  262 static int processTimeEvents(aeEventLoop *eventLoop) {
  263     int processed = 0;
  264     aeTimeEvent *te, *prev;
  265     long long maxId;
  266     time_t now = time(NULL);
  267 
  268     /* If the system clock is moved to the future, and then set back to the
  269      * right value, time events may be delayed in a random way. Often this
  270      * means that scheduled operations will not be performed soon enough.
  271      *
  272      * Here we try to detect system clock skews, and force all the time
  273      * events to be processed ASAP when this happens: the idea is that
  274      * processing events earlier is less dangerous than delaying them
  275      * indefinitely, and practice suggests it is. */
  276     if (now < eventLoop->lastTime) {
  277         te = eventLoop->timeEventHead;
  278         while(te) {
  279             te->when_sec = 0;
  280             te = te->next;
  281         }
  282     }
  283     eventLoop->lastTime = now;
  284 
  285     prev = NULL;
  286     te = eventLoop->timeEventHead;
  287     maxId = eventLoop->timeEventNextId-1;
  288     while(te) {
  289         long now_sec, now_ms;
  290         long long id;
  291 
  292         /* Remove events scheduled for deletion. */
  293         if (te->id == AE_DELETED_EVENT_ID) {
  294             aeTimeEvent *next = te->next;
  295             if (prev == NULL)
  296                 eventLoop->timeEventHead = te->next;
  297             else
  298                 prev->next = te->next;
  299             if (te->finalizerProc)
  300                 te->finalizerProc(eventLoop, te->clientData);
  301             zfree(te);
  302             te = next;
  303             continue;
  304         }
  305 
  306         /* Make sure we don't process time events created by time events in
  307          * this iteration. Note that this check is currently useless: we always
  308          * add new timers on the head, however if we change the implementation
  309          * detail, this check may be useful again: we keep it here for future
  310          * defense. */
  311         if (te->id > maxId) {
  312             te = te->next;
  313             continue;
  314         }
  315         aeGetTime(&now_sec, &now_ms);
  316         if (now_sec > te->when_sec ||
  317             (now_sec == te->when_sec && now_ms >= te->when_ms))
  318         {
  319             int retval;
  320 
  321             id = te->id;
  322             retval = te->timeProc(eventLoop, id, te->clientData);
  323             processed++;
  324             if (retval != AE_NOMORE) {
  325                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
  326             } else {
  327                 te->id = AE_DELETED_EVENT_ID;
  328             }
  329         }
  330         prev = te;
  331         te = te->next;
  332     }
  333     return processed;
  334 }
  335 
  336 /* Process every pending time event, then every pending file event
  337  * (that may be registered by time event callbacks just processed).
  338  * Without special flags the function sleeps until some file event
  339  * fires, or when the next time event occurs (if any).
  340  *
  341  * If flags is 0, the function does nothing and returns.
  342  * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
  343  * if flags has AE_FILE_EVENTS set, file events are processed.
  344  * if flags has AE_TIME_EVENTS set, time events are processed.
  345  * if flags has AE_DONT_WAIT set the function returns ASAP until all
  346  * the events that's possible to process without to wait are processed.
  347  *
  348  * The function returns the number of events processed. */
  349 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
  350 {
  351     int processed = 0, numevents;
  352 
  353     /* Nothing to do? return ASAP */
  354     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
  355 
  356     /* Note that we want call select() even if there are no
  357      * file events to process as long as we want to process time
  358      * events, in order to sleep until the next time event is ready
  359      * to fire. */
  360     if (eventLoop->maxfd != -1 ||
  361         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
  362         int j;
  363         aeTimeEvent *shortest = NULL;
  364         struct timeval tv, *tvp;
  365 
  366         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
  367             shortest = aeSearchNearestTimer(eventLoop);
  368         if (shortest) {
  369             long now_sec, now_ms;
  370 
  371             aeGetTime(&now_sec, &now_ms);
  372             tvp = &tv;
  373 
  374             /* How many milliseconds we need to wait for the next
  375              * time event to fire? */
  376             long long ms =
  377                 (shortest->when_sec - now_sec)*1000 +
  378                 shortest->when_ms - now_ms;
  379 
  380             if (ms > 0) {
  381                 tvp->tv_sec = ms/1000;
  382                 tvp->tv_usec = (ms % 1000)*1000;
  383             } else {
  384                 tvp->tv_sec = 0;
  385                 tvp->tv_usec = 0;
  386             }
  387         } else {
  388             /* If we have to check for events but need to return
  389              * ASAP because of AE_DONT_WAIT we need to set the timeout
  390              * to zero */
  391             if (flags & AE_DONT_WAIT) {
  392                 tv.tv_sec = tv.tv_usec = 0;
  393                 tvp = &tv;
  394             } else {
  395                 /* Otherwise we can block */
  396                 tvp = NULL; /* wait forever */
  397             }
  398         }
  399 
  400         numevents = aeApiPoll(eventLoop, tvp);
  401         for (j = 0; j < numevents; j++) {
  402             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
  403             int mask = eventLoop->fired[j].mask;
  404             int fd = eventLoop->fired[j].fd;
  405             int rfired = 0;
  406 
  407         /* note the fe->mask & mask & ... code: maybe an already processed
  408              * event removed an element that fired and we still didn't
  409              * processed, so we check if the event is still valid. */
  410             if (fe->mask & mask & AE_READABLE) {
  411                 rfired = 1;
  412                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
  413             }
  414             if (fe->mask & mask & AE_WRITABLE) {
  415                 if (!rfired || fe->wfileProc != fe->rfileProc)
  416                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
  417             }
  418             processed++;
  419         }
  420     }
  421     /* Check time events */
  422     if (flags & AE_TIME_EVENTS)
  423         processed += processTimeEvents(eventLoop);
  424 
  425     return processed; /* return the number of processed file/time events */
  426 }
  427 
  428 /* Wait for milliseconds until the given file descriptor becomes
  429  * writable/readable/exception */
  430 int aeWait(int fd, int mask, long long milliseconds) {
  431     struct pollfd pfd;
  432     int retmask = 0, retval;
  433 
  434     memset(&pfd, 0, sizeof(pfd));
  435     pfd.fd = fd;
  436     if (mask & AE_READABLE) pfd.events |= POLLIN;
  437     if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
  438 
  439     if ((retval = poll(&pfd, 1, milliseconds))== 1) {
  440         if (pfd.revents & POLLIN) retmask |= AE_READABLE;
  441         if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
  442     if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
  443         if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
  444         return retmask;
  445     } else {
  446         return retval;
  447     }
  448 }
  449 
  450 void aeMain(aeEventLoop *eventLoop) {
  451     eventLoop->stop = 0;
  452     while (!eventLoop->stop) {
  453         if (eventLoop->beforesleep != NULL)
  454             eventLoop->beforesleep(eventLoop);
  455         aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  456     }
  457 }
  458 
  459 char *aeGetApiName(void) {
  460     return aeApiName();
  461 }
  462 
  463 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
  464     eventLoop->beforesleep = beforesleep;
  465 }