"Fossies" - the Fresh Open Source Software Archive

Member "NetPIPE-3.7.2/src/ibv.c" (19 Aug 2010, 42214 Bytes) of package /linux/privat/old/NetPIPE-3.7.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "ibv.c" see the Fossies "Dox" file reference documentation.

    1 /*****************************************************************************/
    2 /* "NetPIPE" -- Network Protocol Independent Performance Evaluator.          */
    3 /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc.      */
    4 /*                                                                           */
    5 /* This program is free software; you can redistribute it and/or modify      */
    6 /* it under the terms of the GNU General Public License as published by      */
    7 /* the Free Software Foundation.  You should have received a copy of the     */
    8 /* GNU General Public License along with this program; if not, write to the  */
    9 /* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */
   10 /*                                                                           */
   11 /*       ibv.c             ---- Infiniband module for OpenFabrics verbs      */
   12 /*                                                                           */
   13 /* Contributions Copyright (c) 2007 Cisco, Inc.                              */
   14 /*****************************************************************************/
   15 
   16 #define USE_VOLATILE_RPTR /* needed for polling on last byte of recv buffer */
   17 #include    "netpipe.h"
   18 #include    <stdio.h>
   19 #include    <getopt.h>
   20 #include    <pthread.h>
   21 #include    <stdarg.h>
   22 
   23 #define WANT_DEBUG 0
   24 
   25 FILE* logfile = NULL;
   26 
   27 /* If we're not debugging, make the logprintf() an inline function
   28    that the compiler will optimize away. */
   29 #if WANT_DEBUG
   30 #define LOGPRINTF(args)                                           \
   31   do {                                                            \
   32     va_list arglist;                                              \
   33     fprintf(logfile, "%s:%d:%s: ", __FILE__, __LINE__, __func__); \
   34     logprintf args;                                               \
   35     fprintf(logfile, "\n");                                       \
   36   } while (0)
   37 #else
   38 #define LOGPRINTF(a)
   39 #endif
   40 
   41 /* Pre-release versions of libiberbs do not have ibv_device_list() */
   42 #define HAVE_IBV_DEVICE_LIST 1
   43 
   44 /* Header files needed for Infiniband */
   45 
   46 #include    <infiniband/verbs.h>
   47 
   48 /* Global vars */
   49 
   50 static struct ibv_device      *hca; /* Infiniband Adapter */
   51 static struct ibv_context     *ctx; /* Context for Connections */
   52 static struct ibv_port_attr    hca_port;/* Attributes of the HCA */
   53 static int                     port_num;/* IB port to use */
   54 static uint16_t                lid; /* Local ID of Adapter */
   55 static uint16_t                d_lid;   /* Destination ID */
   56 static struct ibv_pd          *pd_hndl; /* Protection Domain handle */
   57 static int                     num_cqe; /* # Command Queue Entries */
   58 static int                     act_num_cqe; /* Actual # CQE */
   59 static struct ibv_cq          *s_cq_hndl; /* Send Command Queue */
   60 static struct ibv_cq          *r_cq_hndl; /* Recv Command Queue */
   61 static struct ibv_mr          *s_mr_hndl; /* Send Mem. Region */
   62 static struct ibv_mr          *r_mr_hndl; /* Recv Mem. Region */
   63 static struct ibv_qp_init_attr qp_init_attr; /* Initial QP attributes */
   64 static struct ibv_qp          *qp_hndl; /* Handle to QP */
   65 static uint32_t                d_qp_num; /* Dest. QP Number */
   66 static struct ibv_qp_attr      qp_attr; /* QP Attribute */
   67 static struct ibv_wc           wc;      /* Work Completion Queue */
   68 static int                     max_wq=50000;    /* max write queue entries */   
   69 static void*                   remote_address;  /* remote address */
   70 static uint32_t                remote_key;  /* Remote Key */
   71 static volatile int            receive_complete; /* initialization variable */
   72 static pthread_t               thread;      /* thread to handle events */
   73 
   74 static int initIB(ArgStruct *p);
   75 static void logprintf(const char *format,  ...);
   76 
   77 /* Function definitions */
   78 
   79 void Init(ArgStruct *p, int* pargc, char*** pargv)
   80 {
   81    /* Setup Infiniband specific defaults
   82     */
   83    p->prot.ib_mtu = IBV_MTU_1024;        /* 1024 Byte MTU                    */
   84    p->prot.commtype = NP_COMM_RDMAWRITE; /* Use RDMA write communications    */
   85    p->prot.comptype = NP_COMP_LOCALPOLL; /* Use local polling for completion */
   86    p->prot.device_and_port = NULL;       /* Use first available port         */
   87    p->tr = 0;                            /* I am not the transmitter         */
   88    p->rcv = 1;                           /* I am the receiver                */
   89 }
   90 /* Setup(..) function is simply used to 'setup' the standard features of 
   91  * the netpipe modules.  tcp,netpipe-related stuff.  This does no actual
   92  * 'setup' of any InfiniBand stuff, other than passing/storing
   93  * the parameters from the command line.... the 'initIB' function
   94  * is called from here though to do IB initialization.
   95  */
   96 void Setup(ArgStruct *p)
   97 {
   98 
   99  int one = 1;
  100  int sockfd;
  101  struct sockaddr_in *lsin1, *lsin2;      /* ptr to sockaddr_in in ArgStruct */
  102  char *host;
  103  struct hostent *addr;
  104  struct protoent *proto;        /* protocol entry */
  105  int send_size, recv_size, sizeofint = sizeof(int);
  106  struct sigaction sigact1;
  107 #if WANT_DEBUG
  108  char logfilename[80];
  109 #endif
  110 
  111  /* Sanity check */
  112  if( p->prot.commtype == NP_COMM_RDMAWRITE && 
  113      p->prot.comptype != NP_COMP_LOCALPOLL ) {
  114    fprintf(stderr, "Error, RDMA Write may only be used with local polling.\n");
  115    fprintf(stderr, "Try using RDMA Write With Immediate Data with vapi polling\n"); /* vapi polling? */
  116    fprintf(stderr, "or event completion\n");
  117    exit(-1);
  118  }
  119  
  120  if( p->prot.commtype != NP_COMM_RDMAWRITE && 
  121      p->prot.comptype == NP_COMP_LOCALPOLL ) {
  122    fprintf(stderr, "Error, local polling may only be used with RDMA Write.\n");
  123    fprintf(stderr, "Try using vapi polling or event completion\n");
  124    exit(-1);
  125  }
  126 
  127 #if WANT_DEBUG
  128  /* Open log file */
  129  sprintf(logfilename, ".iblog%d", 1 - p->tr);
  130  logfile = fopen(logfilename, "w");
  131 #endif
  132 
  133  host = p->host;                           /* copy ptr to hostname */ 
  134 
  135  lsin1 = &(p->prot.sin1);         /* setup the socket structure #1 */
  136  lsin2 = &(p->prot.sin2);        /* setup socket structure #2 */
  137                     /* more setup stuff */
  138  bzero((char *) lsin1, sizeof(*lsin1));
  139  bzero((char *) lsin2, sizeof(*lsin2));
  140                     /* tcp checks */
  141  if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
  142    printf("NetPIPE: can't open stream socket! errno=%d\n", errno);
  143    exit(-4);
  144  }
  145 
  146                     /* another tcp check */
  147  if(!(proto = getprotobyname("tcp"))){
  148    printf("NetPIPE: protocol 'tcp' unknown!\n");
  149    exit(555);
  150  }
  151 
  152  if (p->tr){                                  /* if client i.e., Sender */
  153 
  154 
  155    if (atoi(host) > 0) {                   /* Numerical IP address */
  156      lsin1->sin_family = AF_INET;
  157      lsin1->sin_addr.s_addr = inet_addr(host);
  158 
  159    } else {
  160       
  161      if ((addr = gethostbyname(host)) == NULL){     /* get the hostname */
  162        printf("NetPIPE: invalid hostname '%s'\n", host);
  163        exit(-5);
  164      }
  165 
  166      lsin1->sin_family = addr->h_addrtype;
  167      bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length);
  168    }
  169 
  170    lsin1->sin_port = htons(p->port);
  171 
  172  } else {                                 /* we are the receiver (server) */
  173 
  174    bzero((char *) lsin1, sizeof(*lsin1));
  175    lsin1->sin_family      = AF_INET;
  176    lsin1->sin_addr.s_addr = htonl(INADDR_ANY);
  177    lsin1->sin_port        = htons(p->port);
  178          
  179    /* re-use socket, common if netpipe aborts due to busted networks */
  180    one = 1;
  181    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int))) {
  182        printf("NetPIPE: server: unable to setsockopt -- errno %d\n", errno);
  183        exit(-7);
  184    }
  185 
  186    if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){
  187      printf("NetPIPE: server: bind on local address failed! errno=%d\n", errno);
  188      exit(-6);
  189    }
  190 
  191  }
  192 
  193  if(p->tr)
  194    p->commfd = sockfd;
  195  else
  196    p->servicefd = sockfd;
  197 /* ********** This is where the IB specific stuff begins ******** */
  198  
  199 
  200  /* Establish tcp connections */
  201  /* Connection management for IB is handled over tcp/ip connection */
  202  establish(p);
  203 
  204  /* Initialize OpenIB -> Mellanox Infiniband */
  205 
  206  if(initIB(p) == -1) {
  207    CleanUp(p);
  208    exit(-1);
  209  }
  210 }   
  211 /* Event Handler:
  212  * Receives events from the EventThread, and notifies other functions
  213  * of their arrivals.
  214  */
  215 void event_handler(struct ibv_cq *cq);
  216 
  217 
  218 /* EventThread:
  219  * Continuously polls the Command Queue for events, and registers them with
  220  * the event_handler(..) function.
  221  */
  222 void *EventThread(void *unused)
  223 {
  224   struct ibv_cq *cq;
  225   void *ev_ctx;
  226 
  227   while (1) {
  228     if (ibv_get_cq_event(0, &cq, &ev_ctx)) {
  229       fprintf(stderr, "Failed to get CQ event\n");
  230       return NULL;
  231     }
  232     event_handler(cq);
  233   }
  234 }
  235 /* Initialize the actual IB device */
  236 int initIB(ArgStruct *p)
  237 {
  238   int i, j, ret;
  239   char *tmp;
  240   int num_devices = 0;
  241   struct ibv_device **hca_list, **filtered_hca_list;
  242   struct ibv_device_attr hca_attr;
  243 #if !HAVE_IBV_DEVICE_LIST
  244   struct dlist *hca_dlist; 
  245   struct ibv_device* hca_device; 
  246 #endif
  247 
  248   /* Find all the devices on this host */
  249 #if HAVE_IBV_DEVICE_LIST
  250   hca_list = ibv_get_device_list(&num_devices);
  251 #else
  252   hca_dlist = ibv_get_devices();
  253   dlist_start(hca_dlist); 
  254   dlist_for_each_data(hca_dlist, hca_device, struct ibv_device)
  255     ++num_devices;
  256 #endif
  257 
  258   /* If we didn't find any, return an error */
  259   if (0 == num_devices) {
  260       fprintf(stderr, "Couldn't find any IBV devices\n");
  261       return -1;
  262   }
  263   
  264 #if !HAVE_IBV_DEVICE_LIST
  265   /* If we have the old version (ibv_get_devices()), convert it to
  266      the new form */
  267   hca_list = (struct ibv_device**) malloc(num_devices * 
  268                                           sizeof(struct ibv_device*));
  269   if (NULL == hca_list) {
  270       fprintf(stderr, "%s:%s:%d: malloc failed\n", __FILE__,
  271               __func__, __LINE__);
  272       return -1;
  273   }
  274   
  275   i = 0; 
  276   dlist_start(hca_dlist); 
  277   dlist_for_each_data(hca_dlist, hca_device, struct ibv_device)
  278       hca_list[i++] = hca_device;
  279 #endif    
  280 
  281   /* Possible values for p->prot.device_and_port:
  282 
  283      1. <device>:<port> -- use only this device and only this port
  284      2. <device> -- use the first active port on this device
  285      3. :<port> -- use only this port, but on any device
  286 
  287      <device> names are matched exactly.
  288   */
  289 
  290   /* If a device name was specified on the command line, see if we can
  291      find it */
  292   tmp = NULL;
  293   port_num = -1;
  294   filtered_hca_list = hca_list;
  295   if (NULL != p->prot.device_and_port) {
  296       /* If there's a : in the string, then we have a port */
  297       tmp = strchr(p->prot.device_and_port, ':');
  298       if (NULL != tmp) {
  299           *tmp = '\0';
  300           ++tmp;
  301           port_num = atoi(tmp);
  302       }
  303       LOGPRINTF(("Pre-filter: looking for target device \"%s\", port %d",
  304                  p->prot.device_and_port, port_num));
  305 
  306       /* If the length of the device string left is >0, then there's a
  307          device specification */
  308       if (strlen(p->prot.device_and_port) > 0) {
  309           int found = 0;
  310 
  311           /* Loop through all the devices and find a matching
  312              name */
  313           for (i = 0; i < num_devices; ++i) {
  314               LOGPRINTF(("Pre-filter: found device: %s",
  315                          ibv_get_device_name(hca_list[i])));
  316               if (0 == strcmp(p->prot.device_and_port, 
  317                               ibv_get_device_name(hca_list[i]))) {
  318                   LOGPRINTF(("Pre-filter: found target device: %s (%d of %d)",
  319                              p->prot.device_and_port, i, num_devices));
  320                   filtered_hca_list = &(hca_list[i]);
  321                   num_devices = 1;
  322                   found = 1;
  323                   break;
  324               }
  325           }
  326 
  327           /* If we didn't find it, abort */
  328           if (!found) {
  329               fprintf(stderr, "Unable to find device \"%s\", aborting\n",
  330                       p->prot.device_and_port);
  331               return -1;
  332           }
  333       }
  334   }
  335 
  336   /* Traverse the filtered HCA list and find a good port */
  337   for (hca = NULL, i = 0; NULL == hca && i < num_devices; ++i) {
  338 
  339       /* Get a ibv_context from the ibv_device  */
  340       ctx = ibv_open_device(filtered_hca_list[i]);
  341       if (!ctx) {
  342           fprintf(stderr, "Couldn't create IBV context\n");
  343           return -1;
  344       } else {
  345           LOGPRINTF(("Found HCA %s",
  346                      ibv_get_device_name(filtered_hca_list[i])));
  347       }
  348       
  349       /* Get the device attributes */
  350       if (0 != ibv_query_device(ctx, &hca_attr)) {
  351           fprintf(stderr, "Could not get device context for %s, aborting\n",
  352                   ibv_get_device_name(hca));
  353           return -1;
  354       }
  355 
  356       for (j = 1; j <= hca_attr.phys_port_cnt; ++j) {
  357           /* If a specific port was asked for, *only* look at that port */
  358           if (port_num >= 0 && port_num != j) {
  359               continue;
  360           }
  361           LOGPRINTF(("Checking %s:%d...", 
  362                      ibv_get_device_name(filtered_hca_list[i]), j));
  363 
  364           /* Query this port and see if it's active */
  365           if (0 != ibv_query_port(ctx, j, &hca_port)) {
  366               fprintf(stderr, "Unable to query port %s:%d, aborting\n",
  367                       ibv_get_device_name(filtered_hca_list[i]), j);
  368               return -1;
  369           }
  370 
  371           /* If this port is active, we have a winner! */
  372           if (IBV_PORT_ACTIVE == hca_port.state) {
  373               LOGPRINTF(("%s:%d is ACTIVE", 
  374                          ibv_get_device_name(filtered_hca_list[i]), j));
  375               port_num = j;
  376               hca = filtered_hca_list[i];
  377               break;
  378           }
  379       }
  380 
  381       /* If we found one, we're done */
  382       if (hca) {
  383           break;
  384       }
  385 
  386       /* Otherwise, close the device (ignore any errors) */
  387       ibv_close_device(ctx);
  388       ctx = NULL;
  389   }
  390 
  391   /* If we didn't find a good device/port combo, abort */
  392   if (NULL == hca) {
  393       fprintf(stderr, "Could not find an active device and port, aborting\n");
  394       return -1;
  395   }
  396 
  397   /* free up the other devices in the event we would have multiple ib
  398      devices. if this isnt done, the device pointers will still be
  399      around in space somewhere -> bad */
  400 
  401 #if HAVE_IBV_DEVICE_LIST
  402   ibv_free_device_list(hca_list); 
  403 #endif
  404   
  405   /* Get HCA properties */
  406   
  407   lid = hca_port.lid;       /* local id, used to ref back to the device */
  408   LOGPRINTF(("  lid = %d", lid));
  409 
  410 
  411   /* Allocate Protection Domain */
  412     /* need a Protection domain to handle/register memory over the card */
  413   pd_hndl = ibv_alloc_pd(ctx);  
  414   if(!pd_hndl) {
  415     fprintf(stderr, "Error allocating PD\n");
  416     return -1;
  417   } else {
  418     LOGPRINTF(("Allocated Protection Domain"));
  419   }
  420 
  421 
  422   /* Create send completion queue */
  423   
  424   num_cqe = 30000; /* Requested number of completion q elements */
  425   s_cq_hndl = ibv_create_cq(ctx, num_cqe, NULL, NULL, 0);
  426   if(!s_cq_hndl) {
  427     fprintf(stderr, "Error creating send CQ\n");
  428     return -1;
  429   } else {
  430     act_num_cqe = s_cq_hndl->cqe;
  431     LOGPRINTF(("Created Send Completion Queue with %d elements", act_num_cqe));
  432   }
  433 
  434 
  435   /* Create recv completion queue */
  436   
  437   num_cqe = 20000; /* Requested number of completion q elements */
  438   r_cq_hndl = ibv_create_cq(ctx, num_cqe, NULL, NULL, 0);
  439   if(!r_cq_hndl) {
  440     fprintf(stderr, "Error creating send CQ\n");
  441     return -1;
  442   } else {
  443     act_num_cqe = r_cq_hndl->cqe;
  444     LOGPRINTF(("Created Recv Completion Queue with %d elements", act_num_cqe));
  445   }
  446 
  447 
  448   /* Placeholder for MR */
  449     /* We dont actually setup the Memory Regions here, instead
  450      * this is done in the 'MyMalloc(..)' helper function.
  451      * You could however, set them up here.
  452      */
  453 
  454   /* Create Queue Pair */
  455     /* To setup a Queue Pair, the following qp initial attributes must be
  456      * specified and passed to the create_qp(..) function:
  457      * max send/recv write requests.  (max_recv/send_wr)
  458      * max scatter/gather entries. (max_recv/send_sge)
  459      * Command queues to associate the qp with.  (recv/send_cq)
  460      * Signalling type:  1-> signal all events.  0-> dont, event handler will
  461      *   deal with this.
  462      * QP type.  (RC=reliable connection, UC=unreliable.. etc.) defined 
  463      *   in the verbs header.
  464      */
  465 
  466   memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr)); 
  467 /*
  468  * Set the Max outstanding QP WRs, can be set via max_wr global
  469  * or if this is greater than the max reported by the HCA, set
  470  * to max reported value.
  471  */
  472 if (hca_attr.max_qp_wr < max_wq ) {
  473   LOGPRINTF(("Reduced max_wq to hca reported max of %d", hca_attr.max_qp_wr));
  474   qp_init_attr.cap.max_send_wr = hca_attr.max_qp_wr;
  475   qp_init_attr.cap.max_recv_wr = hca_attr.max_qp_wr;
  476 } else {
  477   qp_init_attr.cap.max_recv_wr    = max_wq;
  478   qp_init_attr.cap.max_send_wr    = max_wq;
  479 }
  480   qp_init_attr.cap.max_recv_sge   = 1; /* Max scatter/gather entries on RQ */
  481   qp_init_attr.cap.max_send_sge   = 1; /* Max scatter/gather entries on SQ */
  482   qp_init_attr.recv_cq            = r_cq_hndl; /* CQ handle for RQ         */
  483   qp_init_attr.send_cq            = s_cq_hndl; /* CQ handle for SQ         */
  484   qp_init_attr.sq_sig_all         = 0; /* Signalling type */
  485   qp_init_attr.qp_type            = IBV_QPT_RC; /* Transmission type         */
  486 
  487   /* ibv_create_qp( ibv_pd *pd, ibv_qp_init_attr * attr) */  
  488   qp_hndl = ibv_create_qp(pd_hndl, &qp_init_attr);
  489   if(!qp_hndl) {
  490     fprintf(stderr, "Error creating Queue Pair: %s\n", strerror(errno));
  491     return -1;
  492   } else {
  493     LOGPRINTF(("Created Queue Pair"));
  494   }
  495 
  496     /* Using the tcp connection, exchange necesary data needed to map
  497      *  the remote memory:
  498      *  (local: lid, qp_hndl->qp_num ), (remote: d_lid, d_qp_num)
  499      */
  500 
  501   /* Exchange lid and qp_num with other node */
  502   
  503   if( write(p->commfd, &lid, sizeof(lid) ) != sizeof(lid) ) {
  504     fprintf(stderr, "Failed to send lid over socket\n");
  505     return -1;
  506   }
  507   if( write(p->commfd, &qp_hndl->qp_num, sizeof(qp_hndl->qp_num) ) != sizeof(qp_hndl->qp_num) ) {
  508     fprintf(stderr, "Failed to send qpnum over socket\n");
  509     return -1;
  510   }
  511   if( read(p->commfd, &d_lid, sizeof(d_lid) ) != sizeof(d_lid) ) {
  512     fprintf(stderr, "Failed to read lid from socket\n");
  513     return -1;
  514   }
  515   if( read(p->commfd, &d_qp_num, sizeof(d_qp_num) ) != sizeof(d_qp_num) ) {
  516     fprintf(stderr, "Failed to read qpnum from socket\n");
  517     return -1;
  518   }
  519   
  520   LOGPRINTF(("Local: lid=%d qp_num=%d Remote: lid=%d qp_num=%d",
  521              lid, qp_hndl->qp_num, d_lid, d_qp_num));
  522     /* Further setup must be done to finalize the QP 'connection'.
  523      * First set the State of the qp to initialization by making a seperate
  524      * ibv_qp_attr* variable, giving it the initial values, and calling
  525      * ibv_qp_modify(..) to merge these settings into the QP.
  526      */
  527 /* NOTE: According to openIB, ib_mthca's QP modify does not set alternate path
  528  *  fields in QP context, so you'll have to do this manually if necessary
  529  */
  530 
  531     /* Bring up Queue Pair */
  532   
  533   /******* INIT state ******/
  534 
  535   /* qp_attr is seperately allocated per qp/connection */
  536   qp_attr.qp_state = IBV_QPS_INIT;
  537   qp_attr.pkey_index = 0;
  538   qp_attr.port_num = port_num;
  539   qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
  540   /* merge the qp_attributes into the queue pair */
  541   ret = ibv_modify_qp(qp_hndl, &qp_attr,
  542               IBV_QP_STATE              |
  543               IBV_QP_PKEY_INDEX         |
  544               IBV_QP_PORT               |
  545               IBV_QP_ACCESS_FLAGS);
  546   if(ret) {
  547     fprintf(stderr, "Error modifying QP to INIT\n");
  548     return -1;
  549   }
  550 
  551   LOGPRINTF(("Modified QP to INIT"));
  552 
  553 /* To enable the Queue Pair to finally receive data, it must be 
  554  * put into the 'RTR' (Ready-To-Receive) state.  The Queue Pair will NOT
  555  * function properly until it has been setup, and manually put through
  556  * the init and rtr states.
  557  */
  558   
  559   /******* RTR (Ready-To-Receive) state *******/
  560 
  561   qp_attr.qp_state = IBV_QPS_RTR;
  562   qp_attr.max_dest_rd_atomic = 1;
  563   qp_attr.dest_qp_num = d_qp_num;
  564   qp_attr.ah_attr.sl = 0;
  565   qp_attr.ah_attr.is_global = 0;
  566   qp_attr.ah_attr.dlid = d_lid;
  567   qp_attr.ah_attr.static_rate = 0;
  568   qp_attr.ah_attr.src_path_bits = 0;
  569   qp_attr.ah_attr.port_num = port_num;
  570   qp_attr.path_mtu = p->prot.ib_mtu;
  571   qp_attr.rq_psn = 0;
  572   qp_attr.pkey_index = 0;
  573   qp_attr.min_rnr_timer = 5;
  574   /* merge these settings into the qp */
  575   ret = ibv_modify_qp(qp_hndl, &qp_attr,
  576               IBV_QP_STATE              |
  577               IBV_QP_AV                 |
  578               IBV_QP_PATH_MTU           |
  579               IBV_QP_DEST_QPN           |
  580               IBV_QP_RQ_PSN             |
  581               IBV_QP_MAX_DEST_RD_ATOMIC |
  582               IBV_QP_MIN_RNR_TIMER);
  583 
  584   if(ret) {
  585     fprintf(stderr, "Error modifying QP to RTR\n");
  586     return -1;
  587   }
  588 
  589   LOGPRINTF(("Modified QP to RTR"));
  590 
  591   /* Sync before going to RTS state */
  592   Sync(p);
  593 
  594   /* In the same manner, 'enable' sending on the queue pair */
  595   
  596   /******* RTS (Ready-to-Send) state *******/
  597 
  598   qp_attr.qp_state = IBV_QPS_RTS;
  599   qp_attr.sq_psn = 0;
  600   qp_attr.timeout = 31;
  601   qp_attr.retry_cnt = 1;
  602   qp_attr.rnr_retry = 1;
  603   qp_attr.max_rd_atomic = 1;
  604 
  605   ret = ibv_modify_qp(qp_hndl, &qp_attr,
  606               IBV_QP_STATE              |
  607               IBV_QP_TIMEOUT            |
  608               IBV_QP_RETRY_CNT          |
  609               IBV_QP_RNR_RETRY          |
  610               IBV_QP_SQ_PSN             |
  611               IBV_QP_MAX_QP_RD_ATOMIC);
  612 
  613   if(ret) {
  614     fprintf(stderr, "Error modifying QP to RTS\n");
  615     return -1;
  616   }
  617   
  618   LOGPRINTF(("Modified QP to RTS"));
  619 
  620   /* If using event completion, request the initial notification */
  621   /* This spawns a seperate thread to do the event handling and
  622    * notification.
  623    * NOTE:  This may have problems in systems with Weak Memory Consistency
  624    * since there are no mutex(*) calls to preserve coherancy??
  625    */ 
  626   if( p->prot.comptype == NP_COMP_EVENT ) {
  627     if (pthread_create(&thread, NULL, EventThread, NULL)) {
  628       fprintf(stderr, "Couldn't start event thread\n");
  629       return -1;
  630     }
  631     ibv_req_notify_cq(r_cq_hndl, 0);    /* request completion notification  */
  632   }                 /* for the receive cq.  2nd argument 
  633                        specifies if ONLY 'solicited'
  634                        completions will be 'noticed' */
  635   
  636  
  637   return 0; /* if we get here, the connection is setup correctly */
  638 }
  639 
  640 
  641 /* Deallocate everything properly */
  642 int finalizeIB(ArgStruct *p)
  643 {
  644   int ret;
  645 
  646   LOGPRINTF(("Finalizing IB stuff"));
  647     /* NOTE: This implementation only has created one of each type of queue.
  648      * In other implementations it may be necessary to create arrays of 
  649      * these queues.  If this is the case, you need to loop and get them all */
  650   if(qp_hndl) {     
  651     LOGPRINTF(("Destroying QP"));
  652     ret = ibv_destroy_qp(qp_hndl);
  653     if(ret) {
  654       fprintf(stderr, "Error destroying Queue Pair\n");
  655     }
  656   }
  657 
  658   if(r_cq_hndl) {
  659     LOGPRINTF(("Destroying Recv CQ"));
  660     ret = ibv_destroy_cq(r_cq_hndl);
  661     if(ret) {
  662       fprintf(stderr, "Error destroying recv CQ\n");
  663     }
  664   }
  665 
  666   if(s_cq_hndl) {
  667     LOGPRINTF(("Destroying Send CQ"));
  668     ret = ibv_destroy_cq(s_cq_hndl);
  669     if(ret) {
  670       fprintf(stderr, "Error destroying send CQ\n");
  671     }
  672   }
  673 
  674   /* Check memory registrations just in case user bailed out */
  675   if(s_mr_hndl) {
  676     LOGPRINTF(("Deregistering send buffer"));
  677     ret = ibv_dereg_mr(s_mr_hndl);
  678     if(ret) {
  679       fprintf(stderr, "Error deregistering send mr\n");
  680     }
  681   }
  682 
  683   if(r_mr_hndl) {
  684     LOGPRINTF(("Deregistering recv buffer"));
  685     ret = ibv_dereg_mr(r_mr_hndl);
  686     if(ret) {
  687       fprintf(stderr, "Error deregistering recv mr\n");
  688     }
  689   }
  690 
  691   if(pd_hndl) {
  692     LOGPRINTF(("Deallocating PD"));
  693     ret = ibv_dealloc_pd(pd_hndl);
  694     if(ret) {
  695       fprintf(stderr, "Error deallocating PD\n");
  696     }
  697   }
  698 
  699   /* Application code should not close HCA, just release handle */
  700 
  701   if(ctx) {
  702     LOGPRINTF(("Releasing HCA"));
  703     ret = ibv_close_device(ctx);
  704     if(ret) {
  705       fprintf(stderr, "Error releasing HCA\n");
  706     }
  707   }
  708 
  709   return 0;
  710 }
  711 
  712 void event_handler(struct ibv_cq *cq)
  713 {
  714   int ret;
  715  
  716   while(1) {
  717      /* int ibv_poll_cq(a,b,c):
  718       *     a: command queue to poll
  719       *     b: max number of completions to return
  720       *     c: array of at least (b) entries of ibv_wc where these
  721       *     completion events will be returned.
  722       */
  723     ret = ibv_poll_cq(cq, 1, &wc);
  724 
  725      if(ret == 0) {
  726         LOGPRINTF(("Empty completion queue, requesting next notification"));
  727         ibv_req_notify_cq(r_cq_hndl, 0);  /* ... explained in prev line.. */
  728         return;
  729      } else if(ret < 0) {
  730         fprintf(stderr, "Error in event_handler (polling cq)\n");
  731         exit(-1);
  732      } else if(wc.status != IBV_WC_SUCCESS) {
  733         fprintf(stderr, "Error in event_handler, on returned work completion "
  734         "status: %d\n", wc.status);
  735         exit(-1);
  736      }
  737      
  738      LOGPRINTF(("Retrieved work completion"));
  739 
  740      /* For ping-pong mode at least, this check shouldn't be needed for
  741       * normal operation, but it will help catch any bugs with multiple
  742       * sends coming through when we're only expecting one.
  743       */
  744      if(receive_complete == 1) {
  745 
  746         while(receive_complete != 0) sched_yield();
  747 
  748      }
  749 
  750      receive_complete = 1;
  751 
  752   }
  753   
  754 }
  755 
  756 /* read the data from the tcp connection */
  757 static int
  758 readFully(int fd, void *obuf, int len)
  759 {
  760   int bytesLeft = len;
  761   char *buf = (char *) obuf;
  762   int bytesRead = 0;
  763 
  764   while (bytesLeft > 0 &&
  765         (bytesRead = read(fd, (void *) buf, bytesLeft)) > 0)
  766     {
  767       bytesLeft -= bytesRead;
  768       buf += bytesRead;
  769     }
  770   if (bytesRead <= 0)
  771     return bytesRead;
  772   return len;
  773 }
  774 
  775 
  776 /* sync up the tcp connection */
  777 void Sync(ArgStruct *p)
  778 {
  779     char s[] = "SyncMe";
  780     char response[7];
  781 
  782     if (write(p->commfd, s, strlen(s)) < 0 ||
  783         readFully(p->commfd, response, strlen(s)) < 0)
  784       {
  785         perror("NetPIPE: error writing or reading synchronization string");
  786         exit(3);
  787       }
  788     if (strncmp(s, response, strlen(s)))
  789       {
  790         fprintf(stderr, "NetPIPE: Synchronization string incorrect!\n");
  791         exit(3);
  792       }
  793 }
  794 
  795 
  796 
  797 
  798 void PrepareToReceive(ArgStruct *p)
  799 {
  800   int                ret;       /* Return code */
  801   struct ibv_recv_wr rr;        /* Receive request */
  802   struct ibv_recv_wr *bad_wr;   /* Handle to any incomplete requests */
  803   struct ibv_sge     sg_entry;  /* Scatter/Gather list - holds buff addr */
  804 
  805   /* We don't need to post a receive if doing RDMA write with local polling */
  806 
  807   if( p->prot.commtype == NP_COMM_RDMAWRITE &&
  808       p->prot.comptype == NP_COMP_LOCALPOLL )
  809      return;
  810   /* setup the receive request, specify which list to use and # entries */
  811   rr.num_sge = 1;           /* # of entries in this list */ 
  812   rr.sg_list = &sg_entry;       /* the list of entries */
  813   rr.next = NULL;           /* the next entry (if more than one */
  814 
  815   sg_entry.lkey = r_mr_hndl->lkey;  /* link the entries lkey to our remote mr */
  816   sg_entry.length = p->bufflen;     /* provide a buffer length */
  817   sg_entry.addr = (uintptr_t)p->r_ptr; /* address/context of sg_entry */
  818 
  819   /* technically if we have problems, the return is < 0,
  820    * but this works as well
  821    */
  822 
  823   /* if we get a change in bad_wr value, it is because the Receive request
  824    * couldnt be posted to the command queue for some reason.  
  825    * (This may be because the queue is full) 
  826    * You should probably do something with the bad_wr if your request 
  827    * needs to actuall get posted.
  828    */
  829   ret = ibv_post_recv(qp_hndl, &rr, &bad_wr);
  830   if(ret) {
  831     fprintf(stderr, "Error posting recv request\n");
  832     CleanUp(p);
  833     exit(-1);
  834   } else {
  835     LOGPRINTF(("Posted recv request"));
  836   }
  837 
  838   /* Set receive flag to zero and request event completion 
  839    * notification for this receive so the event handler will 
  840    * be triggered when the receive completes.
  841    */
  842   if( p->prot.comptype == NP_COMP_EVENT ) {
  843     receive_complete = 0;
  844   }
  845 }
  846 
  847 /* SendData == Post a 'send' request to the (send)command queue */
  848 void SendData(ArgStruct *p)
  849 {
  850   int                ret;       /* Return code */
  851   struct ibv_send_wr sr;        /* Send request */
  852   struct ibv_send_wr *bad_wr;   /* Handle to any incomplete wr returned by ibv*/
  853   struct ibv_sge     sg_entry;  /* Scatter/Gather list - holds buff addr */
  854 
  855   /* Fill in send request struct */
  856     /* Set the send request's opcode based on run-time options */
  857   if(p->prot.commtype == NP_COMM_SENDRECV) {
  858      sr.opcode = IBV_WR_SEND;
  859      LOGPRINTF(("Doing regular send"));
  860   } else if(p->prot.commtype == NP_COMM_SENDRECV_WITH_IMM) {
  861      sr.opcode = IBV_WR_SEND_WITH_IMM;
  862      LOGPRINTF(("Doing regular send with imm"));
  863   } else if(p->prot.commtype == NP_COMM_RDMAWRITE) {
  864      sr.opcode = IBV_WR_RDMA_WRITE; /* if RDMA, need to give more info */
  865      sr.wr.rdma.remote_addr = (uintptr_t)(((char *)remote_address) + (p->s_ptr - p->s_buff));
  866      sr.wr.rdma.rkey = remote_key;
  867      LOGPRINTF(("Doing RDMA write (raddr=%p)", sr.wr.rdma.remote_addr));
  868   } else if(p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM) {
  869      sr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;    /* more info if RDMA */
  870      sr.wr.rdma.remote_addr = (uintptr_t)(((char *)remote_address) + (p->s_ptr - p->s_buff));
  871      sr.wr.rdma.rkey = remote_key;
  872      LOGPRINTF(("Doing RDMA write with imm (raddr=%p)", sr.wr.rdma.remote_addr));
  873   } else {
  874      fprintf(stderr, "Error, invalid communication type in SendData\n");
  875      exit(-1);
  876   }
  877   
  878   sr.send_flags = 0;    /* This needed due to a bug in Mellanox HW rel a-0 */
  879 
  880   sr.num_sge = 1;           /* # entries in this request */
  881   sr.sg_list = &sg_entry;       /* the list of other requests */
  882   sr.next = NULL;           /* the next request in the list */
  883 
  884   sg_entry.lkey = s_mr_hndl->lkey;  /* Local memory region key */
  885   sg_entry.length = p->bufflen;    /* buffer's size */
  886   sg_entry.addr = (uintptr_t)p->s_ptr;  /* buffer's location */
  887 
  888 
  889   
  890   /* Post the send request to the (send)command queue */
  891 
  892   /* ibv_post_send(...) is handled in same fashion ibv_post_recv(..) */
  893   ret = ibv_post_send(qp_hndl, &sr, &bad_wr);
  894   if(ret) {
  895     fprintf(stderr, "Error posting send request\n");
  896   } else {
  897     LOGPRINTF(("Posted send request"));
  898   }
  899 
  900 }
  901 
  902 /* Post a receive request to the (receive)command queue */
  903 void RecvData(ArgStruct *p)
  904 {
  905   int ret;
  906 
  907   /* Busy wait for incoming data */
  908 
  909   LOGPRINTF(("Receiving at buffer address %p", p->r_ptr));
  910 
  911   /*
  912    * Unsignaled receives are not supported, so we must always poll the
  913    * CQ, except when using RDMA writes.
  914    */
  915   if( p->prot.commtype == NP_COMM_RDMAWRITE ) {
  916        
  917     /* Poll for receive completion locally on the receive data */
  918 
  919     LOGPRINTF(("Waiting for last byte of data to arrive"));
  920      
  921     while(p->r_ptr[p->bufflen-1] != 'a' + (p->cache ? 1 - p->tr : 1) ) 
  922     {
  923        /* BUSY WAIT -- this should be fine since we 
  924         * declared r_ptr with volatile qualifier */ 
  925     }
  926 
  927     /* Reset last byte */
  928     p->r_ptr[p->bufflen-1] = 'a' + (p->cache ? p->tr : 0);
  929 
  930     LOGPRINTF(("Received all of data"));
  931 
  932   } else if( p->prot.comptype != NP_COMP_EVENT ) {
  933      
  934      /* Poll for receive completion using poll function */
  935 
  936      LOGPRINTF(("Polling completion queue for work completion"));
  937      
  938      ret = 0;
  939      while(ret == 0)
  940         ret = ibv_poll_cq(r_cq_hndl, 1, &wc);   /* poll & grab 1 completion */
  941      /* ret = # of completions polled by the function */
  942 
  943      if(ret < 0) {
  944         fprintf(stderr, "Error in RecvData, polling for completion\n");
  945         exit(-1);
  946      }
  947 
  948      if(wc.status != IBV_WC_SUCCESS) {
  949         fprintf(stderr, "Error in status of returned completion: %d\n",
  950                 wc.status);
  951         exit(-1);
  952      }
  953 
  954      LOGPRINTF(("Retrieved successful completion"));
  955      
  956   } else if( p->prot.comptype == NP_COMP_EVENT ) {
  957 
  958      /* Instead of polling directly on data or the completion queue,
  959       * let the event completion handler set a flag when the receive
  960       * completes, and poll on that instead. Could try using semaphore here
  961       * as well to eliminate busy polling
  962       */
  963 
  964      LOGPRINTF(("Polling receive flag"));
  965      
  966      while( receive_complete == 0 ) /* this is set by the event hanlr */
  967      {
  968         /* BUSY WAIT */
  969      }
  970 
  971      /* If in prepost-burst mode, we won't be calling PrepareToReceive
  972       * between ping-pongs, so we need to reset the receive_complete
  973       * flag here.
  974       */
  975      if( p->preburst ) receive_complete = 0;
  976 
  977      LOGPRINTF(("Receive completed"));
  978   }
  979 }
  980 
  981 /* Reset is used after a trial to empty the work request queues so we
  982    have enough room for the next trial to run */
  983 void Reset(ArgStruct *p)
  984 {
  985 
  986   int                ret;       /* Return code */
  987   struct ibv_send_wr sr;        /* Send request */
  988   struct ibv_send_wr *bad_sr;   /* handle to your reqeust if it fails */
  989   struct ibv_recv_wr rr;        /* Recv request */
  990   struct ibv_recv_wr *bad_rr;  /* handle to your request if it fails */
  991 
  992   /* If comptype is event, then we'll use event handler to detect receive,
  993    * so initialize receive_complete flag
  994    */
  995   if(p->prot.comptype == NP_COMP_EVENT) receive_complete = 0;
  996 
  997   /* Prepost receive */
  998   rr.num_sge = 0;   /* there are no entries in this request */
  999   rr.next = NULL;
 1000 
 1001   LOGPRINTF(("Posting recv request in Reset"));
 1002   ret = ibv_post_recv(qp_hndl, &rr, &bad_rr);
 1003   if(ret) {
 1004     fprintf(stderr, "  Error posting recv request\n");
 1005     CleanUp(p);
 1006     exit(-1);
 1007   }
 1008 
 1009   /* Make sure both nodes have preposted receives */
 1010   Sync(p);
 1011 
 1012   /* Post Send */
 1013   sr.opcode = IBV_WR_SEND;
 1014   sr.send_flags = IBV_SEND_SIGNALED;
 1015   sr.num_sge = 0;       /* no entires in this request */
 1016   sr.next = NULL;
 1017 
 1018   LOGPRINTF(("Posting send request "));
 1019   ret = ibv_post_send(qp_hndl, &sr, &bad_sr);
 1020   if(ret) {
 1021     fprintf(stderr, "  Error posting send request in Reset\n");
 1022     exit(-1);
 1023   }
 1024   if(wc.status != IBV_WC_SUCCESS) {
 1025      fprintf(stderr, "  Error in completion status: %d\n",
 1026              wc.status);
 1027      exit(-1);
 1028   }
 1029 
 1030   LOGPRINTF(("Polling for completion of send request"));
 1031   ret = 0;
 1032   while(ret == 0)
 1033     ret = ibv_poll_cq(s_cq_hndl, 1, &wc);   /* grab the request */
 1034 
 1035   if(ret < 0) {
 1036     fprintf(stderr, "Error polling CQ for send in Reset\n");
 1037     exit(-1);
 1038   }
 1039   if(wc.status != IBV_WC_SUCCESS) {
 1040      fprintf(stderr, "  Error in completion status: %d\n",
 1041              wc.status);
 1042      exit(-1);
 1043   }          
 1044   
 1045   LOGPRINTF(("Status of send completion: %d", wc.status));
 1046 
 1047   if(p->prot.comptype == NP_COMP_EVENT) { 
 1048      /* If using event completion, the event handler will set receive_complete
 1049       * when it gets the completion event.
 1050       */
 1051      LOGPRINTF(("Waiting for receive_complete flag"));
 1052      while(receive_complete == 0) { /* BUSY WAIT */ }
 1053   } else {
 1054      LOGPRINTF(("Polling for completion of recv request"));
 1055      ret = 0;
 1056      while(ret == 0)
 1057        ret = ibv_poll_cq(r_cq_hndl, 1, &wc);
 1058      
 1059      if(ret < 0) {
 1060        fprintf(stderr, "Error polling CQ for recv in Reset");
 1061        exit(-1);
 1062      }
 1063      if(wc.status != IBV_WC_SUCCESS) {
 1064         fprintf(stderr, "  Error in completion status: %d\n",
 1065                 wc.status);
 1066         exit(-1);
 1067      }
 1068 
 1069      LOGPRINTF(("Status of recv completion: %d", wc.status));
 1070   }
 1071   LOGPRINTF(("Done with reset"));
 1072 }
 1073 
 1074 
 1075 /* ********** NetPipe stuff ********* */
 1076 void SendTime(ArgStruct *p, double *t)
 1077 {
 1078     uint32_t ltime, ntime;
 1079 
 1080     /*
 1081       Multiply the number of seconds by 1e6 to get time in microseconds
 1082       and convert value to an unsigned 32-bit integer.
 1083       */
 1084     ltime = (uint32_t)(*t * 1.e6);
 1085 
 1086     /* Send time in network order */
 1087     ntime = htonl(ltime);
 1088     if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0)
 1089       {
 1090         printf("NetPIPE: write failed in SendTime: errno=%d\n", errno);
 1091         exit(301);
 1092       }
 1093 }
 1094 
 1095 void RecvTime(ArgStruct *p, double *t)
 1096 {
 1097     uint32_t ltime, ntime;
 1098     int bytesRead;
 1099 
 1100     bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t));
 1101     if (bytesRead < 0)
 1102       {
 1103         printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno);
 1104         exit(302);
 1105       }
 1106     else if (bytesRead != sizeof(uint32_t))
 1107       {
 1108         fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n",
 1109                 bytesRead);
 1110         exit(303);
 1111       }
 1112     ltime = ntohl(ntime);
 1113 
 1114     /* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */
 1115     *t = (double)ltime / 1.0e6;
 1116 }
 1117 
 1118 /* in the event of a send failure, re-send (tcp)*/
 1119 void SendRepeat(ArgStruct *p, int rpt)
 1120 {
 1121   uint32_t lrpt, nrpt;
 1122 
 1123   lrpt = rpt;
 1124   /* Send repeat count as a long in network order */
 1125   nrpt = htonl(lrpt);
 1126   if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0)
 1127     {
 1128       printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno);
 1129       exit(304);
 1130     }
 1131 }
 1132 
 1133 
 1134 /* in the event of a recv failure, resend (tcp)*/
 1135 void RecvRepeat(ArgStruct *p, int *rpt)
 1136 {
 1137   uint32_t lrpt, nrpt;
 1138   int bytesRead;
 1139 
 1140   bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t));
 1141   if (bytesRead < 0)
 1142     {
 1143       printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno);
 1144       exit(305);
 1145     }
 1146   else if (bytesRead != sizeof(uint32_t))
 1147     {
 1148       fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n",
 1149               bytesRead);
 1150       exit(306);
 1151     }
 1152   lrpt = ntohl(nrpt);
 1153 
 1154   *rpt = lrpt;
 1155 }
 1156 
 1157 
 1158 /* establish the tcp connection */
 1159 void establish(ArgStruct *p)
 1160 {
 1161  unsigned int clen;
 1162  int one = 1;
 1163  struct protoent;
 1164 
 1165  clen = sizeof(p->prot.sin2);
 1166  if(p->tr){
 1167    if(connect(p->commfd, (struct sockaddr *) &(p->prot.sin1),
 1168               sizeof(p->prot.sin1)) < 0){
 1169      printf("Client: Cannot Connect! errno=%d\n",errno);
 1170      exit(-10);
 1171    }
 1172   }
 1173   else {
 1174     /* SERVER */
 1175     listen(p->servicefd, 5);
 1176     p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2),
 1177                        &clen);
 1178 
 1179     if(p->commfd < 0){
 1180       printf("Server: Accept Failed! errno=%d\n",errno);
 1181       exit(-12);
 1182     }
 1183   }
 1184 }
 1185 
 1186 void CleanUp(ArgStruct *p)
 1187 {
 1188    char *quit="QUIT";
 1189    if (p->tr)
 1190    {
 1191       write(p->commfd,quit, 5);
 1192       read(p->commfd, quit, 5);
 1193       close(p->commfd);
 1194    }
 1195    else
 1196    {
 1197       read(p->commfd,quit, 5);
 1198       write(p->commfd,quit,5);
 1199       close(p->commfd);
 1200       close(p->servicefd);
 1201    }
 1202 
 1203    finalizeIB(p);   /* finally, deallocate all the IB stuff */
 1204 }
 1205 
 1206 
 1207 /* Exchange IB connection info via the tcp connection */
 1208 void AfterAlignmentInit(ArgStruct *p)
 1209 {
 1210   int bytesRead;
 1211 
 1212   /* Exchange buffer pointers and remote infiniband keys if doing rdma. Do
 1213    * the exchange in this function because this will happen after any
 1214    * memory alignment is done, which is important for getting the 
 1215    * correct remote address.
 1216   */
 1217   if( p->prot.commtype == NP_COMM_RDMAWRITE || 
 1218       p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM ) {
 1219      
 1220      /* Send my receive buffer address
 1221       */
 1222      if(write(p->commfd, (void *)&p->r_buff, sizeof(void*)) < 0) {
 1223         perror("NetPIPE: write of buffer address failed in AfterAlignmentInit");
 1224         exit(-1);
 1225      }
 1226      
 1227      LOGPRINTF(("Sent buffer address: %p", p->r_buff));
 1228      
 1229      /* Send my remote key for accessing
 1230       * my remote buffer via IB RDMA
 1231       */
 1232      if(write(p->commfd, (void *)&r_mr_hndl->rkey, sizeof(uint32_t)) < 0) {
 1233         perror("NetPIPE: write of remote key failed in AfterAlignmentInit");
 1234         exit(-1);
 1235      }
 1236   
 1237      LOGPRINTF(("Sent remote key: %d", r_mr_hndl->rkey));
 1238      
 1239      /* Read the sent data
 1240       */
 1241      bytesRead = readFully(p->commfd, (void *)&remote_address, sizeof(void*));
 1242      if (bytesRead < 0) {
 1243         perror("NetPIPE: read of buffer address failed in AfterAlignmentInit");
 1244         exit(-1);
 1245      } else if (bytesRead != sizeof(void*)) {
 1246         perror("NetPIPE: partial read of buffer address in AfterAlignmentInit");
 1247         exit(-1);
 1248      }
 1249      
 1250      LOGPRINTF(("Received remote address from other node: %p", remote_address));
 1251      
 1252      bytesRead = readFully(p->commfd, (void *)&remote_key, sizeof(uint32_t));
 1253      if (bytesRead < 0) {
 1254         perror("NetPIPE: read of remote key failed in AfterAlignmentInit");
 1255         exit(-1);
 1256      } else if (bytesRead != sizeof(uint32_t)) {
 1257         perror("NetPIPE: partial read of remote key in AfterAlignmentInit");
 1258         exit(-1);
 1259      }
 1260      
 1261      LOGPRINTF(("Received remote key from other node: %d", remote_key));
 1262 
 1263   }
 1264 }
 1265 
 1266 
 1267 void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
 1268 {
 1269   /* Allocate buffers */
 1270 
 1271   p->r_buff = malloc(bufflen+MAX(soffset,roffset));
 1272   if(p->r_buff == NULL) {
 1273     fprintf(stderr, "Error malloc'ing buffer\n");
 1274     exit(-1);
 1275   }
 1276 
 1277   if(p->cache) { /* run-time option ? */
 1278 
 1279     /* Infiniband spec says we can register same memory region
 1280      * more than once, so just copy buffer address. We will register
 1281      * the same buffer twice with Infiniband.
 1282      */
 1283     p->s_buff = p->r_buff;
 1284 
 1285   } else {
 1286     
 1287     p->s_buff = malloc(bufflen+soffset);
 1288     if(p->s_buff == NULL) {
 1289       fprintf(stderr, "Error malloc'ing buffer\n");
 1290       exit(-1);
 1291     }
 1292 
 1293   }
 1294 
 1295   /* Register buffers with Infiniband */
 1296 
 1297   /* Associate our newly allocated buffers with an IB memory region
 1298    *   If the reg fails, the function will return NULL for your region ptr
 1299    *   Else it will return a ptr to an allocated mem region 
 1300    */
 1301 
 1302   /* Register the local recv mem region handle:
 1303    * ibv_mem_register( 
 1304    *        local protection domain,
 1305    *        remote buffer address,
 1306    *        size of the remote buffer,
 1307    *        access rights to this memory
 1308    *        )
 1309    */
 1310   r_mr_hndl = ibv_reg_mr(pd_hndl, p->r_buff, bufflen + MAX(soffset, roffset),
 1311              IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
 1312 
 1313   if(!r_mr_hndl)
 1314         {
 1315     fprintf(stderr, "Error registering recv buffer\n");
 1316     exit(-1);
 1317         }
 1318         else
 1319         {
 1320          LOGPRINTF(("Registered Recv Buffer"));
 1321         }
 1322 
 1323     /* Register the send mem region handle */
 1324   s_mr_hndl = ibv_reg_mr(pd_hndl, p->s_buff, bufflen+soffset, IBV_ACCESS_LOCAL_WRITE);
 1325   if(!s_mr_hndl) {
 1326     fprintf(stderr, "Error registering send buffer\n");
 1327     exit(-1);
 1328   } else {
 1329     LOGPRINTF(("Registered Send Buffer"));
 1330   }
 1331 
 1332 }
 1333 
 1334 
 1335 /* De_register the allocated memory regions before exiting */
 1336 void FreeBuff(char *buff1, char *buff2)
 1337 {
 1338   int ret;
 1339 
 1340   if(s_mr_hndl) {
 1341     LOGPRINTF(("Deregistering send buffer"));
 1342     ret = ibv_dereg_mr(s_mr_hndl);
 1343     if(ret) {
 1344       fprintf(stderr, "Error deregistering send mr\n");
 1345     } else {
 1346       s_mr_hndl = NULL;
 1347     }
 1348   }
 1349 
 1350   if(r_mr_hndl) {
 1351     LOGPRINTF(("Deregistering recv buffer"));
 1352     ret = ibv_dereg_mr(r_mr_hndl);
 1353     if(ret) {
 1354       fprintf(stderr, "Error deregistering recv mr\n");
 1355     } else {
 1356       r_mr_hndl = NULL;
 1357     }
 1358   }
 1359 
 1360   if(buff1 != NULL)
 1361     free(buff1);
 1362 
 1363   if(buff2 != NULL)
 1364     free(buff2);
 1365 }
 1366 
 1367 
 1368 static void logprintf(const char *format, ...)
 1369 {
 1370     va_list arglist;
 1371     va_start(arglist, format);
 1372     vfprintf(logfile, format, arglist);
 1373     va_end(arglist);
 1374 }