"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/changelog/src/changelog-rpc-common.c" (16 Sep 2020, 8725 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "changelog-rpc-common.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 
   11 #include "changelog-rpc-common.h"
   12 #include "changelog-messages.h"
   13 
   14 #include <glusterfs/syscall.h>
   15 /**
   16 *****************************************************
   17                   Client Interface
   18 *****************************************************
   19 */
   20 
   21 /**
   22  * Initialize and return an RPC client object for a given unix
   23  * domain socket.
   24  */
   25 
   26 void *
   27 changelog_rpc_poller(void *arg)
   28 {
   29     xlator_t *this = arg;
   30 
   31     (void)gf_event_dispatch(this->ctx->event_pool);
   32     return NULL;
   33 }
   34 
   35 struct rpc_clnt *
   36 changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile,
   37                           rpc_clnt_notify_t fn)
   38 {
   39     int ret = 0;
   40     struct rpc_clnt *rpc = NULL;
   41     dict_t *options = NULL;
   42 
   43     if (!cbkdata)
   44         cbkdata = this;
   45 
   46     options = dict_new();
   47     if (!options)
   48         goto error_return;
   49 
   50     ret = rpc_transport_unix_options_build(options, sockfile, 0);
   51     if (ret) {
   52         gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR,
   53                 NULL);
   54         goto dealloc_dict;
   55     }
   56 
   57     rpc = rpc_clnt_new(options, this, this->name, 16);
   58     if (!rpc)
   59         goto dealloc_dict;
   60 
   61     ret = rpc_clnt_register_notify(rpc, fn, cbkdata);
   62     if (ret) {
   63         gf_smsg(this->name, GF_LOG_ERROR, 0,
   64                 CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, NULL);
   65         goto dealloc_rpc_clnt;
   66     }
   67 
   68     ret = rpc_clnt_start(rpc);
   69     if (ret) {
   70         gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR,
   71                 NULL);
   72         goto dealloc_rpc_clnt;
   73     }
   74 
   75     dict_unref(options);
   76     return rpc;
   77 
   78 dealloc_rpc_clnt:
   79     rpc_clnt_unref(rpc);
   80 dealloc_dict:
   81     dict_unref(options);
   82 error_return:
   83     return NULL;
   84 }
   85 
   86 /**
   87  * Generic RPC client routine to dispatch a request to an
   88  * RPC server.
   89  */
   90 int
   91 changelog_rpc_sumbit_req(struct rpc_clnt *rpc, void *req, call_frame_t *frame,
   92                          rpc_clnt_prog_t *prog, int procnum,
   93                          struct iovec *payload, int payloadcnt,
   94                          struct iobref *iobref, xlator_t *this,
   95                          fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
   96 {
   97     int ret = 0;
   98     int count = 0;
   99     struct iovec iov = {
  100         0,
  101     };
  102     struct iobuf *iobuf = NULL;
  103     char new_iobref = 0;
  104     ssize_t xdr_size = 0;
  105 
  106     GF_ASSERT(this);
  107 
  108     if (req) {
  109         xdr_size = xdr_sizeof(xdrproc, req);
  110 
  111         iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size);
  112         if (!iobuf) {
  113             goto out;
  114         };
  115 
  116         if (!iobref) {
  117             iobref = iobref_new();
  118             if (!iobref) {
  119                 goto out;
  120             }
  121 
  122             new_iobref = 1;
  123         }
  124 
  125         iobref_add(iobref, iobuf);
  126 
  127         iov.iov_base = iobuf->ptr;
  128         iov.iov_len = iobuf_size(iobuf);
  129 
  130         /* Create the xdr payload */
  131         ret = xdr_serialize_generic(iov, req, xdrproc);
  132         if (ret == -1) {
  133             goto out;
  134         }
  135 
  136         iov.iov_len = ret;
  137         count = 1;
  138     }
  139 
  140     ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, payload,
  141                           payloadcnt, iobref, frame, NULL, 0, NULL, 0, NULL);
  142 
  143 out:
  144     if (new_iobref)
  145         iobref_unref(iobref);
  146     if (iobuf)
  147         iobuf_unref(iobuf);
  148     return ret;
  149 }
  150 
  151 /**
  152  * Entry point to perform a remote procedure call
  153  */
  154 int
  155 changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc,
  156                      rpc_clnt_prog_t *prog, int procidx, void *arg)
  157 {
  158     int ret = 0;
  159     call_frame_t *frame = NULL;
  160     rpc_clnt_procedure_t *proc = NULL;
  161 
  162     if (!this || !prog)
  163         goto error_return;
  164 
  165     frame = create_frame(this, this->ctx->pool);
  166     if (!frame) {
  167         gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED,
  168                 NULL);
  169         goto error_return;
  170     }
  171 
  172     proc = &prog->proctable[procidx];
  173     if (proc->fn)
  174         ret = proc->fn(frame, this, arg);
  175 
  176     STACK_DESTROY(frame->root);
  177     return ret;
  178 
  179 error_return:
  180     return -1;
  181 }
  182 
  183 /**
  184 *****************************************************
  185                   Server Interface
  186 *****************************************************
  187 */
  188 
  189 struct iobuf *
  190 __changelog_rpc_serialize_reply(rpcsvc_request_t *req, void *arg,
  191                                 struct iovec *outmsg, xdrproc_t xdrproc)
  192 {
  193     struct iobuf *iob = NULL;
  194     ssize_t retlen = 0;
  195     ssize_t rsp_size = 0;
  196 
  197     rsp_size = xdr_sizeof(xdrproc, arg);
  198     iob = iobuf_get2(req->svc->ctx->iobuf_pool, rsp_size);
  199     if (!iob)
  200         goto error_return;
  201 
  202     iobuf_to_iovec(iob, outmsg);
  203 
  204     retlen = xdr_serialize_generic(*outmsg, arg, xdrproc);
  205     if (retlen == -1)
  206         goto unref_iob;
  207 
  208     outmsg->iov_len = retlen;
  209     return iob;
  210 
  211 unref_iob:
  212     iobuf_unref(iob);
  213 error_return:
  214     return NULL;
  215 }
  216 
  217 int
  218 changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg,
  219                            struct iovec *payload, int payloadcount,
  220                            struct iobref *iobref, xdrproc_t xdrproc)
  221 {
  222     int ret = -1;
  223     struct iobuf *iob = NULL;
  224     struct iovec iov = {
  225         0,
  226     };
  227     char new_iobref = 0;
  228 
  229     if (!req)
  230         goto return_ret;
  231 
  232     if (!iobref) {
  233         iobref = iobref_new();
  234         if (!iobref)
  235             goto return_ret;
  236         new_iobref = 1;
  237     }
  238 
  239     iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc);
  240     if (!iob)
  241         gf_smsg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED,
  242                 NULL);
  243     else
  244         iobref_add(iobref, iob);
  245 
  246     ret = rpcsvc_submit_generic(req, &iov, 1, payload, payloadcount, iobref);
  247 
  248     if (new_iobref)
  249         iobref_unref(iobref);
  250     if (iob)
  251         iobuf_unref(iob);
  252 return_ret:
  253     return ret;
  254 }
  255 
  256 void
  257 changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
  258                              rpcsvc_notify_t fn, struct rpcsvc_program **progs)
  259 {
  260     rpcsvc_listener_t *listener = NULL;
  261     rpcsvc_listener_t *next = NULL;
  262     struct rpcsvc_program *prog = NULL;
  263     rpc_transport_t *trans = NULL;
  264 
  265     if (!rpc)
  266         return;
  267 
  268     while (*progs) {
  269         prog = *progs;
  270         (void)rpcsvc_program_unregister(rpc, prog);
  271         progs++;
  272     }
  273 
  274     list_for_each_entry_safe(listener, next, &rpc->listeners, list)
  275     {
  276         if (listener->trans) {
  277             trans = listener->trans;
  278             rpc_transport_disconnect(trans, _gf_false);
  279         }
  280     }
  281 
  282     (void)rpcsvc_unregister_notify(rpc, fn, this);
  283 
  284     /* TODO Avoid freeing rpc object in case of brick multiplex
  285        after freeing rpc object svc->rpclock corrupted and it takes
  286        more time to detach a brick
  287     */
  288     if (!this->cleanup_starting) {
  289         if (rpc->rxpool) {
  290             mem_pool_destroy(rpc->rxpool);
  291             rpc->rxpool = NULL;
  292         }
  293         GF_FREE(rpc);
  294     }
  295 }
  296 
  297 rpcsvc_t *
  298 changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata,
  299                           rpcsvc_notify_t fn, struct rpcsvc_program **progs)
  300 {
  301     int ret = 0;
  302     rpcsvc_t *rpc = NULL;
  303     dict_t *options = NULL;
  304     struct rpcsvc_program *prog = NULL;
  305 
  306     if (!cbkdata)
  307         cbkdata = this;
  308 
  309     options = dict_new();
  310     if (!options)
  311         return NULL;
  312 
  313     ret = rpcsvc_transport_unix_options_build(options, sockfile);
  314     if (ret)
  315         goto dealloc_dict;
  316 
  317     rpc = rpcsvc_init(this, this->ctx, options, 8);
  318     if (rpc == NULL) {
  319         gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR,
  320                 NULL);
  321         goto dealloc_dict;
  322     }
  323 
  324     ret = rpcsvc_register_notify(rpc, fn, cbkdata);
  325     if (ret) {
  326         gf_smsg(this->name, GF_LOG_ERROR, 0,
  327                 CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, NULL);
  328         goto dealloc_rpc;
  329     }
  330 
  331     ret = rpcsvc_create_listeners(rpc, options, this->name);
  332     if (ret != 1) {
  333         gf_msg_debug(this->name, 0, "failed to create listeners");
  334         goto dealloc_rpc;
  335     }
  336 
  337     while (*progs) {
  338         prog = *progs;
  339         ret = rpcsvc_program_register(rpc, prog, _gf_false);
  340         if (ret) {
  341             gf_smsg(this->name, GF_LOG_ERROR, 0,
  342                     CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, "name%s",
  343                     prog->progname, "prognum=%d", prog->prognum, "pogver=%d",
  344                     prog->progver, NULL);
  345             goto dealloc_rpc;
  346         }
  347 
  348         progs++;
  349     }
  350 
  351     dict_unref(options);
  352     return rpc;
  353 
  354 dealloc_rpc:
  355     GF_FREE(rpc);
  356 dealloc_dict:
  357     dict_unref(options);
  358     return NULL;
  359 }