"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/rpc/rpc-lib/src/rpc-drc.c" (16 Sep 2020, 22765 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "rpc-drc.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
    3   This file is part of GlusterFS.
    4 
    5   This file is licensed to you under your choice of the GNU Lesser
    6   General Public License, version 3 or any later version (LGPLv3 or
    7   later), or the GNU General Public License, version 2 (GPLv2), in all
    8   cases as published by the Free Software Foundation.
    9 */
   10 
   11 #include "rpcsvc.h"
   12 #ifndef RPC_DRC_H
   13 #include "rpc-drc.h"
   14 #endif
   15 #include <glusterfs/locking.h>
   16 #include <glusterfs/statedump.h>
   17 #include <glusterfs/mem-pool.h>
   18 
   19 #include <netinet/in.h>
   20 #include <unistd.h>
   21 
   22 /**
   23  * rpcsvc_drc_op_destroy - Destroys the cached reply
   24  *
   25  * @param drc - the main drc structure
   26  * @param reply - the cached reply to destroy
   27  * @return NULL if reply is destroyed, reply otherwise
   28  */
   29 static drc_cached_op_t *
   30 rpcsvc_drc_op_destroy(rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply)
   31 {
   32     GF_ASSERT(drc);
   33     GF_ASSERT(reply);
   34 
   35     if (reply->state == DRC_OP_IN_TRANSIT)
   36         return reply;
   37 
   38     iobref_unref(reply->msg.iobref);
   39     if (reply->msg.rpchdr)
   40         GF_FREE(reply->msg.rpchdr);
   41     if (reply->msg.proghdr)
   42         GF_FREE(reply->msg.proghdr);
   43     if (reply->msg.progpayload)
   44         GF_FREE(reply->msg.progpayload);
   45 
   46     list_del(&reply->global_list);
   47     reply->client->op_count--;
   48     drc->op_count--;
   49     mem_put(reply);
   50     reply = NULL;
   51 
   52     return reply;
   53 }
   54 
   55 /**
   56  * rpcsvc_drc_op_rb_unref - This function is used in rb tree cleanup only
   57  *
   58  * @param reply - the cached reply to unref
   59  * @param drc - the main drc structure
   60  * @return void
   61  */
   62 static void
   63 rpcsvc_drc_rb_op_destroy(void *reply, void *drc)
   64 {
   65     rpcsvc_drc_op_destroy(drc, (drc_cached_op_t *)reply);
   66 }
   67 
   68 /**
   69  * rpcsvc_remove_drc_client - Cleanup the drc client
   70  *
   71  * @param client - the drc client to be removed
   72  * @return void
   73  */
   74 static void
   75 rpcsvc_remove_drc_client(drc_client_t *client)
   76 {
   77     rb_destroy(client->rbtree, rpcsvc_drc_rb_op_destroy);
   78     list_del(&client->client_list);
   79     GF_FREE(client);
   80 }
   81 
   82 /**
   83  * rpcsvc_client_lookup - Given a sockaddr_storage, find the client if it exists
   84  *
   85  * @param drc - the main drc structure
   86  * @param sockaddr - the network address of the client to be looked up
   87  * @return drc client if it exists, NULL otherwise
   88  */
   89 static drc_client_t *
   90 rpcsvc_client_lookup(rpcsvc_drc_globals_t *drc,
   91                      struct sockaddr_storage *sockaddr)
   92 {
   93     drc_client_t *client = NULL;
   94 
   95     GF_ASSERT(drc);
   96     GF_ASSERT(sockaddr);
   97 
   98     if (list_empty(&drc->clients_head))
   99         return NULL;
  100 
  101     list_for_each_entry(client, &drc->clients_head, client_list)
  102     {
  103         if (gf_sock_union_equal_addr(&client->sock_union,
  104                                      (union gf_sock_union *)sockaddr))
  105             return client;
  106     }
  107 
  108     return NULL;
  109 }
  110 
  111 /**
  112  * drc_compare_reqs - Used by rbtree to determine if incoming req matches with
  113  *                    an existing node(cached reply) in rbtree
  114  *
  115  * @param item - pointer to the incoming req
  116  * @param rb_node_data - pointer to an rbtree node (cached reply)
  117  * @param param - drc pointer - unused here, but used in *op_destroy
  118  * @return 0 if req matches reply, else (req->xid - reply->xid)
  119  */
  120 int
  121 drc_compare_reqs(const void *item, const void *rb_node_data, void *param)
  122 {
  123     int ret = -1;
  124     drc_cached_op_t *req = NULL;
  125     drc_cached_op_t *reply = NULL;
  126 
  127     GF_ASSERT(item);
  128     GF_ASSERT(rb_node_data);
  129     GF_ASSERT(param);
  130 
  131     req = (drc_cached_op_t *)item;
  132     reply = (drc_cached_op_t *)rb_node_data;
  133 
  134     ret = req->xid - reply->xid;
  135     if (ret != 0)
  136         return ret;
  137 
  138     if (req->prognum == reply->prognum && req->procnum == reply->procnum &&
  139         req->progversion == reply->progversion)
  140         return 0;
  141 
  142     return 1;
  143 }
  144 
  145 /**
  146  * drc_init_client_cache - initialize a drc client and its rb tree
  147  *
  148  * @param drc - the main drc structure
  149  * @param client - the drc client to be initialized
  150  * @return 0 on success, -1 on failure
  151  */
  152 static int
  153 drc_init_client_cache(rpcsvc_drc_globals_t *drc, drc_client_t *client)
  154 {
  155     GF_ASSERT(drc);
  156     GF_ASSERT(client);
  157 
  158     client->rbtree = rb_create(drc_compare_reqs, drc, NULL);
  159     if (!client->rbtree) {
  160         gf_log(GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed");
  161         return -1;
  162     }
  163 
  164     return 0;
  165 }
  166 
  167 /**
  168  * rpcsvc_get_drc_client - find the drc client with given sockaddr, else
  169  *                         allocate and initialize a new drc client
  170  *
  171  * @param drc - the main drc structure
  172  * @param sockaddr - network address of client
  173  * @return drc client on success, NULL on failure
  174  */
  175 static drc_client_t *
  176 rpcsvc_get_drc_client(rpcsvc_drc_globals_t *drc,
  177                       struct sockaddr_storage *sockaddr)
  178 {
  179     drc_client_t *client = NULL;
  180 
  181     GF_ASSERT(drc);
  182     GF_ASSERT(sockaddr);
  183 
  184     client = rpcsvc_client_lookup(drc, sockaddr);
  185     if (client)
  186         goto out;
  187 
  188     /* if lookup fails, allocate cache for the new client */
  189     client = GF_CALLOC(1, sizeof(drc_client_t), gf_common_mt_drc_client_t);
  190     if (!client)
  191         goto out;
  192 
  193     GF_ATOMIC_INIT(client->ref, 0);
  194     client->sock_union = (union gf_sock_union) * sockaddr;
  195     client->op_count = 0;
  196     INIT_LIST_HEAD(&client->client_list);
  197 
  198     if (drc_init_client_cache(drc, client)) {
  199         gf_log(GF_RPCSVC, GF_LOG_DEBUG, "initialization of drc client failed");
  200         GF_FREE(client);
  201         client = NULL;
  202         goto out;
  203     }
  204     drc->client_count++;
  205 
  206     list_add(&client->client_list, &drc->clients_head);
  207 
  208 out:
  209     return client;
  210 }
  211 
  212 /**
  213  * rpcsvc_need_drc - Determine if a request needs DRC service
  214  *
  215  * @param req - incoming request
  216  * @return 1 if DRC is needed for req, 0 otherwise
  217  */
  218 int
  219 rpcsvc_need_drc(rpcsvc_request_t *req)
  220 {
  221     rpcsvc_actor_t *actor = NULL;
  222     rpcsvc_drc_globals_t *drc = NULL;
  223 
  224     GF_ASSERT(req);
  225     GF_ASSERT(req->svc);
  226 
  227     drc = req->svc->drc;
  228 
  229     if (!drc || drc->status == DRC_UNINITIATED)
  230         return 0;
  231 
  232     actor = rpcsvc_program_actor(req);
  233     if (!actor)
  234         return 0;
  235 
  236     return (actor->op_type == DRC_NON_IDEMPOTENT && drc->type != DRC_TYPE_NONE);
  237 }
  238 
  239 /**
  240  * rpcsvc_drc_client_ref - ref the drc client
  241  *
  242  * @param client - the drc client to ref
  243  * @return client
  244  */
  245 static drc_client_t *
  246 rpcsvc_drc_client_ref(drc_client_t *client)
  247 {
  248     GF_ASSERT(client);
  249     GF_ATOMIC_INC(client->ref);
  250     return client;
  251 }
  252 
  253 /**
  254  * rpcsvc_drc_client_unref - unref the drc client, and destroy
  255  *                           the client on last unref
  256  *
  257  * @param drc - the main drc structure
  258  * @param client - the drc client to unref
  259  * @return NULL if it is the last unref, client otherwise
  260  */
  261 static drc_client_t *
  262 rpcsvc_drc_client_unref(rpcsvc_drc_globals_t *drc, drc_client_t *client)
  263 {
  264     uint32_t refcount;
  265 
  266     GF_ASSERT(drc);
  267 
  268     refcount = GF_ATOMIC_DEC(client->ref);
  269     if (!refcount) {
  270         drc->client_count--;
  271         rpcsvc_remove_drc_client(client);
  272         client = NULL;
  273     }
  274 
  275     return client;
  276 }
  277 
  278 /**
  279  * rpcsvc_drc_lookup - lookup a request to see if it is already cached
  280  *
  281  * @param req - incoming request
  282  * @return cached reply of req if found, NULL otherwise
  283  */
  284 drc_cached_op_t *
  285 rpcsvc_drc_lookup(rpcsvc_request_t *req)
  286 {
  287     drc_client_t *client = NULL;
  288     drc_cached_op_t *reply = NULL;
  289     drc_cached_op_t new = {
  290         .xid = req->xid,
  291         .prognum = req->prognum,
  292         .progversion = req->progver,
  293         .procnum = req->procnum,
  294     };
  295 
  296     GF_ASSERT(req);
  297 
  298     if (!req->trans->drc_client) {
  299         client = rpcsvc_get_drc_client(req->svc->drc,
  300                                        &req->trans->peerinfo.sockaddr);
  301         if (!client)
  302             goto out;
  303 
  304         req->trans->drc_client = rpcsvc_drc_client_ref(client);
  305     }
  306 
  307     client = req->trans->drc_client;
  308 
  309     if (client->op_count == 0)
  310         goto out;
  311 
  312     reply = rb_find(client->rbtree, &new);
  313 
  314 out:
  315     return reply;
  316 }
  317 
  318 /**
  319  * rpcsvc_send_cached_reply - send the cached reply for the incoming request
  320  *
  321  * @param req - incoming request (which is a duplicate in this case)
  322  * @param reply - the cached reply for req
  323  * @return 0 on successful reply submission, -1 or other non-zero value
  324  * otherwise
  325  */
  326 int
  327 rpcsvc_send_cached_reply(rpcsvc_request_t *req, drc_cached_op_t *reply)
  328 {
  329     int ret = 0;
  330 
  331     GF_ASSERT(req);
  332     GF_ASSERT(reply);
  333 
  334     gf_log(GF_RPCSVC, GF_LOG_DEBUG,
  335            "sending cached reply: xid: %d, "
  336            "client: %s",
  337            req->xid, req->trans->peerinfo.identifier);
  338 
  339     rpcsvc_drc_client_ref(reply->client);
  340     ret = rpcsvc_transport_submit(
  341         req->trans, reply->msg.rpchdr, reply->msg.rpchdrcount,
  342         reply->msg.proghdr, reply->msg.proghdrcount, reply->msg.progpayload,
  343         reply->msg.progpayloadcount, reply->msg.iobref, req->trans_private);
  344     rpcsvc_drc_client_unref(req->svc->drc, reply->client);
  345 
  346     return ret;
  347 }
  348 
  349 /**
  350  * rpcsvc_cache_reply - cache the reply for the processed request 'req'
  351  *
  352  * @param req - processed request
  353  * @param iobref - iobref structure of the reply
  354  * @param rpchdr - rpc header of the reply
  355  * @param rpchdrcount - size of rpchdr
  356  * @param proghdr - program header of the reply
  357  * @param proghdrcount - size of proghdr
  358  * @param payload - payload of the reply if any
  359  * @param payloadcount - size of payload
  360  * @return 0 on success, -1 on failure
  361  */
  362 int
  363 rpcsvc_cache_reply(rpcsvc_request_t *req, struct iobref *iobref,
  364                    struct iovec *rpchdr, int rpchdrcount, struct iovec *proghdr,
  365                    int proghdrcount, struct iovec *payload, int payloadcount)
  366 {
  367     int ret = -1;
  368     drc_cached_op_t *reply = NULL;
  369 
  370     GF_ASSERT(req);
  371     GF_ASSERT(req->reply);
  372 
  373     reply = req->reply;
  374 
  375     reply->state = DRC_OP_CACHED;
  376 
  377     reply->msg.iobref = iobref_ref(iobref);
  378 
  379     reply->msg.rpchdrcount = rpchdrcount;
  380     reply->msg.rpchdr = iov_dup(rpchdr, rpchdrcount);
  381 
  382     reply->msg.proghdrcount = proghdrcount;
  383     reply->msg.proghdr = iov_dup(proghdr, proghdrcount);
  384 
  385     reply->msg.progpayloadcount = payloadcount;
  386     if (payloadcount)
  387         reply->msg.progpayload = iov_dup(payload, payloadcount);
  388 
  389     //        rpcsvc_drc_client_unref (req->svc->drc, req->trans->drc_client);
  390     //        rpcsvc_drc_op_unref (req->svc->drc, reply);
  391     ret = 0;
  392 
  393     return ret;
  394 }
  395 
  396 /**
  397  * rpcsvc_vacate_drc_entries - free up some percentage of drc cache
  398  *                             based on the lru factor
  399  *
  400  * @param drc - the main drc structure
  401  * @return void
  402  */
  403 static void
  404 rpcsvc_vacate_drc_entries(rpcsvc_drc_globals_t *drc)
  405 {
  406     uint32_t i = 0;
  407     uint32_t n = 0;
  408     drc_cached_op_t *reply = NULL;
  409     drc_cached_op_t *tmp = NULL;
  410     drc_client_t *client = NULL;
  411 
  412     GF_ASSERT(drc);
  413 
  414     n = drc->global_cache_size / drc->lru_factor;
  415 
  416     list_for_each_entry_safe_reverse(reply, tmp, &drc->cache_head, global_list)
  417     {
  418         /* Don't delete ops that are in transit */
  419         if (reply->state == DRC_OP_IN_TRANSIT)
  420             continue;
  421 
  422         client = reply->client;
  423 
  424         rb_delete(client->rbtree, reply);
  425 
  426         rpcsvc_drc_op_destroy(drc, reply);
  427         rpcsvc_drc_client_unref(drc, client);
  428         i++;
  429         if (i >= n)
  430             break;
  431     }
  432 }
  433 
  434 /**
  435  * rpcsvc_add_op_to_cache - insert the cached op into the client rbtree and drc
  436  * list
  437  *
  438  * @param drc - the main drc structure
  439  * @param reply - the op to be inserted
  440  * @return 0 on success, -1 on failure
  441  */
  442 static int
  443 rpcsvc_add_op_to_cache(rpcsvc_drc_globals_t *drc, drc_cached_op_t *reply)
  444 {
  445     drc_client_t *client = NULL;
  446     drc_cached_op_t **tmp_reply = NULL;
  447 
  448     GF_ASSERT(drc);
  449     GF_ASSERT(reply);
  450 
  451     client = reply->client;
  452 
  453     /* cache is full, free up some space */
  454     if (drc->op_count >= drc->global_cache_size)
  455         rpcsvc_vacate_drc_entries(drc);
  456 
  457     tmp_reply = (drc_cached_op_t **)rb_probe(client->rbtree, reply);
  458     if (!tmp_reply) {
  459         /* mem alloc failed */
  460         return -1;
  461     } else if (*tmp_reply != reply) {
  462         /* should never happen */
  463         gf_log(GF_RPCSVC, GF_LOG_ERROR, "DRC failed to detect duplicates");
  464         return -1;
  465     }
  466 
  467     client->op_count++;
  468     list_add(&reply->global_list, &drc->cache_head);
  469     drc->op_count++;
  470 
  471     return 0;
  472 }
  473 
  474 /**
  475  * rpcsvc_cache_request - cache the in-transition incoming request
  476  *
  477  * @param req - incoming request
  478  * @return 0 on success, -1 on failure
  479  */
  480 int
  481 rpcsvc_cache_request(rpcsvc_request_t *req)
  482 {
  483     int ret = -1;
  484     drc_client_t *client = NULL;
  485     drc_cached_op_t *reply = NULL;
  486     rpcsvc_drc_globals_t *drc = NULL;
  487 
  488     GF_ASSERT(req);
  489 
  490     drc = req->svc->drc;
  491 
  492     client = req->trans->drc_client;
  493     if (!client) {
  494         gf_log(GF_RPCSVC, GF_LOG_DEBUG, "drc client is NULL");
  495         goto out;
  496     }
  497 
  498     reply = mem_get0(drc->mempool);
  499     if (!reply)
  500         goto out;
  501 
  502     reply->client = rpcsvc_drc_client_ref(client);
  503     reply->xid = req->xid;
  504     reply->prognum = req->prognum;
  505     reply->progversion = req->progver;
  506     reply->procnum = req->procnum;
  507     reply->state = DRC_OP_IN_TRANSIT;
  508     req->reply = reply;
  509     INIT_LIST_HEAD(&reply->global_list);
  510 
  511     ret = rpcsvc_add_op_to_cache(drc, reply);
  512     if (ret) {
  513         req->reply = NULL;
  514         rpcsvc_drc_op_destroy(drc, reply);
  515         rpcsvc_drc_client_unref(drc, client);
  516         gf_log(GF_RPCSVC, GF_LOG_DEBUG, "Failed to add op to drc cache");
  517     }
  518 
  519 out:
  520     return ret;
  521 }
  522 
  523 /**
  524  *
  525  * rpcsvc_drc_priv - function which dumps the drc state
  526  *
  527  * @param drc - the main drc structure
  528  * @return 0 on success, -1 on failure
  529  */
  530 int32_t
  531 rpcsvc_drc_priv(rpcsvc_drc_globals_t *drc)
  532 {
  533     int i = 0;
  534     char key[GF_DUMP_MAX_BUF_LEN] = {0};
  535     drc_client_t *client = NULL;
  536     char ip[INET6_ADDRSTRLEN] = {0};
  537 
  538     if (!drc || drc->status == DRC_UNINITIATED) {
  539         gf_log(GF_RPCSVC, GF_LOG_DEBUG,
  540                "DRC is "
  541                "uninitialized, not dumping its state");
  542         return 0;
  543     }
  544 
  545     gf_proc_dump_add_section("rpc.drc");
  546 
  547     if (TRY_LOCK(&drc->lock))
  548         return -1;
  549 
  550     gf_proc_dump_build_key(key, "drc", "type");
  551     gf_proc_dump_write(key, "%d", drc->type);
  552 
  553     gf_proc_dump_build_key(key, "drc", "client_count");
  554     gf_proc_dump_write(key, "%d", drc->client_count);
  555 
  556     gf_proc_dump_build_key(key, "drc", "current_cache_size");
  557     gf_proc_dump_write(key, "%d", drc->op_count);
  558 
  559     gf_proc_dump_build_key(key, "drc", "max_cache_size");
  560     gf_proc_dump_write(key, "%d", drc->global_cache_size);
  561 
  562     gf_proc_dump_build_key(key, "drc", "lru_factor");
  563     gf_proc_dump_write(key, "%d", drc->lru_factor);
  564 
  565     gf_proc_dump_build_key(key, "drc", "duplicate_request_count");
  566     gf_proc_dump_write(key, "%" PRIu64, drc->cache_hits);
  567 
  568     gf_proc_dump_build_key(key, "drc", "in_transit_duplicate_requests");
  569     gf_proc_dump_write(key, "%" PRIu64, drc->intransit_hits);
  570 
  571     list_for_each_entry(client, &drc->clients_head, client_list)
  572     {
  573         gf_proc_dump_build_key(key, "client", "%d.ip-address", i);
  574         memset(ip, 0, INET6_ADDRSTRLEN);
  575         switch (client->sock_union.storage.ss_family) {
  576             case AF_INET:
  577                 gf_proc_dump_write(
  578                     key, "%s",
  579                     inet_ntop(AF_INET, &client->sock_union.sin.sin_addr.s_addr,
  580                               ip, INET_ADDRSTRLEN));
  581                 break;
  582             case AF_INET6:
  583                 gf_proc_dump_write(
  584                     key, "%s",
  585                     inet_ntop(AF_INET6, &client->sock_union.sin6.sin6_addr, ip,
  586                               INET6_ADDRSTRLEN));
  587                 break;
  588             default:
  589                 gf_proc_dump_write(key, "%s", "N/A");
  590         }
  591 
  592         gf_proc_dump_build_key(key, "client", "%d.ref_count", i);
  593         gf_proc_dump_write(key, "%" PRIu32, GF_ATOMIC_GET(client->ref));
  594         gf_proc_dump_build_key(key, "client", "%d.op_count", i);
  595         gf_proc_dump_write(key, "%d", client->op_count);
  596         i++;
  597     }
  598 
  599     UNLOCK(&drc->lock);
  600     return 0;
  601 }
  602 
  603 /**
  604  * rpcsvc_drc_notify - function which is notified of RPC transport events
  605  *
  606  * @param svc - pointer to rpcsvc_t structure of the rpc
  607  * @param xl - pointer to the xlator
  608  * @param event - the event which triggered this notify
  609  * @param data - the transport structure
  610  * @return 0 on success, -1 on failure
  611  */
  612 int
  613 rpcsvc_drc_notify(rpcsvc_t *svc, void *xl, rpcsvc_event_t event, void *data)
  614 {
  615     int ret = -1;
  616     rpc_transport_t *trans = NULL;
  617     drc_client_t *client = NULL;
  618     rpcsvc_drc_globals_t *drc = NULL;
  619 
  620     GF_ASSERT(svc);
  621     GF_ASSERT(svc->drc);
  622     GF_ASSERT(data);
  623 
  624     drc = svc->drc;
  625 
  626     if (drc->status == DRC_UNINITIATED || drc->type == DRC_TYPE_NONE)
  627         return 0;
  628 
  629     LOCK(&drc->lock);
  630     {
  631         trans = (rpc_transport_t *)data;
  632         client = rpcsvc_get_drc_client(drc, &trans->peerinfo.sockaddr);
  633         if (!client)
  634             goto unlock;
  635 
  636         switch (event) {
  637             case RPCSVC_EVENT_ACCEPT:
  638                 trans->drc_client = rpcsvc_drc_client_ref(client);
  639                 ret = 0;
  640                 break;
  641 
  642             case RPCSVC_EVENT_DISCONNECT:
  643                 ret = 0;
  644                 if (list_empty(&drc->clients_head))
  645                     break;
  646                 /* should be the last unref */
  647                 trans->drc_client = NULL;
  648                 rpcsvc_drc_client_unref(drc, client);
  649                 break;
  650 
  651             default:
  652                 break;
  653         }
  654     }
  655 unlock:
  656     UNLOCK(&drc->lock);
  657     return ret;
  658 }
  659 
  660 /**
  661  * rpcsvc_drc_init - Initialize the duplicate request cache service
  662  *
  663  * @param svc - pointer to rpcsvc_t structure of the rpc
  664  * @param options - the options dictionary which configures drc
  665  * @return 0 on success, non-zero integer on failure
  666  */
  667 int
  668 rpcsvc_drc_init(rpcsvc_t *svc, dict_t *options)
  669 {
  670     int ret = 0;
  671     uint32_t drc_type = 0;
  672     uint32_t drc_size = 0;
  673     uint32_t drc_factor = 0;
  674     rpcsvc_drc_globals_t *drc = NULL;
  675 
  676     GF_ASSERT(svc);
  677     GF_ASSERT(options);
  678 
  679     /* Toggle DRC on/off, when more drc types(persistent/cluster)
  680      * are added, we shouldn't treat this as boolean. */
  681     ret = dict_get_str_boolean(options, "nfs.drc", _gf_false);
  682     if (ret == -1) {
  683         gf_log(GF_RPCSVC, GF_LOG_INFO, "drc user options need second look");
  684         ret = _gf_false;
  685     }
  686 
  687     gf_log(GF_RPCSVC, GF_LOG_INFO, "DRC is turned %s", (ret ? "ON" : "OFF"));
  688 
  689     /*DRC off, nothing to do */
  690     if (ret == _gf_false)
  691         return (0);
  692 
  693     drc = GF_CALLOC(1, sizeof(rpcsvc_drc_globals_t),
  694                     gf_common_mt_drc_globals_t);
  695     if (!drc)
  696         return (-1);
  697 
  698     LOCK_INIT(&drc->lock);
  699     svc->drc = drc;
  700 
  701     /* Specify type of DRC to be used */
  702     ret = dict_get_uint32(options, "nfs.drc-type", &drc_type);
  703     if (ret) {
  704         gf_log(GF_RPCSVC, GF_LOG_DEBUG,
  705                "drc type not set. Continuing with default");
  706         drc_type = DRC_DEFAULT_TYPE;
  707     }
  708 
  709     /* Set the global cache size (no. of ops to cache) */
  710     ret = dict_get_uint32(options, "nfs.drc-size", &drc_size);
  711     if (ret) {
  712         gf_log(GF_RPCSVC, GF_LOG_DEBUG,
  713                "drc size not set. Continuing with default size");
  714         drc_size = DRC_DEFAULT_CACHE_SIZE;
  715     }
  716 
  717     LOCK(&drc->lock);
  718 
  719     drc->type = drc_type;
  720     drc->global_cache_size = drc_size;
  721 
  722     /* Mempool for cached ops */
  723     drc->mempool = mem_pool_new(drc_cached_op_t, drc->global_cache_size);
  724     if (!drc->mempool) {
  725         UNLOCK(&drc->lock);
  726         gf_log(GF_RPCSVC, GF_LOG_ERROR,
  727                "Failed to get mempool for DRC, drc-size: %d", drc_size);
  728         ret = -1;
  729         goto post_unlock;
  730     }
  731 
  732     /* What percent of cache to be evicted whenever it fills up */
  733     ret = dict_get_uint32(options, "nfs.drc-lru-factor", &drc_factor);
  734     if (ret) {
  735         gf_log(GF_RPCSVC, GF_LOG_DEBUG,
  736                "drc lru factor not set. Continuing with policy default");
  737         drc_factor = DRC_DEFAULT_LRU_FACTOR;
  738     }
  739 
  740     drc->lru_factor = (drc_lru_factor_t)drc_factor;
  741 
  742     INIT_LIST_HEAD(&drc->clients_head);
  743     INIT_LIST_HEAD(&drc->cache_head);
  744 
  745     ret = rpcsvc_register_notify(svc, rpcsvc_drc_notify, THIS);
  746     if (ret) {
  747         UNLOCK(&drc->lock);
  748         gf_log(GF_RPCSVC, GF_LOG_ERROR,
  749                "registration of drc_notify function failed");
  750         goto post_unlock;
  751     }
  752 
  753     drc->status = DRC_INITIATED;
  754     UNLOCK(&drc->lock);
  755     gf_log(GF_RPCSVC, GF_LOG_DEBUG, "drc init successful");
  756 post_unlock:
  757     if (ret == -1) {
  758         if (drc->mempool) {
  759             mem_pool_destroy(drc->mempool);
  760             drc->mempool = NULL;
  761         }
  762         GF_FREE(drc);
  763         svc->drc = NULL;
  764     }
  765     return ret;
  766 }
  767 
  768 int
  769 rpcsvc_drc_deinit(rpcsvc_t *svc)
  770 {
  771     rpcsvc_drc_globals_t *drc = NULL;
  772 
  773     if (!svc)
  774         return (-1);
  775 
  776     drc = svc->drc;
  777     if (!drc)
  778         return (0);
  779 
  780     LOCK(&drc->lock);
  781     (void)rpcsvc_unregister_notify(svc, rpcsvc_drc_notify, THIS);
  782     if (drc->mempool) {
  783         mem_pool_destroy(drc->mempool);
  784         drc->mempool = NULL;
  785     }
  786     UNLOCK(&drc->lock);
  787 
  788     GF_FREE(drc);
  789     svc->drc = NULL;
  790 
  791     return (0);
  792 }
  793 
  794 int
  795 rpcsvc_drc_reconfigure(rpcsvc_t *svc, dict_t *options)
  796 {
  797     int ret = -1;
  798     gf_boolean_t enable_drc = _gf_false;
  799     rpcsvc_drc_globals_t *drc = NULL;
  800     uint32_t drc_size = 0;
  801 
  802     /* Input sanitization */
  803     if ((!svc) || (!options))
  804         return (-1);
  805 
  806     /* If DRC was not enabled before, Let rpcsvc_drc_init() to
  807      * take care of DRC initialization part.
  808      */
  809     drc = svc->drc;
  810     if (!drc) {
  811         return rpcsvc_drc_init(svc, options);
  812     }
  813 
  814     /* DRC was already enabled before. Going to be reconfigured. Check
  815      * if reconfigured options contain "nfs.drc" and "nfs.drc-size".
  816      *
  817      * NB: If DRC is "OFF", "drc-size" has no role to play.
  818      *     So, "drc-size" gets evaluated IFF DRC is "ON".
  819      *
  820      * If DRC is reconfigured,
  821      *     case 1: DRC is "ON"
  822      *         sub-case 1: drc-size remains same
  823      *              ACTION: Nothing to do.
  824      *         sub-case 2: drc-size just changed
  825      *              ACTION: rpcsvc_drc_deinit() followed by
  826      *                      rpcsvc_drc_init().
  827      *
  828      *     case 2: DRC is "OFF"
  829      *         ACTION: rpcsvc_drc_deinit()
  830      */
  831     ret = dict_get_str_boolean(options, "nfs.drc", _gf_false);
  832     if (ret < 0)
  833         ret = _gf_false;
  834 
  835     enable_drc = ret;
  836     gf_log(GF_RPCSVC, GF_LOG_INFO, "DRC is turned %s", (ret ? "ON" : "OFF"));
  837 
  838     /* case 1: DRC is "ON"*/
  839     if (enable_drc) {
  840         /* Fetch drc-size if reconfigured */
  841         if (dict_get_uint32(options, "nfs.drc-size", &drc_size))
  842             drc_size = DRC_DEFAULT_CACHE_SIZE;
  843 
  844         /* case 1: sub-case 1*/
  845         if (drc->global_cache_size == drc_size)
  846             return (0);
  847 
  848         /* case 1: sub-case 2*/
  849         (void)rpcsvc_drc_deinit(svc);
  850         return rpcsvc_drc_init(svc, options);
  851     }
  852 
  853     /* case 2: DRC is "OFF" */
  854     return rpcsvc_drc_deinit(svc);
  855 }