"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/rpc/rpc-lib/src/rpc-clnt.c" (16 Sep 2020, 55251 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "rpc-clnt.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
    3   This file is part of GlusterFS.
    4 
    5   This file is licensed to you under your choice of the GNU Lesser
    6   General Public License, version 3 or any later version (LGPLv3 or
    7   later), or the GNU General Public License, version 2 (GPLv2), in all
    8   cases as published by the Free Software Foundation.
    9 */
   10 
   11 #define RPC_CLNT_DEFAULT_REQUEST_COUNT 512
   12 
   13 #include "rpc-clnt.h"
   14 #include "rpc-clnt-ping.h"
   15 #include <glusterfs/byte-order.h>
   16 #include "xdr-rpcclnt.h"
   17 #include "rpc-transport.h"
   18 #include "protocol-common.h"
   19 #include <glusterfs/mem-pool.h>
   20 #include "xdr-rpc.h"
   21 #include "rpc-common-xdr.h"
   22 
   23 void
   24 rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool);
   25 
   26 struct saved_frame *
   27 __saved_frames_get_timedout(struct saved_frames *frames, uint32_t timeout,
   28                             struct timeval *current)
   29 {
   30     struct saved_frame *bailout_frame = NULL, *tmp = NULL;
   31 
   32     if (!list_empty(&frames->sf.list)) {
   33         tmp = list_entry(frames->sf.list.next, typeof(*tmp), list);
   34         if ((tmp->saved_at.tv_sec + timeout) <= current->tv_sec) {
   35             bailout_frame = tmp;
   36             list_del_init(&bailout_frame->list);
   37             frames->count--;
   38         }
   39     }
   40 
   41     return bailout_frame;
   42 }
   43 
   44 static int
   45 _is_lock_fop(struct saved_frame *sframe)
   46 {
   47     int fop = 0;
   48 
   49     if (SFRAME_GET_PROGNUM(sframe) == GLUSTER_FOP_PROGRAM &&
   50         SFRAME_GET_PROGVER(sframe) == GLUSTER_FOP_VERSION)
   51         fop = SFRAME_GET_PROCNUM(sframe);
   52 
   53     return ((fop == GFS3_OP_LK) || (fop == GFS3_OP_INODELK) ||
   54             (fop == GFS3_OP_FINODELK) || (fop == GFS3_OP_ENTRYLK) ||
   55             (fop == GFS3_OP_FENTRYLK));
   56 }
   57 
   58 static struct saved_frame *
   59 __saved_frames_put(struct saved_frames *frames, void *frame,
   60                    struct rpc_req *rpcreq)
   61 {
   62     struct saved_frame *saved_frame = mem_get(
   63         rpcreq->conn->rpc_clnt->saved_frames_pool);
   64 
   65     if (!saved_frame) {
   66         goto out;
   67     }
   68     /* THIS should be saved and set back */
   69 
   70     INIT_LIST_HEAD(&saved_frame->list);
   71 
   72     saved_frame->capital_this = THIS;
   73     saved_frame->frame = frame;
   74     saved_frame->rpcreq = rpcreq;
   75     gettimeofday(&saved_frame->saved_at, NULL);
   76     memset(&saved_frame->rsp, 0, sizeof(rpc_transport_rsp_t));
   77 
   78     if (_is_lock_fop(saved_frame))
   79         list_add_tail(&saved_frame->list, &frames->lk_sf.list);
   80     else
   81         list_add_tail(&saved_frame->list, &frames->sf.list);
   82 
   83     frames->count++;
   84 
   85 out:
   86     return saved_frame;
   87 }
   88 
   89 static void
   90 call_bail(void *data)
   91 {
   92     rpc_transport_t *trans = NULL;
   93     struct rpc_clnt *clnt = NULL;
   94     rpc_clnt_connection_t *conn = NULL;
   95     struct timeval current;
   96     struct list_head list;
   97     struct saved_frame *saved_frame = NULL;
   98     struct saved_frame *trav = NULL;
   99     struct saved_frame *tmp = NULL;
  100     char frame_sent[256] = {
  101         0,
  102     };
  103     struct timespec timeout = {
  104         0,
  105     };
  106     char peerid[UNIX_PATH_MAX] = {0};
  107     gf_boolean_t need_unref = _gf_false;
  108     int len;
  109 
  110     GF_VALIDATE_OR_GOTO("client", data, out);
  111 
  112     clnt = data;
  113 
  114     conn = &clnt->conn;
  115     pthread_mutex_lock(&conn->lock);
  116     {
  117         trans = conn->trans;
  118         if (trans) {
  119             (void)snprintf(peerid, sizeof(peerid), "%s",
  120                            conn->trans->peerinfo.identifier);
  121         }
  122     }
  123     pthread_mutex_unlock(&conn->lock);
  124     /*rpc_clnt_connection_cleanup will be unwinding all saved frames,
  125      * bailed or otherwise*/
  126     if (!trans)
  127         goto out;
  128 
  129     gettimeofday(&current, NULL);
  130     INIT_LIST_HEAD(&list);
  131 
  132     pthread_mutex_lock(&conn->lock);
  133     {
  134         /* Chaining to get call-always functionality from
  135            call-once timer */
  136         if (conn->timer) {
  137             timeout.tv_sec = 10;
  138             timeout.tv_nsec = 0;
  139 
  140             /* Ref rpc as it's added to timer event queue */
  141             rpc_clnt_ref(clnt);
  142             gf_timer_call_cancel(clnt->ctx, conn->timer);
  143             conn->timer = gf_timer_call_after(clnt->ctx, timeout, call_bail,
  144                                               (void *)clnt);
  145 
  146             if (conn->timer == NULL) {
  147                 gf_log(conn->name, GF_LOG_WARNING,
  148                        "Cannot create bailout timer for %s", peerid);
  149                 need_unref = _gf_true;
  150             }
  151         }
  152 
  153         do {
  154             saved_frame = __saved_frames_get_timedout(
  155                 conn->saved_frames, conn->frame_timeout, &current);
  156             if (saved_frame)
  157                 list_add(&saved_frame->list, &list);
  158 
  159         } while (saved_frame);
  160     }
  161     pthread_mutex_unlock(&conn->lock);
  162 
  163     if (list_empty(&list))
  164         goto out;
  165 
  166     list_for_each_entry_safe(trav, tmp, &list, list)
  167     {
  168         gf_time_fmt(frame_sent, sizeof frame_sent, trav->saved_at.tv_sec,
  169                     gf_timefmt_FT);
  170         len = strlen(frame_sent);
  171         snprintf(frame_sent + len, sizeof(frame_sent) - len,
  172                  ".%" GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
  173 
  174         gf_log(conn->name, GF_LOG_ERROR,
  175                "bailing out frame type(%s), op(%s(%d)), xid = 0x%x, "
  176                "unique = %" PRIu64 ", sent = %s, timeout = %d for %s",
  177                trav->rpcreq->prog->progname,
  178                (trav->rpcreq->prog->procnames)
  179                    ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
  180                    : "--",
  181                trav->rpcreq->procnum, trav->rpcreq->xid,
  182                ((call_frame_t *)(trav->frame))->root->unique, frame_sent,
  183                conn->frame_timeout, peerid);
  184 
  185         clnt = rpc_clnt_ref(clnt);
  186         trav->rpcreq->rpc_status = -1;
  187         trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame);
  188 
  189         rpc_clnt_reply_deinit(trav->rpcreq, clnt->reqpool);
  190         clnt = rpc_clnt_unref(clnt);
  191         list_del_init(&trav->list);
  192         mem_put(trav);
  193     }
  194 out:
  195     rpc_clnt_unref(clnt);
  196     if (need_unref)
  197         rpc_clnt_unref(clnt);
  198     return;
  199 }
  200 
  201 /* to be called with conn->lock held */
  202 static struct saved_frame *
  203 __save_frame(struct rpc_clnt *rpc_clnt, call_frame_t *frame,
  204              struct rpc_req *rpcreq)
  205 {
  206     rpc_clnt_connection_t *conn = &rpc_clnt->conn;
  207     struct timespec timeout = {
  208         0,
  209     };
  210     struct saved_frame *saved_frame = __saved_frames_put(conn->saved_frames,
  211                                                          frame, rpcreq);
  212 
  213     if (saved_frame == NULL) {
  214         goto out;
  215     }
  216 
  217     /* TODO: make timeout configurable */
  218     if (conn->timer == NULL) {
  219         timeout.tv_sec = 10;
  220         timeout.tv_nsec = 0;
  221         rpc_clnt_ref(rpc_clnt);
  222         conn->timer = gf_timer_call_after(rpc_clnt->ctx, timeout, call_bail,
  223                                           (void *)rpc_clnt);
  224     }
  225 
  226 out:
  227     return saved_frame;
  228 }
  229 
  230 struct saved_frames *
  231 saved_frames_new(void)
  232 {
  233     struct saved_frames *saved_frames = NULL;
  234 
  235     saved_frames = GF_CALLOC(1, sizeof(*saved_frames),
  236                              gf_common_mt_rpcclnt_savedframe_t);
  237     if (!saved_frames) {
  238         return NULL;
  239     }
  240 
  241     INIT_LIST_HEAD(&saved_frames->sf.list);
  242     INIT_LIST_HEAD(&saved_frames->lk_sf.list);
  243 
  244     return saved_frames;
  245 }
  246 
  247 int
  248 __saved_frame_copy(struct saved_frames *frames, int64_t callid,
  249                    struct saved_frame *saved_frame)
  250 {
  251     struct saved_frame *tmp = NULL;
  252     int ret = -1;
  253 
  254     if (!saved_frame) {
  255         ret = 0;
  256         goto out;
  257     }
  258 
  259     list_for_each_entry(tmp, &frames->sf.list, list)
  260     {
  261         if (tmp->rpcreq->xid == callid) {
  262             *saved_frame = *tmp;
  263             ret = 0;
  264             goto out;
  265         }
  266     }
  267 
  268     list_for_each_entry(tmp, &frames->lk_sf.list, list)
  269     {
  270         if (tmp->rpcreq->xid == callid) {
  271             *saved_frame = *tmp;
  272             ret = 0;
  273             goto out;
  274         }
  275     }
  276 
  277 out:
  278     return ret;
  279 }
  280 
  281 struct saved_frame *
  282 __saved_frame_get(struct saved_frames *frames, int64_t callid)
  283 {
  284     struct saved_frame *saved_frame = NULL;
  285     struct saved_frame *tmp = NULL;
  286 
  287     list_for_each_entry(tmp, &frames->sf.list, list)
  288     {
  289         if (tmp->rpcreq->xid == callid) {
  290             list_del_init(&tmp->list);
  291             frames->count--;
  292             saved_frame = tmp;
  293             goto out;
  294         }
  295     }
  296 
  297     list_for_each_entry(tmp, &frames->lk_sf.list, list)
  298     {
  299         if (tmp->rpcreq->xid == callid) {
  300             list_del_init(&tmp->list);
  301             frames->count--;
  302             saved_frame = tmp;
  303             goto out;
  304         }
  305     }
  306 
  307 out:
  308     if (saved_frame) {
  309         THIS = saved_frame->capital_this;
  310     }
  311 
  312     return saved_frame;
  313 }
  314 
  315 void
  316 saved_frames_unwind(struct saved_frames *saved_frames)
  317 {
  318     struct saved_frame *trav = NULL;
  319     struct saved_frame *tmp = NULL;
  320     char timestr[1024] = {
  321         0,
  322     };
  323     int len;
  324 
  325     list_splice_init(&saved_frames->lk_sf.list, &saved_frames->sf.list);
  326 
  327     list_for_each_entry_safe(trav, tmp, &saved_frames->sf.list, list)
  328     {
  329         gf_time_fmt(timestr, sizeof timestr, trav->saved_at.tv_sec,
  330                     gf_timefmt_FT);
  331         len = strlen(timestr);
  332         snprintf(timestr + len, sizeof(timestr) - len, ".%" GF_PRI_SUSECONDS,
  333                  trav->saved_at.tv_usec);
  334 
  335         if (!trav->rpcreq || !trav->rpcreq->prog)
  336             continue;
  337 
  338         gf_log_callingfn(
  339             trav->rpcreq->conn->name, GF_LOG_ERROR,
  340             "forced unwinding frame type(%s) op(%s(%d)) "
  341             "called at %s (xid=0x%x)",
  342             trav->rpcreq->prog->progname,
  343             ((trav->rpcreq->prog->procnames)
  344                  ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
  345                  : "--"),
  346             trav->rpcreq->procnum, timestr, trav->rpcreq->xid);
  347         saved_frames->count--;
  348 
  349         trav->rpcreq->rpc_status = -1;
  350         trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame);
  351 
  352         rpc_clnt_reply_deinit(trav->rpcreq,
  353                               trav->rpcreq->conn->rpc_clnt->reqpool);
  354 
  355         list_del_init(&trav->list);
  356         mem_put(trav);
  357     }
  358 }
  359 
  360 void
  361 saved_frames_destroy(struct saved_frames *frames)
  362 {
  363     if (!frames)
  364         return;
  365 
  366     saved_frames_unwind(frames);
  367 
  368     GF_FREE(frames);
  369 }
  370 
  371 void
  372 rpc_clnt_reconnect(void *conn_ptr)
  373 {
  374     rpc_transport_t *trans = NULL;
  375     rpc_clnt_connection_t *conn = NULL;
  376     struct timespec ts = {0, 0};
  377     struct rpc_clnt *clnt = NULL;
  378     gf_boolean_t need_unref = _gf_false;
  379     gf_boolean_t canceled_unref = _gf_false;
  380 
  381     conn = conn_ptr;
  382     clnt = conn->rpc_clnt;
  383     pthread_mutex_lock(&conn->lock);
  384     {
  385         trans = conn->trans;
  386         if (!trans)
  387             goto out_unlock;
  388 
  389         if (conn->reconnect) {
  390             if (!gf_timer_call_cancel(clnt->ctx, conn->reconnect))
  391                 canceled_unref = _gf_true;
  392         }
  393         conn->reconnect = 0;
  394 
  395         if ((conn->connected == 0) && !clnt->disabled) {
  396             ts.tv_sec = 3;
  397             ts.tv_nsec = 0;
  398 
  399             gf_log(conn->name, GF_LOG_TRACE, "attempting reconnect");
  400             (void)rpc_transport_connect(trans, conn->config.remote_port);
  401             rpc_clnt_ref(clnt);
  402             conn->reconnect = gf_timer_call_after(clnt->ctx, ts,
  403                                                   rpc_clnt_reconnect, conn);
  404             if (!conn->reconnect) {
  405                 need_unref = _gf_true;
  406                 gf_log(conn->name, GF_LOG_ERROR,
  407                        "Error adding to timer event queue");
  408             }
  409         } else {
  410             gf_log(conn->name, GF_LOG_TRACE, "breaking reconnect chain");
  411         }
  412     }
  413 out_unlock:
  414     pthread_mutex_unlock(&conn->lock);
  415 
  416     rpc_clnt_unref(clnt);
  417     if (need_unref)
  418         rpc_clnt_unref(clnt);
  419     if (canceled_unref)
  420         rpc_clnt_unref(clnt);
  421     return;
  422 }
  423 
  424 int
  425 rpc_clnt_fill_request_info(struct rpc_clnt *clnt, rpc_request_info_t *info)
  426 {
  427     struct saved_frame saved_frame;
  428     int ret = -1;
  429 
  430     pthread_mutex_lock(&clnt->conn.lock);
  431     {
  432         ret = __saved_frame_copy(clnt->conn.saved_frames, info->xid,
  433                                  &saved_frame);
  434     }
  435     pthread_mutex_unlock(&clnt->conn.lock);
  436 
  437     if (ret == -1) {
  438         gf_log(clnt->conn.name, GF_LOG_CRITICAL,
  439                "cannot lookup the saved "
  440                "frame corresponding to xid (%d)",
  441                info->xid);
  442         goto out;
  443     }
  444 
  445     info->prognum = saved_frame.rpcreq->prog->prognum;
  446     info->procnum = saved_frame.rpcreq->procnum;
  447     info->progver = saved_frame.rpcreq->prog->progver;
  448     info->rpc_req = saved_frame.rpcreq;
  449     info->rsp = saved_frame.rsp;
  450 
  451     ret = 0;
  452 out:
  453     return ret;
  454 }
  455 
  456 int
  457 rpc_clnt_reconnect_cleanup(rpc_clnt_connection_t *conn)
  458 {
  459     struct rpc_clnt *clnt = NULL;
  460     int ret = 0;
  461     gf_boolean_t reconnect_unref = _gf_false;
  462 
  463     if (!conn) {
  464         goto out;
  465     }
  466 
  467     clnt = conn->rpc_clnt;
  468 
  469     pthread_mutex_lock(&conn->lock);
  470     {
  471         if (conn->reconnect) {
  472             ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect);
  473             if (!ret) {
  474                 reconnect_unref = _gf_true;
  475                 conn->cleanup_gen++;
  476             }
  477             conn->reconnect = NULL;
  478         }
  479     }
  480     pthread_mutex_unlock(&conn->lock);
  481 
  482     if (reconnect_unref)
  483         rpc_clnt_unref(clnt);
  484 
  485 out:
  486     return 0;
  487 }
  488 
  489 /*
  490  * client_protocol_cleanup - cleanup function
  491  * @trans: transport object
  492  *
  493  */
  494 int
  495 rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn)
  496 {
  497     struct saved_frames *saved_frames = NULL;
  498     struct rpc_clnt *clnt = NULL;
  499     int unref = 0;
  500     int ret = 0;
  501     gf_boolean_t timer_unref = _gf_false;
  502     gf_boolean_t reconnect_unref = _gf_false;
  503 
  504     if (!conn) {
  505         goto out;
  506     }
  507 
  508     clnt = conn->rpc_clnt;
  509 
  510     pthread_mutex_lock(&conn->lock);
  511     {
  512         saved_frames = conn->saved_frames;
  513         conn->saved_frames = saved_frames_new();
  514 
  515         /* bailout logic cleanup */
  516         if (conn->timer) {
  517             ret = gf_timer_call_cancel(clnt->ctx, conn->timer);
  518             if (!ret)
  519                 timer_unref = _gf_true;
  520             conn->timer = NULL;
  521         }
  522         if (conn->reconnect) {
  523             ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect);
  524             if (!ret)
  525                 reconnect_unref = _gf_true;
  526             conn->reconnect = NULL;
  527         }
  528 
  529         conn->connected = 0;
  530         conn->disconnected = 1;
  531 
  532         unref = rpc_clnt_remove_ping_timer_locked(clnt);
  533         /*reset rpc msgs stats*/
  534         conn->pingcnt = 0;
  535         conn->msgcnt = 0;
  536         conn->cleanup_gen++;
  537     }
  538     pthread_mutex_unlock(&conn->lock);
  539 
  540     saved_frames_destroy(saved_frames);
  541     if (unref)
  542         rpc_clnt_unref(clnt);
  543 
  544     if (timer_unref)
  545         rpc_clnt_unref(clnt);
  546 
  547     if (reconnect_unref)
  548         rpc_clnt_unref(clnt);
  549 out:
  550     return 0;
  551 }
  552 
  553 /*
  554  * lookup_frame - lookup call frame corresponding to a given callid
  555  * @trans: transport object
  556  * @callid: call id of the frame
  557  *
  558  * not for external reference
  559  */
  560 
  561 static struct saved_frame *
  562 lookup_frame(rpc_clnt_connection_t *conn, int64_t callid)
  563 {
  564     struct saved_frame *frame = NULL;
  565 
  566     pthread_mutex_lock(&conn->lock);
  567     {
  568         frame = __saved_frame_get(conn->saved_frames, callid);
  569     }
  570     pthread_mutex_unlock(&conn->lock);
  571 
  572     return frame;
  573 }
  574 
  575 int
  576 rpc_clnt_reply_fill(rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn,
  577                     struct rpc_msg *replymsg, struct iovec progmsg,
  578                     struct rpc_req *req, struct saved_frame *saved_frame)
  579 {
  580     int ret = -1;
  581 
  582     if ((!conn) || (!replymsg) || (!req) || (!saved_frame) || (!msg)) {
  583         goto out;
  584     }
  585 
  586     req->rpc_status = 0;
  587     if ((rpc_reply_status(replymsg) == MSG_DENIED) ||
  588         (rpc_accepted_reply_status(replymsg) != SUCCESS)) {
  589         req->rpc_status = -1;
  590     }
  591 
  592     req->rsp[0] = progmsg;
  593     req->rsp_iobref = iobref_ref(msg->iobref);
  594 
  595     if (msg->vectored) {
  596         req->rsp[1] = msg->vector[1];
  597         req->rspcnt = 2;
  598     } else {
  599         req->rspcnt = 1;
  600     }
  601 
  602     /* By this time, the data bytes for the auth scheme would have already
  603      * been copied into the required sections of the req structure,
  604      * we just need to fill in the meta-data about it now.
  605      */
  606     if (req->rpc_status == 0) {
  607         /*
  608          * req->verf.flavour = rpc_reply_verf_flavour (replymsg);
  609          * req->verf.datalen = rpc_reply_verf_len (replymsg);
  610          */
  611     }
  612 
  613     ret = 0;
  614 
  615 out:
  616     return ret;
  617 }
  618 
  619 void
  620 rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool)
  621 {
  622     if (!req) {
  623         goto out;
  624     }
  625 
  626     if (req->rsp_iobref) {
  627         iobref_unref(req->rsp_iobref);
  628     }
  629 
  630     mem_put(req);
  631 out:
  632     return;
  633 }
  634 
  635 /* TODO: use mem-pool for allocating requests */
  636 int
  637 rpc_clnt_reply_init(rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
  638                     struct rpc_req *req, struct saved_frame *saved_frame)
  639 {
  640     char *msgbuf = NULL;
  641     struct rpc_msg rpcmsg;
  642     struct iovec progmsg; /* RPC Program payload */
  643     size_t msglen = 0;
  644     int ret = -1;
  645 
  646     msgbuf = msg->vector[0].iov_base;
  647     msglen = msg->vector[0].iov_len;
  648 
  649     ret = xdr_to_rpc_reply(msgbuf, msglen, &rpcmsg, &progmsg,
  650                            req->verf.authdata);
  651     if (ret != 0) {
  652         gf_log(conn->name, GF_LOG_WARNING, "RPC reply decoding failed");
  653         goto out;
  654     }
  655 
  656     ret = rpc_clnt_reply_fill(msg, conn, &rpcmsg, progmsg, req, saved_frame);
  657     if (ret != 0) {
  658         goto out;
  659     }
  660 
  661     gf_log(conn->name, GF_LOG_TRACE,
  662            "received rpc message (RPC XID: 0x%x"
  663            " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
  664            saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname,
  665            saved_frame->rpcreq->prog->progver, saved_frame->rpcreq->procnum,
  666            conn->name);
  667 
  668 out:
  669     if (ret != 0) {
  670         req->rpc_status = -1;
  671     }
  672 
  673     return ret;
  674 }
  675 
  676 int
  677 rpc_clnt_handle_cbk(struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
  678 {
  679     char *msgbuf = NULL;
  680     rpcclnt_cb_program_t *program = NULL;
  681     struct rpc_msg rpcmsg;
  682     struct iovec progmsg; /* RPC Program payload */
  683     size_t msglen = 0;
  684     int found = 0;
  685     int ret = -1;
  686     int procnum = 0;
  687 
  688     msgbuf = msg->vector[0].iov_base;
  689     msglen = msg->vector[0].iov_len;
  690 
  691     clnt = rpc_clnt_ref(clnt);
  692     ret = xdr_to_rpc_call(msgbuf, msglen, &rpcmsg, &progmsg, NULL, NULL);
  693     if (ret == -1) {
  694         gf_log(clnt->conn.name, GF_LOG_WARNING, "RPC call decoding failed");
  695         goto out;
  696     }
  697 
  698     gf_log(clnt->conn.name, GF_LOG_TRACE,
  699            "receivd rpc message (XID: 0x%" GF_PRI_RPC_XID
  700            ", "
  701            "Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID
  702            ", "
  703            "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC
  704            ") "
  705            "from rpc-transport (%s)",
  706            rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg),
  707            rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg),
  708            rpc_call_progproc(&rpcmsg), clnt->conn.name);
  709 
  710     procnum = rpc_call_progproc(&rpcmsg);
  711 
  712     pthread_mutex_lock(&clnt->lock);
  713     {
  714         list_for_each_entry(program, &clnt->programs, program)
  715         {
  716             if ((program->prognum == rpc_call_program(&rpcmsg)) &&
  717                 (program->progver == rpc_call_progver(&rpcmsg))) {
  718                 found = 1;
  719                 break;
  720             }
  721         }
  722     }
  723     pthread_mutex_unlock(&clnt->lock);
  724 
  725     if (found && (procnum < program->numactors) &&
  726         (program->actors[procnum].actor)) {
  727         program->actors[procnum].actor(clnt, program->mydata, &progmsg);
  728     }
  729 
  730 out:
  731     rpc_clnt_unref(clnt);
  732     return ret;
  733 }
  734 
  735 int
  736 rpc_clnt_handle_reply(struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
  737 {
  738     rpc_clnt_connection_t *conn = NULL;
  739     struct saved_frame *saved_frame = NULL;
  740     int ret = -1;
  741     struct rpc_req *req = NULL;
  742     uint32_t xid = 0;
  743 
  744     clnt = rpc_clnt_ref(clnt);
  745     conn = &clnt->conn;
  746 
  747     xid = ntoh32(*((uint32_t *)pollin->vector[0].iov_base));
  748     saved_frame = lookup_frame(conn, xid);
  749     if (saved_frame == NULL) {
  750         gf_log(conn->name, GF_LOG_ERROR,
  751                "cannot lookup the saved frame for reply with xid (%u)", xid);
  752         goto out;
  753     }
  754 
  755     req = saved_frame->rpcreq;
  756     if (req == NULL) {
  757         gf_log(conn->name, GF_LOG_ERROR, "no request with frame for xid (%u)",
  758                xid);
  759         goto out;
  760     }
  761 
  762     ret = rpc_clnt_reply_init(conn, pollin, req, saved_frame);
  763     if (ret != 0) {
  764         req->rpc_status = -1;
  765         gf_log(conn->name, GF_LOG_WARNING, "initialising rpc reply failed");
  766     }
  767 
  768     req->cbkfn(req, req->rsp, req->rspcnt, saved_frame->frame);
  769 
  770     if (req) {
  771         rpc_clnt_reply_deinit(req, conn->rpc_clnt->reqpool);
  772     }
  773 out:
  774 
  775     if (saved_frame) {
  776         mem_put(saved_frame);
  777     }
  778 
  779     rpc_clnt_unref(clnt);
  780     return ret;
  781 }
  782 
  783 gf_boolean_t
  784 is_rpc_clnt_disconnected(rpc_clnt_connection_t *conn)
  785 {
  786     gf_boolean_t disconnected = _gf_true;
  787 
  788     if (!conn)
  789         return disconnected;
  790 
  791     pthread_mutex_lock(&conn->lock);
  792     {
  793         disconnected = conn->disconnected;
  794     }
  795     pthread_mutex_unlock(&conn->lock);
  796 
  797     return disconnected;
  798 }
  799 
  800 static void
  801 rpc_clnt_destroy(struct rpc_clnt *rpc);
  802 
  803 #define RPC_THIS_SAVE(xl)                                                      \
  804     do {                                                                       \
  805         old_THIS = THIS;                                                       \
  806         if (!old_THIS)                                                         \
  807             gf_log_callingfn("rpc", GF_LOG_CRITICAL,                           \
  808                              "THIS is not initialised.");                      \
  809         THIS = xl;                                                             \
  810     } while (0)
  811 
  812 #define RPC_THIS_RESTORE (THIS = old_THIS)
  813 
  814 static int
  815 rpc_clnt_handle_disconnect(struct rpc_clnt *clnt, rpc_clnt_connection_t *conn)
  816 {
  817     struct timespec ts = {
  818         0,
  819     };
  820     gf_boolean_t unref_clnt = _gf_false;
  821     uint64_t pre_notify_gen = 0, post_notify_gen = 0;
  822 
  823     pthread_mutex_lock(&conn->lock);
  824     {
  825         pre_notify_gen = conn->cleanup_gen;
  826     }
  827     pthread_mutex_unlock(&conn->lock);
  828 
  829     if (clnt->notifyfn)
  830         clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);
  831 
  832     pthread_mutex_lock(&conn->lock);
  833     {
  834         post_notify_gen = conn->cleanup_gen;
  835     }
  836     pthread_mutex_unlock(&conn->lock);
  837 
  838     if (pre_notify_gen == post_notify_gen) {
  839         /* program didn't invoke cleanup, so rpc has to do it */
  840         rpc_clnt_connection_cleanup(conn);
  841     }
  842 
  843     pthread_mutex_lock(&conn->lock);
  844     {
  845         if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) {
  846             ts.tv_sec = 3;
  847             ts.tv_nsec = 0;
  848 
  849             rpc_clnt_ref(clnt);
  850             conn->reconnect = gf_timer_call_after(clnt->ctx, ts,
  851                                                   rpc_clnt_reconnect, conn);
  852             if (conn->reconnect == NULL) {
  853                 gf_log(conn->name, GF_LOG_WARNING,
  854                        "Cannot create rpc_clnt_reconnect timer");
  855                 unref_clnt = _gf_true;
  856             }
  857         }
  858     }
  859     pthread_mutex_unlock(&conn->lock);
  860 
  861     if (unref_clnt)
  862         rpc_clnt_unref(clnt);
  863 
  864     return 0;
  865 }
  866 
  867 int
  868 rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
  869                 rpc_transport_event_t event, void *data, ...)
  870 {
  871     rpc_clnt_connection_t *conn = NULL;
  872     struct rpc_clnt *clnt = NULL;
  873     int ret = -1;
  874     rpc_request_info_t *req_info = NULL;
  875     rpc_transport_pollin_t *pollin = NULL;
  876     void *clnt_mydata = NULL;
  877     DECLARE_OLD_THIS;
  878 
  879     conn = mydata;
  880     if (conn == NULL) {
  881         goto out;
  882     }
  883     clnt = conn->rpc_clnt;
  884     if (!clnt)
  885         goto out;
  886 
  887     RPC_THIS_SAVE(clnt->owner);
  888 
  889     switch (event) {
  890         case RPC_TRANSPORT_DISCONNECT: {
  891             rpc_clnt_handle_disconnect(clnt, conn);
  892             /* The auth_value was being reset to AUTH_GLUSTERFS_v2.
  893              *    if (clnt->auth_value)
  894              *           clnt->auth_value = AUTH_GLUSTERFS_v2;
  895              * It should not be reset here. The disconnect during
  896              * portmap request can race with handshake. If handshake
  897              * happens first and disconnect later, auth_value would set
  898              * to default value and it never sets back to actual auth_value
  899              * supported by server. But it's important to set to lower
  900              * version supported in the case where the server downgrades.
  901              * So moving this code to RPC_TRANSPORT_CONNECT. Note that
  902              * CONNECT cannot race with handshake as by nature it is
  903              * serialized with handhake. An handshake can happen only
  904              * on a connected transport and hence its strictly serialized.
  905              */
  906             break;
  907         }
  908 
  909         case RPC_TRANSPORT_CLEANUP:
  910             if (clnt->notifyfn) {
  911                 clnt_mydata = clnt->mydata;
  912                 clnt->mydata = NULL;
  913                 ret = clnt->notifyfn(clnt, clnt_mydata, RPC_CLNT_DESTROY, NULL);
  914                 if (ret < 0) {
  915                     gf_log(trans->name, GF_LOG_WARNING,
  916                            "client notify handler returned error "
  917                            "while handling RPC_CLNT_DESTROY");
  918                 }
  919             }
  920             rpc_clnt_destroy(clnt);
  921             ret = 0;
  922             break;
  923 
  924         case RPC_TRANSPORT_MAP_XID_REQUEST: {
  925             req_info = data;
  926             ret = rpc_clnt_fill_request_info(clnt, req_info);
  927             break;
  928         }
  929 
  930         case RPC_TRANSPORT_MSG_RECEIVED: {
  931             clock_gettime(CLOCK_REALTIME, &conn->last_received);
  932 
  933             pollin = data;
  934             if (pollin->is_reply)
  935                 ret = rpc_clnt_handle_reply(clnt, pollin);
  936             else
  937                 ret = rpc_clnt_handle_cbk(clnt, pollin);
  938             /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,
  939              * data);
  940              */
  941             break;
  942         }
  943 
  944         case RPC_TRANSPORT_MSG_SENT: {
  945             clock_gettime(CLOCK_REALTIME, &conn->last_sent);
  946 
  947             ret = 0;
  948             break;
  949         }
  950 
  951         case RPC_TRANSPORT_CONNECT: {
  952             pthread_mutex_lock(&conn->lock);
  953             {
  954                 /* Every time there is a disconnection, processes
  955                  * should try to connect to 'glusterd' (ie, default
  956                  * port) or whichever port given as 'option remote-port'
  957                  * in volume file. */
  958                 /* Below code makes sure the (re-)configured port lasts
  959                  * for just one successful attempt */
  960                 conn->config.remote_port = 0;
  961                 conn->connected = 1;
  962                 conn->disconnected = 0;
  963             }
  964             pthread_mutex_unlock(&conn->lock);
  965 
  966             /* auth value should be set to lower version available
  967              * and will be set to appropriate version supported by
  968              * server after the handshake.
  969              */
  970             if (clnt->auth_value)
  971                 clnt->auth_value = AUTH_GLUSTERFS_v2;
  972             if (clnt->notifyfn)
  973                 ret = clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_CONNECT,
  974                                      NULL);
  975 
  976             break;
  977         }
  978 
  979         case RPC_TRANSPORT_ACCEPT:
  980             /* only meaningful on a server, no need of handling this event
  981              * in a client.
  982              */
  983             ret = 0;
  984             break;
  985 
  986         case RPC_TRANSPORT_EVENT_THREAD_DIED:
  987             /* only meaningful on a server, no need of handling this event on a
  988              * client */
  989             ret = 0;
  990             break;
  991     }
  992 
  993 out:
  994     RPC_THIS_RESTORE;
  995     return ret;
  996 }
  997 
  998 static int
  999 rpc_clnt_connection_init(struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
 1000                          dict_t *options, char *name)
 1001 {
 1002     int ret = -1;
 1003     rpc_clnt_connection_t *conn = NULL;
 1004     rpc_transport_t *trans = NULL;
 1005 
 1006     conn = &clnt->conn;
 1007     pthread_mutex_init(&clnt->conn.lock, NULL);
 1008 
 1009     conn->name = gf_strdup(name);
 1010     if (!conn->name) {
 1011         ret = -1;
 1012         goto out;
 1013     }
 1014 
 1015     ret = dict_get_int32(options, "frame-timeout", &conn->frame_timeout);
 1016     if (ret >= 0) {
 1017         gf_log(name, GF_LOG_INFO, "setting frame-timeout to %d",
 1018                conn->frame_timeout);
 1019     } else {
 1020         gf_log(name, GF_LOG_DEBUG, "defaulting frame-timeout to 30mins");
 1021         conn->frame_timeout = 1800;
 1022     }
 1023     conn->rpc_clnt = clnt;
 1024 
 1025     ret = dict_get_int32(options, "ping-timeout", &conn->ping_timeout);
 1026     if (ret >= 0) {
 1027         gf_log(name, GF_LOG_DEBUG, "setting ping-timeout to %d",
 1028                conn->ping_timeout);
 1029     } else {
 1030         /*TODO: Once the epoll thread model is fixed,
 1031           change the default ping-timeout to 30sec */
 1032         gf_log(name, GF_LOG_DEBUG, "disable ping-timeout");
 1033         conn->ping_timeout = 0;
 1034     }
 1035 
 1036     trans = rpc_transport_load(ctx, options, name);
 1037     if (!trans) {
 1038         gf_log(name, GF_LOG_WARNING,
 1039                "loading of new rpc-transport"
 1040                " failed");
 1041         ret = -1;
 1042         goto out;
 1043     }
 1044     rpc_transport_ref(trans);
 1045 
 1046     pthread_mutex_lock(&conn->lock);
 1047     {
 1048         conn->trans = trans;
 1049         trans = NULL;
 1050     }
 1051     pthread_mutex_unlock(&conn->lock);
 1052 
 1053     ret = rpc_transport_register_notify(conn->trans, rpc_clnt_notify, conn);
 1054     if (ret == -1) {
 1055         gf_log(name, GF_LOG_WARNING, "registering notify failed");
 1056         goto out;
 1057     }
 1058 
 1059     conn->saved_frames = saved_frames_new();
 1060     if (!conn->saved_frames) {
 1061         gf_log(name, GF_LOG_WARNING,
 1062                "creation of saved_frames "
 1063                "failed");
 1064         ret = -1;
 1065         goto out;
 1066     }
 1067 
 1068     ret = 0;
 1069 
 1070 out:
 1071     if (ret) {
 1072         pthread_mutex_lock(&conn->lock);
 1073         {
 1074             trans = conn->trans;
 1075             conn->trans = NULL;
 1076         }
 1077         pthread_mutex_unlock(&conn->lock);
 1078         if (trans)
 1079             rpc_transport_unref(trans);
 1080         // conn cleanup needs to be done since we might have failed to
 1081         // register notification.
 1082         rpc_clnt_connection_cleanup(conn);
 1083     }
 1084     return ret;
 1085 }
 1086 
 1087 struct rpc_clnt *
 1088 rpc_clnt_new(dict_t *options, xlator_t *owner, char *name,
 1089              uint32_t reqpool_size)
 1090 {
 1091     int ret = -1;
 1092     struct rpc_clnt *rpc = NULL;
 1093     glusterfs_ctx_t *ctx = owner->ctx;
 1094 
 1095     rpc = GF_CALLOC(1, sizeof(*rpc), gf_common_mt_rpcclnt_t);
 1096     if (!rpc) {
 1097         goto out;
 1098     }
 1099 
 1100     pthread_mutex_init(&rpc->lock, NULL);
 1101     rpc->ctx = ctx;
 1102     rpc->owner = owner;
 1103     GF_ATOMIC_INIT(rpc->xid, 1);
 1104 
 1105     if (!reqpool_size)
 1106         reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT;
 1107 
 1108     rpc->reqpool = mem_pool_new(struct rpc_req, reqpool_size);
 1109     if (rpc->reqpool == NULL) {
 1110         pthread_mutex_destroy(&rpc->lock);
 1111         GF_FREE(rpc);
 1112         rpc = NULL;
 1113         goto out;
 1114     }
 1115 
 1116     rpc->saved_frames_pool = mem_pool_new(struct saved_frame, reqpool_size);
 1117     if (rpc->saved_frames_pool == NULL) {
 1118         pthread_mutex_destroy(&rpc->lock);
 1119         mem_pool_destroy(rpc->reqpool);
 1120         GF_FREE(rpc);
 1121         rpc = NULL;
 1122         goto out;
 1123     }
 1124 
 1125     ret = rpc_clnt_connection_init(rpc, ctx, options, name);
 1126     if (ret == -1) {
 1127         pthread_mutex_destroy(&rpc->lock);
 1128         mem_pool_destroy(rpc->reqpool);
 1129         mem_pool_destroy(rpc->saved_frames_pool);
 1130         GF_FREE(rpc);
 1131         rpc = NULL;
 1132         goto out;
 1133     }
 1134 
 1135     /* This is handled to make sure we have modularity in getting the
 1136        auth data changed */
 1137     gf_boolean_t auth_null = dict_get_str_boolean(options, "auth-null", 0);
 1138 
 1139     rpc->auth_value = (auth_null) ? 0 : AUTH_GLUSTERFS_v2;
 1140 
 1141     rpc = rpc_clnt_ref(rpc);
 1142     INIT_LIST_HEAD(&rpc->programs);
 1143 
 1144 out:
 1145     return rpc;
 1146 }
 1147 
 1148 int
 1149 rpc_clnt_start(struct rpc_clnt *rpc)
 1150 {
 1151     struct rpc_clnt_connection *conn = NULL;
 1152 
 1153     if (!rpc)
 1154         return -1;
 1155 
 1156     conn = &rpc->conn;
 1157 
 1158     pthread_mutex_lock(&conn->lock);
 1159     {
 1160         rpc->disabled = 0;
 1161     }
 1162     pthread_mutex_unlock(&conn->lock);
 1163     /* Corresponding unref will be either on successful timer cancel or last
 1164      * rpc_clnt_reconnect fire event.
 1165      */
 1166     rpc_clnt_ref(rpc);
 1167     rpc_clnt_reconnect(conn);
 1168 
 1169     return 0;
 1170 }
 1171 
 1172 int
 1173 rpc_clnt_cleanup_and_start(struct rpc_clnt *rpc)
 1174 {
 1175     struct rpc_clnt_connection *conn = NULL;
 1176 
 1177     if (!rpc)
 1178         return -1;
 1179 
 1180     conn = &rpc->conn;
 1181 
 1182     rpc_clnt_connection_cleanup(conn);
 1183 
 1184     pthread_mutex_lock(&conn->lock);
 1185     {
 1186         rpc->disabled = 0;
 1187     }
 1188     pthread_mutex_unlock(&conn->lock);
 1189     /* Corresponding unref will be either on successful timer cancel or last
 1190      * rpc_clnt_reconnect fire event.
 1191      */
 1192     rpc_clnt_ref(rpc);
 1193     rpc_clnt_reconnect(conn);
 1194 
 1195     return 0;
 1196 }
 1197 
 1198 int
 1199 rpc_clnt_register_notify(struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
 1200                          void *mydata)
 1201 {
 1202     rpc->mydata = mydata;
 1203     rpc->notifyfn = fn;
 1204 
 1205     return 0;
 1206 }
 1207 
 1208 /* used for GF_LOG_OCCASIONALLY() */
 1209 static int gf_auth_max_groups_log = 0;
 1210 
 1211 static inline int
 1212 setup_glusterfs_auth_param_v3(call_frame_t *frame, auth_glusterfs_params_v3 *au,
 1213                               int lk_owner_len, char *owner_data)
 1214 {
 1215     int ret = -1;
 1216     unsigned int max_groups = 0;
 1217     int max_lkowner_len = 0;
 1218 
 1219     au->pid = frame->root->pid;
 1220     au->uid = frame->root->uid;
 1221     au->gid = frame->root->gid;
 1222 
 1223     au->flags = frame->root->flags;
 1224     au->ctime_sec = frame->root->ctime.tv_sec;
 1225     au->ctime_nsec = frame->root->ctime.tv_nsec;
 1226 
 1227     au->lk_owner.lk_owner_val = owner_data;
 1228     au->lk_owner.lk_owner_len = lk_owner_len;
 1229     au->groups.groups_val = frame->root->groups;
 1230     au->groups.groups_len = frame->root->ngrps;
 1231 
 1232     /* The number of groups and the size of lk_owner depend on oneother.
 1233      * We can truncate the groups, but should not touch the lk_owner. */
 1234     max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v3);
 1235     if (au->groups.groups_len > max_groups) {
 1236         GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING,
 1237                             "truncating grouplist "
 1238                             "from %d to %d",
 1239                             au->groups.groups_len, max_groups);
 1240 
 1241         au->groups.groups_len = max_groups;
 1242     }
 1243 
 1244     max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len,
 1245                                                     AUTH_GLUSTERFS_v3);
 1246     if (lk_owner_len > max_lkowner_len) {
 1247         gf_log("rpc-clnt", GF_LOG_ERROR,
 1248                "lkowner field is too "
 1249                "big (%d), it does not fit in the rpc-header",
 1250                au->lk_owner.lk_owner_len);
 1251         errno = E2BIG;
 1252         goto out;
 1253     }
 1254 
 1255     ret = 0;
 1256 out:
 1257     return ret;
 1258 }
 1259 
 1260 static inline int
 1261 setup_glusterfs_auth_param_v2(call_frame_t *frame, auth_glusterfs_parms_v2 *au,
 1262                               int lk_owner_len, char *owner_data)
 1263 {
 1264     unsigned int max_groups = 0;
 1265     int max_lkowner_len = 0;
 1266     int ret = -1;
 1267 
 1268     au->pid = frame->root->pid;
 1269     au->uid = frame->root->uid;
 1270     au->gid = frame->root->gid;
 1271 
 1272     au->lk_owner.lk_owner_val = owner_data;
 1273     au->lk_owner.lk_owner_len = lk_owner_len;
 1274     au->groups.groups_val = frame->root->groups;
 1275     au->groups.groups_len = frame->root->ngrps;
 1276 
 1277     /* The number of groups and the size of lk_owner depend on oneother.
 1278      * We can truncate the groups, but should not touch the lk_owner. */
 1279     max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v2);
 1280     if (au->groups.groups_len > max_groups) {
 1281         GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING,
 1282                             "truncating grouplist "
 1283                             "from %d to %d",
 1284                             au->groups.groups_len, max_groups);
 1285 
 1286         au->groups.groups_len = max_groups;
 1287     }
 1288 
 1289     max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len,
 1290                                                     AUTH_GLUSTERFS_v2);
 1291     if (lk_owner_len > max_lkowner_len) {
 1292         gf_log("rpc-auth", GF_LOG_ERROR,
 1293                "lkowner field is too "
 1294                "big (%d), it does not fit in the rpc-header",
 1295                au->lk_owner.lk_owner_len);
 1296         errno = E2BIG;
 1297         goto out;
 1298     }
 1299 
 1300     ret = 0;
 1301 out:
 1302     return ret;
 1303 }
 1304 
 1305 static ssize_t
 1306 xdr_serialize_glusterfs_auth(struct rpc_clnt *clnt, call_frame_t *frame,
 1307                              char *dest)
 1308 {
 1309     ssize_t ret = -1;
 1310     XDR xdr;
 1311     char owner[4] = {
 1312         0,
 1313     };
 1314     int32_t pid = 0;
 1315     char *lk_owner_data = NULL;
 1316     int lk_owner_len = 0;
 1317 
 1318     if ((!dest))
 1319         return -1;
 1320 
 1321     xdrmem_create(&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE);
 1322 
 1323     if (frame->root->lk_owner.len) {
 1324         lk_owner_data = frame->root->lk_owner.data;
 1325         lk_owner_len = frame->root->lk_owner.len;
 1326     } else {
 1327         pid = frame->root->pid;
 1328         owner[0] = (char)(pid & 0xff);
 1329         owner[1] = (char)((pid >> 8) & 0xff);
 1330         owner[2] = (char)((pid >> 16) & 0xff);
 1331         owner[3] = (char)((pid >> 24) & 0xff);
 1332 
 1333         lk_owner_data = owner;
 1334         lk_owner_len = 4;
 1335     }
 1336 
 1337     if (clnt->auth_value == AUTH_GLUSTERFS_v2) {
 1338         auth_glusterfs_parms_v2 au_v2 = {
 1339             0,
 1340         };
 1341 
 1342         ret = setup_glusterfs_auth_param_v2(frame, &au_v2, lk_owner_len,
 1343                                             lk_owner_data);
 1344         if (ret)
 1345             goto out;
 1346         if (!xdr_auth_glusterfs_parms_v2(&xdr, &au_v2)) {
 1347             gf_log(THIS->name, GF_LOG_WARNING,
 1348                    "failed to encode auth glusterfs elements");
 1349             ret = -1;
 1350             goto out;
 1351         }
 1352     } else if (clnt->auth_value == AUTH_GLUSTERFS_v3) {
 1353         auth_glusterfs_params_v3 au_v3 = {
 1354             0,
 1355         };
 1356 
 1357         ret = setup_glusterfs_auth_param_v3(frame, &au_v3, lk_owner_len,
 1358                                             lk_owner_data);
 1359         if (ret)
 1360             goto out;
 1361 
 1362         if (!xdr_auth_glusterfs_params_v3(&xdr, &au_v3)) {
 1363             gf_log(THIS->name, GF_LOG_WARNING,
 1364                    "failed to encode auth glusterfs elements");
 1365             ret = -1;
 1366             goto out;
 1367         }
 1368     } else {
 1369         gf_log(THIS->name, GF_LOG_WARNING,
 1370                "failed to encode auth glusterfs elements");
 1371         ret = -1;
 1372         goto out;
 1373     }
 1374 
 1375     ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base));
 1376 
 1377 out:
 1378     return ret;
 1379 }
 1380 
 1381 int
 1382 rpc_clnt_fill_request(struct rpc_clnt *clnt, int prognum, int progver,
 1383                       int procnum, uint64_t xid, call_frame_t *fr,
 1384                       struct rpc_msg *request, char *auth_data)
 1385 {
 1386     int ret = -1;
 1387 
 1388     if (!request) {
 1389         goto out;
 1390     }
 1391 
 1392     memset(request, 0, sizeof(*request));
 1393 
 1394     request->rm_xid = xid;
 1395     request->rm_direction = CALL;
 1396 
 1397     request->rm_call.cb_rpcvers = 2;
 1398     request->rm_call.cb_prog = prognum;
 1399     request->rm_call.cb_vers = progver;
 1400     request->rm_call.cb_proc = procnum;
 1401 
 1402     if (!clnt->auth_value) {
 1403         request->rm_call.cb_cred.oa_flavor = AUTH_NULL;
 1404         request->rm_call.cb_cred.oa_base = NULL;
 1405         request->rm_call.cb_cred.oa_length = 0;
 1406     } else {
 1407         ret = xdr_serialize_glusterfs_auth(clnt, fr, auth_data);
 1408         if (ret == -1) {
 1409             gf_log("rpc-clnt", GF_LOG_WARNING,
 1410                    "cannot encode auth credentials");
 1411             goto out;
 1412         }
 1413 
 1414         request->rm_call.cb_cred.oa_flavor = clnt->auth_value;
 1415         request->rm_call.cb_cred.oa_base = auth_data;
 1416         request->rm_call.cb_cred.oa_length = ret;
 1417     }
 1418     request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
 1419     request->rm_call.cb_verf.oa_base = NULL;
 1420     request->rm_call.cb_verf.oa_length = 0;
 1421 
 1422     ret = 0;
 1423 out:
 1424     return ret;
 1425 }
 1426 
 1427 struct iovec
 1428 rpc_clnt_record_build_header(char *recordstart, size_t rlen,
 1429                              struct rpc_msg *request, size_t payload)
 1430 {
 1431     struct iovec requesthdr = {
 1432         0,
 1433     };
 1434     struct iovec txrecord = {0, 0};
 1435     int ret = -1;
 1436     size_t fraglen = 0;
 1437 
 1438     ret = rpc_request_to_xdr(request, recordstart, rlen, &requesthdr);
 1439     if (ret == -1) {
 1440         gf_log("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request");
 1441         goto out;
 1442     }
 1443 
 1444     fraglen = payload + requesthdr.iov_len;
 1445     gf_log("rpc-clnt", GF_LOG_TRACE,
 1446            "Request fraglen %zu, payload: %zu, "
 1447            "rpc hdr: %zu",
 1448            fraglen, payload, requesthdr.iov_len);
 1449 
 1450     txrecord.iov_base = recordstart;
 1451 
 1452     /* Remember, this is only the vec for the RPC header and does not
 1453      * include the payload above. We needed the payload only to calculate
 1454      * the size of the full fragment. This size is sent in the fragment
 1455      * header.
 1456      */
 1457     txrecord.iov_len = requesthdr.iov_len;
 1458 
 1459 out:
 1460     return txrecord;
 1461 }
 1462 
 1463 struct iobuf *
 1464 rpc_clnt_record_build_record(struct rpc_clnt *clnt, call_frame_t *fr,
 1465                              int prognum, int progver, int procnum,
 1466                              size_t hdrsize, uint64_t xid, struct iovec *recbuf)
 1467 {
 1468     struct rpc_msg request = {
 1469         0,
 1470     };
 1471     struct iobuf *request_iob = NULL;
 1472     char *record = NULL;
 1473     struct iovec recordhdr = {
 1474         0,
 1475     };
 1476     size_t pagesize = 0;
 1477     int ret = -1;
 1478     size_t xdr_size = 0;
 1479     char auth_data[GF_MAX_AUTH_BYTES] = {
 1480         0,
 1481     };
 1482 
 1483     if ((!clnt) || (!recbuf)) {
 1484         goto out;
 1485     }
 1486 
 1487     /* Fill the rpc structure and XDR it into the buffer got above. */
 1488     ret = rpc_clnt_fill_request(clnt, prognum, progver, procnum, xid, fr,
 1489                                 &request, auth_data);
 1490 
 1491     if (ret == -1) {
 1492         gf_log(clnt->conn.name, GF_LOG_WARNING,
 1493                "cannot build a rpc-request xid (%" PRIu64 ")", xid);
 1494         goto out;
 1495     }
 1496 
 1497     xdr_size = xdr_sizeof((xdrproc_t)xdr_callmsg, &request);
 1498 
 1499     /* First, try to get a pointer into the buffer which the RPC
 1500      * layer can use.
 1501      */
 1502     request_iob = iobuf_get2(clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
 1503     if (!request_iob) {
 1504         goto out;
 1505     }
 1506 
 1507     pagesize = iobuf_pagesize(request_iob);
 1508 
 1509     record = iobuf_ptr(request_iob); /* Now we have it. */
 1510 
 1511     recordhdr = rpc_clnt_record_build_header(record, pagesize, &request,
 1512                                              hdrsize);
 1513 
 1514     if (!recordhdr.iov_base) {
 1515         gf_log(clnt->conn.name, GF_LOG_ERROR, "Failed to build record header");
 1516         iobuf_unref(request_iob);
 1517         request_iob = NULL;
 1518         recbuf->iov_base = NULL;
 1519         goto out;
 1520     }
 1521 
 1522     recbuf->iov_base = recordhdr.iov_base;
 1523     recbuf->iov_len = recordhdr.iov_len;
 1524 
 1525 out:
 1526     return request_iob;
 1527 }
 1528 
 1529 static inline struct iobuf *
 1530 rpc_clnt_record(struct rpc_clnt *clnt, call_frame_t *call_frame,
 1531                 rpc_clnt_prog_t *prog, int procnum, size_t hdrlen,
 1532                 struct iovec *rpchdr, uint64_t callid)
 1533 {
 1534     if (!prog || !rpchdr || !call_frame) {
 1535         return NULL;
 1536     }
 1537 
 1538     return rpc_clnt_record_build_record(clnt, call_frame, prog->prognum,
 1539                                         prog->progver, procnum, hdrlen, callid,
 1540                                         rpchdr);
 1541 }
 1542 
 1543 int
 1544 rpcclnt_cbk_program_register(struct rpc_clnt *clnt,
 1545                              rpcclnt_cb_program_t *program, void *mydata)
 1546 {
 1547     int ret = -1;
 1548     char already_registered = 0;
 1549     rpcclnt_cb_program_t *tmp = NULL;
 1550 
 1551     if (!clnt)
 1552         goto out;
 1553 
 1554     if (program->actors == NULL)
 1555         goto out;
 1556 
 1557     pthread_mutex_lock(&clnt->lock);
 1558     {
 1559         list_for_each_entry(tmp, &clnt->programs, program)
 1560         {
 1561             if ((program->prognum == tmp->prognum) &&
 1562                 (program->progver == tmp->progver)) {
 1563                 already_registered = 1;
 1564                 break;
 1565             }
 1566         }
 1567     }
 1568     pthread_mutex_unlock(&clnt->lock);
 1569 
 1570     if (already_registered) {
 1571         gf_log_callingfn(clnt->conn.name, GF_LOG_DEBUG, "already registered");
 1572         ret = 0;
 1573         goto out;
 1574     }
 1575 
 1576     tmp = GF_MALLOC(sizeof(*tmp), gf_common_mt_rpcclnt_cb_program_t);
 1577     if (tmp == NULL) {
 1578         goto out;
 1579     }
 1580 
 1581     memcpy(tmp, program, sizeof(*tmp));
 1582     INIT_LIST_HEAD(&tmp->program);
 1583 
 1584     tmp->mydata = mydata;
 1585 
 1586     pthread_mutex_lock(&clnt->lock);
 1587     {
 1588         list_add_tail(&tmp->program, &clnt->programs);
 1589     }
 1590     pthread_mutex_unlock(&clnt->lock);
 1591 
 1592     ret = 0;
 1593     gf_log(clnt->conn.name, GF_LOG_DEBUG,
 1594            "New program registered: %s, Num: %d, Ver: %d", program->progname,
 1595            program->prognum, program->progver);
 1596 
 1597 out:
 1598     if (ret == -1 && clnt) {
 1599         gf_log(clnt->conn.name, GF_LOG_ERROR,
 1600                "Program registration failed:"
 1601                " %s, Num: %d, Ver: %d",
 1602                program->progname, program->prognum, program->progver);
 1603     }
 1604 
 1605     return ret;
 1606 }
 1607 
 1608 int
 1609 rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum,
 1610                 fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount,
 1611                 struct iovec *progpayload, int progpayloadcount,
 1612                 struct iobref *iobref, void *frame, struct iovec *rsphdr,
 1613                 int rsphdr_count, struct iovec *rsp_payload,
 1614                 int rsp_payload_count, struct iobref *rsp_iobref)
 1615 {
 1616     rpc_clnt_connection_t *conn = NULL;
 1617     struct iobuf *request_iob = NULL;
 1618     struct iovec rpchdr = {
 1619         0,
 1620     };
 1621     struct rpc_req *rpcreq = NULL;
 1622     rpc_transport_req_t req;
 1623     int ret = -1;
 1624     int proglen = 0;
 1625     char new_iobref = 0;
 1626     uint64_t callid = 0;
 1627     gf_boolean_t need_unref = _gf_false;
 1628     call_frame_t *cframe = frame;
 1629 
 1630     if (!rpc || !prog || !frame) {
 1631         goto out;
 1632     }
 1633 
 1634     conn = &rpc->conn;
 1635 
 1636     rpcreq = mem_get(rpc->reqpool);
 1637     if (rpcreq == NULL) {
 1638         goto out;
 1639     }
 1640 
 1641     memset(rpcreq, 0, sizeof(*rpcreq));
 1642     memset(&req, 0, sizeof(req));
 1643 
 1644     if (!iobref) {
 1645         iobref = iobref_new();
 1646         if (!iobref) {
 1647             goto out;
 1648         }
 1649 
 1650         new_iobref = 1;
 1651     }
 1652 
 1653     callid = GF_ATOMIC_INC(rpc->xid);
 1654 
 1655     rpcreq->prog = prog;
 1656     rpcreq->procnum = procnum;
 1657     rpcreq->conn = conn;
 1658     rpcreq->xid = callid;
 1659     rpcreq->cbkfn = cbkfn;
 1660 
 1661     ret = -1;
 1662 
 1663     if (proghdr) {
 1664         proglen += iov_length(proghdr, proghdrcount);
 1665     }
 1666 
 1667     request_iob = rpc_clnt_record(rpc, frame, prog, procnum, proglen, &rpchdr,
 1668                                   callid);
 1669     if (!request_iob) {
 1670         gf_log(conn->name, GF_LOG_WARNING, "cannot build rpc-record");
 1671         goto out;
 1672     }
 1673 
 1674     iobref_add(iobref, request_iob);
 1675 
 1676     req.msg.rpchdr = &rpchdr;
 1677     req.msg.rpchdrcount = 1;
 1678     req.msg.proghdr = proghdr;
 1679     req.msg.proghdrcount = proghdrcount;
 1680     req.msg.progpayload = progpayload;
 1681     req.msg.progpayloadcount = progpayloadcount;
 1682     req.msg.iobref = iobref;
 1683 
 1684     req.rsp.rsphdr = rsphdr;
 1685     req.rsp.rsphdr_count = rsphdr_count;
 1686     req.rsp.rsp_payload = rsp_payload;
 1687     req.rsp.rsp_payload_count = rsp_payload_count;
 1688     req.rsp.rsp_iobref = rsp_iobref;
 1689     req.rpc_req = rpcreq;
 1690 
 1691     pthread_mutex_lock(&conn->lock);
 1692     {
 1693         if (conn->connected == 0) {
 1694             if (rpc->disabled)
 1695                 goto unlock;
 1696             ret = rpc_transport_connect(conn->trans, conn->config.remote_port);
 1697             if (ret < 0) {
 1698                 gf_log(conn->name,
 1699                        (errno == EINPROGRESS) ? GF_LOG_DEBUG : GF_LOG_WARNING,
 1700                        "error returned while attempting to "
 1701                        "connect to host:%s, port:%d",
 1702                        conn->config.remote_host, conn->config.remote_port);
 1703                 goto unlock;
 1704             }
 1705         }
 1706 
 1707         ret = rpc_transport_submit_request(conn->trans, &req);
 1708         if (ret == -1) {
 1709             gf_log(conn->name, GF_LOG_WARNING,
 1710                    "failed to submit rpc-request "
 1711                    "(unique: %" PRIu64
 1712                    ", XID: 0x%x Program: %s, "
 1713                    "ProgVers: %d, Proc: %d) to rpc-transport (%s)",
 1714                    cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
 1715                    rpcreq->prog->progver, rpcreq->procnum, conn->name);
 1716         } else if ((ret >= 0) && frame) {
 1717             /* Save the frame in queue */
 1718             __save_frame(rpc, frame, rpcreq);
 1719 
 1720             /* A ref on rpc-clnt object is taken while registering
 1721              * call_bail to timer in __save_frame. If it fails to
 1722              * register, it needs an unref and should happen outside
 1723              * conn->lock which otherwise leads to deadlocks */
 1724             if (conn->timer == NULL)
 1725                 need_unref = _gf_true;
 1726 
 1727             conn->msgcnt++;
 1728 
 1729             gf_log("rpc-clnt", GF_LOG_TRACE,
 1730                    "submitted request "
 1731                    "(unique: %" PRIu64
 1732                    ", XID: 0x%x, Program: %s, "
 1733                    "ProgVers: %d, Proc: %d) to rpc-transport (%s)",
 1734                    cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
 1735                    rpcreq->prog->progver, rpcreq->procnum, conn->name);
 1736         }
 1737     }
 1738 unlock:
 1739     pthread_mutex_unlock(&conn->lock);
 1740 
 1741     if (need_unref)
 1742         rpc_clnt_unref(rpc);
 1743 
 1744     if (ret == -1) {
 1745         goto out;
 1746     }
 1747 
 1748     rpc_clnt_check_and_start_ping(rpc);
 1749     ret = 0;
 1750 
 1751 out:
 1752     if (request_iob) {
 1753         iobuf_unref(request_iob);
 1754     }
 1755 
 1756     if (new_iobref && iobref) {
 1757         iobref_unref(iobref);
 1758     }
 1759 
 1760     if (frame && (ret == -1)) {
 1761         if (rpcreq) {
 1762             rpcreq->rpc_status = -1;
 1763             cbkfn(rpcreq, NULL, 0, frame);
 1764             mem_put(rpcreq);
 1765         }
 1766     }
 1767     return ret;
 1768 }
 1769 
 1770 struct rpc_clnt *
 1771 rpc_clnt_ref(struct rpc_clnt *rpc)
 1772 {
 1773     if (!rpc)
 1774         return NULL;
 1775 
 1776     GF_ATOMIC_INC(rpc->refcount);
 1777     return rpc;
 1778 }
 1779 
 1780 static void
 1781 rpc_clnt_trigger_destroy(struct rpc_clnt *rpc)
 1782 {
 1783     rpc_clnt_connection_t *conn = NULL;
 1784     rpc_transport_t *trans = NULL;
 1785 
 1786     if (!rpc)
 1787         return;
 1788 
 1789     /* reading conn->trans outside conn->lock is OK, since this is the last
 1790      * ref*/
 1791     conn = &rpc->conn;
 1792     trans = conn->trans;
 1793     rpc_clnt_disable(rpc);
 1794 
 1795     /* This is to account for rpc_clnt_disable that might have been called
 1796      * before rpc_clnt_unref */
 1797     if (trans) {
 1798         /* set conn->trans to NULL before rpc_transport_unref
 1799          * as rpc_transport_unref can potentially free conn
 1800          */
 1801         conn->trans = NULL;
 1802         rpc_transport_unref(trans);
 1803     }
 1804 }
 1805 
 1806 static void
 1807 rpc_clnt_destroy(struct rpc_clnt *rpc)
 1808 {
 1809     rpcclnt_cb_program_t *program = NULL;
 1810     rpcclnt_cb_program_t *tmp = NULL;
 1811     struct saved_frames *saved_frames = NULL;
 1812     rpc_clnt_connection_t *conn = NULL;
 1813 
 1814     if (!rpc)
 1815         return;
 1816 
 1817     conn = &rpc->conn;
 1818     GF_FREE(rpc->conn.name);
 1819     /* Access saved_frames in critical-section to avoid
 1820        crash in rpc_clnt_connection_cleanup at the time
 1821        of destroying saved frames
 1822     */
 1823     pthread_mutex_lock(&conn->lock);
 1824     {
 1825         saved_frames = conn->saved_frames;
 1826         conn->saved_frames = NULL;
 1827     }
 1828     pthread_mutex_unlock(&conn->lock);
 1829 
 1830     saved_frames_destroy(saved_frames);
 1831     pthread_mutex_destroy(&rpc->lock);
 1832     pthread_mutex_destroy(&rpc->conn.lock);
 1833 
 1834     /* mem-pool should be destroyed, otherwise,
 1835        it will cause huge memory leaks */
 1836     mem_pool_destroy(rpc->reqpool);
 1837     mem_pool_destroy(rpc->saved_frames_pool);
 1838 
 1839     list_for_each_entry_safe(program, tmp, &rpc->programs, program)
 1840     {
 1841         GF_FREE(program);
 1842     }
 1843 
 1844     GF_FREE(rpc);
 1845     return;
 1846 }
 1847 
 1848 struct rpc_clnt *
 1849 rpc_clnt_unref(struct rpc_clnt *rpc)
 1850 {
 1851     int count = 0;
 1852 
 1853     if (!rpc)
 1854         return NULL;
 1855 
 1856     count = GF_ATOMIC_DEC(rpc->refcount);
 1857 
 1858     if (!count) {
 1859         rpc_clnt_trigger_destroy(rpc);
 1860         return NULL;
 1861     }
 1862     return rpc;
 1863 }
 1864 
 1865 int
 1866 rpc_clnt_disable(struct rpc_clnt *rpc)
 1867 {
 1868     rpc_clnt_connection_t *conn = NULL;
 1869     rpc_transport_t *trans = NULL;
 1870     int unref = 0;
 1871     int ret = 0;
 1872     gf_boolean_t timer_unref = _gf_false;
 1873     gf_boolean_t reconnect_unref = _gf_false;
 1874 
 1875     if (!rpc) {
 1876         goto out;
 1877     }
 1878 
 1879     conn = &rpc->conn;
 1880 
 1881     pthread_mutex_lock(&conn->lock);
 1882     {
 1883         rpc->disabled = 1;
 1884 
 1885         if (conn->timer) {
 1886             ret = gf_timer_call_cancel(rpc->ctx, conn->timer);
 1887             /* If the event is not fired and it actually cancelled
 1888              * the timer, do the unref else registered call back
 1889              * function will take care of it.
 1890              */
 1891             if (!ret)
 1892                 timer_unref = _gf_true;
 1893             conn->timer = NULL;
 1894         }
 1895 
 1896         if (conn->reconnect) {
 1897             ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect);
 1898             if (!ret)
 1899                 reconnect_unref = _gf_true;
 1900             conn->reconnect = NULL;
 1901         }
 1902         conn->connected = 0;
 1903 
 1904         unref = rpc_clnt_remove_ping_timer_locked(rpc);
 1905         trans = conn->trans;
 1906     }
 1907     pthread_mutex_unlock(&conn->lock);
 1908 
 1909     ret = -1;
 1910     if (trans) {
 1911         ret = rpc_transport_disconnect(trans, _gf_true);
 1912         /* The auth_value was being reset to AUTH_GLUSTERFS_v2.
 1913          *    if (clnt->auth_value)
 1914          *           clnt->auth_value = AUTH_GLUSTERFS_v2;
 1915          * It should not be reset here. The disconnect during
 1916          * portmap request can race with handshake. If handshake
 1917          * happens first and disconnect later, auth_value would set
 1918          * to default value and it never sets back to actual auth_value
 1919          * supported by server. But it's important to set to lower
 1920          * version supported in the case where the server downgrades.
 1921          * So moving this code to RPC_TRANSPORT_CONNECT. Note that
 1922          * CONNECT cannot race with handshake as by nature it is
 1923          * serialized with handhake. An handshake can happen only
 1924          * on a connected transport and hence its strictly serialized.
 1925          */
 1926     }
 1927     if (unref)
 1928         rpc_clnt_unref(rpc);
 1929 
 1930     if (timer_unref)
 1931         rpc_clnt_unref(rpc);
 1932 
 1933     if (reconnect_unref)
 1934         rpc_clnt_unref(rpc);
 1935 
 1936 out:
 1937     return ret;
 1938 }
 1939 
 1940 void
 1941 rpc_clnt_reconfig(struct rpc_clnt *rpc, struct rpc_clnt_config *config)
 1942 {
 1943     if (config->ping_timeout) {
 1944         if (config->ping_timeout != rpc->conn.ping_timeout)
 1945             gf_log(rpc->conn.name, GF_LOG_INFO,
 1946                    "changing ping timeout to %d (from %d)",
 1947                    config->ping_timeout, rpc->conn.ping_timeout);
 1948 
 1949         pthread_mutex_lock(&rpc->conn.lock);
 1950         {
 1951             rpc->conn.ping_timeout = config->ping_timeout;
 1952         }
 1953         pthread_mutex_unlock(&rpc->conn.lock);
 1954     }
 1955 
 1956     if (config->rpc_timeout) {
 1957         if (config->rpc_timeout != rpc->conn.config.rpc_timeout)
 1958             gf_log(rpc->conn.name, GF_LOG_INFO,
 1959                    "changing timeout to %d (from %d)", config->rpc_timeout,
 1960                    rpc->conn.config.rpc_timeout);
 1961         rpc->conn.config.rpc_timeout = config->rpc_timeout;
 1962     }
 1963 
 1964     if (config->remote_port) {
 1965         if (config->remote_port != rpc->conn.config.remote_port)
 1966             gf_log(rpc->conn.name, GF_LOG_INFO, "changing port to %d (from %d)",
 1967                    config->remote_port, rpc->conn.config.remote_port);
 1968 
 1969         rpc->conn.config.remote_port = config->remote_port;
 1970     }
 1971 
 1972     if (config->remote_host) {
 1973         if (rpc->conn.config.remote_host) {
 1974             if (strcmp(rpc->conn.config.remote_host, config->remote_host))
 1975                 gf_log(rpc->conn.name, GF_LOG_INFO,
 1976                        "changing hostname to %s (from %s)", config->remote_host,
 1977                        rpc->conn.config.remote_host);
 1978             GF_FREE(rpc->conn.config.remote_host);
 1979         } else {
 1980             gf_log(rpc->conn.name, GF_LOG_INFO, "setting hostname to %s",
 1981                    config->remote_host);
 1982         }
 1983 
 1984         rpc->conn.config.remote_host = gf_strdup(config->remote_host);
 1985     }
 1986 }