"Fossies" - the Fresh Open Source Software Archive

Member "berkeley_upc-2019.4.2/gasnet/other/amudp/amudp_reqrep.cpp" (27 May 2019, 66409 Bytes) of package /linux/misc/berkeley_upc-2019.4.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "amudp_reqrep.cpp" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 2.26.0_vs_2.28.0.

    1 /*   $Source: bitbucket.org:berkeleylab/gasnet.git/other/amudp/amudp_reqrep.cpp $
    2  * Description: AMUDP Implementations of request/reply operations
    3  * Copyright 2000, Dan Bonachea <bonachea@cs.berkeley.edu>
    4  */
    5 
    6 #include <errno.h>
    7 #include <stdarg.h>
    8 #include <math.h>
    9 #include <time.h>
   10 #include <sys/time.h>
   11 #include <unistd.h>
   12 #include <fcntl.h>
   13 
   14 #include "amudp_internal.h" // must come after any other headers
   15 
   16 /* forward decls */
   17 static int AMUDP_RequestGeneric(amudp_category_t category, 
   18                           ep_t ep, amudp_node_t reply_endpoint, handler_t handler, 
   19                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
   20                           int numargs, va_list argptr,
   21                           uint8_t systemType, uint8_t systemArg);
   22 static int AMUDP_ReplyGeneric(amudp_category_t category, 
   23                           amudp_buf_t *requestbuf, handler_t handler, 
   24                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
   25                           int numargs, va_list argptr,
   26                           uint8_t systemType, uint8_t systemArg);
   27 
   28 #if AMUDP_EXTRA_CHECKSUM
   29   static void AMUDP_SetChecksum(amudp_msg_t *m, size_t len);
   30   static void AMUDP_ValidateChecksum(amudp_msg_t const *m, size_t len);
   31 #endif
   32 
   33 /*------------------------------------------------------------------------------------
   34  * Private helpers
   35  *------------------------------------------------------------------------------------ */
   36 static uint64_t intpow(uint64_t val, uint64_t exp) {
   37   uint64_t retval = 1;
   38   for ( ; exp ; exp--) retval *= val;
   39   return retval;
   40 }
   41 #ifndef STATIC_RETRIES
   42 #define STATIC_RETRIES 30
   43 #endif
   44 static amx_tick_t retryToticks[STATIC_RETRIES];
   45 #define REQUEST_TIMEOUT_TICKS(retrycnt) (                              \
   46   AMX_assert(retryToticks[0]),                                         \
   47   (AMX_PREDICT_TRUE((retrycnt) < STATIC_RETRIES) ?                     \
   48     retryToticks[(retrycnt)] :                                         \
   49     retryToticks[0] * intpow(AMUDP_RequestTimeoutBackoff,(retrycnt)))  \
   50   )
   51 extern void AMUDP_InitRetryCache() {
   52   AMX_assert(!retryToticks[0]);
   53   if (AMUDP_InitialRequestTimeout_us == AMUDP_TIMEOUT_INFINITE) return;
   54   amx_tick_t tickout = AMX_us2ticks(AMUDP_InitialRequestTimeout_us);
   55   amx_tick_t maxticks = AMX_us2ticks(AMUDP_MaxRequestTimeout_us);
   56   for (int i=0; i < STATIC_RETRIES; i++) {
   57     AMX_assert(tickout > 0 && tickout <= maxticks);
   58     retryToticks[i] = tickout;
   59     tickout = MIN(tickout * AMUDP_RequestTimeoutBackoff, maxticks);
   60   }
   61   #if 0  // for debugging retry calc
   62     for (int i=0; i < STATIC_RETRIES*2; i++) {
   63       amx_tick_t tick = REQUEST_TIMEOUT_TICKS(i);
   64       printf("Timeout %2i: %9" PRIu64 " us, %9" PRIu64 " ticks\n",i,tick/AMX_us2ticks(1),tick);
   65     }
   66   #endif
   67 }
   68 /* ------------------------------------------------------------------------------------ */
   69 typedef enum { REQUESTREPLY_PACKET, RETRANSMISSION_PACKET, REFUSAL_PACKET } packet_type;
   70 static int sendPacket(ep_t ep, amudp_msg_t *msg, size_t msgsz, en_t destaddress, packet_type type) {
   71   AMX_assert(ep && msg && msgsz > 0);
   72   AMX_assert(msgsz <= AMUDP_MAX_MSG);
   73   AMX_assert(!enEqual(destaddress, ep->name)); // should never be called for loopback
   74 
   75   #if AMX_DEBUG_VERBOSE
   76     { static int firsttime = 1;
   77       static int verbosesend = 0;
   78       if_pf (firsttime) { verbosesend = !!AMUDP_getenv_prefixed("VERBOSE_SEND"); firsttime = 0; }
   79       if (verbosesend) { 
   80         AMX_VERBOSE_INFO(("sending %i-byte packet to (%s)", (int)msgsz, AMUDP_enStr(destaddress, 0)));
   81       }
   82     }
   83   #endif
   84 
   85   #if AMUDP_EXTRA_CHECKSUM
   86     AMUDP_SetChecksum(msg, msgsz);
   87   #endif
   88 
   89   int retry = 0;
   90   while (1) { 
   91     if_pt (sendto(ep->s, (char *)msg, msgsz, /* Solaris requires cast to char* */
   92                    0, (struct sockaddr *)&destaddress, sizeof(en_t)) > 0 ) { 
   93       // success
   94       AMUDP_STATS(ep->stats.TotalBytesSent += msgsz);
   95       return AM_OK;
   96     }
   97     int err = errno;
   98     if (err == EPERM) {
   99        /* Linux intermittently gets EPERM failures here at startup for no apparent reason -
  100           so allow a retry */
  101       if (retry++ < 5) {
  102         AMX_VERBOSE_INFO(("Got a '%s'(%i) on sendto(), retrying...", strerror(err), err)); 
  103         sleep(1);
  104       } else { // something more serious appears to be wrong..
  105         AMX_RETURN_ERRFR(RESOURCE, sendPacket, strerror(err));
  106       }
  107     } else if (err == ENOBUFS || err == ENOMEM) {
  108       /* some linuxes also generate ENOBUFS for localhost backpressure - 
  109          ignore it and treat it as a drop, let retransmisison handle if necessary */
  110       AMX_DEBUG_WARN(("Got a '%s'(%i) on sendto(%i), ignoring...", strerror(err), err, (int)msgsz)); 
  111       return AM_OK;
  112     } else AMX_RETURN_ERRFR(RESOURCE, sendPacket, strerror(err));
  113   }
  114 
  115 }
  116 /* ------------------------------------------------------------------------------------ */
  117 static int AMUDP_GetOpcode(int isrequest, amudp_category_t cat) {
  118   switch (cat) {
  119     case amudp_Short:
  120       if (isrequest) return AM_REQUEST_M;
  121       else return AM_REPLY_M;
  122     case amudp_Medium:
  123       if (isrequest) return AM_REQUEST_IM;
  124       else return AM_REPLY_IM;
  125     case amudp_Long:
  126       if (isrequest) return AM_REQUEST_XFER_M;
  127       else return AM_REPLY_XFER_M; 
  128     default: AMX_FatalErr("unrecognized opcode in AMUDP_GetOpcode");
  129       return -1;
  130   }
  131 }
  132 /* ------------------------------------------------------------------------------------ */
  133 #define INVALID_NODE ((amudp_node_t)-1)
  134 //  return source id in ep perproc table of this remote addr, or INVALID_NODE for not found 
  135 //  optional hint optimizes lookup
  136 static amudp_node_t sourceAddrToId(ep_t ep, en_t sourceAddr, amudp_node_t hint) {
  137   amudp_perproc_info_t * const pinfo = ep->perProcInfo;
  138   // hint values are 8-bit, try all the matching entries
  139   for (amudp_node_t i = hint; i < ep->P; i += 256) {
  140     en_t const name = pinfo[i].remoteName;
  141     if (enEqual(name, sourceAddr)) return i;
  142   }
  143   AMX_VERBOSE_INFO(("sourceAddrToId hint missed: hint=%i",(int)hint));
  144   // hint may be wrong with non-uniform translation tables, brute-force scan
  145   for (amudp_node_t i = 0; i < ep->P; i++) {
  146     en_t const name = pinfo[i].remoteName;
  147     if (enEqual(name, sourceAddr)) return i;
  148   }
  149   return INVALID_NODE;
  150 }
  151 /* ------------------------------------------------------------------------------------ */
  152 /* ioctl UDP fiasco:
  153  * According to POSIX, ioctl(I_NREAD) on a SOCK_DGRAM should report the EXACT size of
  154  * the next message waiting (or 0), not the number of bytes available on the socket. 
  155  * We can use this as an optimization in choosing the recv buffer size.
  156  * Linux (FIONREAD) and Solaris (I_NREAD) get this right, 
  157  * but all other systems seem to get it wrong, one way or another.
  158  * Cygwin: (bug 3284) not implemented
  159  * WSL (4/8/17) returns raw byte count, which can over or under-report
  160  * FreeBSD: (bug 2827) returns raw byte count, which can over or under-report
  161  * others: over-report by returning total bytes in all messages waiting
  162  */
  163 #ifndef IOCTL_WORKS
  164  #if PLATFORM_OS_LINUX || PLATFORM_OS_SOLARIS || PLATFORM_OS_DARWIN
  165   #define IOCTL_WORKS 1
  166  #else
  167   #define IOCTL_WORKS 0
  168  #endif
  169 #endif
  170 
  171 /* ------------------------------------------------------------------------------------ */
  172 /*  AMUDP_DrainNetwork - read anything outstanding from hardware/kernel buffers into app space */
  173 static int AMUDP_DrainNetwork(ep_t ep) {
  174     int totalBytesDrained = 0;
  175     while (1) {
  176       IOCTL_FIONREAD_ARG_T bytesAvail = 0;
  177       #if IOCTL_WORKS
  178         #if PLATFORM_OS_DARWIN // Apple-specific getsockopt(SO_NREAD) returns what we need
  179           GETSOCKOPT_LENGTH_T junk = sizeof(bytesAvail);
  180           if_pf (SOCK_getsockopt(ep->s, SOL_SOCKET, SO_NREAD, &bytesAvail, &junk) == SOCKET_ERROR)
  181             AMX_RETURN_ERRFR(RESOURCE, "getsockopt(SO_NREAD)", strerror(errno));
  182         #else
  183           if_pf (SOCK_ioctlsocket(ep->s, _FIONREAD, &bytesAvail) == SOCKET_ERROR)
  184             AMX_RETURN_ERRFR(RESOURCE, "ioctl(FIONREAD)", strerror(errno));
  185         #endif
  186 
  187         // sanity check
  188         if_pf ((size_t)bytesAvail > AMUDP_MAX_MSG) {
  189           char x;
  190           int retval = recvfrom(ep->s, (char *)&x, 1, MSG_PEEK, NULL, NULL);
  191           AMX_Err("bytesAvail=%lu  recvfrom(MSG_PEEK)=%i", (unsigned long)bytesAvail, retval);
  192           AMX_RETURN_ERRFR(RESOURCE, "AMUDP_DrainNetwork: received message that was too long", strerror(errno));
  193         }
  194       #else
  195         if (inputWaiting(ep->s, false)) bytesAvail = AMUDP_MAX_MSG; // conservative assumption
  196       #endif
  197       if (bytesAvail == 0) break; 
  198 
  199         /* TODO: another possible workaround for !IOCTL_WORKS:
  200          * Use a MSG_PEEK of the header to retrieve the header and GET_MSG_SZ
  201          * to allocate an exact-sized buffer. 
  202          * Probably not worth the overhead for a short-lived Rx buffer, 
  203          * especially since some OSs will buffer overrun on MSG_PEEK of a partial datagram.
  204          * However this same strategy could be used (possibly on a dedicated socket) on any OS
  205          * to scatter-recv AMLong payloads directly into their final destination, saving a copy.
  206          */
  207 
  208       /* something waiting, acquire a buffer for it */
  209       size_t const msgsz = bytesAvail;
  210       if (ep->rxCnt >= ep->recvDepth) { /* out of buffers - postpone draining */
  211         AMX_DEBUG_WARN_TH("Receive buffer full - unable to drain network. Consider raising RECVDEPTH or polling more often.");
  212         break;
  213       }
  214       amudp_buf_t *destbuf = AMUDP_AcquireBuffer(ep, MSGSZ_TO_BUFFERSZ(msgsz));
  215 
  216       #if AMUDP_EXTRA_CHECKSUM && AMX_DEBUG
  217         memset((char *)&destbuf->msg, 0xCC, msgsz); // init recv buffer to a known value
  218       #endif
  219 
  220       /* perform the receive */
  221       struct sockaddr sa;
  222       int sz = sizeof(en_t);
  223       int retval = myrecvfrom(ep->s, (char *)&destbuf->msg, msgsz, 0, &sa, &sz);
  224 
  225       #if IOCTL_WORKS
  226         if_pt (retval == (int)msgsz) ; // success
  227       #else
  228         if_pt (retval <= (int)msgsz) ; // success
  229       #endif
  230         else if_pf (retval == SOCKET_ERROR)
  231           AMX_RETURN_ERRFR(RESOURCE, "AMUDP_DrainNetwork: recvfrom()", strerror(errno));
  232         else if_pf (retval == 0)
  233           AMX_RETURN_ERRFR(RESOURCE, "AMUDP_DrainNetwork: recvfrom() returned zero", strerror(errno));
  234         else if_pf ((size_t)retval < AMUDP_MIN_MSG) 
  235           AMX_RETURN_ERRFR(RESOURCE, "AMUDP_DrainNetwork: incomplete message received in recvfrom()", strerror(errno));
  236         else if_pf ((size_t)retval > msgsz) 
  237             AMX_RETURN_ERRFR(RESOURCE, "AMUDP_DrainNetwork: buffer overrun in recvfrom()", strerror(errno));
  238         else { /* detect broken ioctl implementations */
  239           AMX_assert(IOCTL_WORKS && retval != (int)bytesAvail);
  240           AMX_Warn("ioctl() is probably broken: bytesAvail=%i  recvfrom returned=%i", (int)bytesAvail, retval);
  241         }
  242       #if AMX_DEBUG
  243         if_pf (sz != sizeof(en_t)) // should never happen
  244           AMX_RETURN_ERRFR(RESOURCE, "AMUDP_DrainNetwork: recvfrom() returned wrong sockaddr size", strerror(errno));
  245       #endif
  246 
  247       #if AMUDP_EXTRA_CHECKSUM
  248         // the following lines can be uncommented to inject errors and verify the checksum support is working
  249         //memset(((char*)destbuf)+retval-8, 0, 8);
  250         //destbuf->msg.chk2 = 4;
  251         //destbuf->msg.packetlen = 4;
  252         AMUDP_ValidateChecksum(&(destbuf->msg), retval);
  253       #endif
  254 
  255       destbuf->status.rx.sourceAddr = *(en_t *)&sa;
  256       destbuf->status.rx.dest = ep; /* remember which ep recvd this message */
  257       destbuf->status.rx.sourceId = sourceAddrToId(ep, *(en_t *)&sa, destbuf->msg.systemMessageArg);
  258 
  259       // add it to the recv queue
  260       destbuf->status.rx.next = NULL;
  261       if (!ep->rxCnt) { // first element
  262         AMX_assert(!ep->rxHead && !ep->rxTail);
  263         ep->rxTail = ep->rxHead = destbuf;
  264       } else { // append to FIFO
  265         AMX_assert(ep->rxHead && ep->rxTail);
  266         AMX_assert(ep->rxHead != ep->rxTail || ep->rxCnt == 1);
  267         ep->rxTail->status.rx.next = destbuf;
  268         ep->rxTail = destbuf;
  269       }
  270       ep->rxCnt++;
  271 
  272       totalBytesDrained += retval;
  273     } // drain recv loop
  274 
  275     #if USE_SOCKET_RECVBUFFER_GROW
  276       /* heuristically decide whether we should expand the OS socket recv buffers */
  277       if (totalBytesDrained + AMUDP_MAX_MSG > ep->socketRecvBufferSize) {
  278         /* it's possible we dropped something due to insufficient OS socket buffer space */
  279         if (!ep->socketRecvBufferMaxedOut) { /* try to do something about it */
  280           /* TODO: we may want to add some hysterisis here to prevent artifical inflation
  281            * due to retransmits after a long period of no polling 
  282            */
  283           int newsize = 2 * ep->socketRecvBufferSize;
  284 
  285           if (newsize > AMUDP_SOCKETBUFFER_MAX) { /* create a semi-sane upper bound */
  286             newsize = AMUDP_SOCKETBUFFER_MAX;
  287             ep->socketRecvBufferMaxedOut = 1;
  288           }
  289           ep->socketRecvBufferMaxedOut += AMUDP_growSocketBufferSize(ep, newsize, SO_RCVBUF, "SO_RCVBUF");
  290         }
  291       }
  292     #endif
  293     return AM_OK; /* done */
  294 }
  295 static int AMUDP_WaitForEndpointActivity(eb_t eb, struct timeval *tv) {
  296     /* drain network and block up to tv time for endpoint recv buffers to become non-empty (NULL to block)
  297      * return AM_OK for activity, AM_ERR_ for other error, -1 for timeout 
  298      * wakeupOnControlActivity controls whether we return on control socket activity (for blocking)
  299      */
  300 
  301     /* drain network and see if some receive buffer already non-empty */
  302     for (int i = 0; i < eb->n_endpoints; i++) {
  303       ep_t ep = eb->endpoints[i];
  304       int retval = AMUDP_DrainNetwork(ep);
  305       if (retval != AM_OK) AMX_RETURN(retval);
  306       if (ep->rxCnt) return AM_OK;
  307     }
  308 
  309     while (1) {
  310       fd_set sockset;
  311       fd_set* psockset = &sockset;
  312       int maxfd = 0;
  313 
  314       FD_ZERO(psockset);
  315       for (int i = 0; i < eb->n_endpoints; i++) {
  316         SOCKET s = eb->endpoints[i]->s;
  317         FD_SET(s, psockset);
  318         if ((int)s > maxfd) maxfd = s;
  319       }
  320       if (AMUDP_SPMDControlSocket != INVALID_SOCKET) {
  321         ASYNC_TCP_DISABLE();
  322         FD_SET(AMUDP_SPMDControlSocket, psockset);
  323         if ((int)AMUDP_SPMDControlSocket > maxfd) maxfd = AMUDP_SPMDControlSocket;
  324       }
  325       /* wait for activity */
  326       amx_tick_t starttime = AMX_getCPUTicks();
  327       int retval = select(maxfd+1, psockset, NULL, NULL, tv);
  328       if (AMUDP_SPMDControlSocket != INVALID_SOCKET) ASYNC_TCP_ENABLE();
  329       if_pf (retval == SOCKET_ERROR) { 
  330         AMX_RETURN_ERRFR(RESOURCE, "AMUDP_Block: select()", strerror(errno));
  331       }
  332       else if (retval == 0) return -1; /* time limit expired */
  333       else if_pf (FD_ISSET(AMUDP_SPMDControlSocket, psockset)) {
  334         AMUDP_SPMDIsActiveControlSocket = TRUE; /* we may have missed a signal */
  335         AMUDP_SPMDHandleControlTraffic(NULL);
  336         if (AMUDP_SPMDwakeupOnControlActivity) return AM_OK;
  337       }
  338       else return AM_OK; /* activity on some endpoint in bundle */
  339       amx_tick_t endtime = AMX_getCPUTicks();
  340 
  341       if (tv) { /* readjust remaining time */
  342         int64_t elapsedtime = AMX_ticks2us(endtime - starttime);
  343         if (elapsedtime < tv->tv_usec) tv->tv_usec -= elapsedtime;
  344         else {
  345           int64_t remainingtime = ((int64_t)tv->tv_sec) * 1000000 + tv->tv_usec;
  346           remainingtime -= elapsedtime;
  347           if (remainingtime <= 0) return -1; /* time limit expired */
  348           tv->tv_sec = (long)(remainingtime / 1000000);
  349           tv->tv_usec = (long)(remainingtime % 1000000);
  350         }
  351       }
  352     }
  353 }
  354 /* ------------------------------------------------------------------------------------ */
  355 // Manage the doubly-linked tx ring
  356 static void AMUDP_EnqueueTxBuffer(ep_t ep, amudp_buf_t *buf) {
  357   if (!ep->timeoutCheckPosn) { // empty ring
  358     AMX_assert(ep->outstandingRequests == 0);
  359     ep->timeoutCheckPosn = buf;
  360     buf->status.tx.next = buf;
  361     buf->status.tx.prev = buf;
  362     ep->outstandingRequests = 1;
  363   } else { // insert "behind" current check posn
  364     AMX_assert(ep->outstandingRequests >= 1);
  365     buf->status.tx.next = ep->timeoutCheckPosn;
  366     buf->status.tx.prev = ep->timeoutCheckPosn->status.tx.prev;
  367     ep->timeoutCheckPosn->status.tx.prev = buf;
  368     buf->status.tx.prev->status.tx.next = buf;
  369     ep->outstandingRequests++;
  370     AMX_assert(ep->outstandingRequests <= ep->sendDepth);
  371   }
  372 }
  373 static void AMUDP_DequeueTxBuffer(ep_t ep, amudp_buf_t *buf) {
  374   AMX_assert(buf->status.tx.next);
  375   AMX_assert(buf->status.tx.prev);
  376   AMX_assert(ep->timeoutCheckPosn);
  377   if (buf->status.tx.next == buf) { // removing last element
  378     AMX_assert(ep->outstandingRequests == 1);
  379     AMX_assert(buf->status.tx.prev == buf);
  380     AMX_assert(ep->timeoutCheckPosn == buf);
  381     ep->timeoutCheckPosn = NULL;
  382     ep->outstandingRequests = 0;
  383   } else { // extract from ring
  384     AMX_assert(ep->outstandingRequests > 1);
  385     if (ep->timeoutCheckPosn == buf) // advance posn
  386       ep->timeoutCheckPosn = buf->status.tx.next;
  387     buf->status.tx.prev->status.tx.next = buf->status.tx.next;
  388     buf->status.tx.next->status.tx.prev = buf->status.tx.prev;
  389     ep->outstandingRequests--;
  390   }
  391   #if AMX_DEBUG
  392     buf->status.tx.next = NULL;
  393     buf->status.tx.prev = NULL;
  394   #endif
  395 }
  396 /* ------------------------------------------------------------------------------------ */
  397 static int AMUDP_HandleRequestTimeouts(ep_t ep, int numtocheck) {
  398   /* check the next numtocheck requests for timeout (or -1 for all)
  399    * and retransmit as necessary. return AM_OK or AM_ERR_XXX
  400    */
  401   amudp_buf_t *buf = ep->timeoutCheckPosn;
  402 
  403   if (!buf) { // tx ring empty
  404     AMX_assert(ep->outstandingRequests == 0);
  405     return AM_OK; 
  406   }
  407 
  408   amx_tick_t now = AMX_getCPUTicks();
  409 
  410   AMX_assert(ep->outstandingRequests > 0);
  411   AMX_assert(ep->outstandingRequests <= ep->PD); // sanity: weak test b/c ignores loopback
  412   if (numtocheck == -1) numtocheck = ep->outstandingRequests;
  413   else numtocheck = MIN(numtocheck, ep->outstandingRequests);
  414   for (int i = 0; i < numtocheck; i++) {
  415     if_pf (buf->status.tx.timestamp <= now) {
  416       AMX_assert(AMUDP_InitialRequestTimeout_us != AMUDP_TIMEOUT_INFINITE);
  417 
  418       static uint32_t max_retryCount = 0;
  419       if_pf (!max_retryCount) { // init precomputed values
  420         if (AMUDP_MaxRequestTimeout_us == AMUDP_TIMEOUT_INFINITE) {
  421           max_retryCount = (uint32_t)-1;
  422         } else {
  423           uint64_t temp = AMUDP_InitialRequestTimeout_us;
  424           while (temp <= AMUDP_MaxRequestTimeout_us) {
  425             temp *= AMUDP_RequestTimeoutBackoff;
  426             max_retryCount++;
  427           }
  428         }
  429       }
  430 
  431       amudp_msg_t * const msg = &buf->msg;
  432       amudp_category_t const cat = AMUDP_MSG_CATEGORY(msg);
  433       AMX_assert(AMUDP_MSG_ISREQUEST(msg));
  434       amudp_node_t const destP = buf->status.tx.destId;
  435 
  436       if_pf (buf->status.tx.retryCount >= max_retryCount) {
  437         /* we already waited too long - request is undeliverable */
  438         amx_returned_handler_fn_t handlerfn = (amx_returned_handler_fn_t)ep->handler[0];
  439         int opcode = AMUDP_GetOpcode(1, cat);
  440 
  441         AMUDP_DequeueTxBuffer(ep, buf);
  442         amudp_bufdesc_t *txdesc = GET_REQ_DESC(ep, destP, AMUDP_MSG_INSTANCE(msg));
  443         txdesc->buffer = NULL; // free tx descriptor
  444 
  445         /* pretend this is a bounced recv buffer */
  446         /* note that source/dest for returned mesgs reflect the virtual "message denied" packet 
  447          * although it doesn't really matter because the AM2 spec is too vague
  448          * about the argblock returned message argument for it to be of any use to anyone
  449          */
  450         buf->status.rx.sourceId = destP; 
  451         buf->status.rx.sourceAddr = ep->perProcInfo[destP].remoteName;
  452         buf->status.rx.dest = ep;
  453 
  454         buf->status.rx.replyIssued = TRUE; /* prevent any reply */
  455         buf->status.rx.handlerRunning = TRUE;
  456         AMX_assert(handlerfn != NULL);
  457         (*handlerfn)(ECONGESTION, opcode, (void *)buf);
  458         buf->status.rx.handlerRunning = FALSE;
  459 
  460         AMUDP_ReleaseBuffer(ep, buf);
  461         AMUDP_STATS(ep->stats.ReturnedMessages++);
  462       } else {
  463         /* retransmit */
  464         size_t msgsz = GET_MSG_SZ(msg);
  465         en_t destaddress = ep->perProcInfo[destP].remoteName;
  466         /* tag should NOT be changed for retransmit */
  467         AMX_VERBOSE_INFO(("Retransmitting a request..."));
  468         int retval = sendPacket(ep, msg, msgsz, destaddress, RETRANSMISSION_PACKET);
  469         if (retval != AM_OK) AMX_RETURN(retval);        
  470 
  471         uint32_t const retry = buf->status.tx.retryCount + 1;
  472         buf->status.tx.retryCount = retry;
  473 
  474         now = AMX_getCPUTicks(); // may have blocked in send
  475         buf->status.tx.timestamp = now + REQUEST_TIMEOUT_TICKS(retry);
  476 
  477         AMUDP_STATS(ep->stats.RequestsRetransmitted[cat]++);
  478         AMUDP_STATS(ep->stats.RequestTotalBytesSent[cat] += msgsz);
  479       }
  480     } // time expired
  481 
  482     buf = buf->status.tx.next; // advance
  483     AMX_assert(buf);
  484   }
  485   
  486   /* advance checked posn */
  487   ep->timeoutCheckPosn = buf;
  488 
  489   return AM_OK;
  490 }
  491 /* ------------------------------------------------------------------------------------ */
  492 #define MAXINT64    ((((uint64_t)1) << 63) - 1)
  493 static amx_tick_t AMUDP_FindEarliestRequestTimeout(eb_t eb) {
  494   /* return the soonest timeout value for an active request
  495    * (which may have already passed)
  496    * return 0 for no outstanding requests
  497    */
  498   amx_tick_t earliesttime = (amx_tick_t)MAXINT64;
  499   for (int i = 0; i < eb->n_endpoints; i++) {
  500     ep_t ep = eb->endpoints[i];
  501     amudp_buf_t * const startpos = ep->timeoutCheckPosn;
  502     if (!startpos) continue;
  503     amudp_buf_t *buf = startpos;
  504     do { 
  505       amx_tick_t timestamp = buf->status.tx.timestamp;
  506       if (timestamp < earliesttime) earliesttime = timestamp;
  507       buf = buf->status.tx.next;
  508     } while (buf != startpos);
  509   }
  510   if (earliesttime == MAXINT64) return 0;
  511   else return earliesttime;
  512 }
  513 /* ------------------------------------------------------------------------------------ */
  514 extern int AMUDP_Block(eb_t eb) {
  515   /* block until some endpoint receive buffer becomes non-empty
  516    * does not poll, but does handle SPMD control socket events
  517    */
  518 
  519   /* first, quickly determine if something is already waiting */
  520   { struct timeval tv = {0,0};
  521     int retval = AMUDP_WaitForEndpointActivity(eb, &tv);
  522     if (retval != -1) AMX_RETURN(retval); /* error or something waiting */
  523   }
  524 
  525   while (1) {
  526     /* we need to be careful we don't sleep longer than the next packet timeout */
  527     amx_tick_t nexttimeout = AMUDP_FindEarliestRequestTimeout(eb);
  528     int retval;
  529     if (nexttimeout) {
  530       struct timeval tv;
  531       amx_tick_t now = AMX_getCPUTicks();
  532       if (nexttimeout < now) goto timeout; /* already have a request timeout */
  533       uint32_t const uspause = (uint32_t)AMX_ticks2us(nexttimeout - now);
  534       tv.tv_sec = (long)(uspause / 1000000);
  535       tv.tv_usec = (long)(uspause % 1000000);
  536       retval = AMUDP_WaitForEndpointActivity(eb, &tv);
  537     } else /* no outstanding requests, so just block */
  538       retval = AMUDP_WaitForEndpointActivity(eb, NULL); 
  539     if (retval != -1) AMX_RETURN(retval); /* error or something waiting */
  540      
  541     /* some request has timed out - handle it */
  542     timeout:
  543     { int i;
  544       for (i = 0; i < eb->n_endpoints; i++) {
  545         ep_t ep = eb->endpoints[i];
  546         if (ep->depth != -1) {
  547           int retval = AMUDP_HandleRequestTimeouts(ep, -1);
  548           if (retval != AM_OK) AMX_RETURN(retval);
  549         }
  550       }
  551     }
  552   }
  553 
  554 }
  555 /* ------------------------------------------------------------------------------------ */
  556 #if AMX_DEBUG
  557   #define REFUSE_NOTICE(reason) AMX_Err("I just refused a message and returned to sender. Reason: %s", reason)
  558 #else
  559   #define REFUSE_NOTICE(reason) (void)0
  560 #endif
  561 
  562 /* this is a local-use-only macro for AMUDP_processPacket */
  563 #define AMUDP_REFUSEMESSAGE(errcode) do {                                       \
  564     msg->systemMessageType = (uint8_t)amudp_system_returnedmessage;             \
  565     msg->systemMessageArg = (uint8_t)errcode;                                   \
  566     if (isloopback) {                                                           \
  567       AMUDP_processPacket(buf, 1);                                              \
  568     } else {                                                                    \
  569       int retval = sendPacket(ep, msg, GET_MSG_SZ(msg),                         \
  570                         buf->status.rx.sourceAddr, REFUSAL_PACKET);             \
  571        /* ignore errors sending this */                                         \
  572       if (retval != AM_OK) AMX_Err("failed to sendPacket to refuse message");   \
  573       else REFUSE_NOTICE(#errcode);                                             \
  574     }                                                                           \
  575     return;                                                                     \
  576   } while(0)
  577 
  578 // Process an incoming buffer from any source, and return when complete
  579 // Does NOT release the buffer
  580 void AMUDP_processPacket(amudp_buf_t * const buf, int isloopback) {
  581   amudp_msg_t * const msg = &buf->msg;
  582   ep_t const ep = buf->status.rx.dest;
  583   amudp_node_t const sourceID = buf->status.rx.sourceId;
  584   int const numargs = AMUDP_MSG_NUMARGS(msg);
  585   uint8_t const seqnum = AMUDP_MSG_SEQNUM(msg);
  586   uint16_t const instance = AMUDP_MSG_INSTANCE(msg);
  587   int const isrequest = AMUDP_MSG_ISREQUEST(msg);
  588   amudp_category_t const cat = AMUDP_MSG_CATEGORY(msg);
  589   int const issystemmsg = ((amudp_system_messagetype_t)msg->systemMessageType) != amudp_system_user;
  590 
  591   /* handle returned messages */
  592   if_pf (issystemmsg) { 
  593     amudp_system_messagetype_t type = ((amudp_system_messagetype_t)msg->systemMessageType);
  594     if_pf (type == amudp_system_returnedmessage) { 
  595       amx_returned_handler_fn_t handlerfn = (amx_returned_handler_fn_t)ep->handler[0];
  596       if (sourceID == INVALID_NODE) return; /*  unknown source, ignore message */
  597       if (isrequest && !isloopback) { /*  the returned message is a request, so free that request buffer */
  598         amudp_bufdesc_t * const desc = GET_REQ_DESC(ep, sourceID, instance);
  599         if (desc->buffer && desc->seqNum == seqnum) {
  600           AMUDP_DequeueTxBuffer(ep, desc->buffer);
  601           AMUDP_ReleaseBuffer(ep, desc->buffer);
  602           desc->seqNum = AMUDP_SEQNUM_INC(desc->seqNum);
  603           desc->buffer = NULL;
  604           ep->perProcInfo[sourceID].instanceHint = instance;
  605         }
  606       }
  607       op_t opcode = AMUDP_GetOpcode(isrequest, cat);
  608 
  609       /* note that source/dest for returned mesgs reflect the virtual "message denied" packet 
  610        * although it doesn't really matter because the AM2 spec is too vague
  611        * about the argblock returned message argument for it to be of any use to anyone
  612        */
  613       buf->status.rx.replyIssued = TRUE; /* prevent any reply */
  614       buf->status.rx.handlerRunning = TRUE;
  615         AMX_assert(handlerfn != NULL);
  616         (*handlerfn)(msg->systemMessageArg, opcode, (void *)buf);
  617       buf->status.rx.handlerRunning = FALSE;
  618       AMUDP_STATS(ep->stats.ReturnedMessages++);
  619       return;
  620     }
  621   }
  622 
  623   if (!isloopback) {
  624     if (isrequest) AMUDP_STATS(ep->stats.RequestsReceived[cat]++);
  625     else AMUDP_STATS(ep->stats.RepliesReceived[cat]++);
  626   }
  627 
  628   /* perform acceptance checks */
  629 
  630   if_pf (ep->tag == AM_NONE || 
  631      (ep->tag != msg->tag && ep->tag != AM_ALL))
  632       AMUDP_REFUSEMESSAGE(EBADTAG);
  633   if_pf (instance >= ep->depth)
  634       AMUDP_REFUSEMESSAGE(EUNREACHABLE);
  635   if_pf (ep->handler[msg->handlerId] == amx_unused_handler &&
  636       !issystemmsg && msg->handlerId != 0)
  637       AMUDP_REFUSEMESSAGE(EBADHANDLER);
  638 
  639   switch (cat) {
  640     case amudp_Short:
  641       if_pf (msg->nBytes > 0 || msg->destOffset > 0)
  642         AMUDP_REFUSEMESSAGE(EBADLENGTH);
  643       break;
  644     case amudp_Medium:
  645       if_pf (msg->nBytes > AMUDP_MAX_MEDIUM || msg->destOffset > 0)
  646         AMUDP_REFUSEMESSAGE(EBADLENGTH);
  647       break;
  648     case amudp_Long: 
  649       /* check segment limits */
  650       if_pf (msg->nBytes > AMUDP_MAX_LONG)
  651         AMUDP_REFUSEMESSAGE(EBADLENGTH);
  652       if_pf ( ep->segLength == 0 || /* empty seg */
  653               ((uintptr_t)ep->segAddr + msg->destOffset) == 0) /* NULL target */
  654         AMUDP_REFUSEMESSAGE(EBADSEGOFF);
  655       if_pf (msg->destOffset + msg->nBytes > ep->segLength)
  656         AMUDP_REFUSEMESSAGE(EBADLENGTH);
  657       break;
  658     default: AMX_unreachable();
  659   }
  660 
  661   /*  check the source id */
  662   if_pf (sourceID == INVALID_NODE) AMUDP_REFUSEMESSAGE(EBADENDPOINT);
  663 
  664   // fetch the descriptor relevant to this network message
  665   amudp_bufdesc_t * const desc = (isloopback ? NULL :
  666                        AMUDP_get_desc(ep, sourceID, instance, 
  667                                       !isrequest,  // the alternate descriptor is the relevant one
  668                                       isrequest)); // should only need to allocate if this is a request
  669 
  670   if (!isloopback) {
  671     static const char *OOOwarn = "Detected arrival of out-of-order %s!\n"
  672       " It appears your system is delivering IP packets out-of-order between worker nodes,\n"
  673       " most likely due to striping over multiple adapters or links.\n"
  674       " This might (rarely) lead to corruption of AMUDP traffic.";
  675     /* check sequence number to see if this is a new request/reply or a duplicate */
  676     if (isrequest) {
  677       if_pf (seqnum != desc->seqNum) { 
  678         if_pf (AMUDP_SEQNUM_INC(seqnum) != desc->seqNum) {
  679           if (OOOwarn) {
  680             AMX_Warn(OOOwarn, "request");
  681             OOOwarn = NULL;
  682           }
  683           // Out-of-order message can only be a "slow" copy from a previously-completed instance 
  684           // that included retransmits. Hence, should always be discarded.
  685           AMUDP_STATS(ep->stats.OutOfOrderRequests++);
  686           AMX_VERBOSE_INFO(("Ignoring an Out-of-order request."));
  687           return;
  688         }
  689         /* request resent or reply got dropped - resend reply */
  690         amudp_buf_t * const replybuf = desc->buffer;
  691         AMX_assert(replybuf);
  692         amudp_msg_t * const replymsg = &replybuf->msg;
  693         int cat = AMUDP_MSG_CATEGORY(replymsg);
  694        
  695         if (!ep->replyEpoch) ep->replyEpoch = AMX_getCPUTicks();
  696         if (replybuf->status.tx.timestamp == ep->replyEpoch) {
  697           // optimization: don't retransmit a reply more than once per epoch 
  698           // This prevents request retransmit storms that built up while we were inattentive
  699           // from being exacerbated into reply retransmit storms
  700           AMX_VERBOSE_INFO(("Got a same-epoch duplicate request - squashing reply retransmit."));
  701           AMUDP_STATS(ep->stats.RepliesSquashed[cat]++);
  702           return;
  703         }
  704         replybuf->status.tx.timestamp = ep->replyEpoch;
  705 
  706         size_t msgsz = GET_MSG_SZ(replymsg);
  707         AMX_VERBOSE_INFO(("Got a duplicate request - resending previous reply."));
  708         int retval = sendPacket(ep, replymsg, msgsz,
  709             ep->perProcInfo[sourceID].remoteName, RETRANSMISSION_PACKET);
  710         if (retval != AM_OK) AMX_Err("sendPacket failed while resending a reply");
  711         AMUDP_STATS(ep->stats.RepliesRetransmitted[cat]++);
  712         AMUDP_STATS(ep->stats.ReplyTotalBytesSent[cat] += msgsz);
  713         return;
  714       }
  715     } else {
  716       if (seqnum != desc->seqNum) { /*  duplicate reply, we already ran handler - ignore it */
  717         if_pf (AMUDP_SEQNUM_INC(seqnum) != desc->seqNum) {
  718           if (OOOwarn) {
  719             AMX_Warn(OOOwarn, "reply");
  720             OOOwarn = NULL;
  721           }
  722           // Out-of-order message can only be a "slow" copy from a previously-completed instance 
  723           // that included retransmits. Hence, should always be discarded.
  724           AMUDP_STATS(ep->stats.OutOfOrderReplies++);
  725           AMX_VERBOSE_INFO(("Ignoring an Out-of-order reply."));
  726           return;
  727         }
  728         AMX_VERBOSE_INFO(("Ignoring a duplicate reply."));
  729         return;
  730       }
  731     }
  732 
  733     /* --- message accepted --- */
  734 
  735     if (isrequest) { //  alternate the reply sequence number so duplicates of this request get ignored
  736         desc->seqNum = AMUDP_SEQNUM_INC(desc->seqNum);
  737     } else { /* it's a reply, free the corresponding request */
  738       amudp_buf_t * const reqbuf = desc->buffer;
  739       if_pt (reqbuf) { 
  740         #if AMUDP_COLLECT_LATENCY_STATS && AMUDP_COLLECT_STATS
  741           { /* gather some latency statistics */
  742             amx_tick_t now = AMX_getCPUTicks();
  743             amx_tick_t latency = (now - reqbuf->status.tx.firstSendTime);
  744             ep->stats.RequestSumLatency += latency;
  745             if (latency < ep->stats.RequestMinLatency) ep->stats.RequestMinLatency = latency;
  746             if (latency > ep->stats.RequestMaxLatency) ep->stats.RequestMaxLatency = latency;
  747           }
  748         #endif
  749         AMUDP_DequeueTxBuffer(ep, reqbuf);
  750         AMUDP_ReleaseBuffer(ep, reqbuf);
  751         desc->seqNum = AMUDP_SEQNUM_INC(desc->seqNum);
  752         desc->buffer = NULL;
  753         ep->perProcInfo[sourceID].instanceHint = instance;
  754       } else { /* request timed out and we decided it was undeliverable, then a reply arrived */
  755         desc->seqNum = AMUDP_SEQNUM_INC(desc->seqNum);
  756         /* TODO: seq numbers may get out of sync on timeout 
  757          * if request got through but replies got lost 
  758          * we also may do bad things if a reply to an undeliverable message 
  759          * arrives after we've reused the request buffer (very unlikely)
  760          * possible soln: add an epoch number
  761          */
  762         return; /* reply handler should NOT be run in this situation */
  763       }
  764     }
  765   }
  766 
  767   { /*  run the handler */
  768     buf->status.rx.replyIssued = FALSE;
  769     buf->status.rx.handlerRunning = TRUE;
  770     if (issystemmsg) { /* an AMUDP system message */
  771       amudp_system_messagetype_t type = ((amudp_system_messagetype_t)(msg->systemMessageType & 0xF));
  772       switch (type) {
  773         case amudp_system_autoreply:
  774           AMX_assert(!isloopback);
  775           return; /*  already taken care of */
  776         default: AMX_unreachable();
  777       }
  778     } else { /* a user message */
  779       uint32_t * const pargs = GET_MSG_ARGS(msg);
  780       handler_t const hid = msg->handlerId;
  781       switch (cat) {
  782         case amudp_Short: 
  783           if (ep->preHandlerCallback) 
  784             ep->preHandlerCallback(amudp_Short, isrequest, hid, buf, 
  785                                    NULL, 0, numargs, pargs);
  786           AMX_RUN_HANDLER_SHORT(ep->handler[hid], buf, pargs, numargs);
  787           if (ep->postHandlerCallback) ep->postHandlerCallback(cat, isrequest);
  788           break;
  789         case amudp_Medium: {
  790           uint8_t * const pData = GET_MSG_DATA(msg);
  791           if (ep->preHandlerCallback) 
  792             ep->preHandlerCallback(amudp_Medium, isrequest, hid, buf, 
  793                                    pData, msg->nBytes, numargs, pargs);
  794           AMX_RUN_HANDLER_MEDIUM(ep->handler[hid], buf, pargs, numargs, pData, msg->nBytes);
  795           if (ep->postHandlerCallback) ep->postHandlerCallback(cat, isrequest);
  796           break;
  797         }
  798         case amudp_Long: {
  799           uint8_t * const pData = ((uint8_t *)ep->segAddr) + msg->destOffset;
  800           /*  a single-message bulk transfer. do the copy */
  801           if (!isloopback) memcpy(pData, GET_MSG_DATA(msg), msg->nBytes);
  802           if (ep->preHandlerCallback) 
  803             ep->preHandlerCallback(amudp_Long, isrequest, hid, buf, 
  804                                    pData, msg->nBytes, numargs, pargs);
  805           AMX_RUN_HANDLER_LONG(ep->handler[hid], buf, pargs, numargs, pData, msg->nBytes);
  806           if (ep->postHandlerCallback) ep->postHandlerCallback(cat, isrequest);
  807           break;
  808         }
  809         default: AMX_unreachable();
  810       }
  811     }
  812     buf->status.rx.handlerRunning = FALSE;
  813     if (!isloopback && isrequest && !buf->status.rx.replyIssued) {
  814         static va_list va_dummy; /* dummy value - static to prevent uninit warnings */
  815         /*  user didn't reply, so issue an auto-reply */
  816         if_pf (AMUDP_ReplyGeneric(amudp_Short, buf, 0, 0, 0, 0, 0, va_dummy, amudp_system_autoreply, 0) 
  817             != AM_OK) /*  should never happen - don't return here to prevent leaking buffer */
  818           AMX_Err("Failed to issue auto reply in AMUDP_ServiceIncomingMessages");
  819     }
  820   }
  821 }
  822 #undef AMUDP_REFUSEMESSAGE  /* this is a local-use-only macro */
  823 /* ------------------------------------------------------------------------------------ */
  824 /* main message receive workhorse - 
  825  * drain network once and service available incoming messages, up to AMUDP_MAX_RECVMSGS_PER_POLL
  826  */
  827 static int AMUDP_ServiceIncomingMessages(ep_t ep) {
  828   /* drain network */
  829   int retval = AMUDP_DrainNetwork(ep);
  830   if (retval != AM_OK) AMX_RETURN(retval);
  831 
  832   ep->replyEpoch = 0;
  833 
  834   for (int i = 0; AMUDP_MAX_RECVMSGS_PER_POLL == 0 || i < MAX(AMUDP_MAX_RECVMSGS_PER_POLL, ep->depth); i++) {
  835       amudp_buf_t * const buf = ep->rxHead;
  836 
  837       if (!buf) return AM_OK; /* nothing else waiting */
  838 
  839       /* we have a real message waiting - dequeue it */
  840       ep->rxHead = buf->status.rx.next;
  841       AMX_assert(ep->rxCnt > 0);
  842       ep->rxCnt--;
  843       if (ep->rxCnt == 0) {
  844         AMX_assert(!ep->rxHead);
  845         ep->rxTail = NULL;
  846       }
  847 
  848       if (AMUDP_FaultInjectionEnabled) { /* allow fault injection to drop some revcd messages */
  849         double randval = rand() / (double)RAND_MAX;
  850         AMX_assert(randval >= 0.0 && AMUDP_FaultInjectionRate >= 0.0);
  851         if (randval < AMUDP_FaultInjectionRate) {
  852           AMX_VERBOSE_INFO(("Fault injection dropping a packet.."));
  853           goto donewithmessage;
  854         }
  855       }
  856   
  857       AMUDP_processPacket(buf, 0);
  858       donewithmessage: /* message handled - continue to next one */
  859 
  860       /* free the handled buffer */
  861       AMUDP_ReleaseBuffer(ep, buf);
  862 
  863   }  /*  for */
  864   return AM_OK;
  865 } /*  AMUDP_ServiceIncomingMessages */
  866 /*------------------------------------------------------------------------------------
  867  * Poll
  868  *------------------------------------------------------------------------------------ */
  869 extern int AM_Poll(eb_t eb) {
  870   AMX_CHECKINIT();
  871   AMX_CHECK_ERR(!eb, BAD_ARG);
  872 
  873   for (int i = 0; i < eb->n_endpoints; i++) {
  874     int retval;
  875     ep_t ep = eb->endpoints[i];
  876 
  877     if_pt (ep->depth != -1) { /* only poll endpoints which have buffers */
  878 
  879       #if USE_ASYNC_TCP_CONTROL
  880         if_pf (AMUDP_SPMDIsActiveControlSocket) /*  async check */
  881       #endif
  882       { retval = AMUDP_SPMDHandleControlTraffic(NULL);
  883         if_pf (retval != AM_OK) AMX_RETURN(retval);
  884       }
  885 
  886       retval = AMUDP_ServiceIncomingMessages(ep); /* drain network and check for activity */
  887       if_pf (retval != AM_OK) AMX_RETURN(retval);
  888 
  889       retval = AMUDP_HandleRequestTimeouts(ep, AMUDP_TIMEOUTS_CHECKED_EACH_POLL);
  890       if_pf (retval != AM_OK) AMX_RETURN(retval);
  891     }
  892   }
  893 
  894   return AM_OK;
  895 }
  896 // poll/block eb while awaiting resource cond
  897 // upon error, execute cleanup and return it
  898 #define BLOCKUNTIL(eb, cond, cleanup) while (!(cond)) { \
  899    int _retval = AM_OK;                                 \
  900    if (AMUDP_PoliteSync) {                              \
  901       _retval = AMUDP_Block(eb);                        \
  902    }                                                    \
  903    if_pt (_retval == AM_OK) _retval = AM_Poll(eb);      \
  904    if_pf (_retval != AM_OK) {                           \
  905      cleanup;                                           \
  906      AMX_RETURN(_retval);                               \
  907    }                                                    \
  908   }
  909 #define TRANSID_TO_NODEID(ep, transid) (                       \
  910   AMX_PREDICT_TRUE(!(ep)->translation) ? (amudp_node_t)(transid) : \
  911     (AMX_assert((transid) < (ep)->translationsz),              \
  912      (ep)->translation[transid].id)                            \
  913   )
  914 
  915 /*------------------------------------------------------------------------------------
  916  * Generic Request/Reply
  917  *------------------------------------------------------------------------------------ */
  918 static int AMUDP_RequestGeneric(amudp_category_t category, 
  919                           ep_t ep, amudp_node_t reply_endpoint, handler_t handler, 
  920                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
  921                           int numargs, va_list argptr, 
  922                           uint8_t systemType, uint8_t systemArg) {
  923 
  924   amudp_node_t const destP = TRANSID_TO_NODEID(ep, reply_endpoint);
  925   amudp_perproc_info_t * const perProcInfo = &ep->perProcInfo[destP];
  926   en_t const destaddress = perProcInfo->remoteName;
  927   const int isloopback = enEqual(destaddress, ep->name);
  928 
  929   uint16_t instance;
  930   amudp_bufdesc_t *outgoingdesc = NULL;
  931 
  932   /*  always poll before sending a request */
  933   int retval = AM_Poll(ep->eb);
  934   if_pf (retval != AM_OK) AMX_RETURN(retval);
  935 
  936   size_t const msgsz = COMPUTE_MSG_SZ(numargs, nbytes);
  937   size_t const buffersz = MSGSZ_TO_BUFFERSZ(msgsz);
  938   amudp_buf_t * const outgoingbuf = AMUDP_AcquireBuffer(ep, buffersz);
  939 
  940   if (isloopback) {
  941     instance = 0; /* not used */
  942   } else { /*  acquire a free request buffer */
  943     int const depth = ep->depth;
  944     amudp_bufdesc_t * const descs = GET_REQ_DESC_ALLOC(ep, destP, 0);
  945 
  946     while(1) { // send descriptor acquisition loop
  947       uint16_t const hint = perProcInfo->instanceHint;
  948       AMX_assert(hint <= depth);
  949       amudp_bufdesc_t * const hintdesc = &descs[hint];
  950 
  951       if_pt (!hintdesc->buffer) { /*  hint is right */
  952         instance = hint;
  953         outgoingdesc = hintdesc;
  954         perProcInfo->instanceHint = (hint+1==depth?0:hint+1);
  955         goto gotinstance;
  956       } else { /*  hint is wrong */
  957         /*  search for a free instance */
  958         instance = hint; 
  959         do {
  960           instance = ((instance+1)==depth?0:instance+1);
  961           amudp_bufdesc_t * const tdesc = &descs[hint];
  962           if (!tdesc->buffer) {
  963             outgoingdesc = tdesc;
  964             goto gotinstance;
  965           }
  966         } while (instance != hint);
  967 
  968         /*  no buffers available - wait until one is open 
  969          *  (hint will point to a free buffer) 
  970          */
  971         BLOCKUNTIL(ep->eb, descs[perProcInfo->instanceHint].buffer == NULL,
  972                   AMUDP_ReleaseBuffer(ep, outgoingbuf)); // prevent leak on error return
  973       }
  974     } 
  975 
  976   gotinstance:
  977     AMX_assert(outgoingdesc);
  978     AMX_assert(!outgoingdesc->buffer);
  979 
  980     // wait for sendDepth, if necessary
  981     BLOCKUNTIL(ep->eb, ep->outstandingRequests < ep->sendDepth, 
  982                   AMUDP_ReleaseBuffer(ep, outgoingbuf)); // prevent leak on error return
  983 
  984     AMX_assert(!outgoingdesc->buffer);
  985     outgoingdesc->buffer = outgoingbuf; // claim desc
  986   }
  987 
  988   /*  setup message meta-data */
  989   amudp_msg_t * const msg = &outgoingbuf->msg;
  990   if (isloopback) AMUDP_MSG_SETFLAGS(msg, TRUE, category, numargs, 0, 0);
  991   else AMUDP_MSG_SETFLAGS(msg, TRUE, category, numargs, outgoingdesc->seqNum, instance);
  992   msg->destOffset = dest_offset;
  993   msg->handlerId = handler;
  994   msg->nBytes = (uint16_t)nbytes;
  995   AMX_assert(systemType == amudp_system_user);
  996   AMX_assert(systemArg == 0);
  997   msg->systemMessageType = systemType;
  998   msg->systemMessageArg = (uint8_t)ep->idHint;
  999   msg->tag = perProcInfo->tag;
 1000   AMX_assert(GET_MSG_SZ(msg) == msgsz);
 1001 
 1002   { /*  setup args */
 1003     int i;
 1004     uint32_t *args = GET_MSG_ARGS(msg);
 1005     for (i = 0; i < numargs; i++) {
 1006       args[i] = (uint32_t)va_arg(argptr, int); /* must be int due to default argument promotion */
 1007     }
 1008     #if USE_CLEAR_UNUSED_SPACE
 1009       if (i < AMUDP_MAX_SHORT) args[i] = 0;
 1010     #endif
 1011   }
 1012 
 1013   if (isloopback) { /* run handler synchronously */
 1014     if (nbytes > 0) { /* setup data */
 1015       if (category == amudp_Long) { /* one-copy: buffer was overallocated, could be reduced with more complexity */
 1016         AMX_CHECK_ERRFRC(dest_offset + nbytes > ep->segLength, BAD_ARG, 
 1017                            "AMRequestXfer", "segment overflow", 
 1018                            AMUDP_ReleaseBuffer(ep, outgoingbuf));
 1019         memmove(((int8_t *)ep->segAddr) + dest_offset, 
 1020                 source_addr, nbytes);
 1021       } else { /* mediums still need data copy */
 1022         memcpy(GET_MSG_DATA(msg), source_addr, nbytes);
 1023       }
 1024     }
 1025     /* pretend its a recv buffer */
 1026     outgoingbuf->status.rx.dest = ep;
 1027     outgoingbuf->status.rx.sourceId = reply_endpoint;
 1028     outgoingbuf->status.rx.sourceAddr = destaddress;
 1029 
 1030     AMUDP_processPacket(outgoingbuf, 1);
 1031 
 1032     AMUDP_ReleaseBuffer(ep, outgoingbuf);
 1033   } else { /* perform the send */
 1034 
 1035     /*  setup data */
 1036     if (nbytes > 0) {
 1037       memcpy(GET_MSG_DATA(msg), source_addr, nbytes);
 1038     }
 1039 
 1040     int retval = sendPacket(ep, msg, msgsz, destaddress, REQUESTREPLY_PACKET);
 1041     if_pf (retval != AM_OK) {
 1042       outgoingdesc->buffer = NULL; /*  send failed, so message rejected - release buffer */
 1043       AMUDP_ReleaseBuffer(ep, outgoingbuf);
 1044       perProcInfo->instanceHint = instance;
 1045       AMX_RETURN(retval);
 1046     }
 1047 
 1048     amx_tick_t now = AMX_getCPUTicks();
 1049     if (AMUDP_InitialRequestTimeout_us == AMUDP_TIMEOUT_INFINITE) { // never timeout
 1050       outgoingbuf->status.tx.timestamp = (amx_tick_t)-1;
 1051     } else {
 1052       outgoingbuf->status.tx.timestamp = now + REQUEST_TIMEOUT_TICKS(0);
 1053     }
 1054     #if AMUDP_COLLECT_LATENCY_STATS
 1055       outgoingbuf->status.tx.firstSendTime = now;
 1056     #endif
 1057 
 1058     outgoingbuf->status.tx.retryCount = 0;
 1059     outgoingbuf->status.tx.destId = destP;
 1060     AMUDP_EnqueueTxBuffer(ep, outgoingbuf);
 1061 
 1062     AMUDP_STATS(ep->stats.RequestsSent[category]++);
 1063     AMUDP_STATS(ep->stats.RequestDataBytesSent[category] += sizeof(int) * numargs + nbytes);
 1064     AMUDP_STATS(ep->stats.RequestTotalBytesSent[category] += msgsz);
 1065   }
 1066 
 1067   return AM_OK;
 1068 }
 1069 /* ------------------------------------------------------------------------------------ */
 1070 static int AMUDP_ReplyGeneric(amudp_category_t category, 
 1071                           amudp_buf_t *requestbuf, handler_t handler, 
 1072                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
 1073                           int numargs, va_list argptr,
 1074                           uint8_t systemType, uint8_t systemArg) {
 1075   ep_t const ep = requestbuf->status.rx.dest;
 1076   amudp_node_t const destP = requestbuf->status.rx.sourceId;
 1077   const int isloopback = enEqual(requestbuf->status.rx.sourceAddr, ep->name);
 1078   amudp_perproc_info_t const * const perProcInfo = &ep->perProcInfo[destP];
 1079 
 1080   /*  we don't poll within a reply because by definition we are already polling somewhere in the call chain */
 1081 
 1082   size_t const msgsz = COMPUTE_MSG_SZ(numargs, nbytes);
 1083   size_t const buffersz = MSGSZ_TO_BUFFERSZ(msgsz);
 1084   amudp_buf_t * const outgoingbuf = AMUDP_AcquireBuffer(ep, buffersz);
 1085   amudp_bufdesc_t *outgoingdesc;
 1086   uint16_t instance;
 1087 
 1088   if (isloopback) {
 1089     #if AMX_DEBUG
 1090       outgoingdesc = NULL; /* not used */
 1091       instance = 0; /* not used */
 1092     #endif
 1093   } else {
 1094     /*  acquire a free descriptor  */
 1095     /*  trivial because replies always overwrite previous reply in request instance */
 1096     instance = AMUDP_MSG_INSTANCE(&(requestbuf->msg)); 
 1097     outgoingdesc = GET_REP_DESC(ep, destP, instance); // reply desc alloc in processPacket
 1098 
 1099     if (outgoingdesc->buffer) { /* free buffer of previous reply */
 1100       AMUDP_ReleaseBuffer(ep, outgoingdesc->buffer);
 1101     }
 1102     outgoingdesc->buffer = outgoingbuf;
 1103   }
 1104 
 1105   /*  setup message meta-data */
 1106   amudp_msg_t * const msg = &outgoingbuf->msg;
 1107   if (isloopback) AMUDP_MSG_SETFLAGS(msg, FALSE, category, numargs, 0, 0);
 1108   else AMUDP_MSG_SETFLAGS(msg, FALSE, category, numargs, 
 1109                           AMUDP_MSG_SEQNUM(&requestbuf->msg), // clone request seqnum, as rep_desc already inc
 1110                           instance);
 1111   msg->destOffset = dest_offset;
 1112   msg->handlerId = handler;
 1113   msg->nBytes = (uint16_t)nbytes;
 1114   AMX_assert(systemType == amudp_system_user || systemType == amudp_system_autoreply);
 1115   AMX_assert(systemArg == 0);
 1116   msg->systemMessageType = systemType;
 1117   msg->systemMessageArg = (uint8_t)ep->idHint;
 1118   msg->tag = perProcInfo->tag;
 1119   AMX_assert(GET_MSG_SZ(msg) == msgsz);
 1120 
 1121   { /*  setup args */
 1122     int i;
 1123     uint32_t *args = GET_MSG_ARGS(msg);
 1124     for (i = 0; i < numargs; i++) {
 1125       args[i] = (uint32_t)va_arg(argptr, int); /* must be int due to default argument promotion */
 1126     }
 1127     #if USE_CLEAR_UNUSED_SPACE
 1128       if (i < AMUDP_MAX_SHORT) args[i] = 0;
 1129     #endif
 1130   }
 1131 
 1132   en_t const destaddress = perProcInfo->remoteName;
 1133   if (isloopback) { /* run handler synchronously */
 1134     if (nbytes > 0) { /* setup data */
 1135       if (category == amudp_Long) { /* one-copy */
 1136         AMX_CHECK_ERRFRC(dest_offset + nbytes > ep->segLength, BAD_ARG, 
 1137                            "AMRequestXfer", "segment overflow",
 1138                            AMUDP_ReleaseBuffer(ep, outgoingbuf));
 1139         memmove(((int8_t *)ep->segAddr) + dest_offset, 
 1140                 source_addr, nbytes);
 1141       } else { /* mediums still need data copy */
 1142         memcpy(GET_MSG_DATA(msg), source_addr, nbytes);
 1143       }
 1144     }
 1145 
 1146     /* pretend its a recv buffer */
 1147     outgoingbuf->status.rx.dest = ep;
 1148     outgoingbuf->status.rx.sourceId = destP;
 1149     outgoingbuf->status.rx.sourceAddr = destaddress;
 1150 
 1151     AMUDP_processPacket(outgoingbuf, 1);
 1152 
 1153     AMUDP_ReleaseBuffer(ep, outgoingbuf);
 1154   } else { /* perform the send */
 1155     /*  setup data */
 1156     memcpy(GET_MSG_DATA(msg), source_addr, nbytes);
 1157 
 1158     int retval = sendPacket(ep, msg, msgsz, destaddress, REQUESTREPLY_PACKET);
 1159     if_pf (retval != AM_OK) AMX_RETURN(retval);
 1160 
 1161     if (!ep->replyEpoch) ep->replyEpoch = AMX_getCPUTicks();
 1162     outgoingbuf->status.tx.timestamp = ep->replyEpoch;
 1163     AMUDP_STATS(ep->stats.RepliesSent[category]++);
 1164     AMUDP_STATS(ep->stats.ReplyDataBytesSent[category] += sizeof(int) * numargs + nbytes);
 1165     AMUDP_STATS(ep->stats.ReplyTotalBytesSent[category] += msgsz);
 1166   }
 1167 
 1168   requestbuf->status.rx.replyIssued = TRUE;
 1169   return AM_OK;
 1170 }
 1171 
 1172 /*------------------------------------------------------------------------------------
 1173  * Request
 1174  *------------------------------------------------------------------------------------ */
 1175 extern int AMUDP_RequestVA(ep_t request_endpoint, amudp_node_t reply_endpoint, handler_t handler, 
 1176                          int numargs, va_list argptr) {
 1177   AMX_CHECKINIT();
 1178   AMX_CHECK_ERR(!request_endpoint, BAD_ARG);
 1179   AMX_CHECK_ERR(AMUDP_BADHANDLERVAL(handler), BAD_ARG);
 1180   AMX_CHECK_ERR(request_endpoint->depth == -1, NOT_INIT); /* it's an error to call before AM_SetExpectedResources */
 1181   AMX_CHECK_ERR(reply_endpoint >= request_endpoint->translationsz, BAD_ARG);
 1182   AMX_CHECK_ERR(request_endpoint->translation && !request_endpoint->translation[reply_endpoint].inuse, BAD_ARG);
 1183   AMX_CHECK_ERR(!request_endpoint->translation && reply_endpoint >= request_endpoint->P, BAD_ARG);
 1184   AMX_assert(numargs >= 0 && numargs <= AMUDP_MAX_SHORT);
 1185 
 1186   return AMUDP_RequestGeneric(amudp_Short, 
 1187                                   request_endpoint, reply_endpoint, handler, 
 1188                                   NULL, 0, 0,
 1189                                   numargs, argptr,
 1190                                   amudp_system_user, 0);
 1191 
 1192 }
 1193 extern int AMUDP_Request(ep_t request_endpoint, amudp_node_t reply_endpoint, handler_t handler, 
 1194                          int numargs, ...) {
 1195     int retval;
 1196     va_list argptr;
 1197     va_start(argptr, numargs); /*  pass in last argument */
 1198     retval = AMUDP_RequestVA(request_endpoint, reply_endpoint, handler, 
 1199                            numargs, argptr);
 1200     va_end(argptr);
 1201     return retval;
 1202 }
 1203 /* ------------------------------------------------------------------------------------ */
 1204 extern int AMUDP_RequestIVA(ep_t request_endpoint, amudp_node_t reply_endpoint, handler_t handler, 
 1205                           void *source_addr, size_t nbytes,
 1206                           int numargs, va_list argptr) {
 1207   AMX_CHECKINIT();
 1208   AMX_CHECK_ERR(!request_endpoint, BAD_ARG);
 1209   AMX_CHECK_ERR(AMUDP_BADHANDLERVAL(handler), BAD_ARG);
 1210   AMX_CHECK_ERR(request_endpoint->depth == -1, NOT_INIT); /* it's an error to call before AM_SetExpectedResources */
 1211   AMX_CHECK_ERR(reply_endpoint >= request_endpoint->translationsz, BAD_ARG);
 1212   AMX_CHECK_ERR(request_endpoint->translation && !request_endpoint->translation[reply_endpoint].inuse, BAD_ARG);
 1213   AMX_CHECK_ERR(!request_endpoint->translation && reply_endpoint >= request_endpoint->P, BAD_ARG);
 1214   AMX_CHECK_ERR(!source_addr && nbytes > 0, BAD_ARG);
 1215   AMX_CHECK_ERR(nbytes > AMUDP_MAX_MEDIUM, BAD_ARG);
 1216   AMX_assert(numargs >= 0 && numargs <= AMUDP_MAX_SHORT);
 1217 
 1218   return AMUDP_RequestGeneric(amudp_Medium, 
 1219                                   request_endpoint, reply_endpoint, handler, 
 1220                                   source_addr, nbytes, 0,
 1221                                   numargs, argptr,
 1222                                   amudp_system_user, 0);
 1223 }
 1224 extern int AMUDP_RequestI(ep_t request_endpoint, amudp_node_t reply_endpoint, handler_t handler, 
 1225                           void *source_addr, size_t nbytes,
 1226                           int numargs, ...) {
 1227     int retval;
 1228     va_list argptr;
 1229     va_start(argptr, numargs); /*  pass in last argument */
 1230     retval = AMUDP_RequestIVA(request_endpoint, reply_endpoint, handler, 
 1231                               source_addr, nbytes,
 1232                               numargs, argptr);
 1233     va_end(argptr);
 1234     return retval; 
 1235 }
 1236 /* ------------------------------------------------------------------------------------ */
 1237 extern int AMUDP_RequestXferVA(ep_t request_endpoint, amudp_node_t reply_endpoint, handler_t handler, 
 1238                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
 1239                           int async, 
 1240                           int numargs, va_list argptr) {
 1241   AMX_CHECKINIT();
 1242   AMX_CHECK_ERR(!request_endpoint, BAD_ARG);
 1243   AMX_CHECK_ERR(AMUDP_BADHANDLERVAL(handler), BAD_ARG);
 1244   AMX_CHECK_ERR(request_endpoint->depth == -1, NOT_INIT); /* it's an error to call before AM_SetExpectedResources */
 1245   AMX_CHECK_ERR(reply_endpoint >= request_endpoint->translationsz, BAD_ARG);
 1246   AMX_CHECK_ERR(request_endpoint->translation && !request_endpoint->translation[reply_endpoint].inuse, BAD_ARG);
 1247   AMX_CHECK_ERR(!request_endpoint->translation && reply_endpoint >= request_endpoint->P, BAD_ARG);
 1248   AMX_CHECK_ERR(!source_addr && nbytes > 0, BAD_ARG);
 1249   AMX_CHECK_ERR(nbytes > AMUDP_MAX_LONG, BAD_ARG);
 1250   AMX_CHECK_ERR(dest_offset > AMUDP_MAX_SEGLENGTH, BAD_ARG);
 1251   AMX_assert(numargs >= 0 && numargs <= AMUDP_MAX_SHORT);
 1252 
 1253   amudp_node_t const destP = TRANSID_TO_NODEID(request_endpoint, reply_endpoint);
 1254   amudp_perproc_info_t const * const perProcInfo = &request_endpoint->perProcInfo[destP];
 1255   const int isloopback = enEqual(perProcInfo->remoteName, request_endpoint->name);
 1256 
 1257   if (async && !isloopback) { /*  decide if we can satisfy request without blocking */
 1258       /* it's unclear from the spec whether we should poll before an async failure,
 1259        * but by definition the app must be prepared for handlers to run when calling this 
 1260        * request, so it shouldn't cause anything to break, and the async request is more likely
 1261        * to succeed if we do. so:
 1262        */
 1263       AM_Poll(request_endpoint->eb);
 1264 
 1265       /* check senddepth */
 1266       if (request_endpoint->outstandingRequests >= request_endpoint->sendDepth)
 1267         AMX_RETURN_ERRFR(IN_USE, AMUDP_RequestXferAsync, "Request can't be satisfied without blocking right now");
 1268 
 1269       /* see if there's a free buffer */
 1270       amudp_bufdesc_t * const desc = GET_REQ_DESC_ALLOC(request_endpoint, destP, 0);
 1271       uint16_t const hint = perProcInfo->instanceHint;
 1272       int const depth = request_endpoint->depth;
 1273       int i = hint;
 1274       AMX_assert(i >= 0 && i < depth);
 1275       while (1) {
 1276         if (!desc[i].buffer) break;
 1277         i = (i+1==depth ? 0 : i+1);
 1278         if (i == hint) AMX_RETURN_ERRFR(IN_USE, AMUDP_RequestXferAsync, 
 1279                                          "Request can't be satisfied without blocking right now");
 1280       }
 1281   }
 1282 
 1283   /* perform the send */
 1284   return AMUDP_RequestGeneric(amudp_Long, 
 1285                                   request_endpoint, reply_endpoint, handler, 
 1286                                   source_addr, nbytes, dest_offset,
 1287                                   numargs, argptr,
 1288                                   amudp_system_user, 0);
 1289 }
 1290 extern int AMUDP_RequestXfer(ep_t request_endpoint, amudp_node_t reply_endpoint, handler_t handler, 
 1291                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
 1292                           int async, 
 1293                           int numargs, ...) {
 1294       int retval;
 1295       va_list argptr;
 1296       va_start(argptr, numargs); /*  pass in last argument */
 1297       retval = AMUDP_RequestXferVA(request_endpoint, reply_endpoint, handler, 
 1298                                 source_addr, nbytes, dest_offset,
 1299                                 async,
 1300                                 numargs, argptr);
 1301       va_end(argptr);
 1302       return retval; 
 1303 }
 1304 /*------------------------------------------------------------------------------------
 1305  * Reply
 1306  *------------------------------------------------------------------------------------ */
 1307 extern int AMUDP_ReplyVA(void *token, handler_t handler, 
 1308                        int numargs, va_list argptr) {
 1309   AMX_CHECKINIT();
 1310   AMX_CHECK_ERR(!token, BAD_ARG);
 1311   AMX_CHECK_ERR(AMUDP_BADHANDLERVAL(handler), BAD_ARG);
 1312   AMX_assert(numargs >= 0 && numargs <= AMUDP_MAX_SHORT);
 1313 
 1314   amudp_buf_t * const buf = (amudp_buf_t *)token;
 1315   amudp_msg_t * const msg = &buf->msg;
 1316 
 1317   //  semantic checking on reply
 1318   AMX_CHECK_ERR(!AMUDP_MSG_ISREQUEST(msg), RESOURCE);       /* token is not a request */
 1319   AMX_CHECK_ERR(!buf->status.rx.handlerRunning, RESOURCE); /* token is not for an active request */
 1320   AMX_CHECK_ERR(buf->status.rx.replyIssued, RESOURCE);     /* already issued a reply */
 1321   AMX_CHECK_ERR(((amudp_system_messagetype_t)msg->systemMessageType) != amudp_system_user,
 1322                     RESOURCE); /* can't reply to a system message (returned message) */
 1323 
 1324   return AMUDP_ReplyGeneric(amudp_Short, 
 1325                                   buf, handler, 
 1326                                   NULL, 0, 0,
 1327                                   numargs, argptr,
 1328                                   amudp_system_user, 0);
 1329 }
 1330 extern int AMUDP_Reply(void *token, handler_t handler, 
 1331                        int numargs, ...) {
 1332     int retval;
 1333     va_list argptr;
 1334     va_start(argptr, numargs); /*  pass in last argument */
 1335     retval = AMUDP_ReplyVA(token, handler,
 1336                                   numargs, argptr);
 1337     va_end(argptr);
 1338     return retval; 
 1339 }
 1340 /* ------------------------------------------------------------------------------------ */
 1341 extern int AMUDP_ReplyIVA(void *token, handler_t handler, 
 1342                           void *source_addr, size_t nbytes,
 1343                           int numargs, va_list argptr) {
 1344   AMX_CHECKINIT();
 1345   AMX_CHECK_ERR(!token, BAD_ARG);
 1346   AMX_CHECK_ERR(AMUDP_BADHANDLERVAL(handler), BAD_ARG);
 1347   AMX_CHECK_ERR(!source_addr && nbytes > 0, BAD_ARG);
 1348   AMX_CHECK_ERR(nbytes > AMUDP_MAX_MEDIUM, BAD_ARG);
 1349   AMX_assert(numargs >= 0 && numargs <= AMUDP_MAX_SHORT);
 1350 
 1351   amudp_buf_t * const buf = (amudp_buf_t *)token;
 1352   amudp_msg_t * const msg = &buf->msg;
 1353 
 1354   //  semantic checking on reply
 1355   AMX_CHECK_ERR(!AMUDP_MSG_ISREQUEST(msg), RESOURCE);       /* token is not a request */
 1356   AMX_CHECK_ERR(!buf->status.rx.handlerRunning, RESOURCE); /* token is not for an active request */
 1357   AMX_CHECK_ERR(buf->status.rx.replyIssued, RESOURCE);     /* already issued a reply */
 1358   AMX_CHECK_ERR(((amudp_system_messagetype_t)msg->systemMessageType) != amudp_system_user,
 1359                     RESOURCE); /* can't reply to a system message (returned message) */
 1360 
 1361   return AMUDP_ReplyGeneric(amudp_Medium, 
 1362                                   buf, handler, 
 1363                                   source_addr, nbytes, 0,
 1364                                   numargs, argptr,
 1365                                   amudp_system_user, 0);
 1366 }
 1367 extern int AMUDP_ReplyI(void *token, handler_t handler, 
 1368                           void *source_addr, size_t nbytes,
 1369                           int numargs, ...) {
 1370     int retval;
 1371     va_list argptr;
 1372     va_start(argptr, numargs); /*  pass in last argument */
 1373     retval = AMUDP_ReplyIVA(token, handler,
 1374                                   source_addr, nbytes,
 1375                                   numargs, argptr);
 1376     va_end(argptr);
 1377     return retval; 
 1378 }
 1379 /* ------------------------------------------------------------------------------------ */
 1380 extern int AMUDP_ReplyXferVA(void *token, handler_t handler, 
 1381                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
 1382                           int numargs, va_list argptr) {
 1383   AMX_CHECKINIT();
 1384   AMX_CHECK_ERR(!token, BAD_ARG);
 1385   AMX_CHECK_ERR(AMUDP_BADHANDLERVAL(handler), BAD_ARG);
 1386   AMX_CHECK_ERR(!source_addr && nbytes > 0, BAD_ARG);
 1387   AMX_CHECK_ERR(nbytes > AMUDP_MAX_LONG, BAD_ARG);
 1388   AMX_CHECK_ERR(dest_offset > AMUDP_MAX_SEGLENGTH, BAD_ARG);
 1389   AMX_assert(numargs >= 0 && numargs <= AMUDP_MAX_SHORT);
 1390 
 1391   amudp_buf_t * const buf = (amudp_buf_t *)token;
 1392   amudp_msg_t * const msg = &buf->msg;
 1393 
 1394   //  semantic checking on reply
 1395   AMX_CHECK_ERR(!AMUDP_MSG_ISREQUEST(msg), RESOURCE);       /* token is not a request */
 1396   AMX_CHECK_ERR(!buf->status.rx.handlerRunning, RESOURCE); /* token is not for an active request */
 1397   AMX_CHECK_ERR(buf->status.rx.replyIssued, RESOURCE);     /* already issued a reply */
 1398   AMX_CHECK_ERR(((amudp_system_messagetype_t)msg->systemMessageType) != amudp_system_user,
 1399                     RESOURCE); /* can't reply to a system message (returned message) */
 1400 
 1401   return AMUDP_ReplyGeneric(amudp_Long, 
 1402                                   buf, handler, 
 1403                                   source_addr, nbytes, dest_offset,
 1404                                   numargs, argptr,
 1405                                   amudp_system_user, 0);
 1406 }
 1407 extern int AMUDP_ReplyXfer(void *token, handler_t handler, 
 1408                           void *source_addr, size_t nbytes, uintptr_t dest_offset, 
 1409                           int numargs, ...) {
 1410     int retval;
 1411     va_list argptr;
 1412     va_start(argptr, numargs); /*  pass in last argument */
 1413     retval = AMUDP_ReplyXferVA(token, handler,
 1414                                   source_addr, nbytes, dest_offset,
 1415                                   numargs, argptr);
 1416     va_end(argptr);
 1417     return retval; 
 1418 }
 1419 /* ------------------------------------------------------------------------------------ */
 1420 extern void AMUDP_DefaultReturnedMsg_Handler(int status, op_t opcode, void *token) {
 1421   const char *statusStr = "*unknown*";
 1422   const char *opcodeStr = "*unknown*";
 1423   amudp_buf_t * const buf = (amudp_buf_t *)token;
 1424   amudp_msg_t * const msg = &buf->msg;
 1425   int numArgs = AMUDP_MSG_NUMARGS(msg);
 1426   uint32_t const * const args = GET_MSG_ARGS(msg);
 1427   char argStr[255];
 1428 
 1429   #define STATCASE(name, desc) case name: statusStr = #name ": " desc; break;
 1430   switch (status) {
 1431     STATCASE(EBADARGS      , "Arguments to request or reply function invalid    ");
 1432     STATCASE(EBADENTRY     , "X-lation table index selected unbound table entry ");
 1433     STATCASE(EBADTAG       , "Sender's tag did not match the receiver's EP tag  "); 
 1434     STATCASE(EBADHANDLER   , "Invalid index into the recv.'s handler table      "); 
 1435     STATCASE(EBADSEGOFF    , "Offset into the dest-memory VM segment invalid    ");
 1436     STATCASE(EBADLENGTH    , "Bulk xfer length goes beyond a segment's end      ");
 1437     STATCASE(EBADENDPOINT  , "Destination endpoint does not exist               ");
 1438     STATCASE(ECONGESTION   , "Congestion at destination endpoint                ");
 1439     STATCASE(EUNREACHABLE  , "Destination endpoint unreachable                  ");
 1440     STATCASE(EREPLYREJECTED, "Destination endpoint refused reply message        ");
 1441     }
 1442   #define OPCASE(name) case name: opcodeStr = #name; break;
 1443   switch (opcode) {
 1444     OPCASE(AM_REQUEST_M);
 1445     OPCASE(AM_REQUEST_IM);
 1446     OPCASE(AM_REQUEST_XFER_M);
 1447     OPCASE(AM_REPLY_M);
 1448     OPCASE(AM_REPLY_IM);
 1449     OPCASE(AM_REPLY_XFER_M);
 1450   }
 1451 
 1452   argStr[0] = '\0';
 1453   for (int i=0; i < numArgs; i++) {
 1454     char tmp[20];
 1455     sprintf(tmp, "0x%08x  ", (int)args[i]);
 1456     strcat(argStr, tmp);
 1457   }
 1458   AMX_FatalErr("An active message was returned to sender,\n"
 1459              "    and trapped by the default returned message handler (handler 0):\n"
 1460              "Error Code: %s\n"
 1461              "Message type: %s\n"
 1462              "Destination: %s (%i)\n"
 1463              "Handler: %i\n"
 1464              "Tag: %s\n"
 1465              "Arguments(%i): %s\n"
 1466              "Aborting...",
 1467              statusStr, opcodeStr, 
 1468              AMUDP_enStr(buf->status.rx.sourceAddr, 0), buf->status.rx.sourceId,
 1469              msg->handlerId, AMUDP_tagStr(msg->tag, 0),
 1470              numArgs, argStr);
 1471 }
 1472 /* ------------------------------------------------------------------------------------ */
 1473 #if AMUDP_EXTRA_CHECKSUM
 1474 static uint16_t checksum(uint8_t const * const data, size_t len) {
 1475   uint16_t val = 0;
 1476   for (size_t i=0; i < len; i++) { // a simple, fast, non-secure checksum
 1477     uint8_t stir = (uint8_t)(i & 0xFF);
 1478     val = (val << 8) | 
 1479           ( ((val >> 8) & 0xFF) ^ data[i] ^ stir );
 1480   }
 1481   return val;
 1482 }
 1483 static void AMUDP_SetChecksum(amudp_msg_t * const m, size_t len) {
 1484   AMX_assert(len > 0 && len <= AMUDP_MAX_MSG);
 1485   m->packetlen = (uint32_t)len;
 1486   uint8_t *data = (uint8_t *)&(m->packetlen); 
 1487   uint16_t chk = checksum(data, len - 4); // checksum includes chk* fields
 1488   m->chk1 = chk;
 1489   m->chk2 = chk;
 1490 }
 1491 static void AMUDP_ValidateChecksum(amudp_msg_t const * const m, size_t len) {
 1492   static char report[512];
 1493   int failed = 0;
 1494 
 1495   { static int firstcall = 1;
 1496     if (firstcall) AMX_Warn("AMUDP_EXTRA_CHECKSUM is enabled. This mode is ONLY intended for debugging system problems.");
 1497     firstcall = 0;
 1498   }
 1499 
 1500   if_pf (m->chk1 != m->chk2) {
 1501     strcat(report, " : Checksum field corrupted");
 1502     failed = 1;
 1503   }
 1504   if_pf (len != m->packetlen) {
 1505     strcat(report, " : Length mismatch");
 1506     failed = 1;
 1507   }
 1508   if_pf (len < AMUDP_MIN_MSG || len > AMUDP_MAX_MSG) {
 1509     strcat(report, " : Packet length illegal");
 1510     failed = 1;
 1511   }
 1512 
 1513   uint8_t const * const data = (uint8_t const *)&(m->packetlen); 
 1514   size_t datalen = len-4;
 1515   uint16_t recvchk = checksum(data, datalen);
 1516 
 1517   if_pf (recvchk != m->chk1) {
 1518     strcat(report, " : Checksum mismatch on data");
 1519     failed = 1;
 1520   }
 1521 
 1522   if_pf (failed) {
 1523     // further analysis
 1524     uint8_t val = data[datalen-1];
 1525     int rep = 0;
 1526     for (int i=datalen-1; i >= 0; i--) {
 1527       if (data[i] == val) rep++;
 1528       else break;
 1529     }
 1530     if (rep > 1) {
 1531       char tmp[80];
 1532       sprintf(tmp," : Final %d bytes are 0x%02x",rep,val);
 1533       strcat(report,tmp);
 1534     }
 1535     AMX_FatalErr("UDP packet failed checksum!\n  recvLen: %d  packetlen: %d\n  chk1:0x%04x  chk2:0x%04x  recvchk:0x%04x\n  Analysis%s",
 1536                     (int)len, (int)m->packetlen, m->chk1, m->chk2, recvchk, report);
 1537   }
 1538 }
 1539 #endif