"Fossies" - the Fresh Open Source Software Archive

Member "nanomsg-1.1.5/src/aio/usock_posix.inc" (15 Oct 2018, 42742 Bytes) of package /linux/misc/nanomsg-1.1.5.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) fasm source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "usock_posix.inc" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.1.4_vs_1.1.5.

    1 /*
    2     Copyright (c) 2013 Martin Sustrik  All rights reserved.
    3     Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
    4     Copyright 2017 Garrett D'Amore <garrett@damore.org>
    5 
    6     Permission is hereby granted, free of charge, to any person obtaining a copy
    7     of this software and associated documentation files (the "Software"),
    8     to deal in the Software without restriction, including without limitation
    9     the rights to use, copy, modify, merge, publish, distribute, sublicense,
   10     and/or sell copies of the Software, and to permit persons to whom
   11     the Software is furnished to do so, subject to the following conditions:
   12 
   13     The above copyright notice and this permission notice shall be included
   14     in all copies or substantial portions of the Software.
   15 
   16     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   17     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
   18     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
   19     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
   20     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
   21     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
   22     IN THE SOFTWARE.
   23 */
   24 
   25 #include "../utils/alloc.h"
   26 #include "../utils/closefd.h"
   27 #include "../utils/cont.h"
   28 #include "../utils/fast.h"
   29 #include "../utils/err.h"
   30 #include "../utils/attr.h"
   31 
   32 #include <string.h>
   33 #include <unistd.h>
   34 #include <fcntl.h>
   35 #include <sys/uio.h>
   36 
   37 #define NN_USOCK_STATE_IDLE 1
   38 #define NN_USOCK_STATE_STARTING 2
   39 #define NN_USOCK_STATE_BEING_ACCEPTED 3
   40 #define NN_USOCK_STATE_ACCEPTED 4
   41 #define NN_USOCK_STATE_CONNECTING 5
   42 #define NN_USOCK_STATE_ACTIVE 6
   43 #define NN_USOCK_STATE_REMOVING_FD 7
   44 #define NN_USOCK_STATE_DONE 8
   45 #define NN_USOCK_STATE_LISTENING 9
   46 #define NN_USOCK_STATE_ACCEPTING 10
   47 #define NN_USOCK_STATE_CANCELLING 11
   48 #define NN_USOCK_STATE_STOPPING 12
   49 #define NN_USOCK_STATE_STOPPING_ACCEPT 13
   50 #define NN_USOCK_STATE_ACCEPTING_ERROR 14
   51 
   52 #define NN_USOCK_ACTION_ACCEPT 1
   53 #define NN_USOCK_ACTION_BEING_ACCEPTED 2
   54 #define NN_USOCK_ACTION_CANCEL 3
   55 #define NN_USOCK_ACTION_LISTEN 4
   56 #define NN_USOCK_ACTION_CONNECT 5
   57 #define NN_USOCK_ACTION_ACTIVATE 6
   58 #define NN_USOCK_ACTION_DONE 7
   59 #define NN_USOCK_ACTION_ERROR 8
   60 #define NN_USOCK_ACTION_STARTED 9
   61 
   62 #define NN_USOCK_SRC_FD 1
   63 #define NN_USOCK_SRC_TASK_CONNECTING 2
   64 #define NN_USOCK_SRC_TASK_CONNECTED 3
   65 #define NN_USOCK_SRC_TASK_ACCEPT 4
   66 #define NN_USOCK_SRC_TASK_SEND 5
   67 #define NN_USOCK_SRC_TASK_RECV 6
   68 #define NN_USOCK_SRC_TASK_STOP 7
   69 
   70 /*  Private functions. */
   71 static void nn_usock_init_from_fd (struct nn_usock *self, int s);
   72 static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr);
   73 static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len);
   74 static int nn_usock_geterr (struct nn_usock *self);
   75 static void nn_usock_handler (struct nn_fsm *self, int src, int type,
   76     void *srcptr);
   77 static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
   78     void *srcptr);
   79 
   80 void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner)
   81 {
   82     /*  Initalise the state machine. */
   83     nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown,
   84         src, self, owner);
   85     self->state = NN_USOCK_STATE_IDLE;
   86 
   87     /*  Choose a worker thread to handle this socket. */
   88     self->worker = nn_fsm_choose_worker (&self->fsm);
   89 
   90     /*  Actual file descriptor will be generated during 'start' step. */
   91     self->s = -1;
   92     self->errnum = 0;
   93 
   94     self->in.buf = NULL;
   95     self->in.len = 0;
   96     self->in.batch = NULL;
   97     self->in.batch_len = 0;
   98     self->in.batch_pos = 0;
   99     self->in.pfd = NULL;
  100 
  101     memset (&self->out.hdr, 0, sizeof (struct msghdr));
  102 
  103     /*  Initialise tasks for the worker thread. */
  104     nn_worker_fd_init (&self->wfd, NN_USOCK_SRC_FD, &self->fsm);
  105     nn_worker_task_init (&self->task_connecting, NN_USOCK_SRC_TASK_CONNECTING,
  106         &self->fsm);
  107     nn_worker_task_init (&self->task_connected, NN_USOCK_SRC_TASK_CONNECTED,
  108         &self->fsm);
  109     nn_worker_task_init (&self->task_accept, NN_USOCK_SRC_TASK_ACCEPT,
  110         &self->fsm);
  111     nn_worker_task_init (&self->task_send, NN_USOCK_SRC_TASK_SEND, &self->fsm);
  112     nn_worker_task_init (&self->task_recv, NN_USOCK_SRC_TASK_RECV, &self->fsm);
  113     nn_worker_task_init (&self->task_stop, NN_USOCK_SRC_TASK_STOP, &self->fsm);
  114 
  115     /*  Intialise events raised by usock. */
  116     nn_fsm_event_init (&self->event_established);
  117     nn_fsm_event_init (&self->event_sent);
  118     nn_fsm_event_init (&self->event_received);
  119     nn_fsm_event_init (&self->event_error);
  120 
  121     /*  accepting is not going on at the moment. */
  122     self->asock = NULL;
  123 }
  124 
  125 void nn_usock_term (struct nn_usock *self)
  126 {
  127     nn_assert_state (self, NN_USOCK_STATE_IDLE);
  128 
  129     if (self->in.batch)
  130         nn_free (self->in.batch);
  131 
  132     nn_fsm_event_term (&self->event_error);
  133     nn_fsm_event_term (&self->event_received);
  134     nn_fsm_event_term (&self->event_sent);
  135     nn_fsm_event_term (&self->event_established);
  136 
  137     nn_worker_cancel (self->worker, &self->task_stop);
  138     nn_worker_cancel (self->worker, &self->task_recv);
  139     nn_worker_cancel (self->worker, &self->task_send);
  140     nn_worker_cancel (self->worker, &self->task_accept);
  141     nn_worker_cancel (self->worker, &self->task_connected);
  142     nn_worker_cancel (self->worker, &self->task_connecting);
  143 
  144     nn_worker_task_term (&self->task_stop);
  145     nn_worker_task_term (&self->task_recv);
  146     nn_worker_task_term (&self->task_send);
  147     nn_worker_task_term (&self->task_accept);
  148     nn_worker_task_term (&self->task_connected);
  149     nn_worker_task_term (&self->task_connecting);
  150     nn_worker_fd_term (&self->wfd);
  151 
  152     nn_fsm_term (&self->fsm);
  153 }
  154 
  155 int nn_usock_isidle (struct nn_usock *self)
  156 {
  157     return nn_fsm_isidle (&self->fsm);
  158 }
  159 
  160 int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol)
  161 {
  162     int s;
  163 
  164     /*  If the operating system allows to directly open the socket with CLOEXEC
  165         flag, do so. That way there are no race conditions. */
  166 #ifdef SOCK_CLOEXEC
  167     type |= SOCK_CLOEXEC;
  168 #endif
  169 
  170     /* Open the underlying socket. */
  171     s = socket (domain, type, protocol);
  172     if (nn_slow (s < 0))
  173        return -errno;
  174 
  175     nn_usock_init_from_fd (self, s);
  176 
  177     /*  Start the state machine. */
  178     nn_fsm_start (&self->fsm);
  179 
  180     return 0;
  181 }
  182 
  183 void nn_usock_start_fd (struct nn_usock *self, int fd)
  184 {
  185     nn_usock_init_from_fd (self, fd);
  186     nn_fsm_start (&self->fsm);
  187     nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED);
  188 }
  189 
  190 static void nn_usock_init_from_fd (struct nn_usock *self, int s)
  191 {
  192     int rc;
  193     int opt;
  194 
  195     nn_assert (self->state == NN_USOCK_STATE_IDLE ||
  196         self->state == NN_USOCK_STATE_BEING_ACCEPTED);
  197 
  198     /*  Store the file descriptor. */
  199     nn_assert (self->s == -1);
  200     self->s = s;
  201 
  202     /* Setting FD_CLOEXEC option immediately after socket creation is the
  203         second best option after using SOCK_CLOEXEC. There is a race condition
  204         here (if process is forked between socket creation and setting
  205         the option) but the problem is pretty unlikely to happen. */
  206 #if defined FD_CLOEXEC
  207     rc = fcntl (self->s, F_SETFD, FD_CLOEXEC);
  208 #if defined NN_HAVE_OSX
  209     errno_assert (rc != -1 || errno == EINVAL);
  210 #else
  211     errno_assert (rc != -1);
  212 #endif
  213 #endif
  214 
  215     /* If applicable, prevent SIGPIPE signal when writing to the connection
  216         already closed by the peer. */
  217 #ifdef SO_NOSIGPIPE
  218     opt = 1;
  219     rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
  220 #if defined NN_HAVE_OSX
  221     errno_assert (rc == 0 || errno == EINVAL);
  222 #else
  223     errno_assert (rc == 0);
  224 #endif
  225 #endif
  226 
  227     /* Switch the socket to the non-blocking mode. All underlying sockets
  228         are always used in the callbackhronous mode. */
  229     opt = fcntl (self->s, F_GETFL, 0);
  230     if (opt == -1)
  231         opt = 0;
  232     if (!(opt & O_NONBLOCK)) {
  233         rc = fcntl (self->s, F_SETFL, opt | O_NONBLOCK);
  234 #if defined NN_HAVE_OSX
  235         errno_assert (rc != -1 || errno == EINVAL);
  236 #else
  237         errno_assert (rc != -1);
  238 #endif
  239     }
  240 }
  241 
  242 void nn_usock_stop (struct nn_usock *self)
  243 {
  244     nn_fsm_stop (&self->fsm);
  245 }
  246 
  247 void nn_usock_async_stop (struct nn_usock *self)
  248 {
  249     nn_worker_execute (self->worker, &self->task_stop);
  250     nn_fsm_raise (&self->fsm, &self->event_error, NN_USOCK_SHUTDOWN);
  251 }
  252 
  253 void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner)
  254 {
  255     nn_fsm_swap_owner (&self->fsm, owner);
  256 }
  257 
  258 int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
  259     const void *optval, size_t optlen)
  260 {
  261     int rc;
  262 
  263     /*  The socket can be modified only before it's active. */
  264     nn_assert (self->state == NN_USOCK_STATE_STARTING ||
  265         self->state == NN_USOCK_STATE_ACCEPTED);
  266 
  267     /*  EINVAL errors are ignored on OSX platform. The reason for that is buggy
  268         OSX behaviour where setsockopt returns EINVAL if the peer have already
  269         disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though
  270         the option value was invalid, but the peer have already closed the
  271         connection. This behaviour should be relatively harmless. */
  272     rc = setsockopt (self->s, level, optname, optval, (socklen_t) optlen);
  273 #if defined NN_HAVE_OSX
  274     if (nn_slow (rc != 0 && errno != EINVAL))
  275         return -errno;
  276 #else
  277     if (nn_slow (rc != 0))
  278         return -errno;
  279 #endif
  280 
  281     return 0;
  282 }
  283 
  284 int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr,
  285     size_t addrlen)
  286 {
  287     int rc;
  288     int opt;
  289 
  290     /*  The socket can be bound only before it's connected. */
  291     nn_assert_state (self, NN_USOCK_STATE_STARTING);
  292 
  293     /*  Windows Subsystem for Linux - SO_REUSEADDR is different,
  294         and the Windows semantics are very wrong for us. */
  295 #ifndef NN_HAVE_WSL
  296     /*  Allow re-using the address. */
  297     opt = 1;
  298     rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
  299     errno_assert (rc == 0);
  300 #endif
  301 
  302     rc = bind (self->s, addr, (socklen_t) addrlen);
  303     if (nn_slow (rc != 0))
  304         return -errno;
  305 
  306     return 0;
  307 }
  308 
  309 int nn_usock_listen (struct nn_usock *self, int backlog)
  310 {
  311     int rc;
  312 
  313     /*  You can start listening only before the socket is connected. */
  314     nn_assert_state (self, NN_USOCK_STATE_STARTING);
  315 
  316     /*  Start listening for incoming connections. */
  317     rc = listen (self->s, backlog);
  318     if (nn_slow (rc != 0))
  319         return -errno;
  320 
  321     /*  Notify the state machine. */
  322     nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN);
  323 
  324     return 0;
  325 }
  326 
  327 void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener)
  328 {
  329     int s;
  330 
  331     /*  Start the actual accepting. */
  332     if (nn_fsm_isidle(&self->fsm)) {
  333         nn_fsm_start (&self->fsm);
  334         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED);
  335     }
  336     nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT);
  337 
  338     /*  Try to accept new connection in synchronous manner. */
  339 #if NN_HAVE_ACCEPT4
  340     s = accept4 (listener->s, NULL, NULL, SOCK_CLOEXEC);
  341     if ((s < 0) && (errno == ENOTSUP)) {
  342         /*  Apparently some old versions of Linux have a stub for this in libc,
  343             without any of the underlying kernel support. */
  344         s = accept (listener->s, NULL, NULL);
  345     }
  346 #else
  347     s = accept (listener->s, NULL, NULL);
  348 #endif
  349 
  350     /*  Immediate success. */
  351     if (nn_fast (s >= 0)) {
  352         /*  Disassociate the listener socket from the accepted
  353             socket. Is useful if we restart accepting on ACCEPT_ERROR  */
  354         listener->asock = NULL;
  355         self->asock = NULL;
  356 
  357         nn_usock_init_from_fd (self, s);
  358         nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE);
  359         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
  360         return;
  361     }
  362 
  363     /*  Detect a failure. Note that in ECONNABORTED case we simply ignore
  364         the error and wait for next connection in asynchronous manner. */
  365     errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
  366         errno == ECONNABORTED || errno == ENFILE || errno == EMFILE ||
  367         errno == ENOBUFS || errno == ENOMEM);
  368 
  369     /*  Pair the two sockets.  They are already paired in case
  370         previous attempt failed on ACCEPT_ERROR  */
  371     nn_assert (!self->asock || self->asock == listener);
  372     self->asock = listener;
  373     nn_assert (!listener->asock || listener->asock == self);
  374     listener->asock = self;
  375 
  376     /*  Some errors are just ok to ignore for now.  We also stop repeating
  377         any errors until next IN_FD event so that we are not in a tight loop
  378         and allow processing other events in the meantime  */
  379     if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK
  380         && errno != ECONNABORTED && errno != listener->errnum))
  381     {
  382         listener->errnum = errno;
  383         listener->state = NN_USOCK_STATE_ACCEPTING_ERROR;
  384         nn_fsm_raise (&listener->fsm,
  385             &listener->event_error, NN_USOCK_ACCEPT_ERROR);
  386         return;
  387     }
  388 
  389     /*  Ask the worker thread to wait for the new connection. */
  390     nn_worker_execute (listener->worker, &listener->task_accept);
  391 }
  392 
  393 void nn_usock_activate (struct nn_usock *self)
  394 {
  395     nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE);
  396 }
  397 
  398 void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
  399     size_t addrlen)
  400 {
  401     int rc;
  402 
  403     /*  Notify the state machine that we've started connecting. */
  404     nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT);
  405 
  406     /* Do the connect itself. */
  407     rc = connect (self->s, addr, (socklen_t) addrlen);
  408 
  409     /* Immediate success. */
  410     if (nn_fast (rc == 0)) {
  411         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
  412         return;
  413     }
  414 
  415     /*  Immediate error. */
  416     if (nn_slow (errno != EINPROGRESS)) {
  417         self->errnum = errno;
  418         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  419         return;
  420     }
  421 
  422     /*  Start asynchronous connect. */
  423     nn_worker_execute (self->worker, &self->task_connecting);
  424 }
  425 
  426 void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
  427     int iovcnt)
  428 {
  429     int rc;
  430     int i;
  431     int out;
  432 
  433     /*  Make sure that the socket is actually alive. */
  434     if (self->state != NN_USOCK_STATE_ACTIVE) {
  435         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  436         return;
  437     }
  438 
  439     /*  Copy the iovecs to the socket. */
  440     nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT);
  441     self->out.hdr.msg_iov = self->out.iov;
  442     out = 0;
  443     for (i = 0; i != iovcnt; ++i) {
  444         if (iov [i].iov_len == 0)
  445             continue;
  446         self->out.iov [out].iov_base = iov [i].iov_base;
  447         self->out.iov [out].iov_len = iov [i].iov_len;
  448         out++;
  449     }
  450     self->out.hdr.msg_iovlen = out;
  451 
  452     /*  Try to send the data immediately. */
  453     rc = nn_usock_send_raw (self, &self->out.hdr);
  454 
  455     /*  Success. */
  456     if (nn_fast (rc == 0)) {
  457         nn_fsm_raise (&self->fsm, &self->event_sent, NN_USOCK_SENT);
  458         return;
  459     }
  460 
  461     /*  Errors. */
  462     if (nn_slow (rc != -EAGAIN)) {
  463         errnum_assert (rc == -ECONNRESET, -rc);
  464         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  465         return;
  466     }
  467 
  468     /*  Ask the worker thread to send the remaining data. */
  469     nn_worker_execute (self->worker, &self->task_send);
  470 }
  471 
  472 void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
  473 {
  474     int rc;
  475     size_t nbytes;
  476 
  477     /*  Make sure that the socket is actually alive. */
  478     if (self->state != NN_USOCK_STATE_ACTIVE) {
  479         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  480         return;
  481     }
  482 
  483     /*  Try to receive the data immediately. */
  484     nbytes = len;
  485     self->in.pfd = fd;
  486     rc = nn_usock_recv_raw (self, buf, &nbytes);
  487     if (nn_slow (rc < 0)) {
  488         errnum_assert (rc == -ECONNRESET, -rc);
  489         nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  490         return;
  491     }
  492 
  493     /*  Success. */
  494     if (nn_fast (nbytes == len)) {
  495         nn_fsm_raise (&self->fsm, &self->event_received, NN_USOCK_RECEIVED);
  496         return;
  497     }
  498 
  499     /*  There are still data to receive in the background. */
  500     self->in.buf = ((uint8_t*) buf) + nbytes;
  501     self->in.len = len - nbytes;
  502 
  503     /*  Ask the worker thread to receive the remaining data. */
  504     nn_worker_execute (self->worker, &self->task_recv);
  505 }
  506 
  507 static int nn_internal_tasks (struct nn_usock *usock, int src, int type)
  508 {
  509 
  510 /******************************************************************************/
  511 /*  Internal tasks sent from the user thread to the worker thread.            */
  512 /******************************************************************************/
  513     switch (src) {
  514     case NN_USOCK_SRC_TASK_SEND:
  515         nn_assert (type == NN_WORKER_TASK_EXECUTE);
  516         nn_worker_set_out (usock->worker, &usock->wfd);
  517         return 1;
  518     case NN_USOCK_SRC_TASK_RECV:
  519         nn_assert (type == NN_WORKER_TASK_EXECUTE);
  520         nn_worker_set_in (usock->worker, &usock->wfd);
  521         return 1;
  522     case NN_USOCK_SRC_TASK_CONNECTED:
  523         nn_assert (type == NN_WORKER_TASK_EXECUTE);
  524         nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  525         return 1;
  526     case NN_USOCK_SRC_TASK_CONNECTING:
  527         nn_assert (type == NN_WORKER_TASK_EXECUTE);
  528         nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  529         nn_worker_set_out (usock->worker, &usock->wfd);
  530         return 1;
  531     case NN_USOCK_SRC_TASK_ACCEPT:
  532         nn_assert (type == NN_WORKER_TASK_EXECUTE);
  533         nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  534         nn_worker_set_in (usock->worker, &usock->wfd);
  535         return 1;
  536     }
  537 
  538     return 0;
  539 }
  540 
  541 static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
  542     NN_UNUSED void *srcptr)
  543 {
  544     struct nn_usock *usock;
  545 
  546     usock = nn_cont (self, struct nn_usock, fsm);
  547 
  548     if (nn_internal_tasks (usock, src, type))
  549         return;
  550 
  551     if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
  552 
  553         /*  Socket in ACCEPTING or CANCELLING state cannot be closed.
  554             Stop the socket being accepted first. */
  555         nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING &&
  556             usock->state != NN_USOCK_STATE_CANCELLING);
  557 
  558         usock->errnum = 0;
  559 
  560         /*  Synchronous stop. */
  561         if (usock->state == NN_USOCK_STATE_IDLE)
  562             goto finish3;
  563         if (usock->state == NN_USOCK_STATE_DONE)
  564             goto finish2;
  565         if (usock->state == NN_USOCK_STATE_STARTING ||
  566               usock->state == NN_USOCK_STATE_ACCEPTED ||
  567               usock->state == NN_USOCK_STATE_ACCEPTING_ERROR ||
  568               usock->state == NN_USOCK_STATE_LISTENING)
  569             goto finish1;
  570 
  571         /*  When socket that's being accepted is asked to stop, we have to
  572             ask the listener socket to stop accepting first. */
  573         if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) {
  574             nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL);
  575             usock->state = NN_USOCK_STATE_STOPPING_ACCEPT;
  576             return;
  577         }
  578 
  579         /*  Asynchronous stop. */
  580         if (usock->state != NN_USOCK_STATE_REMOVING_FD)
  581             nn_usock_async_stop (usock);
  582         usock->state = NN_USOCK_STATE_STOPPING;
  583         return;
  584     }
  585     if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) {
  586         nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE);
  587         goto finish2;
  588     }
  589     if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) {
  590         if (src != NN_USOCK_SRC_TASK_STOP)
  591             return;
  592         nn_assert (type == NN_WORKER_TASK_EXECUTE);
  593         nn_worker_rm_fd (usock->worker, &usock->wfd);
  594 finish1:
  595         nn_closefd (usock->s);
  596         usock->s = -1;
  597 finish2:
  598         usock->state = NN_USOCK_STATE_IDLE;
  599         nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED);
  600 finish3:
  601         return;
  602     }
  603 
  604     nn_fsm_bad_state(usock->state, src, type);
  605 }
  606 
  607 static void nn_usock_handler (struct nn_fsm *self, int src, int type,
  608     NN_UNUSED void *srcptr)
  609 {
  610     int rc;
  611     struct nn_usock *usock;
  612     int s;
  613     size_t sz;
  614     int sockerr;
  615 
  616     usock = nn_cont (self, struct nn_usock, fsm);
  617 
  618     if(nn_internal_tasks(usock, src, type))
  619         return;
  620 
  621     switch (usock->state) {
  622 
  623 /******************************************************************************/
  624 /*  IDLE state.                                                               */
  625 /*  nn_usock object is initialised, but underlying OS socket is not yet       */
  626 /*  created.                                                                  */
  627 /******************************************************************************/
  628     case NN_USOCK_STATE_IDLE:
  629         switch (src) {
  630         case NN_FSM_ACTION:
  631             switch (type) {
  632             case NN_FSM_START:
  633                 usock->state = NN_USOCK_STATE_STARTING;
  634                 return;
  635             default:
  636                 nn_fsm_bad_action (usock->state, src, type);
  637             }
  638         default:
  639             nn_fsm_bad_source (usock->state, src, type);
  640         }
  641 
  642 /******************************************************************************/
  643 /*  STARTING state.                                                           */
  644 /*  Underlying OS socket is created, but it's not yet passed to the worker    */
  645 /*  thread. In this state we can set socket options, local and remote         */
  646 /*  address etc.                                                              */
  647 /******************************************************************************/
  648     case NN_USOCK_STATE_STARTING:
  649 
  650         /*  Events from the owner of the usock. */
  651         switch (src) {
  652         case NN_FSM_ACTION:
  653             switch (type) {
  654             case NN_USOCK_ACTION_LISTEN:
  655                 usock->state = NN_USOCK_STATE_LISTENING;
  656                 return;
  657             case NN_USOCK_ACTION_CONNECT:
  658                 usock->state = NN_USOCK_STATE_CONNECTING;
  659                 return;
  660             case NN_USOCK_ACTION_BEING_ACCEPTED:
  661                 usock->state = NN_USOCK_STATE_BEING_ACCEPTED;
  662                 return;
  663             case NN_USOCK_ACTION_STARTED:
  664                 nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  665                 usock->state = NN_USOCK_STATE_ACTIVE;
  666                 return;
  667             default:
  668                 nn_fsm_bad_action (usock->state, src, type);
  669             }
  670         default:
  671             nn_fsm_bad_source (usock->state, src, type);
  672         }
  673 
  674 /******************************************************************************/
  675 /*  BEING_ACCEPTED state.                                                     */
  676 /*  accept() was called on the usock. Now the socket is waiting for a new     */
  677 /*  connection to arrive.                                                     */
  678 /******************************************************************************/
  679     case NN_USOCK_STATE_BEING_ACCEPTED:
  680         switch (src) {
  681         case NN_FSM_ACTION:
  682             switch (type) {
  683             case NN_USOCK_ACTION_DONE:
  684                 usock->state = NN_USOCK_STATE_ACCEPTED;
  685                 nn_fsm_raise (&usock->fsm, &usock->event_established,
  686                     NN_USOCK_ACCEPTED);
  687                 return;
  688             default:
  689                 nn_fsm_bad_action (usock->state, src, type);
  690             }
  691         default:
  692             nn_fsm_bad_source (usock->state, src, type);
  693         }
  694 
  695 /******************************************************************************/
  696 /*  ACCEPTED state.                                                           */
  697 /*  Connection was accepted, now it can be tuned. Afterwards, it'll move to   */
  698 /*  the active state.                                                         */
  699 /******************************************************************************/
  700     case NN_USOCK_STATE_ACCEPTED:
  701         switch (src) {
  702         case NN_FSM_ACTION:
  703             switch (type) {
  704             case NN_USOCK_ACTION_ACTIVATE:
  705                 nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  706                 usock->state = NN_USOCK_STATE_ACTIVE;
  707                 return;
  708             default:
  709                 nn_fsm_bad_action (usock->state, src, type);
  710             }
  711         default:
  712             nn_fsm_bad_source (usock->state, src, type);
  713         }
  714 
  715 /******************************************************************************/
  716 /*  CONNECTING state.                                                         */
  717 /*  Asynchronous connecting is going on.                                      */
  718 /******************************************************************************/
  719     case NN_USOCK_STATE_CONNECTING:
  720         switch (src) {
  721         case NN_FSM_ACTION:
  722             switch (type) {
  723             case NN_USOCK_ACTION_DONE:
  724                 usock->state = NN_USOCK_STATE_ACTIVE;
  725                 nn_worker_execute (usock->worker, &usock->task_connected);
  726                 nn_fsm_raise (&usock->fsm, &usock->event_established,
  727                     NN_USOCK_CONNECTED);
  728                 return;
  729             case NN_USOCK_ACTION_ERROR:
  730                 nn_closefd (usock->s);
  731                 usock->s = -1;
  732                 usock->state = NN_USOCK_STATE_DONE;
  733                 nn_fsm_raise (&usock->fsm, &usock->event_error,
  734                     NN_USOCK_ERROR);
  735                 return;
  736             default:
  737                 nn_fsm_bad_action (usock->state, src, type);
  738             }
  739         case NN_USOCK_SRC_FD:
  740             switch (type) {
  741             case NN_WORKER_FD_OUT:
  742                 nn_worker_reset_out (usock->worker, &usock->wfd);
  743                 usock->state = NN_USOCK_STATE_ACTIVE;
  744                 sockerr = nn_usock_geterr(usock);
  745                 if (sockerr == 0) {
  746                     nn_fsm_raise (&usock->fsm, &usock->event_established,
  747                         NN_USOCK_CONNECTED);
  748                 } else {
  749                     usock->errnum = sockerr;
  750                     nn_worker_rm_fd (usock->worker, &usock->wfd);
  751                     rc = close (usock->s);
  752                     errno_assert (rc == 0);
  753                     usock->s = -1;
  754                     usock->state = NN_USOCK_STATE_DONE;
  755                     nn_fsm_raise (&usock->fsm,
  756                         &usock->event_error, NN_USOCK_ERROR);
  757                 }
  758                 return;
  759             case NN_WORKER_FD_ERR:
  760                 nn_worker_rm_fd (usock->worker, &usock->wfd);
  761                 nn_closefd (usock->s);
  762                 usock->s = -1;
  763                 usock->state = NN_USOCK_STATE_DONE;
  764                 nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
  765                 return;
  766             default:
  767                 nn_fsm_bad_action (usock->state, src, type);
  768             }
  769         default:
  770             nn_fsm_bad_source (usock->state, src, type);
  771         }
  772 
  773 /******************************************************************************/
  774 /*  ACTIVE state.                                                             */
  775 /*  Socket is connected. It can be used for sending and receiving data.       */
  776 /******************************************************************************/
  777     case NN_USOCK_STATE_ACTIVE:
  778         switch (src) {
  779         case NN_USOCK_SRC_FD:
  780             switch (type) {
  781             case NN_WORKER_FD_IN:
  782                 sz = usock->in.len;
  783                 rc = nn_usock_recv_raw (usock, usock->in.buf, &sz);
  784                 if (nn_fast (rc == 0)) {
  785                     usock->in.len -= sz;
  786                     usock->in.buf += sz;
  787                     if (!usock->in.len) {
  788                         nn_worker_reset_in (usock->worker, &usock->wfd);
  789                         nn_fsm_raise (&usock->fsm, &usock->event_received,
  790                             NN_USOCK_RECEIVED);
  791                     }
  792                     return;
  793                 }
  794                 errnum_assert (rc == -ECONNRESET, -rc);
  795                 goto error;
  796             case NN_WORKER_FD_OUT:
  797                 rc = nn_usock_send_raw (usock, &usock->out.hdr);
  798                 if (nn_fast (rc == 0)) {
  799                     nn_worker_reset_out (usock->worker, &usock->wfd);
  800                     nn_fsm_raise (&usock->fsm, &usock->event_sent,
  801                         NN_USOCK_SENT);
  802                     return;
  803                 }
  804                 if (nn_fast (rc == -EAGAIN))
  805                     return;
  806                 errnum_assert (rc == -ECONNRESET, -rc);
  807                 goto error;
  808             case NN_WORKER_FD_ERR:
  809 error:
  810                 nn_worker_rm_fd (usock->worker, &usock->wfd);
  811                 nn_closefd (usock->s);
  812                 usock->s = -1;
  813                 usock->state = NN_USOCK_STATE_DONE;
  814                 nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
  815                 return;
  816             default:
  817                 nn_fsm_bad_action (usock->state, src, type);
  818             }
  819         case NN_FSM_ACTION:
  820             switch (type) {
  821             case NN_USOCK_ACTION_ERROR:
  822                 usock->state = NN_USOCK_STATE_REMOVING_FD;
  823                 nn_usock_async_stop (usock);
  824                 return;
  825             default:
  826                 nn_fsm_bad_action (usock->state, src, type);
  827             }
  828         default:
  829             nn_fsm_bad_source(usock->state, src, type);
  830         }
  831 
  832 /******************************************************************************/
  833 /*  REMOVING_FD state.                                                        */
  834 /******************************************************************************/
  835     case NN_USOCK_STATE_REMOVING_FD:
  836         switch (src) {
  837         case NN_USOCK_SRC_TASK_STOP:
  838             switch (type) {
  839             case NN_WORKER_TASK_EXECUTE:
  840                 nn_worker_rm_fd (usock->worker, &usock->wfd);
  841                 nn_closefd (usock->s);
  842                 usock->s = -1;
  843                 usock->state = NN_USOCK_STATE_DONE;
  844                 nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
  845                 return;
  846             default:
  847                 nn_fsm_bad_action (usock->state, src, type);
  848             }
  849 
  850         /*  Events from the file descriptor are ignored while it is being
  851             removed. */
  852         case NN_USOCK_SRC_FD:
  853             return;
  854 
  855         case NN_FSM_ACTION:
  856             switch (type) {
  857             case NN_USOCK_ACTION_ERROR:
  858                 return;
  859             default:
  860                 nn_fsm_bad_action (usock->state, src, type);
  861             }
  862         default:
  863             nn_fsm_bad_source (usock->state, src, type);
  864         }
  865 
  866 /******************************************************************************/
  867 /*  DONE state.                                                               */
  868 /*  Socket is closed. The only thing that can be done in this state is        */
  869 /*  stopping the usock.                                                       */
  870 /******************************************************************************/
  871     case NN_USOCK_STATE_DONE:
  872         return;
  873 
  874 /******************************************************************************/
  875 /*  LISTENING state.                                                          */
  876 /*  Socket is listening for new incoming connections, however, user is not    */
  877 /*  accepting a new connection.                                               */
  878 /******************************************************************************/
  879     case NN_USOCK_STATE_LISTENING:
  880         switch (src) {
  881         case NN_FSM_ACTION:
  882             switch (type) {
  883             case NN_USOCK_ACTION_ACCEPT:
  884                 usock->state = NN_USOCK_STATE_ACCEPTING;
  885                 return;
  886             default:
  887                 nn_fsm_bad_action (usock->state, src, type);
  888             }
  889         default:
  890             nn_fsm_bad_source (usock->state, src, type);
  891         }
  892 
  893 /******************************************************************************/
  894 /*  ACCEPTING state.                                                          */
  895 /*  User is waiting asynchronouslyfor a new inbound connection                */
  896 /*  to be accepted.                                                           */
  897 /******************************************************************************/
  898     case NN_USOCK_STATE_ACCEPTING:
  899         switch (src) {
  900         case NN_FSM_ACTION:
  901             switch (type) {
  902             case NN_USOCK_ACTION_DONE:
  903                 usock->state = NN_USOCK_STATE_LISTENING;
  904                 return;
  905             case NN_USOCK_ACTION_CANCEL:
  906                 usock->state = NN_USOCK_STATE_CANCELLING;
  907                 nn_worker_execute (usock->worker, &usock->task_stop);
  908                 return;
  909             default:
  910                 nn_fsm_bad_action (usock->state, src, type);
  911             }
  912         case NN_USOCK_SRC_FD:
  913             switch (type) {
  914             case NN_WORKER_FD_IN:
  915 
  916                 /*  New connection arrived in asynchronous manner. */
  917 #if NN_HAVE_ACCEPT4
  918                 s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
  919 #else
  920                 s = accept (usock->s, NULL, NULL);
  921 #endif
  922 
  923                 /*  ECONNABORTED is an valid error. New connection was closed
  924                     by the peer before we were able to accept it. If it happens
  925                     do nothing and wait for next incoming connection. */
  926                 if (nn_slow (s < 0 && errno == ECONNABORTED))
  927                     return;
  928 
  929                 /*  Resource allocation errors. It's not clear from POSIX
  930                     specification whether the new connection is closed in this
  931                     case or whether it remains in the backlog. In the latter
  932                     case it would be wise to wait here for a while to prevent
  933                     busy looping. */
  934                 if (nn_slow (s < 0 && (errno == ENFILE || errno == EMFILE ||
  935                       errno == ENOBUFS || errno == ENOMEM))) {
  936                     usock->errnum = errno;
  937                     usock->state = NN_USOCK_STATE_ACCEPTING_ERROR;
  938 
  939                     /*  Wait till the user starts accepting once again. */
  940                     nn_worker_rm_fd (usock->worker, &usock->wfd);
  941 
  942                     nn_fsm_raise (&usock->fsm,
  943                         &usock->event_error, NN_USOCK_ACCEPT_ERROR);
  944                     return;
  945                 }
  946 
  947                 /* Any other error is unexpected. */
  948                 errno_assert (s >= 0);
  949 
  950                 /*  Initialise the new usock object. */
  951                 nn_usock_init_from_fd (usock->asock, s);
  952                 usock->asock->state = NN_USOCK_STATE_ACCEPTED;
  953 
  954                 /*  Notify the user that connection was accepted. */
  955                 nn_fsm_raise (&usock->asock->fsm,
  956                     &usock->asock->event_established, NN_USOCK_ACCEPTED);
  957 
  958                 /*  Disassociate the listener socket from the accepted
  959                     socket. */
  960                 usock->asock->asock = NULL;
  961                 usock->asock = NULL;
  962 
  963                 /*  Wait till the user starts accepting once again. */
  964                 nn_worker_rm_fd (usock->worker, &usock->wfd);
  965                 usock->state = NN_USOCK_STATE_LISTENING;
  966 
  967                 return;
  968 
  969             default:
  970                 nn_fsm_bad_action (usock->state, src, type);
  971             }
  972         default:
  973             nn_fsm_bad_source (usock->state, src, type);
  974         }
  975 
  976 /******************************************************************************/
  977 /*  ACCEPTING_ERROR state.                                                    */
  978 /*  Waiting the socket to accept the error and restart                        */
  979 /******************************************************************************/
  980     case NN_USOCK_STATE_ACCEPTING_ERROR:
  981         switch (src) {
  982         case NN_FSM_ACTION:
  983             switch (type) {
  984             case NN_USOCK_ACTION_ACCEPT:
  985                 usock->state = NN_USOCK_STATE_ACCEPTING;
  986                 return;
  987             default:
  988                 nn_fsm_bad_action (usock->state, src, type);
  989             }
  990         default:
  991             nn_fsm_bad_source (usock->state, src, type);
  992         }
  993 
  994 /******************************************************************************/
  995 /*  CANCELLING state.                                                         */
  996 /******************************************************************************/
  997     case NN_USOCK_STATE_CANCELLING:
  998         switch (src) {
  999         case NN_USOCK_SRC_TASK_STOP:
 1000             switch (type) {
 1001             case NN_WORKER_TASK_EXECUTE:
 1002                 nn_worker_rm_fd (usock->worker, &usock->wfd);
 1003                 usock->state = NN_USOCK_STATE_LISTENING;
 1004 
 1005                 /*  Notify the accepted socket that it was stopped. */
 1006                 nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
 1007 
 1008                 return;
 1009             default:
 1010                 nn_fsm_bad_action (usock->state, src, type);
 1011             }
 1012         case NN_USOCK_SRC_FD:
 1013             switch (type) {
 1014             case NN_WORKER_FD_IN:
 1015                 return;
 1016             default:
 1017                 nn_fsm_bad_action (usock->state, src, type);
 1018             }
 1019         default:
 1020             nn_fsm_bad_source (usock->state, src, type);
 1021         }
 1022 
 1023 /******************************************************************************/
 1024 /*  Invalid state                                                             */
 1025 /******************************************************************************/
 1026     default:
 1027         nn_fsm_bad_state (usock->state, src, type);
 1028     }
 1029 }
 1030 
 1031 static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr)
 1032 {
 1033     ssize_t nbytes;
 1034 
 1035     /*  Try to send the data. */
 1036 #if defined MSG_NOSIGNAL
 1037     nbytes = sendmsg (self->s, hdr, MSG_NOSIGNAL);
 1038 #else
 1039     nbytes = sendmsg (self->s, hdr, 0);
 1040 #endif
 1041 
 1042     /*  Handle errors. */
 1043     if (nn_slow (nbytes < 0)) {
 1044         if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
 1045             nbytes = 0;
 1046         else {
 1047 
 1048             /*  If the connection fails, return ECONNRESET. */
 1049             return -ECONNRESET;
 1050         }
 1051     }
 1052 
 1053     /*  Some bytes were sent. Adjust the iovecs accordingly. */
 1054     while (nbytes) {
 1055         if (nbytes >= (ssize_t)hdr->msg_iov->iov_len) {
 1056             --hdr->msg_iovlen;
 1057             if (!hdr->msg_iovlen) {
 1058                 nn_assert (nbytes == (ssize_t)hdr->msg_iov->iov_len);
 1059                 return 0;
 1060             }
 1061             nbytes -= hdr->msg_iov->iov_len;
 1062             ++hdr->msg_iov;
 1063         }
 1064         else {
 1065             *((uint8_t**) &(hdr->msg_iov->iov_base)) += nbytes;
 1066             hdr->msg_iov->iov_len -= nbytes;
 1067             return -EAGAIN;
 1068         }
 1069     }
 1070 
 1071     if (hdr->msg_iovlen > 0)
 1072         return -EAGAIN;
 1073 
 1074     return 0;
 1075 }
 1076 
 1077 static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len)
 1078 {
 1079     size_t sz;
 1080     size_t length;
 1081     ssize_t nbytes;
 1082     struct iovec iov;
 1083     struct msghdr hdr;
 1084     unsigned char ctrl [256];
 1085 #if defined NN_HAVE_MSG_CONTROL
 1086     struct cmsghdr *cmsg;
 1087 #endif
 1088     int fd;
 1089 
 1090     /*  If batch buffer doesn't exist, allocate it. The point of delayed
 1091         deallocation to allow non-receiving sockets, such as TCP listening
 1092         sockets, to do without the batch buffer. */
 1093     if (nn_slow (!self->in.batch)) {
 1094         self->in.batch = nn_alloc (NN_USOCK_BATCH_SIZE, "AIO batch buffer");
 1095         alloc_assert (self->in.batch);
 1096     }
 1097 
 1098     /*  Try to satisfy the recv request by data from the batch buffer. */
 1099     length = *len;
 1100     sz = self->in.batch_len - self->in.batch_pos;
 1101     if (sz) {
 1102         if (sz > length)
 1103             sz = length;
 1104         memcpy (buf, self->in.batch + self->in.batch_pos, sz);
 1105         self->in.batch_pos += sz;
 1106         buf = ((char*) buf) + sz;
 1107         length -= sz;
 1108         if (!length)
 1109             return 0;
 1110     }
 1111 
 1112     /*  If recv request is greater than the batch buffer, get the data directly
 1113         into the place. Otherwise, read data to the batch buffer. */
 1114     if (length > NN_USOCK_BATCH_SIZE) {
 1115         iov.iov_base = buf;
 1116         iov.iov_len = length;
 1117     }
 1118     else {
 1119         iov.iov_base = self->in.batch;
 1120         iov.iov_len = NN_USOCK_BATCH_SIZE;
 1121     }
 1122     memset (&hdr, 0, sizeof (hdr));
 1123     hdr.msg_iov = &iov;
 1124     hdr.msg_iovlen = 1;
 1125 #if defined NN_HAVE_MSG_CONTROL
 1126     hdr.msg_control = ctrl;
 1127     hdr.msg_controllen = sizeof (ctrl);
 1128 #else
 1129     *((int*) ctrl) = -1;
 1130     hdr.msg_accrights = ctrl;
 1131     hdr.msg_accrightslen = sizeof (int);
 1132 #endif
 1133     nbytes = recvmsg (self->s, &hdr, 0);
 1134 
 1135     /*  Handle any possible errors. */
 1136     if (nn_slow (nbytes <= 0)) {
 1137 
 1138         if (nn_slow (nbytes == 0))
 1139             return -ECONNRESET;
 1140 
 1141         /*  Zero bytes received. */
 1142         if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
 1143             nbytes = 0;
 1144         else {
 1145 
 1146             /*  If the peer closes the connection, return ECONNRESET. */
 1147             return -ECONNRESET;
 1148         }
 1149     }
 1150 
 1151     /*  Extract the associated file descriptor, if any. */
 1152     if (nbytes > 0) {
 1153 #if defined NN_HAVE_MSG_CONTROL
 1154         cmsg = CMSG_FIRSTHDR (&hdr);
 1155         while (cmsg) {
 1156             if (cmsg->cmsg_level == SOL_SOCKET &&
 1157                   cmsg->cmsg_type == SCM_RIGHTS) {
 1158                 if (self->in.pfd) {
 1159                     memcpy (self->in.pfd, CMSG_DATA (cmsg),
 1160                         sizeof (*self->in.pfd));
 1161                     self->in.pfd = NULL;
 1162                 }
 1163                 else {
 1164                     memcpy (&fd, CMSG_DATA (cmsg), sizeof (fd));
 1165                     nn_closefd (fd);
 1166                 }
 1167                 break;
 1168             }
 1169             cmsg = CMSG_NXTHDR (&hdr, cmsg);
 1170         }
 1171 #else
 1172         if (hdr.msg_accrightslen > 0) {
 1173             nn_assert (hdr.msg_accrightslen == sizeof (int));
 1174             if (self->in.pfd) {
 1175                 memcpy (self->in.pfd, hdr.msg_accrights,
 1176                     sizeof (*self->in.pfd));
 1177                 self->in.pfd = NULL;
 1178             }
 1179             else {
 1180                 memcpy (&fd, hdr.msg_accrights, sizeof (fd));
 1181                 nn_closefd (fd);
 1182             }
 1183         }
 1184 #endif
 1185     }
 1186 
 1187     /*  If the data were received directly into the place we can return
 1188         straight away. */
 1189     if (length > NN_USOCK_BATCH_SIZE) {
 1190         length -= nbytes;
 1191         *len -= length;
 1192         return 0;
 1193     }
 1194 
 1195     /*  New data were read to the batch buffer. Copy the requested amount of it
 1196         to the user-supplied buffer. */
 1197     self->in.batch_len = nbytes;
 1198     self->in.batch_pos = 0;
 1199     if (nbytes) {
 1200         sz = nbytes > (ssize_t)length ? length : (size_t)nbytes;
 1201         memcpy (buf, self->in.batch, sz);
 1202         length -= sz;
 1203         self->in.batch_pos += sz;
 1204     }
 1205 
 1206     *len -= length;
 1207     return 0;
 1208 }
 1209 
 1210 static int nn_usock_geterr (struct nn_usock *self)
 1211 {
 1212     int rc;
 1213     int opt;
 1214 #if defined NN_HAVE_HPUX
 1215     int optsz;
 1216 #else
 1217     socklen_t optsz;
 1218 #endif
 1219 
 1220     opt = 0;
 1221     optsz = sizeof (opt);
 1222     rc = getsockopt (self->s, SOL_SOCKET, SO_ERROR, &opt, &optsz);
 1223 
 1224     /*  The following should handle both Solaris and UNIXes derived from BSD. */
 1225     if (rc == -1)
 1226         return errno;
 1227     errno_assert (rc == 0);
 1228     nn_assert (optsz == sizeof (opt));
 1229     return opt;
 1230 }