"Fossies" - the Fresh Open Source Software Archive

Member "NetPIPE-3.7.2/src/ib.c" (19 Aug 2010, 32182 Bytes) of package /linux/privat/old/NetPIPE-3.7.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "ib.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 4.x_vs_3.7.2.

    1 /*****************************************************************************/
    2 /* "NetPIPE" -- Network Protocol Independent Performance Evaluator.          */
    3 /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc.      */
    4 /*                                                                           */
    5 /* This program is free software; you can redistribute it and/or modify      */
    6 /* it under the terms of the GNU General Public License as published by      */
    7 /* the Free Software Foundation.  You should have received a copy of the     */
    8 /* GNU General Public License along with this program; if not, write to the  */
    9 /* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */
   10 /*                                                                           */
   11 /*       ib.c              ---- Infiniband module for the Mellanox VAPI      */
   12 /*****************************************************************************/
   13 
   14 #define USE_VOLATILE_RPTR /* needed for polling on last byte of recv buffer */
   15 #include    "netpipe.h"
   16 #include    <stdio.h>
   17 #include    <getopt.h>
   18 
   19 /* Debugging output macro */
   20 
   21 FILE* logfile;
   22 
   23 #if 0
   24 #define LOGPRINTF(_format, _aa...) fprintf(logfile, __FUNCTION__": " _format, ##_aa); fflush(logfile)
   25 #else
   26 #define LOGPRINTF(_format, _aa...)
   27 #endif
   28 
   29 /* Header files needed for Infiniband */
   30 
   31 #include    "vapi.h"        /* Mellanox Verbs API */
   32 #include    "evapi.h"       /* Mellanox Verbs API extension */
   33 #include    "vapi_common.h" /* Mellanox VIP layer of HCA Verbs */
   34 
   35 /* Global vars */
   36 
   37 static VAPI_hca_hndl_t     hca_hndl=VAPI_INVAL_HNDL;
   38 static VAPI_hca_port_t     hca_port;
   39 static int                 port_num;
   40 static IB_lid_t            lid;
   41 static IB_lid_t            d_lid;
   42 static VAPI_pd_hndl_t      pd_hndl=VAPI_INVAL_HNDL;
   43 static VAPI_cqe_num_t      num_cqe;
   44 static VAPI_cqe_num_t      act_num_cqe;
   45 static VAPI_cq_hndl_t      s_cq_hndl=VAPI_INVAL_HNDL;
   46 static VAPI_cq_hndl_t      r_cq_hndl=VAPI_INVAL_HNDL;
   47 static EVAPI_compl_handler_hndl_t ceh_hndl=VAPI_INVAL_HNDL;
   48 static VAPI_mrw_t          mr_in;
   49 static VAPI_mrw_t          s_mr_out;
   50 static VAPI_mrw_t          r_mr_out;
   51 static VAPI_mr_hndl_t      s_mr_hndl=VAPI_INVAL_HNDL;
   52 static VAPI_mr_hndl_t      r_mr_hndl=VAPI_INVAL_HNDL;
   53 static VAPI_qp_init_attr_t qp_init_attr;
   54 static VAPI_qp_prop_t      qp_prop;
   55 static VAPI_qp_hndl_t      qp_hndl=VAPI_INVAL_HNDL;
   56 static VAPI_qp_num_t       d_qp_num;
   57 static VAPI_qp_attr_mask_t qp_attr_mask;
   58 static VAPI_qp_attr_t      qp_attr;
   59 static VAPI_qp_cap_t       qp_cap;
   60 static VAPI_wc_desc_t      wc;
   61 static int                 max_wq=50000;
   62 static void*               remote_address;
   63 static VAPI_rkey_t         remote_key;
   64 static volatile int        receive_complete;
   65 
   66 /* Local prototypes */
   67 
   68 void event_handler(VAPI_hca_hndl_t, VAPI_cq_hndl_t, void*);
   69 
   70 /* Function definitions */
   71 
   72 void Init(ArgStruct *p, int* pargc, char*** pargv)
   73 {
   74    /* Set defaults
   75     */
   76    p->prot.ib_mtu = MTU1024;             /* 1024 Byte MTU                    */
   77    p->prot.commtype = NP_COMM_SENDRECV;  /* Use Send/Receive communications  */
   78    p->prot.comptype = NP_COMP_LOCALPOLL; /* Use local polling for completion */
   79    p->tr = 0;                            /* I am not the transmitter         */
   80    p->rcv = 1;                           /* I am the receiver                */      
   81 }
   82 
   83 void Setup(ArgStruct *p)
   84 {
   85 
   86  int one = 1;
   87  int sockfd;
   88  struct sockaddr_in *lsin1, *lsin2;      /* ptr to sockaddr_in in ArgStruct */
   89  char *host;
   90  struct hostent *addr;
   91  struct protoent *proto;
   92  int send_size, recv_size, sizeofint = sizeof(int);
   93  struct sigaction sigact1;
   94  char logfilename[80];
   95 
   96  /* Sanity check */
   97  if( p->prot.commtype == NP_COMM_RDMAWRITE && 
   98      p->prot.comptype != NP_COMP_LOCALPOLL ) {
   99    fprintf(stderr, "Error, RDMA Write may only be used with local polling.\n");
  100    fprintf(stderr, "Try using RDMA Write With Immediate Data with vapi polling\n");
  101    fprintf(stderr, "or event completion\n");
  102    exit(-1);
  103  }
  104  
  105  /* Open log file */
  106  sprintf(logfilename, ".iblog%d", 1 - p->tr);
  107  logfile = fopen(logfilename, "w");
  108 
  109  host = p->host;                           /* copy ptr to hostname */ 
  110 
  111  lsin1 = &(p->prot.sin1);
  112  lsin2 = &(p->prot.sin2);
  113 
  114  bzero((char *) lsin1, sizeof(*lsin1));
  115  bzero((char *) lsin2, sizeof(*lsin2));
  116 
  117  if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
  118    printf("NetPIPE: can't open stream socket! errno=%d\n", errno);
  119    exit(-4);
  120  }
  121 
  122  if(!(proto = getprotobyname("tcp"))){
  123    printf("NetPIPE: protocol 'tcp' unknown!\n");
  124    exit(555);
  125  }
  126 
  127  if (p->tr){                                  /* if client i.e., Sender */
  128 
  129 
  130    if (atoi(host) > 0) {                   /* Numerical IP address */
  131      lsin1->sin_family = AF_INET;
  132      lsin1->sin_addr.s_addr = inet_addr(host);
  133 
  134    } else {
  135       
  136      if ((addr = gethostbyname(host)) == NULL){
  137        printf("NetPIPE: invalid hostname '%s'\n", host);
  138        exit(-5);
  139      }
  140 
  141      lsin1->sin_family = addr->h_addrtype;
  142      bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length);
  143    }
  144 
  145    lsin1->sin_port = htons(p->port);
  146 
  147  } else {                                 /* we are the receiver (server) */
  148    
  149    bzero((char *) lsin1, sizeof(*lsin1));
  150    lsin1->sin_family      = AF_INET;
  151    lsin1->sin_addr.s_addr = htonl(INADDR_ANY);
  152    lsin1->sin_port        = htons(p->port);
  153 
  154    /* re-use socket, common if netpipe aborts due to busted networks */
  155    one = 1;
  156    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int))) {
  157        printf("NetPIPE: server: unable to setsockopt -- errno %d\n", errno);
  158        exit(-7);
  159    }
  160    
  161    if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){
  162      printf("NetPIPE: server: bind on local address failed! errno=%d", errno);
  163      exit(-6);
  164    }
  165 
  166  }
  167 
  168  if(p->tr)
  169    p->commfd = sockfd;
  170  else
  171    p->servicefd = sockfd;
  172 
  173  
  174 
  175  /* Establish tcp connections */
  176 
  177  establish(p);
  178 
  179  /* Initialize Mellanox Infiniband */
  180 
  181  if(initIB(p) == -1) {
  182    CleanUp(p);
  183    exit(-1);
  184  }
  185 }   
  186 
  187 int initIB(ArgStruct *p)
  188 {
  189   VAPI_ret_t          ret;
  190 
  191   /* Open HCA */
  192 
  193   /* open hca just in case it was not opened by system earlier */
  194   ret = VAPI_open_hca("InfiniHost0", &hca_hndl); 
  195 
  196   ret = EVAPI_get_hca_hndl("InfiniHost0", &hca_hndl);
  197   if(ret != VAPI_OK) {
  198     fprintf(stderr, "Error opening Infiniband HCA: %s\n", VAPI_strerror(ret));
  199     return -1;
  200   } else {
  201     LOGPRINTF("Opened Infiniband HCA\n");
  202   }
  203 
  204   /* Get HCA properties */
  205 
  206   port_num=1;
  207   ret = VAPI_query_hca_port_prop(hca_hndl, (IB_port_t)port_num, 
  208                                  (VAPI_hca_port_t *)&hca_port);
  209   if(ret != VAPI_OK) {
  210     fprintf(stderr, "Error querying Infiniband HCA: %s\n", VAPI_strerror(ret));
  211     return -1;
  212   } else {
  213     LOGPRINTF("Queried Infiniband HCA\n");
  214   }
  215   lid = hca_port.lid;
  216   LOGPRINTF("  lid = %d\n", lid);
  217 
  218 
  219   /* Allocate Protection Domain */
  220 
  221   ret = VAPI_alloc_pd(hca_hndl, &pd_hndl);
  222   if(ret != VAPI_OK) {
  223     fprintf(stderr, "Error allocating PD: %s\n", VAPI_strerror(ret));
  224     return -1;
  225   } else {
  226     LOGPRINTF("Allocated Protection Domain\n");
  227   }
  228 
  229 
  230   /* Create send completion queue */
  231   
  232   num_cqe = 30000; /* Requested number of completion q elements */
  233   ret = VAPI_create_cq(hca_hndl, num_cqe, &s_cq_hndl, &act_num_cqe);
  234   if(ret != VAPI_OK) {
  235     fprintf(stderr, "Error creating send CQ: %s\n", VAPI_strerror(ret));
  236     return -1;
  237   } else {
  238     LOGPRINTF("Created Send Completion Queue with %d elements\n", act_num_cqe);
  239   }
  240 
  241 
  242   /* Create recv completion queue */
  243   
  244   num_cqe = 20000; /* Requested number of completion q elements */
  245   ret = VAPI_create_cq(hca_hndl, num_cqe, &r_cq_hndl, &act_num_cqe);
  246   if(ret != VAPI_OK) {
  247     fprintf(stderr, "Error creating recv CQ: %s\n", VAPI_strerror(ret));
  248     return -1;
  249   } else {
  250     LOGPRINTF("Created Recv Completion Queue with %d elements\n", act_num_cqe);
  251   }
  252 
  253 
  254   /* Placeholder for MR */
  255 
  256 
  257   /* Create Queue Pair */
  258 
  259   qp_init_attr.cap.max_oust_wr_rq = max_wq; /* Max outstanding WR on RQ      */
  260   qp_init_attr.cap.max_oust_wr_sq = max_wq; /* Max outstanding WR on SQ      */
  261   qp_init_attr.cap.max_sg_size_rq = 1; /* Max scatter/gather entries on RQ */
  262   qp_init_attr.cap.max_sg_size_sq = 1; /* Max scatter/gather entries on SQ */
  263   qp_init_attr.pd_hndl            = pd_hndl; /* Protection domain handle   */
  264   qp_init_attr.rdd_hndl           = 0; /* Reliable datagram domain handle  */
  265   qp_init_attr.rq_cq_hndl         = r_cq_hndl; /* CQ handle for RQ         */
  266   qp_init_attr.rq_sig_type        = VAPI_SIGNAL_REQ_WR; /* Signalling type */
  267   qp_init_attr.sq_cq_hndl         = s_cq_hndl; /* CQ handle for RQ         */
  268   qp_init_attr.sq_sig_type        = VAPI_SIGNAL_REQ_WR; /* Signalling type */
  269   qp_init_attr.ts_type            = IB_TS_RC; /* Transmission type         */
  270   
  271   ret = VAPI_create_qp(hca_hndl, &qp_init_attr, &qp_hndl, &qp_prop);
  272   if(ret != VAPI_OK) {
  273     fprintf(stderr, "Error creating Queue Pair: %s\n", VAPI_strerror(ret));
  274     return -1;
  275   } else {
  276     LOGPRINTF("Created Queue Pair, max outstanding WR on RQ: %d, on SQ: %d\n",
  277               qp_prop.cap.max_oust_wr_rq, qp_prop.cap.max_oust_wr_sq);
  278   }
  279 
  280 
  281   /* Exchange lid and qp_num with other node */
  282   
  283   if( write(p->commfd, &lid, sizeof(lid) ) != sizeof(lid) ) {
  284     fprintf(stderr, "Failed to send lid over socket\n");
  285     return -1;
  286   }
  287   if( write(p->commfd, &qp_prop.qp_num, sizeof(qp_prop.qp_num) ) != sizeof(qp_prop.qp_num) ) {
  288     fprintf(stderr, "Failed to send qpnum over socket\n");
  289     return -1;
  290   }
  291   if( read(p->commfd, &d_lid, sizeof(d_lid) ) != sizeof(d_lid) ) {
  292     fprintf(stderr, "Failed to read lid from socket\n");
  293     return -1;
  294   }
  295   if( read(p->commfd, &d_qp_num, sizeof(d_qp_num) ) != sizeof(d_qp_num) ) {
  296     fprintf(stderr, "Failed to read qpnum from socket\n");
  297     return -1;
  298   }
  299   
  300   LOGPRINTF("Local: lid=%d qp_num=%d Remote: lid=%d qp_num=%d\n",
  301          lid, qp_prop.qp_num, d_lid, d_qp_num);
  302 
  303 
  304   /* Bring up Queue Pair */
  305   
  306   /******* INIT state ******/
  307 
  308   QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
  309 
  310   qp_attr.qp_state = VAPI_INIT;
  311   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
  312 
  313   qp_attr.pkey_ix = 0;
  314   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX);
  315 
  316   qp_attr.port = port_num;
  317   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PORT);
  318 
  319   qp_attr.remote_atomic_flags = VAPI_EN_REM_WRITE | VAPI_EN_REM_READ;
  320   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_REMOTE_ATOMIC_FLAGS);
  321 
  322   ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
  323   if(ret != VAPI_OK) {
  324     fprintf(stderr, "Error modifying QP to INIT: %s\n", VAPI_strerror(ret));
  325     return -1;
  326   }
  327 
  328   LOGPRINTF("Modified QP to INIT\n");
  329 
  330   /******* RTR (Ready-To-Receive) state *******/
  331 
  332   QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
  333 
  334   qp_attr.qp_state = VAPI_RTR;
  335   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
  336 
  337   qp_attr.qp_ous_rd_atom = 1;
  338   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_OUS_RD_ATOM);
  339 
  340   qp_attr.dest_qp_num = d_qp_num;
  341   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_DEST_QP_NUM);
  342 
  343   qp_attr.av.sl = 0;
  344   qp_attr.av.grh_flag = FALSE;
  345   qp_attr.av.dlid = d_lid;
  346   qp_attr.av.static_rate = 0;
  347   qp_attr.av.src_path_bits = 0;
  348   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_AV);
  349 
  350   qp_attr.path_mtu = p->prot.ib_mtu;
  351   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PATH_MTU);
  352 
  353   qp_attr.rq_psn = 0;
  354   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RQ_PSN);
  355 
  356   qp_attr.pkey_ix = 0;
  357   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX);
  358 
  359   qp_attr.min_rnr_timer = 5;
  360   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_MIN_RNR_TIMER);
  361   
  362   ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
  363   if(ret != VAPI_OK) {
  364     fprintf(stderr, "Error modifying QP to RTR: %s\n", VAPI_strerror(ret));
  365     return -1;
  366   }
  367 
  368   LOGPRINTF("Modified QP to RTR\n");
  369 
  370   /* Sync before going to RTS state */
  371   Sync(p);
  372 
  373   /******* RTS (Ready-to-Send) state *******/
  374 
  375   QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
  376 
  377   qp_attr.qp_state = VAPI_RTS;
  378   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
  379 
  380   qp_attr.sq_psn = 0;
  381   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_SQ_PSN);
  382 
  383   qp_attr.timeout = 31;
  384   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_TIMEOUT);
  385 
  386   qp_attr.retry_count = 1;
  387   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RETRY_COUNT);
  388 
  389   qp_attr.rnr_retry = 1;
  390   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RNR_RETRY);
  391 
  392   qp_attr.ous_dst_rd_atom = 1;
  393   QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_OUS_DST_RD_ATOM);
  394 
  395   ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
  396   if(ret != VAPI_OK) {
  397     fprintf(stderr, "Error modifying QP to RTS: %s\n", VAPI_strerror(ret));
  398     return -1;
  399   }
  400   
  401   LOGPRINTF("Modified QP to RTS\n");
  402 
  403   /* If using event completion, register event completion handler and request
  404    * the initial notification
  405    */
  406   if( p->prot.comptype == NP_COMP_EVENT ) {
  407 
  408     EVAPI_set_comp_eventh(hca_hndl, r_cq_hndl, event_handler, p, &ceh_hndl);
  409     VAPI_req_comp_notif(hca_hndl, r_cq_hndl, VAPI_NEXT_COMP);
  410 
  411   }
  412  
  413   return 0;
  414 }
  415 
  416 int finalizeIB(ArgStruct *p)
  417 {
  418   VAPI_ret_t ret;
  419 
  420   LOGPRINTF("Finalizing IB stuff\n");
  421 
  422   /* Clear completion event handler */
  423 
  424   if(p->prot.comptype == NP_COMP_EVENT ) {
  425      LOGPRINTF("Clearing comp handler\n");
  426      ret = EVAPI_clear_comp_eventh(hca_hndl, ceh_hndl);
  427      if(ret != VAPI_OK) {
  428         fprintf(stderr, "Error clearing event handler: %s\n",
  429                 VAPI_strerror(ret));
  430      }
  431   }
  432 
  433   if(qp_hndl != VAPI_INVAL_HNDL) {
  434     LOGPRINTF("Destroying QP\n");
  435     ret = VAPI_destroy_qp(hca_hndl, qp_hndl);
  436     if(ret != VAPI_OK) {
  437       fprintf(stderr, "Error destroying Queue Pair: %s\n", VAPI_strerror(ret));
  438     }
  439   }
  440 
  441   if(r_cq_hndl != VAPI_INVAL_HNDL) {
  442     LOGPRINTF("Destroying Recv CQ\n");
  443     ret = VAPI_destroy_cq(hca_hndl, r_cq_hndl);
  444     if(ret != VAPI_OK) {
  445       fprintf(stderr, "Error destroying recv CQ: %s\n", VAPI_strerror(ret));
  446     }
  447   }
  448 
  449   if(s_cq_hndl != VAPI_INVAL_HNDL) {
  450     LOGPRINTF("Destroying Send CQ\n");
  451     ret = VAPI_destroy_cq(hca_hndl, s_cq_hndl);
  452     if(ret != VAPI_OK) {
  453       fprintf(stderr, "Error destroying send CQ: %s\n", VAPI_strerror(ret));
  454     }
  455   }
  456 
  457   /* Check memory registrations just in case user bailed out */
  458   if(s_mr_hndl != VAPI_INVAL_HNDL) {
  459     LOGPRINTF("Deregistering send buffer\n");
  460     ret = VAPI_deregister_mr(hca_hndl, s_mr_hndl);
  461     if(ret != VAPI_OK) {
  462       fprintf(stderr, "Error deregistering send mr: %s\n", VAPI_strerror(ret));
  463     }
  464   }
  465 
  466   if(r_mr_hndl != VAPI_INVAL_HNDL) {
  467     LOGPRINTF("Deregistering recv buffer\n");
  468     ret = VAPI_deregister_mr(hca_hndl, r_mr_hndl);
  469     if(ret != VAPI_OK) {
  470       fprintf(stderr, "Error deregistering recv mr: %s\n", VAPI_strerror(ret));
  471     }
  472   }
  473 
  474   if(pd_hndl != VAPI_INVAL_HNDL) {
  475     LOGPRINTF("Deallocating PD\n");
  476     ret = VAPI_dealloc_pd(hca_hndl, pd_hndl);
  477     if(ret != VAPI_OK) {
  478       fprintf(stderr, "Error deallocating PD: %s\n", VAPI_strerror(ret));
  479     }
  480   }
  481 
  482   /* Application code should not close HCA, just release handle */
  483 
  484   if(hca_hndl != VAPI_INVAL_HNDL) {
  485     LOGPRINTF("Releasing HCA\n");
  486     ret = EVAPI_release_hca_hndl(hca_hndl);
  487     if(ret != VAPI_OK) {
  488       fprintf(stderr, "Error releasing HCA: %s\n", VAPI_strerror(ret));
  489     }
  490   }
  491 
  492   return 0;
  493 }
  494 
  495 void event_handler(VAPI_hca_hndl_t hca, VAPI_cq_hndl_t cq, void* data)
  496 {
  497   VAPI_ret_t    ret;
  498  
  499   while(1) {
  500      
  501      ret = VAPI_poll_cq(hca, cq, &wc);
  502 
  503      if(ret == VAPI_CQ_EMPTY) {
  504         LOGPRINTF("Empty completion queue, requesting next notification\n");
  505         VAPI_req_comp_notif(hca_hndl, r_cq_hndl, VAPI_NEXT_COMP);
  506         return;
  507      } else if(ret != VAPI_OK) {
  508         fprintf(stderr, "Error in event_handler, polling cq: %s\n",
  509                 VAPI_strerror(ret));
  510         exit(-1);
  511      } else if(wc.status != VAPI_SUCCESS) {
  512         fprintf(stderr, "Error in event_handler, on returned work completion "
  513                         "status: %s\n", VAPI_wc_status_sym(wc.status));
  514         exit(-1);
  515      }
  516      
  517      LOGPRINTF("Retrieved work completion\n");
  518 
  519      /* For ping-pong mode at least, this check shouldn't be needed for
  520       * normal operation, but it will help catch any bugs with multiple
  521       * sends coming through when we're only expecting one.
  522       */
  523      if(receive_complete == 1) {
  524 
  525         while(receive_complete != 0) sched_yield();
  526 
  527      }
  528 
  529      receive_complete = 1;
  530 
  531   }
  532   
  533 }
  534 
  535 static int
  536 readFully(int fd, void *obuf, int len)
  537 {
  538   int bytesLeft = len;
  539   char *buf = (char *) obuf;
  540   int bytesRead = 0;
  541 
  542   while (bytesLeft > 0 &&
  543         (bytesRead = read(fd, (void *) buf, bytesLeft)) > 0)
  544     {
  545       bytesLeft -= bytesRead;
  546       buf += bytesRead;
  547     }
  548   if (bytesRead <= 0)
  549     return bytesRead;
  550   return len;
  551 }
  552 
  553 void Sync(ArgStruct *p)
  554 {
  555     char s[] = "SyncMe";
  556     char response[7];
  557 
  558     if (write(p->commfd, s, strlen(s)) < 0 ||
  559         readFully(p->commfd, response, strlen(s)) < 0)
  560       {
  561         perror("NetPIPE: error writing or reading synchronization string");
  562         exit(3);
  563       }
  564     if (strncmp(s, response, strlen(s)))
  565       {
  566         fprintf(stderr, "NetPIPE: Synchronization string incorrect!\n");
  567         exit(3);
  568       }
  569 }
  570 
  571 void PrepareToReceive(ArgStruct *p)
  572 {
  573   VAPI_ret_t          ret;       /* Return code */
  574   VAPI_rr_desc_t      rr;        /* Receive request */
  575   VAPI_sg_lst_entry_t sg_entry;  /* Scatter/Gather list - holds buff addr */
  576 
  577   /* We don't need to post a receive if doing RDMA write with local polling */
  578 
  579   if( p->prot.commtype == NP_COMM_RDMAWRITE &&
  580       p->prot.comptype == NP_COMP_LOCALPOLL )
  581      return;
  582   
  583   rr.opcode = VAPI_RECEIVE;
  584 
  585   /* We only need signaled completions if using VAPI
  586    * completion methods.
  587    */
  588   if( p->prot.comptype == NP_COMP_LOCALPOLL )
  589      rr.comp_type = VAPI_UNSIGNALED;
  590   else
  591      rr.comp_type = VAPI_SIGNALED;
  592 
  593   rr.sg_lst_len = 1;
  594   rr.sg_lst_p = &sg_entry;
  595 
  596   sg_entry.lkey = r_mr_out.l_key;
  597   sg_entry.len = p->bufflen;
  598   sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)p->r_ptr;
  599 
  600   ret = VAPI_post_rr(hca_hndl, qp_hndl, &rr);
  601   if(ret != VAPI_OK) {
  602     fprintf(stderr, "Error posting recv request: %s\n", VAPI_strerror(ret));
  603     CleanUp(p);
  604     exit(-1);
  605   } else {
  606     LOGPRINTF("Posted recv request\n");
  607   }
  608 
  609   /* Set receive flag to zero and request event completion 
  610    * notification for this receive so the event handler will 
  611    * be triggered when the receive completes.
  612    */
  613   if( p->prot.comptype == NP_COMP_EVENT ) {
  614     receive_complete = 0;
  615   }
  616 }
  617 
  618 void SendData(ArgStruct *p)
  619 {
  620   VAPI_ret_t          ret;       /* Return code */
  621   VAPI_sr_desc_t      sr;        /* Send request */
  622   VAPI_sg_lst_entry_t sg_entry;  /* Scatter/Gather list - holds buff addr */
  623 
  624   /* Fill in send request struct */
  625 
  626   if(p->prot.commtype == NP_COMM_SENDRECV) {
  627      sr.opcode = VAPI_SEND;
  628      LOGPRINTF("Doing regular send\n");
  629   } else if(p->prot.commtype == NP_COMM_SENDRECV_WITH_IMM) {
  630      sr.opcode = VAPI_SEND_WITH_IMM;
  631      LOGPRINTF("Doing regular send with imm\n");
  632   } else if(p->prot.commtype == NP_COMM_RDMAWRITE) {
  633      sr.opcode = VAPI_RDMA_WRITE;
  634      sr.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(remote_address + (p->s_ptr - p->s_buff));
  635      sr.r_key = remote_key;
  636      LOGPRINTF("Doing RDMA write (raddr=%p)\n", sr.remote_addr);
  637   } else if(p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM) {
  638      sr.opcode = VAPI_RDMA_WRITE_WITH_IMM;
  639      sr.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(remote_address + (p->s_ptr - p->s_buff));
  640      sr.r_key = remote_key;
  641      LOGPRINTF("Doing RDMA write with imm (raddr=%p)\n", sr.remote_addr);
  642   } else {
  643      fprintf(stderr, "Error, invalid communication type in SendData\n");
  644      exit(-1);
  645   }
  646   
  647   sr.comp_type = VAPI_UNSIGNALED;
  648   sr.set_se = FALSE; /* This needed due to a bug in Mellanox HW rel a-0 */
  649 
  650   sr.sg_lst_len = 1;
  651   sr.sg_lst_p = &sg_entry;
  652 
  653   sg_entry.lkey = s_mr_out.l_key; /* Local memory region key */
  654   sg_entry.len = p->bufflen;
  655   sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)p->s_ptr;
  656 
  657   ret = VAPI_post_sr(hca_hndl, qp_hndl, &sr);
  658   if(ret != VAPI_OK) {
  659     fprintf(stderr, "Error posting send request: %s\n", VAPI_strerror(ret));
  660   } else {
  661     LOGPRINTF("Posted send request\n");
  662   }
  663 
  664 }
  665 
  666 void RecvData(ArgStruct *p)
  667 {
  668   VAPI_ret_t ret;
  669 
  670   /* Busy wait for incoming data */
  671 
  672   LOGPRINTF("Receiving at buffer address %p\n", p->r_ptr);
  673 
  674   if( p->prot.comptype == NP_COMP_LOCALPOLL ) {
  675        
  676     /* Poll for receive completion locally on the receive data */
  677 
  678     LOGPRINTF("Waiting for last byte of data to arrive\n");
  679      
  680     while(p->r_ptr[p->bufflen-1] != 'a' + (p->cache ? 1 - p->tr : 1) ) 
  681     {
  682        /* BUSY WAIT -- this should be fine since we 
  683         * declared r_ptr with volatile qualifier */ 
  684     }
  685 
  686     /* Reset last byte */
  687     p->r_ptr[p->bufflen-1] = 'a' + (p->cache ? p->tr : 0);
  688 
  689     LOGPRINTF("Received all of data\n");
  690 
  691   } else if( p->prot.comptype == NP_COMP_VAPIPOLL ) {
  692      
  693      /* Poll for receive completion using VAPI poll function */
  694 
  695      LOGPRINTF("Polling completion queue for VAPI work completion\n");
  696      
  697      ret = VAPI_CQ_EMPTY;
  698      while(ret == VAPI_CQ_EMPTY)
  699         ret = VAPI_poll_cq(hca_hndl, r_cq_hndl, &wc);
  700 
  701      if(ret != VAPI_OK) {
  702         fprintf(stderr, "Error in RecvData, polling for completion: %s\n",
  703                 VAPI_strerror(ret));
  704         exit(-1);
  705      }
  706 
  707      if(wc.status != VAPI_SUCCESS) {
  708         fprintf(stderr, "Error in status of returned completion: %s\n",
  709               VAPI_wc_status_sym(wc.status));
  710         exit(-1);
  711      }
  712 
  713      LOGPRINTF("Retrieved successful completion\n");
  714      
  715   } else if( p->prot.comptype == NP_COMP_EVENT ) {
  716 
  717      /* Instead of polling directly on data or VAPI completion queue,
  718       * let the VAPI event completion handler set a flag when the receive
  719       * completes, and poll on that instead. Could try using semaphore here
  720       * as well to eliminate busy polling
  721       */
  722 
  723      LOGPRINTF("Polling receive flag\n");
  724      
  725      while( receive_complete == 0 )
  726      {
  727         /* BUSY WAIT */
  728      }
  729 
  730      /* If in prepost-burst mode, we won't be calling PrepareToReceive
  731       * between ping-pongs, so we need to reset the receive_complete
  732       * flag here.
  733       */
  734      if( p->preburst ) receive_complete = 0;
  735 
  736      LOGPRINTF("Receive completed\n");
  737   }
  738 }
  739 
  740 /* Reset is used after a trial to empty the work request queues so we
  741    have enough room for the next trial to run */
  742 void Reset(ArgStruct *p)
  743 {
  744 
  745   VAPI_ret_t          ret;       /* Return code */
  746   VAPI_sr_desc_t      sr;        /* Send request */
  747   VAPI_rr_desc_t      rr;        /* Recv request */
  748 
  749   /* If comptype is event, then we'll use event handler to detect receive,
  750    * so initialize receive_complete flag
  751    */
  752   if(p->prot.comptype == NP_COMP_EVENT) receive_complete = 0;
  753 
  754   /* Prepost receive */
  755   rr.opcode = VAPI_RECEIVE;
  756   rr.comp_type = VAPI_SIGNALED;
  757   rr.sg_lst_len = 0;
  758 
  759   LOGPRINTF("Posting recv request in Reset\n");
  760   ret = VAPI_post_rr(hca_hndl, qp_hndl, &rr);
  761   if(ret != VAPI_OK) {
  762     fprintf(stderr, "  Error posting recv request: %s\n", VAPI_strerror(ret));
  763     CleanUp(p);
  764     exit(-1);
  765   }
  766 
  767   /* Make sure both nodes have preposted receives */
  768   Sync(p);
  769 
  770   /* Post Send */
  771   sr.opcode = VAPI_SEND;
  772   sr.comp_type = VAPI_SIGNALED;
  773   sr.set_se = FALSE; /* This needed due to a bug in Mellanox HW rel a-0 */
  774   sr.sg_lst_len = 0;
  775 
  776   LOGPRINTF("Posting send request \n");
  777   ret = VAPI_post_sr(hca_hndl, qp_hndl, &sr);
  778   if(ret != VAPI_OK) {
  779     fprintf(stderr, "  Error posting send request in Reset: %s\n", 
  780             VAPI_strerror(ret));
  781     exit(-1);
  782   }
  783   if(wc.status != VAPI_SUCCESS) {
  784      fprintf(stderr, "  Error in completion status: %s\n",
  785              VAPI_wc_status_sym(wc.status));
  786      exit(-1);
  787   }
  788 
  789   LOGPRINTF("Polling for completion of send request\n");
  790   ret = VAPI_CQ_EMPTY;
  791   while(ret == VAPI_CQ_EMPTY)
  792     ret = VAPI_poll_cq(hca_hndl, s_cq_hndl, &wc);
  793 
  794   if(ret != VAPI_OK) {
  795     fprintf(stderr, "Error polling CQ for send in Reset: %s\n", 
  796             VAPI_strerror(ret));
  797     exit(-1);
  798   }
  799   if(wc.status != VAPI_SUCCESS) {
  800      fprintf(stderr, "  Error in completion status: %s\n",
  801              VAPI_wc_status_sym(wc.status));
  802      exit(-1);
  803   }          
  804   
  805   LOGPRINTF("Status of send completion: %s\n", VAPI_wc_status_sym(wc.status));
  806 
  807   if(p->prot.comptype == NP_COMP_EVENT) { 
  808      /* If using event completion, the event handler will set receive_complete
  809       * when it gets the completion event.
  810       */
  811      LOGPRINTF("Waiting for receive_complete flag\n");
  812      while(receive_complete == 0) { /* BUSY WAIT */ }
  813   } else {
  814      LOGPRINTF("Polling for completion of recv request\n");
  815      ret = VAPI_CQ_EMPTY;
  816      while(ret == VAPI_CQ_EMPTY)
  817        ret = VAPI_poll_cq(hca_hndl, r_cq_hndl, &wc);
  818      
  819      if(ret != VAPI_OK) {
  820        fprintf(stderr, "Error polling CQ for recv in Reset: %s\n", 
  821                VAPI_strerror(ret));
  822        exit(-1);
  823      }
  824      if(wc.status != VAPI_SUCCESS) {
  825         fprintf(stderr, "  Error in completion status: %s\n",
  826                 VAPI_wc_status_sym(wc.status));
  827         exit(-1);
  828      }
  829 
  830      LOGPRINTF("Status of recv completion: %s\n", VAPI_wc_status_sym(wc.status));
  831   }
  832   LOGPRINTF("Done with reset\n");
  833 }
  834 
  835 void SendTime(ArgStruct *p, double *t)
  836 {
  837     uint32_t ltime, ntime;
  838 
  839     /*
  840       Multiply the number of seconds by 1e6 to get time in microseconds
  841       and convert value to an unsigned 32-bit integer.
  842       */
  843     ltime = (uint32_t)(*t * 1.e6);
  844 
  845     /* Send time in network order */
  846     ntime = htonl(ltime);
  847     if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0)
  848       {
  849         printf("NetPIPE: write failed in SendTime: errno=%d\n", errno);
  850         exit(301);
  851       }
  852 }
  853 
  854 void RecvTime(ArgStruct *p, double *t)
  855 {
  856     uint32_t ltime, ntime;
  857     int bytesRead;
  858 
  859     bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t));
  860     if (bytesRead < 0)
  861       {
  862         printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno);
  863         exit(302);
  864       }
  865     else if (bytesRead != sizeof(uint32_t))
  866       {
  867         fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n",
  868                 bytesRead);
  869         exit(303);
  870       }
  871     ltime = ntohl(ntime);
  872 
  873     /* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */
  874     *t = (double)ltime / 1.0e6;
  875 }
  876 
  877 void SendRepeat(ArgStruct *p, int rpt)
  878 {
  879   uint32_t lrpt, nrpt;
  880 
  881   lrpt = rpt;
  882   /* Send repeat count as a long in network order */
  883   nrpt = htonl(lrpt);
  884   if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0)
  885     {
  886       printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno);
  887       exit(304);
  888     }
  889 }
  890 
  891 void RecvRepeat(ArgStruct *p, int *rpt)
  892 {
  893   uint32_t lrpt, nrpt;
  894   int bytesRead;
  895 
  896   bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t));
  897   if (bytesRead < 0)
  898     {
  899       printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno);
  900       exit(305);
  901     }
  902   else if (bytesRead != sizeof(uint32_t))
  903     {
  904       fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n",
  905               bytesRead);
  906       exit(306);
  907     }
  908   lrpt = ntohl(nrpt);
  909 
  910   *rpt = lrpt;
  911 }
  912 
  913 void establish(ArgStruct *p)
  914 {
  915  int clen;
  916  int one = 1;
  917  struct protoent;
  918 
  919  clen = sizeof(p->prot.sin2);
  920  if(p->tr){
  921    if(connect(p->commfd, (struct sockaddr *) &(p->prot.sin1),
  922               sizeof(p->prot.sin1)) < 0){
  923      printf("Client: Cannot Connect! errno=%d\n",errno);
  924      exit(-10);
  925    }
  926   }
  927   else {
  928     /* SERVER */
  929     listen(p->servicefd, 5);
  930     p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2),
  931                        &clen);
  932 
  933     if(p->commfd < 0){
  934       printf("Server: Accept Failed! errno=%d\n",errno);
  935       exit(-12);
  936     }
  937   }
  938 }
  939 
  940 void CleanUp(ArgStruct *p)
  941 {
  942    char *quit="QUIT";
  943    if (p->tr)
  944    {
  945       write(p->commfd,quit, 5);
  946       read(p->commfd, quit, 5);
  947       close(p->commfd);
  948    }
  949    else
  950    {
  951       read(p->commfd,quit, 5);
  952       write(p->commfd,quit,5);
  953       close(p->commfd);
  954       close(p->servicefd);
  955    }
  956 
  957    finalizeIB(p);
  958 }
  959 
  960 
  961 void AfterAlignmentInit(ArgStruct *p)
  962 {
  963   int bytesRead;
  964 
  965   /* Exchange buffer pointers and remote infiniband keys if doing rdma. Do
  966    * the exchange in this function because this will happen after any
  967    * memory alignment is done, which is important for getting the 
  968    * correct remote address.
  969   */
  970   if( p->prot.commtype == NP_COMM_RDMAWRITE || 
  971       p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM ) {
  972      
  973      /* Send my receive buffer address
  974       */
  975      if(write(p->commfd, (void *)&p->r_buff, sizeof(void*)) < 0) {
  976         perror("NetPIPE: write of buffer address failed in AfterAlignmentInit");
  977         exit(-1);
  978      }
  979      
  980      LOGPRINTF("Sent buffer address: %p\n", p->r_buff);
  981      
  982      /* Send my remote key for accessing
  983       * my remote buffer via IB RDMA
  984       */
  985      if(write(p->commfd, (void *)&r_mr_out.r_key, sizeof(VAPI_rkey_t)) < 0) {
  986         perror("NetPIPE: write of remote key failed in AfterAlignmentInit");
  987         exit(-1);
  988      }
  989   
  990      LOGPRINTF("Sent remote key: %d\n", r_mr_out.r_key);
  991      
  992      /* Read the sent data
  993       */
  994      bytesRead = readFully(p->commfd, (void *)&remote_address, sizeof(void*));
  995      if (bytesRead < 0) {
  996         perror("NetPIPE: read of buffer address failed in AfterAlignmentInit");
  997         exit(-1);
  998      } else if (bytesRead != sizeof(void*)) {
  999         perror("NetPIPE: partial read of buffer address in AfterAlignmentInit");
 1000         exit(-1);
 1001      }
 1002      
 1003      LOGPRINTF("Received remote address from other node: %p\n", remote_address);
 1004      
 1005      bytesRead = readFully(p->commfd, (void *)&remote_key, sizeof(VAPI_rkey_t));
 1006      if (bytesRead < 0) {
 1007         perror("NetPIPE: read of remote key failed in AfterAlignmentInit");
 1008         exit(-1);
 1009      } else if (bytesRead != sizeof(VAPI_rkey_t)) {
 1010         perror("NetPIPE: partial read of remote key in AfterAlignmentInit");
 1011         exit(-1);
 1012      }
 1013      
 1014      LOGPRINTF("Received remote key from other node: %d\n", remote_key);
 1015 
 1016   }
 1017 }
 1018 
 1019 
 1020 void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
 1021 {
 1022   VAPI_ret_t ret;
 1023 
 1024   /* Allocate buffers */
 1025 
 1026   p->r_buff = malloc(bufflen+MAX(soffset,roffset));
 1027   if(p->r_buff == NULL) {
 1028     fprintf(stderr, "Error malloc'ing buffer\n");
 1029     exit(-1);
 1030   }
 1031 
 1032   if(p->cache) {
 1033 
 1034     /* Infiniband spec says we can register same memory region
 1035      * more than once, so just copy buffer address. We will register
 1036      * the same buffer twice with Infiniband.
 1037      */
 1038     p->s_buff = p->r_buff;
 1039 
 1040   } else {
 1041 
 1042     p->s_buff = malloc(bufflen+soffset);
 1043     if(p->s_buff == NULL) {
 1044       fprintf(stderr, "Error malloc'ing buffer\n");
 1045       exit(-1);
 1046     }
 1047 
 1048   }
 1049 
 1050   /* Register buffers with Infiniband */
 1051 
 1052   mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
 1053   mr_in.l_key = 0;
 1054   mr_in.pd_hndl = pd_hndl;
 1055   mr_in.r_key = 0;
 1056   mr_in.size = bufflen+MAX(soffset,roffset);
 1057   mr_in.start = (VAPI_virt_addr_t)(MT_virt_addr_t)p->r_buff;
 1058   mr_in.type = VAPI_MR;
 1059 
 1060   ret = VAPI_register_mr(hca_hndl, &mr_in, &r_mr_hndl, &r_mr_out);
 1061   if(ret != VAPI_OK)
 1062         {
 1063     fprintf(stderr, "Error registering recv buffer: %s\n", VAPI_strerror(ret));
 1064     exit(-1);
 1065         }
 1066         else
 1067         {
 1068          LOGPRINTF("Registered Recv Buffer\n");
 1069         }
 1070 
 1071   mr_in.acl = VAPI_EN_LOCAL_WRITE;
 1072   mr_in.l_key = 0;
 1073   mr_in.pd_hndl = pd_hndl;
 1074   mr_in.r_key = 0;
 1075   mr_in.size = bufflen+soffset;
 1076   mr_in.start = (VAPI_virt_addr_t)(MT_virt_addr_t)p->s_buff;
 1077   mr_in.type = VAPI_MR;
 1078 
 1079   ret = VAPI_register_mr(hca_hndl, &mr_in, &s_mr_hndl, &s_mr_out);
 1080   if(ret != VAPI_OK) {
 1081     fprintf(stderr, "Error registering send buffer: %s\n", VAPI_strerror(ret));
 1082     exit(-1);
 1083   } else {
 1084     LOGPRINTF("Registered Send Buffer\n");
 1085   }
 1086 
 1087 }
 1088 void FreeBuff(char *buff1, char *buff2)
 1089 {
 1090   VAPI_ret_t ret;
 1091 
 1092   if(s_mr_hndl != VAPI_INVAL_HNDL) {
 1093     LOGPRINTF("Deregistering send buffer\n");
 1094     ret = VAPI_deregister_mr(hca_hndl, s_mr_hndl);
 1095     if(ret != VAPI_OK) {
 1096       fprintf(stderr, "Error deregistering send mr: %s\n", VAPI_strerror(ret));
 1097     } else {
 1098       s_mr_hndl = VAPI_INVAL_HNDL;
 1099     }
 1100   }
 1101 
 1102   if(r_mr_hndl != VAPI_INVAL_HNDL) {
 1103     LOGPRINTF("Deregistering recv buffer\n");
 1104     ret = VAPI_deregister_mr(hca_hndl, r_mr_hndl);
 1105     if(ret != VAPI_OK) {
 1106       fprintf(stderr, "Error deregistering recv mr: %s\n", VAPI_strerror(ret));
 1107     } else {
 1108       r_mr_hndl = VAPI_INVAL_HNDL;
 1109     }
 1110   }
 1111 
 1112   if(buff1 != NULL)
 1113     free(buff1);
 1114 
 1115   if(buff2 != NULL)
 1116     free(buff2);
 1117 }
 1118