"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/sdfs/src/sdfs.c" (16 Sep 2020, 38621 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "sdfs.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 #include <libgen.h>
   11 #include "sdfs.h"
   12 
   13 static int
   14 sdfs_frame_return(call_frame_t *frame)
   15 {
   16     sdfs_local_t *local = NULL;
   17 
   18     if (!frame)
   19         return -1;
   20 
   21     local = frame->local;
   22 
   23     return GF_ATOMIC_DEC(local->call_cnt);
   24 }
   25 
   26 static void
   27 sdfs_lock_free(sdfs_entry_lock_t *entrylk)
   28 {
   29     if (entrylk == NULL)
   30         goto out;
   31 
   32     loc_wipe(&entrylk->parent_loc);
   33     GF_FREE(entrylk->basename);
   34 
   35 out:
   36     return;
   37 }
   38 
   39 static void
   40 sdfs_lock_array_free(sdfs_lock_t *lock)
   41 {
   42     sdfs_entry_lock_t *entrylk = NULL;
   43     int i = 0;
   44 
   45     if (lock == NULL)
   46         goto out;
   47 
   48     for (i = 0; i < lock->lock_count; i++) {
   49         entrylk = &lock->entrylk[i];
   50         sdfs_lock_free(entrylk);
   51     }
   52 
   53 out:
   54     return;
   55 }
   56 
   57 static void
   58 sdfs_local_cleanup(sdfs_local_t *local)
   59 {
   60     if (!local)
   61         return;
   62 
   63     loc_wipe(&local->loc);
   64     loc_wipe(&local->parent_loc);
   65 
   66     if (local->stub) {
   67         call_stub_destroy(local->stub);
   68         local->stub = NULL;
   69     }
   70 
   71     sdfs_lock_array_free(local->lock);
   72     GF_FREE(local->lock);
   73 
   74     mem_put(local);
   75 }
   76 
   77 static int
   78 sdfs_build_parent_loc(loc_t *parent, loc_t *child)
   79 {
   80     int ret = -1;
   81     char *path = NULL;
   82 
   83     if (!child->parent) {
   84         goto out;
   85     }
   86     parent->inode = inode_ref(child->parent);
   87     path = gf_strdup(child->path);
   88     if (!path) {
   89         ret = -ENOMEM;
   90         goto out;
   91     }
   92 
   93     parent->path = dirname(path);
   94     if (!parent->path) {
   95         goto out;
   96     }
   97 
   98     gf_uuid_copy(parent->gfid, child->pargfid);
   99     return 0;
  100 
  101 out:
  102     GF_FREE(path);
  103     return ret;
  104 }
  105 
  106 static sdfs_local_t *
  107 sdfs_local_init(call_frame_t *frame, xlator_t *this)
  108 {
  109     sdfs_local_t *local = NULL;
  110 
  111     local = mem_get0(this->local_pool);
  112     if (!local)
  113         goto out;
  114 
  115     frame->local = local;
  116 out:
  117     return local;
  118 }
  119 
  120 static int
  121 sdfs_get_new_frame_common(call_frame_t *frame, call_frame_t **new_frame)
  122 {
  123     int ret = -1;
  124     sdfs_local_t *local = NULL;
  125     client_t *client = NULL;
  126 
  127     *new_frame = copy_frame(frame);
  128     if (!*new_frame) {
  129         goto err;
  130     }
  131 
  132     client = frame->root->client;
  133     gf_client_ref(client);
  134     (*new_frame)->root->client = client;
  135 
  136     local = sdfs_local_init(*new_frame, THIS);
  137     if (!local) {
  138         goto err;
  139     }
  140 
  141     local->main_frame = frame;
  142     /*Set unique lk-owner for the fop*/
  143     set_lk_owner_from_ptr(&(*new_frame)->root->lk_owner, (*new_frame)->root);
  144 
  145     ret = 0;
  146 err:
  147     if ((ret == -1) && (*new_frame)) {
  148         SDFS_STACK_DESTROY((*new_frame));
  149         *new_frame = NULL;
  150     }
  151 
  152     return ret;
  153 }
  154 
  155 static int
  156 sdfs_get_new_frame(call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
  157 {
  158     int ret = -1;
  159     sdfs_local_t *local = NULL;
  160 
  161     ret = sdfs_get_new_frame_common(frame, new_frame);
  162     if (ret < 0) {
  163         goto err;
  164     }
  165 
  166     local = (*new_frame)->local;
  167 
  168     ret = sdfs_build_parent_loc(&local->parent_loc, loc);
  169     if (ret) {
  170         goto err;
  171     }
  172 
  173     ret = loc_copy(&local->loc, loc);
  174     if (ret == -1) {
  175         goto err;
  176     }
  177 
  178     ret = 0;
  179 err:
  180     if (ret && (*new_frame)) {
  181         SDFS_STACK_DESTROY((*new_frame));
  182         *new_frame = NULL;
  183         ret = -1;
  184     }
  185 
  186     return ret;
  187 }
  188 
  189 static int
  190 sdfs_get_new_frame_readdirp(call_frame_t *frame, fd_t *fd,
  191                             call_frame_t **new_frame)
  192 {
  193     int ret = -1;
  194     sdfs_local_t *local = NULL;
  195 
  196     ret = sdfs_get_new_frame_common(frame, new_frame);
  197     if (ret < 0) {
  198         goto err;
  199     }
  200 
  201     local = (*new_frame)->local;
  202     local->parent_loc.inode = inode_ref(fd->inode);
  203     gf_uuid_copy(local->parent_loc.gfid, fd->inode->gfid);
  204 
  205     ret = 0;
  206 err:
  207     return ret;
  208 }
  209 
  210 int
  211 sdfs_entrylk_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  212                  int32_t op_ret, int32_t op_errno, dict_t *xdata)
  213 {
  214     sdfs_local_t *local = NULL;
  215     call_stub_t *stub = NULL;
  216 
  217     local = frame->local;
  218 
  219     local->op_ret = op_ret;
  220     local->op_errno = op_errno;
  221 
  222     if (local->stub) {
  223         stub = local->stub;
  224         local->stub = NULL;
  225         call_resume(stub);
  226     } else {
  227         if (op_ret < 0)
  228             gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  229                    "Unlocking entry lock failed for %s", local->loc.name);
  230 
  231         SDFS_STACK_DESTROY(frame);
  232     }
  233 
  234     return 0;
  235 }
  236 
  237 int
  238 sdfs_mkdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  239                int32_t op_ret, int32_t op_errno, inode_t *inode,
  240                struct iatt *stbuf, struct iatt *preparent,
  241                struct iatt *postparent, dict_t *xdata)
  242 {
  243     sdfs_local_t *local = NULL;
  244 
  245     local = frame->local;
  246 
  247     STACK_UNWIND_STRICT(mkdir, local->main_frame, op_ret, op_errno, inode,
  248                         stbuf, preparent, postparent, xdata);
  249 
  250     local->main_frame = NULL;
  251     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  252                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  253                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  254     return 0;
  255 }
  256 
  257 int
  258 sdfs_mkdir_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
  259                   mode_t umask, dict_t *xdata)
  260 {
  261     sdfs_local_t *local = NULL;
  262     char gfid[GF_UUID_BUF_SIZE] = {0};
  263     int op_errno = -1;
  264 
  265     local = frame->local;
  266 
  267     gf_uuid_unparse(loc->pargfid, gfid);
  268 
  269     if (local->op_ret < 0) {
  270         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  271                "Acquiring entry lock failed for directory %s "
  272                "with parent gfid %s",
  273                local->loc.name, gfid);
  274         op_errno = local->op_errno;
  275         goto err;
  276     }
  277 
  278     STACK_WIND(frame, sdfs_mkdir_cbk, FIRST_CHILD(this),
  279                FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata);
  280 
  281     return 0;
  282 err:
  283     STACK_UNWIND_STRICT(mkdir, local->main_frame, -1, op_errno, NULL, NULL,
  284                         NULL, NULL, NULL);
  285 
  286     local->main_frame = NULL;
  287     SDFS_STACK_DESTROY(frame);
  288     return 0;
  289 }
  290 
  291 int
  292 sdfs_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
  293            mode_t umask, dict_t *xdata)
  294 {
  295     sdfs_local_t *local = NULL;
  296     call_frame_t *new_frame = NULL;
  297     call_stub_t *stub = NULL;
  298     int op_errno = 0;
  299 
  300     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
  301         op_errno = ENOMEM;
  302         goto err;
  303     }
  304 
  305     stub = fop_mkdir_stub(new_frame, sdfs_mkdir_helper, loc, mode, umask,
  306                           xdata);
  307     if (!stub) {
  308         op_errno = ENOMEM;
  309         goto err;
  310     }
  311 
  312     local = new_frame->local;
  313     local->stub = stub;
  314 
  315     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  316                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  317                local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
  318 
  319     return 0;
  320 err:
  321     STACK_UNWIND_STRICT(mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL,
  322                         NULL);
  323 
  324     if (new_frame)
  325         SDFS_STACK_DESTROY(new_frame);
  326 
  327     return 0;
  328 }
  329 
  330 int
  331 sdfs_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  332                int32_t op_ret, int32_t op_errno, struct iatt *preparent,
  333                struct iatt *postparent, dict_t *xdata)
  334 {
  335     sdfs_local_t *local = NULL;
  336 
  337     local = frame->local;
  338 
  339     STACK_UNWIND_STRICT(rmdir, local->main_frame, op_ret, op_errno, preparent,
  340                         postparent, xdata);
  341 
  342     local->main_frame = NULL;
  343     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  344                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  345                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  346     return 0;
  347 }
  348 
  349 int
  350 sdfs_rmdir_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
  351                   dict_t *xdata)
  352 {
  353     sdfs_local_t *local = NULL;
  354     char gfid[GF_UUID_BUF_SIZE] = {0};
  355 
  356     local = frame->local;
  357 
  358     gf_uuid_unparse(loc->pargfid, gfid);
  359 
  360     if (local->op_ret < 0) {
  361         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  362                "Acquiring entry lock failed for directory %s "
  363                "with parent gfid %s",
  364                local->loc.name, gfid);
  365         goto err;
  366     }
  367 
  368     STACK_WIND(frame, sdfs_rmdir_cbk, FIRST_CHILD(this),
  369                FIRST_CHILD(this)->fops->rmdir, loc, flags, xdata);
  370 
  371     return 0;
  372 err:
  373     STACK_UNWIND_STRICT(rmdir, local->main_frame, -1, local->op_errno, NULL,
  374                         NULL, NULL);
  375 
  376     local->main_frame = NULL;
  377     SDFS_STACK_DESTROY(frame);
  378     return 0;
  379 }
  380 
  381 int
  382 sdfs_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
  383            dict_t *xdata)
  384 {
  385     sdfs_local_t *local = NULL;
  386     call_frame_t *new_frame = NULL;
  387     call_stub_t *stub = NULL;
  388     int op_errno = 0;
  389 
  390     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
  391         op_errno = ENOMEM;
  392         goto err;
  393     }
  394 
  395     stub = fop_rmdir_stub(new_frame, sdfs_rmdir_helper, loc, flags, xdata);
  396     if (!stub) {
  397         op_errno = ENOMEM;
  398         goto err;
  399     }
  400 
  401     local = new_frame->local;
  402     local->stub = stub;
  403 
  404     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  405                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  406                local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
  407 
  408     return 0;
  409 err:
  410     STACK_UNWIND_STRICT(rmdir, frame, -1, op_errno, NULL, NULL, NULL);
  411 
  412     if (new_frame)
  413         SDFS_STACK_DESTROY(new_frame);
  414 
  415     return 0;
  416 }
  417 
  418 int
  419 sdfs_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  420                 int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
  421                 struct iatt *stbuf, struct iatt *preparent,
  422                 struct iatt *postparent, dict_t *xdata)
  423 {
  424     sdfs_local_t *local = NULL;
  425 
  426     local = frame->local;
  427 
  428     STACK_UNWIND_STRICT(create, local->main_frame, op_ret, op_errno, fd, inode,
  429                         stbuf, preparent, postparent, xdata);
  430 
  431     local->main_frame = NULL;
  432     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  433                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  434                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  435     return 0;
  436 }
  437 
  438 int
  439 sdfs_create_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
  440                    int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
  441                    dict_t *xdata)
  442 {
  443     sdfs_local_t *local = NULL;
  444     char gfid[GF_UUID_BUF_SIZE] = {0};
  445 
  446     local = frame->local;
  447 
  448     gf_uuid_unparse(loc->pargfid, gfid);
  449 
  450     if (local->op_ret < 0) {
  451         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  452                "Acquiring entry lock failed for directory %s "
  453                "with parent gfid %s",
  454                local->loc.name, gfid);
  455         goto err;
  456     }
  457 
  458     STACK_WIND(frame, sdfs_create_cbk, FIRST_CHILD(this),
  459                FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
  460                xdata);
  461 
  462     return 0;
  463 err:
  464     STACK_UNWIND_STRICT(create, local->main_frame, -1, local->op_errno, NULL,
  465                         NULL, NULL, NULL, NULL, NULL);
  466 
  467     local->main_frame = NULL;
  468     SDFS_STACK_DESTROY(frame);
  469     return 0;
  470 }
  471 
  472 int
  473 sdfs_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
  474             mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
  475 {
  476     sdfs_local_t *local = NULL;
  477     call_frame_t *new_frame = NULL;
  478     call_stub_t *stub = NULL;
  479     int op_errno = 0;
  480 
  481     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
  482         op_errno = ENOMEM;
  483         goto err;
  484     }
  485 
  486     stub = fop_create_stub(new_frame, sdfs_create_helper, loc, flags, mode,
  487                            umask, fd, xdata);
  488     if (!stub) {
  489         op_errno = ENOMEM;
  490         goto err;
  491     }
  492 
  493     local = new_frame->local;
  494     local->stub = stub;
  495 
  496     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  497                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  498                local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
  499 
  500     return 0;
  501 err:
  502     STACK_UNWIND_STRICT(create, frame, -1, op_errno, NULL, NULL, NULL, NULL,
  503                         NULL, NULL);
  504 
  505     if (new_frame)
  506         SDFS_STACK_DESTROY(new_frame);
  507 
  508     return 0;
  509 }
  510 
  511 int
  512 sdfs_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  513                 int32_t op_ret, int32_t op_errno, struct iatt *preparent,
  514                 struct iatt *postparent, dict_t *xdata)
  515 {
  516     sdfs_local_t *local = NULL;
  517 
  518     local = frame->local;
  519 
  520     STACK_UNWIND_STRICT(unlink, local->main_frame, op_ret, op_errno, preparent,
  521                         postparent, xdata);
  522 
  523     local->main_frame = NULL;
  524     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  525                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  526                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  527     return 0;
  528 }
  529 
  530 int
  531 sdfs_unlink_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
  532                    dict_t *xdata)
  533 {
  534     sdfs_local_t *local = NULL;
  535     char gfid[GF_UUID_BUF_SIZE] = {0};
  536 
  537     local = frame->local;
  538 
  539     gf_uuid_unparse(loc->pargfid, gfid);
  540 
  541     if (local->op_ret < 0) {
  542         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  543                "Acquiring entry lock failed for directory %s "
  544                "with parent gfid %s",
  545                local->loc.name, gfid);
  546         goto err;
  547     }
  548 
  549     STACK_WIND(frame, sdfs_unlink_cbk, FIRST_CHILD(this),
  550                FIRST_CHILD(this)->fops->unlink, loc, flags, xdata);
  551 
  552     return 0;
  553 err:
  554     STACK_UNWIND_STRICT(unlink, local->main_frame, -1, local->op_errno, NULL,
  555                         NULL, NULL);
  556 
  557     local->main_frame = NULL;
  558     SDFS_STACK_DESTROY(frame);
  559     return 0;
  560 }
  561 
  562 int
  563 sdfs_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
  564             dict_t *xdata)
  565 {
  566     sdfs_local_t *local = NULL;
  567     call_frame_t *new_frame = NULL;
  568     call_stub_t *stub = NULL;
  569     int op_errno = 0;
  570 
  571     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
  572         op_errno = ENOMEM;
  573         goto err;
  574     }
  575 
  576     stub = fop_unlink_stub(new_frame, sdfs_unlink_helper, loc, flags, xdata);
  577     if (!stub) {
  578         op_errno = ENOMEM;
  579         goto err;
  580     }
  581 
  582     local = new_frame->local;
  583     local->stub = stub;
  584 
  585     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  586                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  587                local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
  588 
  589     return 0;
  590 err:
  591     STACK_UNWIND_STRICT(unlink, frame, -1, op_errno, NULL, NULL, NULL);
  592 
  593     if (new_frame)
  594         SDFS_STACK_DESTROY(new_frame);
  595 
  596     return 0;
  597 }
  598 
  599 int
  600 sdfs_symlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  601                  int32_t op_ret, int32_t op_errno, inode_t *inode,
  602                  struct iatt *stbuf, struct iatt *preparent,
  603                  struct iatt *postparent, dict_t *xdata)
  604 {
  605     sdfs_local_t *local = NULL;
  606 
  607     local = frame->local;
  608 
  609     STACK_UNWIND_STRICT(link, local->main_frame, op_ret, op_errno, inode, stbuf,
  610                         preparent, postparent, xdata);
  611 
  612     local->main_frame = NULL;
  613     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  614                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  615                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  616     return 0;
  617 }
  618 
  619 int
  620 sdfs_symlink_helper(call_frame_t *frame, xlator_t *this, const char *linkname,
  621                     loc_t *loc, mode_t umask, dict_t *xdata)
  622 {
  623     sdfs_local_t *local = NULL;
  624     char gfid[GF_UUID_BUF_SIZE] = {0};
  625 
  626     local = frame->local;
  627 
  628     gf_uuid_unparse(loc->pargfid, gfid);
  629 
  630     if (local->op_ret < 0) {
  631         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  632                "Acquiring entry lock failed for directory %s "
  633                "with parent gfid %s",
  634                local->loc.name, gfid);
  635         goto err;
  636     }
  637 
  638     STACK_WIND(frame, sdfs_symlink_cbk, FIRST_CHILD(this),
  639                FIRST_CHILD(this)->fops->symlink, linkname, loc, umask, xdata);
  640 
  641     return 0;
  642 err:
  643     STACK_UNWIND_STRICT(link, local->main_frame, -1, local->op_errno, NULL,
  644                         NULL, NULL, NULL, NULL);
  645 
  646     local->main_frame = NULL;
  647     SDFS_STACK_DESTROY(frame);
  648     return 0;
  649 }
  650 
  651 int
  652 sdfs_symlink(call_frame_t *frame, xlator_t *this, const char *linkname,
  653              loc_t *loc, mode_t umask, dict_t *xdata)
  654 {
  655     sdfs_local_t *local = NULL;
  656     call_frame_t *new_frame = NULL;
  657     call_stub_t *stub = NULL;
  658     int op_errno = 0;
  659 
  660     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
  661         op_errno = ENOMEM;
  662         goto err;
  663     }
  664 
  665     stub = fop_symlink_stub(new_frame, sdfs_symlink_helper, linkname, loc,
  666                             umask, xdata);
  667     if (!stub) {
  668         op_errno = ENOMEM;
  669         goto err;
  670     }
  671 
  672     local = new_frame->local;
  673     local->stub = stub;
  674 
  675     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  676                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  677                local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
  678 
  679     return 0;
  680 err:
  681     STACK_UNWIND_STRICT(link, frame, -1, op_errno, NULL, NULL, NULL, NULL,
  682                         NULL);
  683 
  684     if (new_frame)
  685         SDFS_STACK_DESTROY(new_frame);
  686 
  687     return 0;
  688 }
  689 
  690 int
  691 sdfs_common_entrylk_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  692                         int32_t op_ret, int32_t op_errno, dict_t *xdata)
  693 {
  694     sdfs_local_t *local = NULL;
  695     int this_call_cnt = 0;
  696     int lk_index = 0;
  697     sdfs_lock_t *locks = NULL;
  698     call_stub_t *stub = NULL;
  699 
  700     local = frame->local;
  701     locks = local->lock;
  702     lk_index = (long)cookie;
  703 
  704     if (op_ret < 0) {
  705         local->op_ret = op_ret;
  706         local->op_errno = op_errno;
  707     } else {
  708         locks->entrylk->locked[lk_index] = _gf_true;
  709     }
  710 
  711     this_call_cnt = sdfs_frame_return(frame);
  712     if (this_call_cnt > 0) {
  713         gf_log(this->name, GF_LOG_DEBUG,
  714                "As there are more callcnt (%d) returning without WIND",
  715                this_call_cnt);
  716         return 0;
  717     }
  718 
  719     if (local->stub) {
  720         stub = local->stub;
  721         local->stub = NULL;
  722         call_resume(stub);
  723     } else {
  724         if (local->op_ret < 0)
  725             gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  726                    "unlocking entry lock failed ");
  727         SDFS_STACK_DESTROY(frame);
  728     }
  729 
  730     return 0;
  731 }
  732 
  733 int
  734 sdfs_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
  735               int32_t op_errno, inode_t *inode, struct iatt *stbuf,
  736               struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
  737 {
  738     sdfs_local_t *local = NULL;
  739     sdfs_lock_t *lock = NULL;
  740     int i = 0;
  741     int lock_count = 0;
  742 
  743     local = frame->local;
  744     lock = local->lock;
  745 
  746     STACK_UNWIND_STRICT(link, local->main_frame, op_ret, op_errno, inode, stbuf,
  747                         preparent, postparent, xdata);
  748 
  749     local->main_frame = NULL;
  750     lock_count = lock->lock_count;
  751     for (i = 0; i < lock_count; i++) {
  752         STACK_WIND_COOKIE(frame, sdfs_common_entrylk_cbk, (void *)(long)i,
  753                           FIRST_CHILD(this), FIRST_CHILD(this)->fops->entrylk,
  754                           this->name, &lock->entrylk[i].parent_loc,
  755                           lock->entrylk[i].basename, ENTRYLK_UNLOCK,
  756                           ENTRYLK_WRLCK, xdata);
  757     }
  758 
  759     return 0;
  760 }
  761 
  762 int
  763 sdfs_link_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
  764                  loc_t *newloc, dict_t *xdata)
  765 {
  766     sdfs_local_t *local = NULL;
  767     sdfs_lock_t *locks = NULL;
  768     gf_boolean_t stack_destroy = _gf_true;
  769     int lock_count = 0;
  770     int i = 0;
  771 
  772     local = frame->local;
  773     locks = local->lock;
  774 
  775     if (local->op_ret < 0) {
  776         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  777                "Acquiring entry lock failed");
  778         goto err;
  779     }
  780 
  781     STACK_WIND(frame, sdfs_link_cbk, FIRST_CHILD(this),
  782                FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
  783 
  784     return 0;
  785 err:
  786     STACK_UNWIND_STRICT(link, local->main_frame, -1, local->op_errno, NULL,
  787                         NULL, NULL, NULL, NULL);
  788 
  789     local->main_frame = NULL;
  790     for (i = 0; i < locks->lock_count && locks->entrylk->locked[i]; i++) {
  791         lock_count++;
  792     }
  793     GF_ATOMIC_INIT(local->call_cnt, lock_count);
  794 
  795     for (i = 0; i < lock_count; i++) {
  796         if (!locks->entrylk->locked[i]) {
  797             lock_count++;
  798             continue;
  799         }
  800 
  801         stack_destroy = _gf_false;
  802         STACK_WIND(frame, sdfs_common_entrylk_cbk, FIRST_CHILD(this),
  803                    FIRST_CHILD(this)->fops->entrylk, this->name,
  804                    &locks->entrylk[i].parent_loc, locks->entrylk[i].basename,
  805                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  806     }
  807 
  808     if (stack_destroy)
  809         SDFS_STACK_DESTROY(frame);
  810 
  811     return 0;
  812 }
  813 
  814 static int
  815 sdfs_init_entry_lock(sdfs_entry_lock_t *lock, loc_t *loc)
  816 {
  817     int ret = 0;
  818 
  819     ret = sdfs_build_parent_loc(&lock->parent_loc, loc);
  820     if (ret)
  821         return -1;
  822 
  823     lock->basename = gf_strdup(loc->name);
  824     if (!lock->basename)
  825         return -1;
  826 
  827     return 0;
  828 }
  829 
  830 int
  831 sdfs_entry_lock_cmp(const void *l1, const void *l2)
  832 {
  833     const sdfs_entry_lock_t *r1 = l1;
  834     const sdfs_entry_lock_t *r2 = l2;
  835     int ret = 0;
  836     uuid_t gfid1 = {0};
  837     uuid_t gfid2 = {0};
  838 
  839     loc_gfid((loc_t *)&r1->parent_loc, gfid1);
  840     loc_gfid((loc_t *)&r2->parent_loc, gfid2);
  841     ret = gf_uuid_compare(gfid1, gfid2);
  842     /*Entrylks with NULL basename are the 'smallest'*/
  843     if (ret == 0) {
  844         if (!r1->basename)
  845             return -1;
  846         if (!r2->basename)
  847             return 1;
  848         ret = strcmp(r1->basename, r2->basename);
  849     }
  850 
  851     if (ret <= 0)
  852         return -1;
  853     else
  854         return 1;
  855 }
  856 
  857 int
  858 sdfs_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
  859           dict_t *xdata)
  860 {
  861     sdfs_local_t *local = NULL;
  862     call_frame_t *new_frame = NULL;
  863     call_stub_t *stub = NULL;
  864     sdfs_lock_t *lock = NULL;
  865     client_t *client = NULL;
  866     int ret = 0;
  867     int op_errno = ENOMEM;
  868 
  869     new_frame = copy_frame(frame);
  870     if (!new_frame) {
  871         op_errno = ENOMEM;
  872         goto err;
  873     }
  874     /*Set unique lk-owner for the fop*/
  875     set_lk_owner_from_ptr(&new_frame->root->lk_owner, new_frame->root);
  876 
  877     gf_client_ref(client);
  878     new_frame->root->client = client;
  879     local = sdfs_local_init(new_frame, this);
  880     if (!local) {
  881         op_errno = ENOMEM;
  882         goto err;
  883     }
  884 
  885     local->main_frame = frame;
  886 
  887     lock = GF_CALLOC(1, sizeof(*lock), gf_common_mt_char);
  888     if (!lock)
  889         goto err;
  890 
  891     local->lock = lock;
  892 
  893     ret = sdfs_init_entry_lock(&lock->entrylk[0], newloc);
  894     if (ret)
  895         goto err;
  896 
  897     ++lock->lock_count;
  898 
  899     local->lock = lock;
  900     GF_ATOMIC_INIT(local->call_cnt, lock->lock_count);
  901 
  902     ret = loc_copy(&local->loc, newloc);
  903     if (ret == -1) {
  904         op_errno = ENOMEM;
  905         goto err;
  906     }
  907 
  908     stub = fop_link_stub(new_frame, sdfs_link_helper, oldloc, newloc, xdata);
  909     if (!stub) {
  910         op_errno = ENOMEM;
  911         goto err;
  912     }
  913 
  914     local->stub = stub;
  915 
  916     STACK_WIND_COOKIE(new_frame, sdfs_common_entrylk_cbk, 0, FIRST_CHILD(this),
  917                       FIRST_CHILD(this)->fops->entrylk, this->name,
  918                       &lock->entrylk[0].parent_loc, lock->entrylk[0].basename,
  919                       ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
  920 
  921     return 0;
  922 err:
  923 
  924     STACK_UNWIND_STRICT(link, frame, -1, op_errno, NULL, NULL, NULL, NULL,
  925                         NULL);
  926 
  927     if (new_frame)
  928         SDFS_STACK_DESTROY(new_frame);
  929 
  930     return 0;
  931 }
  932 
  933 int
  934 sdfs_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  935                int32_t op_ret, int32_t op_errno, inode_t *inode,
  936                struct iatt *stbuf, struct iatt *preparent,
  937                struct iatt *postparent, dict_t *xdata)
  938 {
  939     sdfs_local_t *local = NULL;
  940 
  941     local = frame->local;
  942 
  943     STACK_UNWIND_STRICT(mknod, local->main_frame, op_ret, op_errno, inode,
  944                         stbuf, preparent, postparent, xdata);
  945 
  946     local->main_frame = NULL;
  947     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
  948                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
  949                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
  950     return 0;
  951 }
  952 
  953 int
  954 sdfs_mknod_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
  955                   dev_t rdev, mode_t umask, dict_t *xdata)
  956 {
  957     sdfs_local_t *local = NULL;
  958     char gfid[GF_UUID_BUF_SIZE] = {0};
  959 
  960     local = frame->local;
  961 
  962     gf_uuid_unparse(loc->pargfid, gfid);
  963 
  964     if (local->op_ret < 0) {
  965         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
  966                "Acquiring entry lock failed for directory %s "
  967                "with parent gfid %s",
  968                local->loc.name, gfid);
  969         goto err;
  970     }
  971 
  972     STACK_WIND(frame, sdfs_mknod_cbk, FIRST_CHILD(this),
  973                FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata);
  974 
  975     return 0;
  976 err:
  977     STACK_UNWIND_STRICT(mknod, local->main_frame, -1, local->op_errno, NULL,
  978                         NULL, NULL, NULL, NULL);
  979 
  980     local->main_frame = NULL;
  981     SDFS_STACK_DESTROY(frame);
  982     return 0;
  983 }
  984 
  985 int
  986 sdfs_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
  987            dev_t rdev, mode_t umask, dict_t *xdata)
  988 {
  989     sdfs_local_t *local = NULL;
  990     call_frame_t *new_frame = NULL;
  991     call_stub_t *stub = NULL;
  992     int op_errno = 0;
  993 
  994     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
  995         op_errno = ENOMEM;
  996         goto err;
  997     }
  998 
  999     stub = fop_mknod_stub(new_frame, sdfs_mknod_helper, loc, mode, rdev, umask,
 1000                           xdata);
 1001     if (!stub) {
 1002         op_errno = ENOMEM;
 1003         goto err;
 1004     }
 1005 
 1006     local = new_frame->local;
 1007     local->stub = stub;
 1008 
 1009     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
 1010                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
 1011                local->loc.name, ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
 1012 
 1013     return 0;
 1014 err:
 1015     STACK_UNWIND_STRICT(mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL,
 1016                         NULL);
 1017 
 1018     if (new_frame)
 1019         SDFS_STACK_DESTROY(new_frame);
 1020 
 1021     return 0;
 1022 }
 1023 
 1024 int
 1025 sdfs_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 1026                 int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
 1027                 struct iatt *preoldparent, struct iatt *postoldparent,
 1028                 struct iatt *prenewparent, struct iatt *postnewparent,
 1029                 dict_t *xdata)
 1030 {
 1031     sdfs_local_t *local = NULL;
 1032     sdfs_lock_t *lock = NULL;
 1033     int i = 0;
 1034     int call_cnt = 0;
 1035 
 1036     local = frame->local;
 1037     lock = local->lock;
 1038     GF_ATOMIC_INIT(local->call_cnt, lock->lock_count);
 1039 
 1040     STACK_UNWIND_STRICT(rename, local->main_frame, op_ret, op_errno, stbuf,
 1041                         preoldparent, postoldparent, prenewparent,
 1042                         postnewparent, xdata);
 1043 
 1044     local->main_frame = NULL;
 1045     call_cnt = GF_ATOMIC_GET(local->call_cnt);
 1046 
 1047     for (i = 0; i < call_cnt; i++) {
 1048         STACK_WIND_COOKIE(frame, sdfs_common_entrylk_cbk, (void *)(long)i,
 1049                           FIRST_CHILD(this), FIRST_CHILD(this)->fops->entrylk,
 1050                           this->name, &lock->entrylk[i].parent_loc,
 1051                           lock->entrylk[i].basename, ENTRYLK_UNLOCK,
 1052                           ENTRYLK_WRLCK, xdata);
 1053     }
 1054 
 1055     return 0;
 1056 }
 1057 
 1058 int
 1059 sdfs_rename_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
 1060                    loc_t *newloc, dict_t *xdata)
 1061 {
 1062     sdfs_local_t *local = NULL;
 1063     sdfs_lock_t *lock = NULL;
 1064     gf_boolean_t stack_destroy = _gf_true;
 1065     int lock_count = 0;
 1066     int i = 0;
 1067 
 1068     local = frame->local;
 1069     lock = local->lock;
 1070 
 1071     if (local->op_ret < 0) {
 1072         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
 1073                "Acquiring entry lock failed ");
 1074         goto err;
 1075     }
 1076 
 1077     STACK_WIND(frame, sdfs_rename_cbk, FIRST_CHILD(this),
 1078                FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
 1079 
 1080     return 0;
 1081 
 1082 err:
 1083     STACK_UNWIND_STRICT(rename, local->main_frame, -1, local->op_errno, NULL,
 1084                         NULL, NULL, NULL, NULL, NULL);
 1085 
 1086     local->main_frame = NULL;
 1087     for (i = 0; i < lock->lock_count && lock->entrylk->locked[i]; i++) {
 1088         lock_count++;
 1089     }
 1090     GF_ATOMIC_INIT(local->call_cnt, lock_count);
 1091 
 1092     for (i = 0; i < lock_count; i++) {
 1093         if (!lock->entrylk->locked[i]) {
 1094             lock_count++;
 1095             continue;
 1096         }
 1097         stack_destroy = _gf_false;
 1098         STACK_WIND(frame, sdfs_common_entrylk_cbk, FIRST_CHILD(this),
 1099                    FIRST_CHILD(this)->fops->entrylk, this->name,
 1100                    &lock->entrylk[i].parent_loc, lock->entrylk[i].basename,
 1101                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
 1102     }
 1103 
 1104     if (stack_destroy)
 1105         SDFS_STACK_DESTROY(frame);
 1106 
 1107     return 0;
 1108 }
 1109 
 1110 int
 1111 sdfs_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
 1112             dict_t *xdata)
 1113 {
 1114     sdfs_local_t *local = NULL;
 1115     sdfs_lock_t *lock = NULL;
 1116     call_frame_t *new_frame = NULL;
 1117     call_stub_t *stub = NULL;
 1118     client_t *client = NULL;
 1119     int ret = 0;
 1120     int op_errno = ENOMEM;
 1121     int i = 0;
 1122     int call_cnt = 0;
 1123 
 1124     new_frame = copy_frame(frame);
 1125     if (!new_frame) {
 1126         op_errno = ENOMEM;
 1127         goto err;
 1128     }
 1129     /*Set unique lk-owner for the fop*/
 1130     set_lk_owner_from_ptr(&new_frame->root->lk_owner, new_frame->root);
 1131 
 1132     gf_client_ref(client);
 1133     new_frame->root->client = client;
 1134     local = sdfs_local_init(new_frame, this);
 1135     if (!local) {
 1136         op_errno = ENOMEM;
 1137         goto err;
 1138     }
 1139 
 1140     local->main_frame = frame;
 1141 
 1142     lock = GF_CALLOC(1, sizeof(*lock), gf_common_mt_char);
 1143     if (!lock)
 1144         goto err;
 1145 
 1146     local->lock = lock;
 1147 
 1148     ret = sdfs_init_entry_lock(&lock->entrylk[0], oldloc);
 1149     if (ret)
 1150         goto err;
 1151     lock->entrylk->locked[0] = _gf_false;
 1152 
 1153     ++lock->lock_count;
 1154 
 1155     ret = sdfs_init_entry_lock(&lock->entrylk[1], newloc);
 1156     if (ret)
 1157         goto err;
 1158     lock->entrylk->locked[1] = _gf_false;
 1159 
 1160     ++lock->lock_count;
 1161 
 1162     qsort(lock->entrylk, lock->lock_count, sizeof(*lock->entrylk),
 1163           sdfs_entry_lock_cmp);
 1164 
 1165     local->lock = lock;
 1166     GF_ATOMIC_INIT(local->call_cnt, lock->lock_count);
 1167 
 1168     stub = fop_rename_stub(new_frame, sdfs_rename_helper, oldloc, newloc,
 1169                            xdata);
 1170     if (!stub) {
 1171         op_errno = ENOMEM;
 1172         goto err;
 1173     }
 1174 
 1175     local->stub = stub;
 1176     call_cnt = GF_ATOMIC_GET(local->call_cnt);
 1177     for (i = 0; i < call_cnt; i++) {
 1178         STACK_WIND_COOKIE(new_frame, sdfs_common_entrylk_cbk, (void *)(long)i,
 1179                           FIRST_CHILD(this), FIRST_CHILD(this)->fops->entrylk,
 1180                           this->name, &lock->entrylk[i].parent_loc,
 1181                           lock->entrylk[i].basename, ENTRYLK_LOCK,
 1182                           ENTRYLK_WRLCK, xdata);
 1183     }
 1184 
 1185     return 0;
 1186 err:
 1187 
 1188     STACK_UNWIND_STRICT(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
 1189                         NULL, NULL);
 1190 
 1191     if (new_frame)
 1192         SDFS_STACK_DESTROY(new_frame);
 1193 
 1194     return 0;
 1195 }
 1196 
 1197 int
 1198 sdfs_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 1199                 int32_t op_ret, int32_t op_errno, inode_t *inode,
 1200                 struct iatt *stbuf, dict_t *xdata, struct iatt *postparent)
 1201 {
 1202     sdfs_local_t *local = NULL;
 1203 
 1204     local = frame->local;
 1205 
 1206     if (!local->loc.parent) {
 1207         sdfs_local_cleanup(local);
 1208         frame->local = NULL;
 1209         STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, stbuf,
 1210                             xdata, postparent);
 1211         return 0;
 1212     }
 1213 
 1214     STACK_UNWIND_STRICT(lookup, local->main_frame, op_ret, op_errno, inode,
 1215                         stbuf, xdata, postparent);
 1216 
 1217     local->main_frame = NULL;
 1218     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
 1219                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
 1220                local->loc.name, ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
 1221     return 0;
 1222 }
 1223 
 1224 int
 1225 sdfs_lookup_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
 1226                    dict_t *xdata)
 1227 {
 1228     sdfs_local_t *local = NULL;
 1229     char gfid[GF_UUID_BUF_SIZE] = {0};
 1230 
 1231     local = frame->local;
 1232 
 1233     gf_uuid_unparse(loc->pargfid, gfid);
 1234 
 1235     if (local->op_ret < 0) {
 1236         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
 1237                "Acquiring entry lock failed for directory %s "
 1238                "with parent gfid %s",
 1239                local->loc.name, gfid);
 1240         goto err;
 1241     }
 1242 
 1243     STACK_WIND(frame, sdfs_lookup_cbk, FIRST_CHILD(this),
 1244                FIRST_CHILD(this)->fops->lookup, loc, xdata);
 1245 
 1246     return 0;
 1247 err:
 1248     STACK_UNWIND_STRICT(lookup, local->main_frame, -1, local->op_errno, NULL,
 1249                         NULL, NULL, NULL);
 1250     local->main_frame = NULL;
 1251 
 1252     SDFS_STACK_DESTROY(frame);
 1253     return 0;
 1254 }
 1255 
 1256 int
 1257 sdfs_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
 1258 {
 1259     sdfs_local_t *local = NULL;
 1260     call_frame_t *new_frame = NULL;
 1261     call_stub_t *stub = NULL;
 1262     int op_errno = 0;
 1263 
 1264     if (!loc->parent) {
 1265         local = sdfs_local_init(frame, this);
 1266         if (!local) {
 1267             op_errno = ENOMEM;
 1268             goto err;
 1269         }
 1270 
 1271         STACK_WIND_TAIL(frame, FIRST_CHILD(this),
 1272                         FIRST_CHILD(this)->fops->lookup, loc, xdata);
 1273         return 0;
 1274     }
 1275 
 1276     if (-1 == sdfs_get_new_frame(frame, loc, &new_frame)) {
 1277         op_errno = ENOMEM;
 1278         goto err;
 1279     }
 1280 
 1281     stub = fop_lookup_stub(new_frame, sdfs_lookup_helper, loc, xdata);
 1282     if (!stub) {
 1283         op_errno = ENOMEM;
 1284         goto err;
 1285     }
 1286 
 1287     local = new_frame->local;
 1288     local->stub = stub;
 1289 
 1290     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
 1291                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
 1292                local->loc.name, ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
 1293 
 1294     return 0;
 1295 
 1296 err:
 1297     STACK_UNWIND_STRICT(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
 1298 
 1299     if (new_frame)
 1300         SDFS_STACK_DESTROY(new_frame);
 1301 
 1302     return 0;
 1303 }
 1304 
 1305 int32_t
 1306 sdfs_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 1307                   int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
 1308                   dict_t *xdata)
 1309 {
 1310     sdfs_local_t *local = NULL;
 1311 
 1312     local = frame->local;
 1313     STACK_UNWIND_STRICT(readdirp, local->main_frame, op_ret, op_errno, entries,
 1314                         xdata);
 1315 
 1316     local->main_frame = NULL;
 1317     STACK_WIND(frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
 1318                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
 1319                NULL, ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
 1320     return 0;
 1321 }
 1322 
 1323 int32_t
 1324 sdfs_readdirp_helper(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
 1325                      off_t off, dict_t *xdata)
 1326 {
 1327     sdfs_local_t *local = NULL;
 1328     char gfid[GF_UUID_BUF_SIZE] = {0};
 1329 
 1330     local = frame->local;
 1331 
 1332     gf_uuid_unparse(fd->inode->gfid, gfid);
 1333 
 1334     if (local->op_ret < 0) {
 1335         gf_msg(this->name, GF_LOG_ERROR, 0, SDFS_MSG_ENTRYLK_ERROR,
 1336                "Acquiring entry lock failed for directory %s "
 1337                "with parent gfid %s",
 1338                local->loc.name, gfid);
 1339         goto err;
 1340     }
 1341 
 1342     STACK_WIND(frame, sdfs_readdirp_cbk, FIRST_CHILD(this),
 1343                FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
 1344 
 1345     return 0;
 1346 err:
 1347     STACK_UNWIND_STRICT(readdirp, local->main_frame, -1, local->op_errno, NULL,
 1348                         NULL);
 1349 
 1350     local->main_frame = NULL;
 1351 
 1352     SDFS_STACK_DESTROY(frame);
 1353     return 0;
 1354 }
 1355 
 1356 int32_t
 1357 sdfs_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
 1358               off_t off, dict_t *xdata)
 1359 {
 1360     sdfs_local_t *local = NULL;
 1361     call_frame_t *new_frame = NULL;
 1362     call_stub_t *stub = NULL;
 1363     int op_errno = 0;
 1364 
 1365     if (-1 == sdfs_get_new_frame_readdirp(frame, fd, &new_frame)) {
 1366         op_errno = ENOMEM;
 1367         goto err;
 1368     }
 1369 
 1370     stub = fop_readdirp_stub(new_frame, sdfs_readdirp_helper, fd, size, off,
 1371                              xdata);
 1372     if (!stub) {
 1373         op_errno = ENOMEM;
 1374         goto err;
 1375     }
 1376 
 1377     local = new_frame->local;
 1378     local->stub = stub;
 1379 
 1380     STACK_WIND(new_frame, sdfs_entrylk_cbk, FIRST_CHILD(this),
 1381                FIRST_CHILD(this)->fops->entrylk, this->name, &local->parent_loc,
 1382                NULL, ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
 1383 
 1384     return 0;
 1385 
 1386 err:
 1387     STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL);
 1388 
 1389     if (new_frame)
 1390         SDFS_STACK_DESTROY(new_frame);
 1391 
 1392     return 0;
 1393 }
 1394 
 1395 int
 1396 init(xlator_t *this)
 1397 {
 1398     int ret = -1;
 1399 
 1400     if (!this->children || this->children->next) {
 1401         gf_log(this->name, GF_LOG_ERROR,
 1402                "'dentry-fop-serializer' not configured with exactly one child");
 1403         goto out;
 1404     }
 1405 
 1406     if (!this->parents) {
 1407         gf_log(this->name, GF_LOG_WARNING, "dangling volume. check volfile ");
 1408     }
 1409 
 1410     this->local_pool = mem_pool_new(sdfs_local_t, 512);
 1411     if (!this->local_pool) {
 1412         goto out;
 1413     }
 1414 
 1415     GF_OPTION_INIT("pass-through", this->pass_through, bool, out);
 1416 
 1417     ret = 0;
 1418 
 1419 out:
 1420     return ret;
 1421 }
 1422 
 1423 int
 1424 reconfigure(xlator_t *this, dict_t *options)
 1425 {
 1426     int ret = -1;
 1427 
 1428     GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, out);
 1429 
 1430     ret = 0;
 1431 out:
 1432     return ret;
 1433 }
 1434 
 1435 void
 1436 fini(xlator_t *this)
 1437 {
 1438     mem_pool_destroy(this->local_pool);
 1439     this->local_pool = NULL;
 1440     return;
 1441 }
 1442 
 1443 struct xlator_fops fops = {
 1444     .mkdir = sdfs_mkdir,
 1445     .rmdir = sdfs_rmdir,
 1446     .create = sdfs_create,
 1447     .unlink = sdfs_unlink,
 1448     .symlink = sdfs_symlink,
 1449     .link = sdfs_link,
 1450     .mknod = sdfs_mknod,
 1451     .rename = sdfs_rename,
 1452     .lookup = sdfs_lookup,
 1453     .readdirp = sdfs_readdirp,
 1454 };
 1455 
 1456 struct xlator_cbks cbks;
 1457 
 1458 struct volume_options options[] = {
 1459     {.key = {"pass-through"},
 1460      .type = GF_OPTION_TYPE_BOOL,
 1461      .default_value = "true",
 1462      .op_version = {GD_OP_VERSION_4_1_0},
 1463      .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
 1464      .tags = {"sdfs"},
 1465      .description = "Enable/Disable dentry serialize functionality"},
 1466     {.key = {NULL}},
 1467 };
 1468 
 1469 xlator_api_t xlator_api = {
 1470     .init = init,
 1471     .fini = fini,
 1472     .reconfigure = reconfigure,
 1473     .op_version = {GD_OP_VERSION_4_0_0},
 1474     .fops = &fops,
 1475     .cbks = &cbks,
 1476     .options = options,
 1477     .identifier = "sdfs",
 1478     .category = GF_TECH_PREVIEW,
 1479 };