"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/marker/src/marker-quota.c" (16 Sep 2020, 59328 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "marker-quota.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 #include <glusterfs/dict.h>
   11 #include <glusterfs/xlator.h>
   12 #include <glusterfs/defaults.h>
   13 #include "libxlator.h"
   14 #include <glusterfs/common-utils.h>
   15 #include <glusterfs/byte-order.h>
   16 #include "marker-quota.h"
   17 #include "marker-quota-helper.h"
   18 #include <glusterfs/syncop.h>
   19 #include <glusterfs/quota-common-utils.h>
   20 
   21 int
   22 mq_loc_copy(loc_t *dst, loc_t *src)
   23 {
   24     int ret = -1;
   25 
   26     GF_VALIDATE_OR_GOTO("marker", dst, out);
   27     GF_VALIDATE_OR_GOTO("marker", src, out);
   28 
   29     if (src->inode == NULL ||
   30         ((src->parent == NULL) && (gf_uuid_is_null(src->pargfid)) &&
   31          !__is_root_gfid(src->inode->gfid))) {
   32         gf_log("marker", GF_LOG_WARNING, "src loc is not valid");
   33         goto out;
   34     }
   35 
   36     ret = loc_copy(dst, src);
   37 out:
   38     return ret;
   39 }
   40 
   41 static void
   42 mq_set_ctx_status(quota_inode_ctx_t *ctx, gf_boolean_t *flag,
   43                   gf_boolean_t status)
   44 {
   45     LOCK(&ctx->lock);
   46     {
   47         *flag = status;
   48     }
   49     UNLOCK(&ctx->lock);
   50 }
   51 
   52 static void
   53 mq_test_and_set_ctx_status(quota_inode_ctx_t *ctx, gf_boolean_t *flag,
   54                            gf_boolean_t *status)
   55 {
   56     gf_boolean_t temp = _gf_false;
   57 
   58     LOCK(&ctx->lock);
   59     {
   60         temp = *status;
   61         *status = *flag;
   62         *flag = temp;
   63     }
   64     UNLOCK(&ctx->lock);
   65 }
   66 
   67 static void
   68 mq_get_ctx_status(quota_inode_ctx_t *ctx, gf_boolean_t *flag,
   69                   gf_boolean_t *status)
   70 {
   71     LOCK(&ctx->lock);
   72     {
   73         *status = *flag;
   74     }
   75     UNLOCK(&ctx->lock);
   76 }
   77 
   78 int32_t
   79 mq_get_ctx_updation_status(quota_inode_ctx_t *ctx, gf_boolean_t *status)
   80 {
   81     GF_VALIDATE_OR_GOTO("marker", ctx, out);
   82     GF_VALIDATE_OR_GOTO("marker", status, out);
   83 
   84     mq_get_ctx_status(ctx, &ctx->updation_status, status);
   85     return 0;
   86 out:
   87     return -1;
   88 }
   89 
   90 int32_t
   91 mq_set_ctx_updation_status(quota_inode_ctx_t *ctx, gf_boolean_t status)
   92 {
   93     GF_VALIDATE_OR_GOTO("marker", ctx, out);
   94 
   95     mq_set_ctx_status(ctx, &ctx->updation_status, status);
   96     return 0;
   97 out:
   98     return -1;
   99 }
  100 
  101 int32_t
  102 mq_test_and_set_ctx_updation_status(quota_inode_ctx_t *ctx,
  103                                     gf_boolean_t *status)
  104 {
  105     GF_VALIDATE_OR_GOTO("marker", ctx, out);
  106     GF_VALIDATE_OR_GOTO("marker", status, out);
  107 
  108     mq_test_and_set_ctx_status(ctx, &ctx->updation_status, status);
  109     return 0;
  110 out:
  111     return -1;
  112 }
  113 
  114 int32_t
  115 mq_set_ctx_create_status(quota_inode_ctx_t *ctx, gf_boolean_t status)
  116 {
  117     GF_VALIDATE_OR_GOTO("marker", ctx, out);
  118 
  119     mq_set_ctx_status(ctx, &ctx->create_status, status);
  120     return 0;
  121 out:
  122     return -1;
  123 }
  124 
  125 int32_t
  126 mq_test_and_set_ctx_create_status(quota_inode_ctx_t *ctx, gf_boolean_t *status)
  127 {
  128     GF_VALIDATE_OR_GOTO("marker", ctx, out);
  129     GF_VALIDATE_OR_GOTO("marker", status, out);
  130 
  131     mq_test_and_set_ctx_status(ctx, &ctx->create_status, status);
  132     return 0;
  133 out:
  134     return -1;
  135 }
  136 
  137 static void
  138 mq_set_ctx_dirty_status(quota_inode_ctx_t *ctx, gf_boolean_t status)
  139 {
  140     GF_VALIDATE_OR_GOTO("marker", ctx, out);
  141 
  142     mq_set_ctx_status(ctx, &ctx->dirty_status, status);
  143 out:
  144     return;
  145 }
  146 
  147 int
  148 mq_build_ancestry(xlator_t *this, loc_t *loc)
  149 {
  150     int32_t ret = -1;
  151     fd_t *fd = NULL;
  152     gf_dirent_t entries;
  153     gf_dirent_t *entry = NULL;
  154     dict_t *xdata = NULL;
  155     inode_t *tmp_parent = NULL;
  156     inode_t *tmp_inode = NULL;
  157     inode_t *linked_inode = NULL;
  158     quota_inode_ctx_t *ctx = NULL;
  159 
  160     INIT_LIST_HEAD(&entries.list);
  161 
  162     xdata = dict_new();
  163     if (xdata == NULL) {
  164         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  165         ret = -ENOMEM;
  166         goto out;
  167     }
  168 
  169     ret = dict_set_int8(xdata, GET_ANCESTRY_DENTRY_KEY, 1);
  170     if (ret < 0)
  171         goto out;
  172 
  173     fd = fd_anonymous(loc->inode);
  174     if (fd == NULL) {
  175         gf_log(this->name, GF_LOG_ERROR, "fd creation failed");
  176         ret = -ENOMEM;
  177         goto out;
  178     }
  179 
  180     fd_bind(fd);
  181 
  182     ret = syncop_readdirp(this, fd, 131072, 0, &entries, xdata, NULL);
  183     if (ret < 0) {
  184         gf_log(this->name,
  185                (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  186                "readdirp failed "
  187                "for %s: %s",
  188                loc->path, strerror(-ret));
  189         goto out;
  190     }
  191 
  192     if (list_empty(&entries.list)) {
  193         ret = -1;
  194         goto out;
  195     }
  196 
  197     list_for_each_entry(entry, &entries.list, list)
  198     {
  199         if (__is_root_gfid(entry->inode->gfid)) {
  200             /* The list contains a sub-list for each possible path
  201              * to the target inode. Each sub-list starts with the
  202              * root entry of the tree and is followed by the child
  203              * entries for a particular path to the target entry.
  204              * The root entry is an implied sub-list delimiter,
  205              * as it denotes we have started processing a new path.
  206              * Reset the parent pointer and continue
  207              */
  208 
  209             tmp_parent = NULL;
  210         } else {
  211             linked_inode = inode_link(entry->inode, tmp_parent, entry->d_name,
  212                                       &entry->d_stat);
  213             if (linked_inode) {
  214                 tmp_inode = entry->inode;
  215                 entry->inode = linked_inode;
  216                 inode_unref(tmp_inode);
  217             } else {
  218                 gf_log(this->name, GF_LOG_ERROR, "inode link failed");
  219                 ret = -EINVAL;
  220                 goto out;
  221             }
  222         }
  223 
  224         ctx = mq_inode_ctx_new(entry->inode, this);
  225         if (ctx == NULL) {
  226             gf_log(this->name, GF_LOG_WARNING,
  227                    "mq_inode_ctx_new "
  228                    "failed for %s",
  229                    uuid_utoa(entry->inode->gfid));
  230             ret = -ENOMEM;
  231             goto out;
  232         }
  233 
  234         /* For non-directory, posix_get_ancestry_non_directory returns
  235          * all hard-links that are represented by nodes adjacent to
  236          * each other in the dentry-list.
  237          * (Unlike the directory case where adjacent nodes either have
  238          *  a parent/child relationship or belong to different paths).
  239          */
  240         if (entry->inode->ia_type == IA_IFDIR)
  241             tmp_parent = entry->inode;
  242     }
  243 
  244     if (loc->parent)
  245         inode_unref(loc->parent);
  246 
  247     loc->parent = inode_parent(loc->inode, 0, NULL);
  248     if (loc->parent == NULL) {
  249         ret = -1;
  250         goto out;
  251     }
  252 
  253     ret = 0;
  254 
  255 out:
  256     gf_dirent_free(&entries);
  257 
  258     if (fd)
  259         fd_unref(fd);
  260 
  261     if (xdata)
  262         dict_unref(xdata);
  263 
  264     return ret;
  265 }
  266 
  267 /* This function should be used only in inspect_directory and inspect_file
  268  * function to heal quota xattrs.
  269  * Inode quota feature is introduced in 3.7.
  270  * If gluster setup is upgraded from 3.6 to 3.7, there can be a
  271  * getxattr and setxattr spikes with quota heal as inode quota is missing.
  272  * So this wrapper function is to avoid xattrs spikes during upgrade.
  273  * This function returns success even is inode-quota xattrs are missing and
  274  * hence no healing performed.
  275  */
  276 static int32_t
  277 _quota_dict_get_meta(xlator_t *this, dict_t *dict, char *key, const int keylen,
  278                      quota_meta_t *meta, ia_type_t ia_type,
  279                      gf_boolean_t add_delta)
  280 {
  281     int32_t ret = 0;
  282     marker_conf_t *priv = NULL;
  283 
  284     priv = this->private;
  285 
  286     ret = quota_dict_get_inode_meta(dict, key, keylen, meta);
  287     if (ret == -2 && (priv->feature_enabled & GF_INODE_QUOTA) == 0) {
  288         /* quota_dict_get_inode_meta returns -2 if
  289          * inode quota xattrs are not present.
  290          * if inode quota self heal is turned off,
  291          * then we should skip healing inode quotas
  292          */
  293 
  294         gf_log(this->name, GF_LOG_DEBUG,
  295                "inode quota disabled. "
  296                "inode quota self heal will not be performed");
  297         ret = 0;
  298         if (add_delta) {
  299             if (ia_type == IA_IFDIR)
  300                 meta->dir_count = 1;
  301             else
  302                 meta->file_count = 1;
  303         }
  304     }
  305 
  306     return ret;
  307 }
  308 
  309 int32_t
  310 quota_dict_set_size_meta(xlator_t *this, dict_t *dict, const quota_meta_t *meta)
  311 {
  312     int32_t ret = -ENOMEM;
  313     quota_meta_t *value = NULL;
  314     char size_key[QUOTA_KEY_MAX] = {
  315         0,
  316     };
  317 
  318     value = GF_MALLOC(2 * sizeof(quota_meta_t), gf_common_quota_meta_t);
  319     if (value == NULL) {
  320         goto out;
  321     }
  322     value[0].size = hton64(meta->size);
  323     value[0].file_count = hton64(meta->file_count);
  324     value[0].dir_count = hton64(meta->dir_count);
  325 
  326     value[1].size = 0;
  327     value[1].file_count = 0;
  328     value[1].dir_count = hton64(1);
  329 
  330     GET_SIZE_KEY(this, size_key, ret);
  331     if (ret < 0)
  332         goto out;
  333     ret = dict_set_bin(dict, size_key, value, (sizeof(quota_meta_t) * 2));
  334     if (ret < 0) {
  335         gf_log_callingfn("quota", GF_LOG_ERROR, "dict set failed");
  336         GF_FREE(value);
  337     }
  338 out:
  339     return ret;
  340 }
  341 
  342 void
  343 mq_compute_delta(quota_meta_t *delta, const quota_meta_t *op1,
  344                  const quota_meta_t *op2)
  345 {
  346     delta->size = op1->size - op2->size;
  347     delta->file_count = op1->file_count - op2->file_count;
  348     delta->dir_count = op1->dir_count - op2->dir_count;
  349 }
  350 
  351 void
  352 mq_add_meta(quota_meta_t *dst, const quota_meta_t *src)
  353 {
  354     dst->size += src->size;
  355     dst->file_count += src->file_count;
  356     dst->dir_count += src->dir_count;
  357 }
  358 
  359 void
  360 mq_sub_meta(quota_meta_t *dst, const quota_meta_t *src)
  361 {
  362     if (src == NULL) {
  363         dst->size = -dst->size;
  364         dst->file_count = -dst->file_count;
  365         dst->dir_count = -dst->dir_count;
  366     } else {
  367         dst->size = src->size - dst->size;
  368         dst->file_count = src->file_count - dst->file_count;
  369         dst->dir_count = src->dir_count - dst->dir_count;
  370     }
  371 }
  372 
  373 int32_t
  374 mq_are_xattrs_set(xlator_t *this, loc_t *loc, gf_boolean_t *contri_set,
  375                   gf_boolean_t *size_set)
  376 {
  377     int32_t ret = -1;
  378     char contri_key[QUOTA_KEY_MAX] = {
  379         0,
  380     };
  381     char size_key[QUOTA_KEY_MAX] = {
  382         0,
  383     };
  384     quota_meta_t meta = {
  385         0,
  386     };
  387     struct iatt stbuf = {
  388         0,
  389     };
  390     dict_t *dict = NULL;
  391     dict_t *rsp_dict = NULL;
  392 
  393     dict = dict_new();
  394     if (dict == NULL) {
  395         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  396         goto out;
  397     }
  398 
  399     ret = mq_req_xattr(this, loc, dict, contri_key, size_key);
  400     if (ret < 0)
  401         goto out;
  402 
  403     ret = syncop_lookup(FIRST_CHILD(this), loc, &stbuf, NULL, dict, &rsp_dict);
  404     if (ret < 0) {
  405         gf_log_callingfn(
  406             this->name,
  407             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  408             "lookup failed "
  409             "for %s: %s",
  410             loc->path, strerror(-ret));
  411         goto out;
  412     }
  413 
  414     if (rsp_dict == NULL)
  415         goto out;
  416 
  417     *contri_set = _gf_true;
  418     *size_set = _gf_true;
  419     if (loc->inode->ia_type == IA_IFDIR) {
  420         ret = quota_dict_get_inode_meta(rsp_dict, size_key, strlen(size_key),
  421                                         &meta);
  422         if (ret < 0 || meta.dir_count == 0)
  423             *size_set = _gf_false;
  424     }
  425 
  426     if (!loc_is_root(loc)) {
  427         ret = quota_dict_get_inode_meta(rsp_dict, contri_key,
  428                                         strlen(contri_key), &meta);
  429         if (ret < 0)
  430             *contri_set = _gf_false;
  431     }
  432 
  433     ret = 0;
  434 out:
  435     if (dict)
  436         dict_unref(dict);
  437 
  438     if (rsp_dict)
  439         dict_unref(rsp_dict);
  440 
  441     return ret;
  442 }
  443 
  444 int32_t
  445 mq_create_size_xattrs(xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc)
  446 {
  447     int32_t ret = -1;
  448     quota_meta_t size = {
  449         0,
  450     };
  451     dict_t *dict = NULL;
  452 
  453     GF_VALIDATE_OR_GOTO("marker", loc, out);
  454     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  455 
  456     if (loc->inode->ia_type != IA_IFDIR) {
  457         ret = 0;
  458         goto out;
  459     }
  460 
  461     dict = dict_new();
  462     if (!dict) {
  463         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  464         ret = -1;
  465         goto out;
  466     }
  467 
  468     ret = quota_dict_set_size_meta(this, dict, &size);
  469     if (ret < 0)
  470         goto out;
  471 
  472     ret = syncop_xattrop(FIRST_CHILD(this), loc,
  473                          GF_XATTROP_ADD_ARRAY64_WITH_DEFAULT, dict, NULL, NULL,
  474                          NULL);
  475 
  476     if (ret < 0) {
  477         gf_log_callingfn(
  478             this->name,
  479             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  480             "xattrop failed "
  481             "for %s: %s",
  482             loc->path, strerror(-ret));
  483         goto out;
  484     }
  485 
  486 out:
  487     if (dict)
  488         dict_unref(dict);
  489 
  490     return ret;
  491 }
  492 
  493 int32_t
  494 mq_lock(xlator_t *this, loc_t *loc, short l_type)
  495 {
  496     struct gf_flock lock = {
  497         0,
  498     };
  499     int32_t ret = -1;
  500 
  501     GF_VALIDATE_OR_GOTO("marker", loc, out);
  502     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  503 
  504     gf_log(this->name, GF_LOG_DEBUG, "set lock type %d on %s", l_type,
  505            loc->path);
  506 
  507     lock.l_len = 0;
  508     lock.l_start = 0;
  509     lock.l_type = l_type;
  510     lock.l_whence = SEEK_SET;
  511 
  512     ret = syncop_inodelk(FIRST_CHILD(this), this->name, loc, F_SETLKW, &lock,
  513                          NULL, NULL);
  514     if (ret < 0)
  515         gf_log_callingfn(
  516             this->name,
  517             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  518             "inodelk failed "
  519             "for %s: %s",
  520             loc->path, strerror(-ret));
  521 
  522 out:
  523 
  524     return ret;
  525 }
  526 
  527 int32_t
  528 mq_get_dirty(xlator_t *this, loc_t *loc, int32_t *dirty)
  529 {
  530     int32_t ret = -1;
  531     int8_t value = 0;
  532     dict_t *dict = NULL;
  533     dict_t *rsp_dict = NULL;
  534     struct iatt stbuf = {
  535         0,
  536     };
  537 
  538     dict = dict_new();
  539     if (dict == NULL) {
  540         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  541         goto out;
  542     }
  543 
  544     ret = dict_set_int64(dict, QUOTA_DIRTY_KEY, 0);
  545     if (ret < 0) {
  546         gf_log(this->name, GF_LOG_WARNING, "dict set failed");
  547         goto out;
  548     }
  549 
  550     ret = syncop_lookup(FIRST_CHILD(this), loc, &stbuf, NULL, dict, &rsp_dict);
  551     if (ret < 0) {
  552         gf_log_callingfn(
  553             this->name,
  554             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  555             "lookup failed "
  556             "for %s: %s",
  557             loc->path, strerror(-ret));
  558         goto out;
  559     }
  560 
  561     ret = dict_get_int8(rsp_dict, QUOTA_DIRTY_KEY, &value);
  562     if (ret < 0)
  563         goto out;
  564 
  565     *dirty = value;
  566 
  567 out:
  568     if (dict)
  569         dict_unref(dict);
  570 
  571     if (rsp_dict)
  572         dict_unref(rsp_dict);
  573 
  574     return ret;
  575 }
  576 
  577 int32_t
  578 mq_get_set_dirty(xlator_t *this, loc_t *loc, int32_t dirty, int32_t *prev_dirty)
  579 {
  580     int32_t ret = -1;
  581     int8_t value = 0;
  582     quota_inode_ctx_t *ctx = NULL;
  583     dict_t *dict = NULL;
  584     dict_t *rsp_dict = NULL;
  585 
  586     GF_VALIDATE_OR_GOTO("marker", loc, out);
  587     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  588     GF_VALIDATE_OR_GOTO("marker", prev_dirty, out);
  589 
  590     ret = mq_inode_ctx_get(loc->inode, this, &ctx);
  591     if (ret < 0) {
  592         gf_log(this->name, GF_LOG_ERROR,
  593                "failed to get inode ctx for "
  594                "%s",
  595                loc->path);
  596         goto out;
  597     }
  598 
  599     dict = dict_new();
  600     if (!dict) {
  601         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  602         ret = -1;
  603         goto out;
  604     }
  605 
  606     ret = dict_set_int8(dict, QUOTA_DIRTY_KEY, dirty);
  607     if (ret < 0) {
  608         gf_log(this->name, GF_LOG_ERROR, "dict_set failed");
  609         goto out;
  610     }
  611 
  612     ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_GET_AND_SET, dict,
  613                          NULL, NULL, &rsp_dict);
  614     if (ret < 0) {
  615         gf_log_callingfn(
  616             this->name,
  617             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  618             "xattrop failed "
  619             "for %s: %s",
  620             loc->path, strerror(-ret));
  621         goto out;
  622     }
  623 
  624     *prev_dirty = 0;
  625     if (rsp_dict) {
  626         ret = dict_get_int8(rsp_dict, QUOTA_DIRTY_KEY, &value);
  627         if (ret == 0)
  628             *prev_dirty = value;
  629     }
  630 
  631     LOCK(&ctx->lock);
  632     {
  633         ctx->dirty = dirty;
  634     }
  635     UNLOCK(&ctx->lock);
  636     ret = 0;
  637 out:
  638     if (dict)
  639         dict_unref(dict);
  640 
  641     if (rsp_dict)
  642         dict_unref(rsp_dict);
  643 
  644     return ret;
  645 }
  646 
  647 int32_t
  648 mq_mark_dirty(xlator_t *this, loc_t *loc, int32_t dirty)
  649 {
  650     int32_t ret = -1;
  651     dict_t *dict = NULL;
  652     quota_inode_ctx_t *ctx = NULL;
  653 
  654     GF_VALIDATE_OR_GOTO("marker", loc, out);
  655     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  656 
  657     ret = mq_inode_ctx_get(loc->inode, this, &ctx);
  658     if (ret < 0) {
  659         gf_log(this->name, GF_LOG_ERROR,
  660                "failed to get inode ctx for "
  661                "%s",
  662                loc->path);
  663         ret = 0;
  664         goto out;
  665     }
  666 
  667     dict = dict_new();
  668     if (!dict) {
  669         ret = -1;
  670         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  671         goto out;
  672     }
  673 
  674     ret = dict_set_int8(dict, QUOTA_DIRTY_KEY, dirty);
  675     if (ret < 0) {
  676         gf_log(this->name, GF_LOG_ERROR, "dict_set failed");
  677         goto out;
  678     }
  679 
  680     ret = syncop_setxattr(FIRST_CHILD(this), loc, dict, 0, NULL, NULL);
  681     if (ret < 0) {
  682         gf_log_callingfn(
  683             this->name,
  684             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  685             "setxattr dirty = %d "
  686             "failed for %s: %s",
  687             dirty, loc->path, strerror(-ret));
  688         goto out;
  689     }
  690 
  691     LOCK(&ctx->lock);
  692     {
  693         ctx->dirty = dirty;
  694     }
  695     UNLOCK(&ctx->lock);
  696 
  697 out:
  698     if (dict)
  699         dict_unref(dict);
  700 
  701     return ret;
  702 }
  703 
  704 int32_t
  705 _mq_get_metadata(xlator_t *this, loc_t *loc, quota_meta_t *contri,
  706                  quota_meta_t *size, uuid_t contri_gfid)
  707 {
  708     int32_t ret = -1;
  709     quota_meta_t meta = {
  710         0,
  711     };
  712     char contri_key[QUOTA_KEY_MAX] = {
  713         0,
  714     };
  715     char size_key[QUOTA_KEY_MAX] = {
  716         0,
  717     };
  718     int keylen = 0;
  719     dict_t *dict = NULL;
  720     dict_t *rsp_dict = NULL;
  721     struct iatt stbuf = {
  722         0,
  723     };
  724 
  725     GF_VALIDATE_OR_GOTO("marker", loc, out);
  726     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  727 
  728     if (size == NULL && contri == NULL)
  729         goto out;
  730 
  731     dict = dict_new();
  732     if (dict == NULL) {
  733         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  734         goto out;
  735     }
  736 
  737     if (size && loc->inode->ia_type == IA_IFDIR) {
  738         GET_SIZE_KEY(this, size_key, keylen);
  739         if (keylen < 0)
  740             goto out;
  741         ret = dict_set_int64(dict, size_key, 0);
  742         if (ret < 0) {
  743             gf_log(this->name, GF_LOG_ERROR, "dict_set failed.");
  744             goto out;
  745         }
  746     }
  747 
  748     if (contri && !loc_is_root(loc)) {
  749         ret = mq_dict_set_contribution(this, dict, loc, contri_gfid,
  750                                        contri_key);
  751         if (ret < 0)
  752             goto out;
  753     }
  754 
  755     ret = syncop_lookup(FIRST_CHILD(this), loc, &stbuf, NULL, dict, &rsp_dict);
  756     if (ret < 0) {
  757         gf_log_callingfn(
  758             this->name,
  759             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  760             "lookup failed "
  761             "for %s: %s",
  762             loc->path, strerror(-ret));
  763         goto out;
  764     }
  765 
  766     if (size) {
  767         if (loc->inode->ia_type == IA_IFDIR) {
  768             ret = quota_dict_get_meta(rsp_dict, size_key, keylen, &meta);
  769             if (ret < 0) {
  770                 gf_log(this->name, GF_LOG_ERROR, "dict_get failed.");
  771                 goto out;
  772             }
  773 
  774             size->size = meta.size;
  775             size->file_count = meta.file_count;
  776             size->dir_count = meta.dir_count;
  777         } else {
  778             size->size = stbuf.ia_blocks * 512;
  779             size->file_count = 1;
  780             size->dir_count = 0;
  781         }
  782     }
  783 
  784     if (contri && !loc_is_root(loc)) {
  785         ret = quota_dict_get_meta(rsp_dict, contri_key, strlen(contri_key),
  786                                   &meta);
  787         if (ret < 0) {
  788             contri->size = 0;
  789             contri->file_count = 0;
  790             contri->dir_count = 0;
  791         } else {
  792             contri->size = meta.size;
  793             contri->file_count = meta.file_count;
  794             contri->dir_count = meta.dir_count;
  795         }
  796     }
  797 
  798     ret = 0;
  799 
  800 out:
  801     if (dict)
  802         dict_unref(dict);
  803 
  804     if (rsp_dict)
  805         dict_unref(rsp_dict);
  806 
  807     return ret;
  808 }
  809 
  810 int32_t
  811 mq_get_metadata(xlator_t *this, loc_t *loc, quota_meta_t *contri,
  812                 quota_meta_t *size, quota_inode_ctx_t *ctx,
  813                 inode_contribution_t *contribution)
  814 {
  815     int32_t ret = -1;
  816 
  817     GF_VALIDATE_OR_GOTO("marker", loc, out);
  818     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  819     GF_VALIDATE_OR_GOTO("marker", ctx, out);
  820     GF_VALIDATE_OR_GOTO("marker", contribution, out);
  821 
  822     if (size == NULL && contri == NULL) {
  823         ret = 0;
  824         goto out;
  825     }
  826 
  827     ret = _mq_get_metadata(this, loc, contri, size, contribution->gfid);
  828     if (ret < 0)
  829         goto out;
  830 
  831     if (size) {
  832         LOCK(&ctx->lock);
  833         {
  834             ctx->size = size->size;
  835             ctx->file_count = size->file_count;
  836             ctx->dir_count = size->dir_count;
  837         }
  838         UNLOCK(&ctx->lock);
  839     }
  840 
  841     if (contri) {
  842         LOCK(&contribution->lock);
  843         {
  844             contribution->contribution = contri->size;
  845             contribution->file_count = contri->file_count;
  846             contribution->dir_count = contri->dir_count;
  847         }
  848         UNLOCK(&contribution->lock);
  849     }
  850 
  851 out:
  852     return ret;
  853 }
  854 
  855 int32_t
  856 mq_get_delta(xlator_t *this, loc_t *loc, quota_meta_t *delta,
  857              quota_inode_ctx_t *ctx, inode_contribution_t *contribution)
  858 {
  859     int32_t ret = -1;
  860     quota_meta_t size = {
  861         0,
  862     };
  863     quota_meta_t contri = {
  864         0,
  865     };
  866 
  867     GF_VALIDATE_OR_GOTO("marker", loc, out);
  868     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  869     GF_VALIDATE_OR_GOTO("marker", ctx, out);
  870     GF_VALIDATE_OR_GOTO("marker", contribution, out);
  871 
  872     ret = mq_get_metadata(this, loc, &contri, &size, ctx, contribution);
  873     if (ret < 0)
  874         goto out;
  875 
  876     mq_compute_delta(delta, &size, &contri);
  877 
  878 out:
  879     return ret;
  880 }
  881 
  882 int32_t
  883 mq_remove_contri(xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
  884                  inode_contribution_t *contri, quota_meta_t *delta,
  885                  uint32_t nlink)
  886 {
  887     int32_t ret = -1;
  888     char contri_key[QUOTA_KEY_MAX] = {
  889         0,
  890     };
  891 
  892     if (nlink == 1) {
  893         /*File was a last link and has been deleted */
  894         ret = 0;
  895         goto done;
  896     }
  897 
  898     GET_CONTRI_KEY(this, contri_key, contri->gfid, ret);
  899     if (ret < 0) {
  900         gf_log(this->name, GF_LOG_ERROR,
  901                "get contri_key "
  902                "failed for %s",
  903                uuid_utoa(contri->gfid));
  904         goto out;
  905     }
  906 
  907     ret = syncop_removexattr(FIRST_CHILD(this), loc, contri_key, 0, NULL);
  908     if (ret < 0) {
  909         if (-ret == ENOENT || -ret == ESTALE || -ret == ENODATA ||
  910             -ret == ENOATTR) {
  911             /* Remove contri in done when unlink operation is
  912              * performed, so return success on ENOENT/ESTSLE
  913              * rename operation removes xattr earlier,
  914              * so return success on ENODATA
  915              */
  916             ret = 0;
  917         } else {
  918             gf_log_callingfn(this->name, GF_LOG_ERROR,
  919                              "removexattr %s failed for %s: %s", contri_key,
  920                              loc->path, strerror(-ret));
  921             goto out;
  922         }
  923     }
  924 
  925 done:
  926     LOCK(&contri->lock);
  927     {
  928         contri->contribution += delta->size;
  929         contri->file_count += delta->file_count;
  930         contri->dir_count += delta->dir_count;
  931     }
  932     UNLOCK(&contri->lock);
  933 
  934     ret = 0;
  935 
  936 out:
  937     QUOTA_FREE_CONTRIBUTION_NODE(ctx, contri);
  938 
  939     return ret;
  940 }
  941 
  942 int32_t
  943 mq_update_contri(xlator_t *this, loc_t *loc, inode_contribution_t *contri,
  944                  quota_meta_t *delta)
  945 {
  946     int32_t ret = -1;
  947     char contri_key[QUOTA_KEY_MAX] = {
  948         0,
  949     };
  950     dict_t *dict = NULL;
  951 
  952     GF_VALIDATE_OR_GOTO("marker", loc, out);
  953     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
  954     GF_VALIDATE_OR_GOTO("marker", delta, out);
  955     GF_VALIDATE_OR_GOTO("marker", contri, out);
  956 
  957     if (quota_meta_is_null(delta)) {
  958         ret = 0;
  959         goto out;
  960     }
  961 
  962     dict = dict_new();
  963     if (!dict) {
  964         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
  965         ret = -1;
  966         goto out;
  967     }
  968 
  969     GET_CONTRI_KEY(this, contri_key, contri->gfid, ret);
  970     if (ret < 0) {
  971         gf_log(this->name, GF_LOG_ERROR,
  972                "get contri_key "
  973                "failed for %s",
  974                uuid_utoa(contri->gfid));
  975         goto out;
  976     }
  977 
  978     ret = quota_dict_set_meta(dict, contri_key, delta, loc->inode->ia_type);
  979     if (ret < 0)
  980         goto out;
  981 
  982     ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, dict,
  983                          NULL, NULL, NULL);
  984     if (ret < 0) {
  985         gf_log_callingfn(
  986             this->name,
  987             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
  988             "xattrop failed "
  989             "for %s: %s",
  990             loc->path, strerror(-ret));
  991         goto out;
  992     }
  993 
  994     LOCK(&contri->lock);
  995     {
  996         contri->contribution += delta->size;
  997         contri->file_count += delta->file_count;
  998         contri->dir_count += delta->dir_count;
  999     }
 1000     UNLOCK(&contri->lock);
 1001 
 1002 out:
 1003     if (dict)
 1004         dict_unref(dict);
 1005 
 1006     return ret;
 1007 }
 1008 
 1009 int32_t
 1010 mq_update_size(xlator_t *this, loc_t *loc, quota_meta_t *delta)
 1011 {
 1012     int32_t ret = -1;
 1013     quota_inode_ctx_t *ctx = NULL;
 1014     dict_t *dict = NULL;
 1015 
 1016     GF_VALIDATE_OR_GOTO("marker", loc, out);
 1017     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
 1018     GF_VALIDATE_OR_GOTO("marker", delta, out);
 1019 
 1020     if (quota_meta_is_null(delta)) {
 1021         ret = 0;
 1022         goto out;
 1023     }
 1024 
 1025     ret = mq_inode_ctx_get(loc->inode, this, &ctx);
 1026     if (ret < 0) {
 1027         gf_log(this->name, GF_LOG_ERROR,
 1028                "failed to get inode ctx for "
 1029                "%s",
 1030                loc->path);
 1031         goto out;
 1032     }
 1033 
 1034     dict = dict_new();
 1035     if (!dict) {
 1036         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
 1037         ret = -1;
 1038         goto out;
 1039     }
 1040 
 1041     ret = quota_dict_set_size_meta(this, dict, delta);
 1042     if (ret < 0)
 1043         goto out;
 1044 
 1045     ret = syncop_xattrop(FIRST_CHILD(this), loc,
 1046                          GF_XATTROP_ADD_ARRAY64_WITH_DEFAULT, dict, NULL, NULL,
 1047                          NULL);
 1048     if (ret < 0) {
 1049         gf_log_callingfn(
 1050             this->name,
 1051             (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
 1052             "xattrop failed "
 1053             "for %s: %s",
 1054             loc->path, strerror(-ret));
 1055         goto out;
 1056     }
 1057 
 1058     LOCK(&ctx->lock);
 1059     {
 1060         ctx->size += delta->size;
 1061         ctx->file_count += delta->file_count;
 1062         if (ctx->dir_count == 0)
 1063             ctx->dir_count += delta->dir_count + 1;
 1064         else
 1065             ctx->dir_count += delta->dir_count;
 1066     }
 1067     UNLOCK(&ctx->lock);
 1068 
 1069 out:
 1070     if (dict)
 1071         dict_unref(dict);
 1072 
 1073     return ret;
 1074 }
 1075 
 1076 int
 1077 mq_synctask_cleanup(int ret, call_frame_t *frame, void *opaque)
 1078 {
 1079     quota_synctask_t *args = NULL;
 1080 
 1081     GF_ASSERT(opaque);
 1082 
 1083     args = (quota_synctask_t *)opaque;
 1084     loc_wipe(&args->loc);
 1085 
 1086     if (args->stub)
 1087         call_resume(args->stub);
 1088 
 1089     if (!args->is_static)
 1090         GF_FREE(args);
 1091 
 1092     return 0;
 1093 }
 1094 
 1095 int
 1096 mq_synctask1(xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
 1097              quota_meta_t *contri, uint32_t nlink, call_stub_t *stub)
 1098 {
 1099     int32_t ret = -1;
 1100     quota_synctask_t *args = NULL;
 1101     quota_synctask_t static_args = {
 1102         0,
 1103     };
 1104 
 1105     if (spawn) {
 1106         QUOTA_ALLOC_OR_GOTO(args, quota_synctask_t, ret, out);
 1107         args->is_static = _gf_false;
 1108     } else {
 1109         args = &static_args;
 1110         args->is_static = _gf_true;
 1111     }
 1112 
 1113     args->this = this;
 1114     args->stub = stub;
 1115     loc_copy(&args->loc, loc);
 1116     args->ia_nlink = nlink;
 1117 
 1118     if (contri) {
 1119         args->contri = *contri;
 1120     } else {
 1121         args->contri.size = -1;
 1122         args->contri.file_count = -1;
 1123         args->contri.dir_count = -1;
 1124     }
 1125 
 1126     if (spawn) {
 1127         ret = synctask_new1(this->ctx->env, 1024 * 16, task,
 1128                             mq_synctask_cleanup, NULL, args);
 1129         if (ret) {
 1130             gf_log(this->name, GF_LOG_ERROR,
 1131                    "Failed to spawn "
 1132                    "new synctask");
 1133             mq_synctask_cleanup(ret, NULL, args);
 1134         }
 1135     } else {
 1136         ret = task(args);
 1137         mq_synctask_cleanup(ret, NULL, args);
 1138     }
 1139 
 1140 out:
 1141     return ret;
 1142 }
 1143 
 1144 int
 1145 mq_synctask(xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc)
 1146 {
 1147     return mq_synctask1(this, task, spawn, loc, NULL, -1, NULL);
 1148 }
 1149 
 1150 int32_t
 1151 mq_prevalidate_txn(xlator_t *this, loc_t *origin_loc, loc_t *loc,
 1152                    quota_inode_ctx_t **ctx, struct iatt *buf)
 1153 {
 1154     int32_t ret = -1;
 1155     quota_inode_ctx_t *ctxtmp = NULL;
 1156 
 1157     if (buf) {
 1158         if (buf->ia_type == IA_IFREG && IS_DHT_LINKFILE_MODE(buf))
 1159             goto out;
 1160 
 1161         if (buf->ia_type != IA_IFREG && buf->ia_type != IA_IFLNK &&
 1162             buf->ia_type != IA_IFDIR)
 1163             goto out;
 1164     }
 1165 
 1166     if (origin_loc == NULL || origin_loc->inode == NULL ||
 1167         gf_uuid_is_null(origin_loc->inode->gfid))
 1168         goto out;
 1169 
 1170     loc_copy(loc, origin_loc);
 1171 
 1172     if (gf_uuid_is_null(loc->gfid))
 1173         gf_uuid_copy(loc->gfid, loc->inode->gfid);
 1174 
 1175     if (!loc_is_root(loc) && loc->parent == NULL)
 1176         loc->parent = inode_parent(loc->inode, 0, NULL);
 1177 
 1178     ret = mq_inode_ctx_get(loc->inode, this, &ctxtmp);
 1179     if (ret < 0) {
 1180         gf_log_callingfn(this->name, GF_LOG_WARNING,
 1181                          "inode ctx for "
 1182                          "is NULL for %s",
 1183                          loc->path);
 1184         goto out;
 1185     }
 1186     if (ctx)
 1187         *ctx = ctxtmp;
 1188 
 1189     ret = 0;
 1190 out:
 1191     return ret;
 1192 }
 1193 
 1194 int
 1195 mq_create_xattrs_task(void *opaque)
 1196 {
 1197     int32_t ret = -1;
 1198     gf_boolean_t locked = _gf_false;
 1199     gf_boolean_t contri_set = _gf_false;
 1200     gf_boolean_t size_set = _gf_false;
 1201     gf_boolean_t need_txn = _gf_false;
 1202     quota_synctask_t *args = NULL;
 1203     quota_inode_ctx_t *ctx = NULL;
 1204     xlator_t *this = NULL;
 1205     loc_t *loc = NULL;
 1206     gf_boolean_t status = _gf_false;
 1207 
 1208     GF_ASSERT(opaque);
 1209 
 1210     args = (quota_synctask_t *)opaque;
 1211     loc = &args->loc;
 1212     this = args->this;
 1213     THIS = this;
 1214 
 1215     ret = mq_inode_ctx_get(loc->inode, this, &ctx);
 1216     if (ret < 0) {
 1217         gf_log(this->name, GF_LOG_WARNING,
 1218                "Failed to"
 1219                "get inode ctx, aborting quota create txn");
 1220         goto out;
 1221     }
 1222 
 1223     if (loc->inode->ia_type == IA_IFDIR) {
 1224         /* lock not required for files */
 1225         ret = mq_lock(this, loc, F_WRLCK);
 1226         if (ret < 0)
 1227             goto out;
 1228         locked = _gf_true;
 1229     }
 1230 
 1231     ret = mq_are_xattrs_set(this, loc, &contri_set, &size_set);
 1232     if (ret < 0 || (contri_set && size_set))
 1233         goto out;
 1234 
 1235     mq_set_ctx_create_status(ctx, _gf_false);
 1236     status = _gf_true;
 1237 
 1238     if (loc->inode->ia_type == IA_IFDIR && size_set == _gf_false) {
 1239         ret = mq_create_size_xattrs(this, ctx, loc);
 1240         if (ret < 0)
 1241             goto out;
 1242     }
 1243 
 1244     need_txn = _gf_true;
 1245 out:
 1246     if (locked)
 1247         ret = mq_lock(this, loc, F_UNLCK);
 1248 
 1249     if (status == _gf_false)
 1250         mq_set_ctx_create_status(ctx, _gf_false);
 1251 
 1252     if (need_txn)
 1253         ret = mq_initiate_quota_blocking_txn(this, loc, NULL);
 1254 
 1255     return ret;
 1256 }
 1257 
 1258 static int
 1259 _mq_create_xattrs_txn(xlator_t *this, loc_t *origin_loc, struct iatt *buf,
 1260                       gf_boolean_t spawn)
 1261 {
 1262     int32_t ret = -1;
 1263     quota_inode_ctx_t *ctx = NULL;
 1264     gf_boolean_t status = _gf_true;
 1265     loc_t loc = {
 1266         0,
 1267     };
 1268     inode_contribution_t *contribution = NULL;
 1269 
 1270     ret = mq_prevalidate_txn(this, origin_loc, &loc, &ctx, buf);
 1271     if (ret < 0)
 1272         goto out;
 1273 
 1274     ret = mq_test_and_set_ctx_create_status(ctx, &status);
 1275     if (ret < 0 || status == _gf_true)
 1276         goto out;
 1277 
 1278     if (!loc_is_root(&loc) && loc.parent) {
 1279         contribution = mq_add_new_contribution_node(this, ctx, &loc);
 1280         if (contribution == NULL) {
 1281             gf_log(this->name, GF_LOG_WARNING,
 1282                    "cannot add a new contribution node "
 1283                    "(%s)",
 1284                    uuid_utoa(loc.gfid));
 1285             ret = -1;
 1286             goto out;
 1287         } else {
 1288             GF_REF_PUT(contribution);
 1289         }
 1290     }
 1291 
 1292     ret = mq_synctask(this, mq_create_xattrs_task, spawn, &loc);
 1293 out:
 1294     if (ret < 0 && status == _gf_false)
 1295         mq_set_ctx_create_status(ctx, _gf_false);
 1296 
 1297     loc_wipe(&loc);
 1298     return ret;
 1299 }
 1300 
 1301 int
 1302 mq_create_xattrs_txn(xlator_t *this, loc_t *loc, struct iatt *buf)
 1303 {
 1304     int32_t ret = -1;
 1305 
 1306     GF_VALIDATE_OR_GOTO("marker", loc, out);
 1307     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
 1308 
 1309     ret = _mq_create_xattrs_txn(this, loc, buf, _gf_true);
 1310 out:
 1311     return ret;
 1312 }
 1313 
 1314 int32_t
 1315 mq_reduce_parent_size_task(void *opaque)
 1316 {
 1317     int32_t ret = -1;
 1318     int32_t prev_dirty = 0;
 1319     quota_inode_ctx_t *ctx = NULL;
 1320     quota_inode_ctx_t *parent_ctx = NULL;
 1321     inode_contribution_t *contribution = NULL;
 1322     quota_meta_t delta = {
 1323         0,
 1324     };
 1325     quota_meta_t contri = {
 1326         0,
 1327     };
 1328     loc_t parent_loc = {
 1329         0,
 1330     };
 1331     gf_boolean_t locked = _gf_false;
 1332     gf_boolean_t dirty = _gf_false;
 1333     quota_synctask_t *args = NULL;
 1334     xlator_t *this = NULL;
 1335     loc_t *loc = NULL;
 1336     gf_boolean_t remove_xattr = _gf_true;
 1337     uint32_t nlink = 0;
 1338 
 1339     GF_ASSERT(opaque);
 1340 
 1341     args = (quota_synctask_t *)opaque;
 1342     loc = &args->loc;
 1343     contri = args->contri;
 1344     nlink = args->ia_nlink;
 1345     this = args->this;
 1346     THIS = this;
 1347 
 1348     ret = mq_inode_loc_fill(NULL, loc->parent, &parent_loc);
 1349     if (ret < 0) {
 1350         gf_log(this->name, GF_LOG_ERROR,
 1351                "parent_loc fill failed for "
 1352                "child inode %s: ",
 1353                uuid_utoa(loc->inode->gfid));
 1354         goto out;
 1355     }
 1356 
 1357     ret = mq_lock(this, &parent_loc, F_WRLCK);
 1358     if (ret < 0)
 1359         goto out;
 1360     locked = _gf_true;
 1361 
 1362     if (contri.size >= 0) {
 1363         /* contri parameter is supplied only for rename operation.
 1364          * remove xattr is alreday performed, we need to skip
 1365          * removexattr for rename operation
 1366          */
 1367         remove_xattr = _gf_false;
 1368         delta.size = contri.size;
 1369         delta.file_count = contri.file_count;
 1370         delta.dir_count = contri.dir_count;
 1371     } else {
 1372         remove_xattr = _gf_true;
 1373 
 1374         ret = mq_inode_ctx_get(loc->inode, this, &ctx);
 1375         if (ret < 0) {
 1376             gf_log_callingfn(this->name, GF_LOG_WARNING,
 1377                              "ctx for"
 1378                              " the node %s is NULL",
 1379                              loc->path);
 1380             goto out;
 1381         }
 1382 
 1383         contribution = mq_get_contribution_node(loc->parent, ctx);
 1384         if (contribution == NULL) {
 1385             ret = -1;
 1386             gf_log(this->name, GF_LOG_DEBUG,
 1387                    "contribution for the node %s is NULL", loc->path);
 1388             goto out;
 1389         }
 1390 
 1391         LOCK(&contribution->lock);
 1392         {
 1393             delta.size = contribution->contribution;
 1394             delta.file_count = contribution->file_count;
 1395             delta.dir_count = contribution->dir_count;
 1396         }
 1397         UNLOCK(&contribution->lock);
 1398     }
 1399 
 1400     ret = mq_get_set_dirty(this, &parent_loc, 1, &prev_dirty);
 1401     if (ret < 0)
 1402         goto out;
 1403     dirty = _gf_true;
 1404 
 1405     mq_sub_meta(&delta, NULL);
 1406 
 1407     if (remove_xattr) {
 1408         ret = mq_remove_contri(this, loc, ctx, contribution, &delta, nlink);
 1409         if (ret < 0)
 1410             goto out;
 1411     }
 1412 
 1413     if (quota_meta_is_null(&delta))
 1414         goto out;
 1415 
 1416     ret = mq_update_size(this, &parent_loc, &delta);
 1417     if (ret < 0)
 1418         goto out;
 1419 
 1420 out:
 1421     if (dirty) {
 1422         if (ret < 0 || prev_dirty) {
 1423             /* On failure clear dirty status flag.
 1424              * In the next lookup inspect_directory_xattr
 1425              * can set the status flag and fix the
 1426              * dirty directory.
 1427              * Do the same if dir was dirty before
 1428              * the txn
 1429              */
 1430             ret = mq_inode_ctx_get(parent_loc.inode, this, &parent_ctx);
 1431             if (ret == 0)
 1432                 mq_set_ctx_dirty_status(parent_ctx, _gf_false);
 1433         } else {
 1434             ret = mq_mark_dirty(this, &parent_loc, 0);
 1435         }
 1436     }
 1437 
 1438     if (locked)
 1439         ret = mq_lock(this, &parent_loc, F_UNLCK);
 1440 
 1441     if (ret >= 0)
 1442         ret = mq_initiate_quota_blocking_txn(this, &parent_loc, NULL);
 1443 
 1444     loc_wipe(&parent_loc);
 1445 
 1446     if (contribution)
 1447         GF_REF_PUT(contribution);
 1448 
 1449     return ret;
 1450 }
 1451 
 1452 int32_t
 1453 mq_reduce_parent_size_txn(xlator_t *this, loc_t *origin_loc,
 1454                           quota_meta_t *contri, uint32_t nlink,
 1455                           call_stub_t *stub)
 1456 {
 1457     int32_t ret = -1;
 1458     loc_t loc = {
 1459         0,
 1460     };
 1461     gf_boolean_t resume_stub = _gf_true;
 1462 
 1463     GF_VALIDATE_OR_GOTO("marker", this, out);
 1464     GF_VALIDATE_OR_GOTO("marker", origin_loc, out);
 1465 
 1466     ret = mq_prevalidate_txn(this, origin_loc, &loc, NULL, NULL);
 1467     if (ret < 0)
 1468         goto out;
 1469 
 1470     if (loc_is_root(&loc)) {
 1471         ret = 0;
 1472         goto out;
 1473     }
 1474 
 1475     resume_stub = _gf_false;
 1476     ret = mq_synctask1(this, mq_reduce_parent_size_task, _gf_true, &loc, contri,
 1477                        nlink, stub);
 1478 out:
 1479     loc_wipe(&loc);
 1480 
 1481     if (resume_stub && stub)
 1482         call_resume(stub);
 1483 
 1484     if (ret)
 1485         gf_log_callingfn(this ? this->name : "Marker", GF_LOG_ERROR,
 1486                          "mq_reduce_parent_size_txn failed");
 1487 
 1488     return ret;
 1489 }
 1490 
 1491 int
 1492 mq_initiate_quota_task(void *opaque)
 1493 {
 1494     int32_t ret = -1;
 1495     int32_t prev_dirty = 0;
 1496     loc_t child_loc = {
 1497         0,
 1498     };
 1499     loc_t parent_loc = {
 1500         0,
 1501     };
 1502     gf_boolean_t locked = _gf_false;
 1503     gf_boolean_t dirty = _gf_false;
 1504     gf_boolean_t status = _gf_false;
 1505     quota_meta_t delta = {
 1506         0,
 1507     };
 1508     quota_synctask_t *args = NULL;
 1509     xlator_t *this = NULL;
 1510     loc_t *loc = NULL;
 1511     inode_contribution_t *contri = NULL;
 1512     quota_inode_ctx_t *ctx = NULL;
 1513     quota_inode_ctx_t *parent_ctx = NULL;
 1514     inode_t *tmp_parent = NULL;
 1515 
 1516     GF_VALIDATE_OR_GOTO("marker", opaque, out);
 1517 
 1518     args = (quota_synctask_t *)opaque;
 1519     loc = &args->loc;
 1520     this = args->this;
 1521 
 1522     GF_VALIDATE_OR_GOTO("marker", this, out);
 1523     THIS = this;
 1524 
 1525     GF_VALIDATE_OR_GOTO(this->name, loc, out);
 1526     GF_VALIDATE_OR_GOTO(this->name, loc->inode, out);
 1527 
 1528     ret = mq_loc_copy(&child_loc, loc);
 1529     if (ret < 0) {
 1530         gf_log(this->name, GF_LOG_ERROR, "loc copy failed");
 1531         goto out;
 1532     }
 1533 
 1534     while (!__is_root_gfid(child_loc.gfid)) {
 1535         ret = mq_inode_ctx_get(child_loc.inode, this, &ctx);
 1536         if (ret < 0) {
 1537             gf_log(this->name, GF_LOG_WARNING,
 1538                    "inode ctx get failed for %s, "
 1539                    "aborting update txn",
 1540                    child_loc.path);
 1541             goto out;
 1542         }
 1543 
 1544         /* To improve performance, abort current transaction
 1545          * if one is already in progress for same inode
 1546          */
 1547         if (status == _gf_true) {
 1548             /* status will already set before txn start,
 1549              * so it should not be set in first
 1550              * loop iteration
 1551              */
 1552             ret = mq_test_and_set_ctx_updation_status(ctx, &status);
 1553             if (ret < 0 || status == _gf_true)
 1554                 goto out;
 1555         }
 1556 
 1557         if (child_loc.parent == NULL) {
 1558             ret = mq_build_ancestry(this, &child_loc);
 1559             if (ret < 0 || child_loc.parent == NULL) {
 1560                 /* If application performs parallel remove
 1561                  * operations on same set of files/directories
 1562                  * then we may get ENOENT/ESTALE
 1563                  */
 1564                 gf_log(this->name,
 1565                        (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG
 1566                                                           : GF_LOG_ERROR,
 1567                        "build ancestry failed for inode %s",
 1568                        uuid_utoa(child_loc.inode->gfid));
 1569                 ret = -1;
 1570                 goto out;
 1571             }
 1572         }
 1573 
 1574         ret = mq_inode_loc_fill(NULL, child_loc.parent, &parent_loc);
 1575         if (ret < 0) {
 1576             gf_log(this->name, GF_LOG_ERROR,
 1577                    "parent_loc fill "
 1578                    "failed for child inode %s: ",
 1579                    uuid_utoa(child_loc.inode->gfid));
 1580             goto out;
 1581         }
 1582 
 1583         ret = mq_lock(this, &parent_loc, F_WRLCK);
 1584         if (ret < 0)
 1585             goto out;
 1586         locked = _gf_true;
 1587 
 1588         mq_set_ctx_updation_status(ctx, _gf_false);
 1589         status = _gf_true;
 1590 
 1591         /* Contribution node can be NULL in below scenarios and
 1592            create if needed:
 1593 
 1594            Scenario 1)
 1595            In this case create a new contribution node
 1596            Suppose hard link for a file f1 present in a directory d1 is
 1597            created in the directory d2 (as f2). Now, since d2's
 1598            contribution is not there in f1's inode ctx, d2's
 1599            contribution xattr won't be created and will create problems
 1600            for quota operations.
 1601 
 1602            Don't create contribution if parent has been changed after
 1603            taking a lock, this can happen when rename is performed
 1604            and writes is still in-progress for the same file
 1605 
 1606            Scenario 2)
 1607            When a rename operation is performed, contribution node
 1608            for olp path will be removed.
 1609 
 1610            Create contribution node only if oldparent is same as
 1611            newparent.
 1612            Consider below example
 1613            1) rename FOP invoked on file 'x'
 1614            2) write is still in progress for file 'x'
 1615            3) rename takes a lock on old-parent
 1616            4) write-update txn blocked on old-parent to acquire lock
 1617            5) in rename_cbk, contri xattrs are removed and contribution
 1618               is deleted and lock is released
 1619            6) now write-update txn gets the lock and updates the
 1620               wrong parent as it was holding lock on old parent
 1621               so validate parent once the lock is acquired
 1622 
 1623              For more information on this problem, please see
 1624              doc for marker_rename in file marker.c
 1625         */
 1626         contri = mq_get_contribution_node(child_loc.parent, ctx);
 1627         if (contri == NULL) {
 1628             tmp_parent = inode_parent(child_loc.inode, 0, NULL);
 1629             if (tmp_parent == NULL) {
 1630                 /* This can happen if application performs
 1631                  * parallel remove operations on same set
 1632                  * of files/directories
 1633                  */
 1634                 gf_log(this->name, GF_LOG_WARNING,
 1635                        "parent is "
 1636                        "NULL for inode %s",
 1637                        uuid_utoa(child_loc.inode->gfid));
 1638                 ret = -1;
 1639                 goto out;
 1640             }
 1641             if (gf_uuid_compare(tmp_parent->gfid, parent_loc.gfid)) {
 1642                 /* abort txn if parent has changed */
 1643                 ret = 0;
 1644                 goto out;
 1645             }
 1646 
 1647             inode_unref(tmp_parent);
 1648             tmp_parent = NULL;
 1649 
 1650             contri = mq_add_new_contribution_node(this, ctx, &child_loc);
 1651             if (contri == NULL) {
 1652                 gf_log(this->name, GF_LOG_ERROR,
 1653                        "Failed to "
 1654                        "create contribution node for %s, "
 1655                        "abort update txn",
 1656                        child_loc.path);
 1657                 ret = -1;
 1658                 goto out;
 1659             }
 1660         }
 1661 
 1662         ret = mq_get_delta(this, &child_loc, &delta, ctx, contri);
 1663         if (ret < 0)
 1664             goto out;
 1665 
 1666         if (quota_meta_is_null(&delta))
 1667             goto out;
 1668 
 1669         ret = mq_get_set_dirty(this, &parent_loc, 1, &prev_dirty);
 1670         if (ret < 0)
 1671             goto out;
 1672         dirty = _gf_true;
 1673 
 1674         ret = mq_update_contri(this, &child_loc, contri, &delta);
 1675         if (ret < 0)
 1676             goto out;
 1677 
 1678         ret = mq_update_size(this, &parent_loc, &delta);
 1679         if (ret < 0) {
 1680             gf_log(this->name, GF_LOG_DEBUG,
 1681                    "rollback "
 1682                    "contri updation");
 1683             mq_sub_meta(&delta, NULL);
 1684             mq_update_contri(this, &child_loc, contri, &delta);
 1685             goto out;
 1686         }
 1687 
 1688         if (prev_dirty == 0) {
 1689             ret = mq_mark_dirty(this, &parent_loc, 0);
 1690         } else {
 1691             ret = mq_inode_ctx_get(parent_loc.inode, this, &parent_ctx);
 1692             if (ret == 0)
 1693                 mq_set_ctx_dirty_status(parent_ctx, _gf_false);
 1694         }
 1695         dirty = _gf_false;
 1696         prev_dirty = 0;
 1697 
 1698         ret = mq_lock(this, &parent_loc, F_UNLCK);
 1699         locked = _gf_false;
 1700 
 1701         if (__is_root_gfid(parent_loc.gfid))
 1702             break;
 1703 
 1704         /* Repeate above steps upwards till the root */
 1705         loc_wipe(&child_loc);
 1706         ret = mq_loc_copy(&child_loc, &parent_loc);
 1707         if (ret < 0)
 1708             goto out;
 1709 
 1710         loc_wipe(&parent_loc);
 1711         GF_REF_PUT(contri);
 1712         contri = NULL;
 1713     }
 1714 
 1715 out:
 1716     if ((dirty) && (ret < 0)) {
 1717         /* On failure clear dirty status flag.
 1718          * In the next lookup inspect_directory_xattr
 1719          * can set the status flag and fix the
 1720          * dirty directory.
 1721          * Do the same if the dir was dirty before
 1722          * txn
 1723          */
 1724         ret = mq_inode_ctx_get(parent_loc.inode, this, &parent_ctx);
 1725         if (ret == 0)
 1726             mq_set_ctx_dirty_status(parent_ctx, _gf_false);
 1727     }
 1728 
 1729     if (locked)
 1730         ret = mq_lock(this, &parent_loc, F_UNLCK);
 1731 
 1732     if (ctx && status == _gf_false)
 1733         mq_set_ctx_updation_status(ctx, _gf_false);
 1734 
 1735     loc_wipe(&child_loc);
 1736     loc_wipe(&parent_loc);
 1737 
 1738     if (tmp_parent)
 1739         inode_unref(tmp_parent);
 1740 
 1741     if (contri)
 1742         GF_REF_PUT(contri);
 1743 
 1744     return 0;
 1745 }
 1746 
 1747 int
 1748 _mq_initiate_quota_txn(xlator_t *this, loc_t *origin_loc, struct iatt *buf,
 1749                        gf_boolean_t spawn)
 1750 {
 1751     int32_t ret = -1;
 1752     quota_inode_ctx_t *ctx = NULL;
 1753     gf_boolean_t status = _gf_true;
 1754     loc_t loc = {
 1755         0,
 1756     };
 1757 
 1758     ret = mq_prevalidate_txn(this, origin_loc, &loc, &ctx, buf);
 1759     if (ret < 0)
 1760         goto out;
 1761 
 1762     if (loc_is_root(&loc)) {
 1763         ret = 0;
 1764         goto out;
 1765     }
 1766 
 1767     ret = mq_test_and_set_ctx_updation_status(ctx, &status);
 1768     if (ret < 0 || status == _gf_true)
 1769         goto out;
 1770 
 1771     ret = mq_synctask(this, mq_initiate_quota_task, spawn, &loc);
 1772 
 1773 out:
 1774     if (ret < 0 && status == _gf_false)
 1775         mq_set_ctx_updation_status(ctx, _gf_false);
 1776 
 1777     loc_wipe(&loc);
 1778     return ret;
 1779 }
 1780 
 1781 int
 1782 mq_initiate_quota_txn(xlator_t *this, loc_t *loc, struct iatt *buf)
 1783 {
 1784     int32_t ret = -1;
 1785 
 1786     GF_VALIDATE_OR_GOTO("marker", this, out);
 1787     GF_VALIDATE_OR_GOTO("marker", loc, out);
 1788     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
 1789 
 1790     ret = _mq_initiate_quota_txn(this, loc, buf, _gf_true);
 1791 out:
 1792     return ret;
 1793 }
 1794 
 1795 int
 1796 mq_initiate_quota_blocking_txn(xlator_t *this, loc_t *loc, struct iatt *buf)
 1797 {
 1798     int32_t ret = -1;
 1799 
 1800     GF_VALIDATE_OR_GOTO("marker", this, out);
 1801     GF_VALIDATE_OR_GOTO("marker", loc, out);
 1802     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
 1803 
 1804     ret = _mq_initiate_quota_txn(this, loc, buf, _gf_false);
 1805 out:
 1806     return ret;
 1807 }
 1808 
 1809 int
 1810 mq_update_dirty_inode_task(void *opaque)
 1811 {
 1812     int32_t ret = -1;
 1813     fd_t *fd = NULL;
 1814     off_t offset = 0;
 1815     gf_dirent_t entries;
 1816     gf_dirent_t *entry = NULL;
 1817     gf_boolean_t locked = _gf_false;
 1818     gf_boolean_t updated = _gf_false;
 1819     int32_t dirty = 0;
 1820     quota_meta_t contri = {
 1821         0,
 1822     };
 1823     quota_meta_t size = {
 1824         0,
 1825     };
 1826     quota_meta_t contri_sum = {
 1827         0,
 1828     };
 1829     quota_meta_t delta = {
 1830         0,
 1831     };
 1832     quota_synctask_t *args = NULL;
 1833     xlator_t *this = NULL;
 1834     loc_t *loc = NULL;
 1835     quota_inode_ctx_t *ctx = NULL;
 1836     dict_t *xdata = NULL;
 1837     char contri_key[QUOTA_KEY_MAX] = {
 1838         0,
 1839     };
 1840     int keylen = 0;
 1841 
 1842     GF_ASSERT(opaque);
 1843 
 1844     args = (quota_synctask_t *)opaque;
 1845     loc = &args->loc;
 1846     this = args->this;
 1847     THIS = this;
 1848     INIT_LIST_HEAD(&entries.list);
 1849 
 1850     ret = mq_inode_ctx_get(loc->inode, this, &ctx);
 1851     if (ret < 0)
 1852         goto out;
 1853 
 1854     GET_CONTRI_KEY(this, contri_key, loc->gfid, keylen);
 1855     if (keylen < 0) {
 1856         ret = keylen;
 1857         goto out;
 1858     }
 1859 
 1860     xdata = dict_new();
 1861     if (xdata == NULL) {
 1862         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
 1863         ret = -1;
 1864         goto out;
 1865     }
 1866 
 1867     ret = dict_set_int64(xdata, contri_key, 0);
 1868     if (ret < 0) {
 1869         gf_log(this->name, GF_LOG_ERROR, "dict_set failed");
 1870         goto out;
 1871     }
 1872 
 1873     ret = mq_lock(this, loc, F_WRLCK);
 1874     if (ret < 0)
 1875         goto out;
 1876     locked = _gf_true;
 1877 
 1878     ret = mq_get_dirty(this, loc, &dirty);
 1879     if (ret < 0 || dirty == 0) {
 1880         ret = 0;
 1881         goto out;
 1882     }
 1883 
 1884     fd = fd_create(loc->inode, 0);
 1885     if (!fd) {
 1886         gf_log(this->name, GF_LOG_ERROR, "Failed to create fd");
 1887         ret = -1;
 1888         goto out;
 1889     }
 1890 
 1891     ret = syncop_opendir(this, loc, fd, NULL, NULL);
 1892     if (ret < 0) {
 1893         gf_log(this->name,
 1894                (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG : GF_LOG_ERROR,
 1895                "opendir failed "
 1896                "for %s: %s",
 1897                loc->path, strerror(-ret));
 1898         goto out;
 1899     }
 1900 
 1901     fd_bind(fd);
 1902     while ((ret = syncop_readdirp(this, fd, 131072, offset, &entries, xdata,
 1903                                   NULL)) != 0) {
 1904         if (ret < 0) {
 1905             gf_log(this->name,
 1906                    (-ret == ENOENT || -ret == ESTALE) ? GF_LOG_DEBUG
 1907                                                       : GF_LOG_ERROR,
 1908                    "readdirp failed "
 1909                    "for %s: %s",
 1910                    loc->path, strerror(-ret));
 1911             goto out;
 1912         }
 1913 
 1914         if (list_empty(&entries.list))
 1915             break;
 1916 
 1917         list_for_each_entry(entry, &entries.list, list)
 1918         {
 1919             offset = entry->d_off;
 1920 
 1921             if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
 1922                 continue;
 1923 
 1924             memset(&contri, 0, sizeof(contri));
 1925             quota_dict_get_meta(entry->dict, contri_key, keylen, &contri);
 1926             if (quota_meta_is_null(&contri))
 1927                 continue;
 1928 
 1929             mq_add_meta(&contri_sum, &contri);
 1930         }
 1931 
 1932         gf_dirent_free(&entries);
 1933     }
 1934     /* Inculde for self */
 1935     contri_sum.dir_count++;
 1936 
 1937     ret = _mq_get_metadata(this, loc, NULL, &size, 0);
 1938     if (ret < 0)
 1939         goto out;
 1940 
 1941     mq_compute_delta(&delta, &contri_sum, &size);
 1942 
 1943     if (quota_meta_is_null(&delta))
 1944         goto out;
 1945 
 1946     gf_log(this->name, GF_LOG_INFO,
 1947            "calculated size = %" PRId64 ", original size = %" PRIu64
 1948            ", diff = %" PRIu64 ", path = %s ",
 1949            contri_sum.size, size.size, delta.size, loc->path);
 1950 
 1951     gf_log(this->name, GF_LOG_INFO,
 1952            "calculated f_count = %" PRId64 ", original f_count = %" PRIu64
 1953            ", diff = %" PRIu64 ", path = %s ",
 1954            contri_sum.file_count, size.file_count, delta.file_count, loc->path);
 1955 
 1956     gf_log(this->name, GF_LOG_INFO,
 1957            "calculated d_count = %" PRId64 ", original d_count = %" PRIu64
 1958            ", diff = %" PRIu64 ", path = %s ",
 1959            contri_sum.dir_count, size.dir_count, delta.dir_count, loc->path);
 1960 
 1961     ret = mq_update_size(this, loc, &delta);
 1962     if (ret < 0)
 1963         goto out;
 1964 
 1965     updated = _gf_true;
 1966 
 1967 out:
 1968     gf_dirent_free(&entries);
 1969 
 1970     if (fd)
 1971         fd_unref(fd);
 1972 
 1973     if (xdata)
 1974         dict_unref(xdata);
 1975 
 1976     if (ret < 0) {
 1977         /* On failure clear dirty status flag.
 1978          * In the next lookup inspect_directory_xattr
 1979          * can set the status flag and fix the
 1980          * dirty directory
 1981          */
 1982         if (ctx)
 1983             mq_set_ctx_dirty_status(ctx, _gf_false);
 1984     } else if (dirty) {
 1985         mq_mark_dirty(this, loc, 0);
 1986     }
 1987 
 1988     if (locked)
 1989         mq_lock(this, loc, F_UNLCK);
 1990 
 1991     if (updated)
 1992         mq_initiate_quota_blocking_txn(this, loc, NULL);
 1993 
 1994     return ret;
 1995 }
 1996 
 1997 int32_t
 1998 mq_update_dirty_inode_txn(xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx)
 1999 {
 2000     int32_t ret = -1;
 2001     gf_boolean_t status = _gf_true;
 2002 
 2003     GF_VALIDATE_OR_GOTO("marker", loc, out);
 2004     GF_VALIDATE_OR_GOTO("marker", loc->inode, out);
 2005 
 2006     mq_test_and_set_ctx_status(ctx, &ctx->dirty_status, &status);
 2007     if (status == _gf_true)
 2008         goto out;
 2009 
 2010     ret = mq_synctask(this, mq_update_dirty_inode_task, _gf_true, loc);
 2011 out:
 2012     if (ret < 0 && status == _gf_false)
 2013         mq_set_ctx_dirty_status(ctx, _gf_false);
 2014 
 2015     return ret;
 2016 }
 2017 
 2018 int32_t
 2019 mq_inspect_directory_xattr(xlator_t *this, quota_inode_ctx_t *ctx,
 2020                            inode_contribution_t *contribution, loc_t *loc,
 2021                            dict_t *dict)
 2022 {
 2023     int32_t ret = -1;
 2024     int8_t dirty = -1;
 2025     quota_meta_t size = {
 2026         0,
 2027     };
 2028     quota_meta_t contri = {
 2029         0,
 2030     };
 2031     quota_meta_t delta = {
 2032         0,
 2033     };
 2034     char contri_key[QUOTA_KEY_MAX] = {
 2035         0,
 2036     };
 2037     char size_key[QUOTA_KEY_MAX] = {
 2038         0,
 2039     };
 2040     int keylen = 0;
 2041     gf_boolean_t status = _gf_false;
 2042 
 2043     ret = dict_get_int8(dict, QUOTA_DIRTY_KEY, &dirty);
 2044     if (ret < 0) {
 2045         /* dirty is set only on the first file write operation
 2046          * so ignore this error
 2047          */
 2048         ret = 0;
 2049         dirty = 0;
 2050     }
 2051 
 2052     GET_SIZE_KEY(this, size_key, keylen);
 2053     if (keylen < 0) {
 2054         ret = -1;
 2055         goto out;
 2056     }
 2057     ret = _quota_dict_get_meta(this, dict, size_key, keylen, &size, IA_IFDIR,
 2058                                _gf_false);
 2059     if (ret < 0)
 2060         goto create_xattr;
 2061 
 2062     if (!contribution)
 2063         goto create_xattr;
 2064 
 2065     if (!loc_is_root(loc)) {
 2066         GET_CONTRI_KEY(this, contri_key, contribution->gfid, keylen);
 2067         if (keylen < 0) {
 2068             ret = -1;
 2069             goto out;
 2070         }
 2071         ret = _quota_dict_get_meta(this, dict, contri_key, keylen, &contri,
 2072                                    IA_IFDIR, _gf_false);
 2073         if (ret < 0)
 2074             goto create_xattr;
 2075 
 2076         LOCK(&contribution->lock);
 2077         {
 2078             contribution->contribution = contri.size;
 2079             contribution->file_count = contri.file_count;
 2080             contribution->dir_count = contri.dir_count;
 2081         }
 2082         UNLOCK(&contribution->lock);
 2083     }
 2084 
 2085     LOCK(&ctx->lock);
 2086     {
 2087         ctx->size = size.size;
 2088         ctx->file_count = size.file_count;
 2089         ctx->dir_count = size.dir_count;
 2090         ctx->dirty = dirty;
 2091     }
 2092     UNLOCK(&ctx->lock);
 2093 
 2094     ret = mq_get_ctx_updation_status(ctx, &status);
 2095     if (ret < 0 || status == _gf_true) {
 2096         /* If the update txn is in progress abort inspection */
 2097         ret = 0;
 2098         goto out;
 2099     }
 2100 
 2101     mq_compute_delta(&delta, &size, &contri);
 2102 
 2103     if (dirty) {
 2104         ret = mq_update_dirty_inode_txn(this, loc, ctx);
 2105         goto out;
 2106     }
 2107 
 2108     if (!loc_is_root(loc) && !quota_meta_is_null(&delta))
 2109         mq_initiate_quota_txn(this, loc, NULL);
 2110 
 2111     ret = 0;
 2112     goto out;
 2113 
 2114 create_xattr:
 2115     if (ret < 0)
 2116         ret = mq_create_xattrs_txn(this, loc, NULL);
 2117 
 2118 out:
 2119     return ret;
 2120 }
 2121 
 2122 int32_t
 2123 mq_inspect_file_xattr(xlator_t *this, quota_inode_ctx_t *ctx,
 2124                       inode_contribution_t *contribution, loc_t *loc,
 2125                       dict_t *dict, struct iatt *buf)
 2126 {
 2127     int32_t ret = -1;
 2128     quota_meta_t size = {
 2129         0,
 2130     };
 2131     quota_meta_t contri = {
 2132         0,
 2133     };
 2134     quota_meta_t delta = {
 2135         0,
 2136     };
 2137     char contri_key[QUOTA_KEY_MAX] = {
 2138         0,
 2139     };
 2140     int keylen = 0;
 2141     gf_boolean_t status = _gf_false;
 2142 
 2143     if (!buf || !contribution || !ctx)
 2144         goto out;
 2145 
 2146     LOCK(&ctx->lock);
 2147     {
 2148         ctx->size = 512 * buf->ia_blocks;
 2149         ctx->file_count = 1;
 2150         ctx->dir_count = 0;
 2151 
 2152         size.size = ctx->size;
 2153         size.file_count = ctx->file_count;
 2154         size.dir_count = ctx->dir_count;
 2155     }
 2156     UNLOCK(&ctx->lock);
 2157 
 2158     GET_CONTRI_KEY(this, contri_key, contribution->gfid, keylen);
 2159     if (keylen < 0) {
 2160         ret = -1;
 2161         goto out;
 2162     }
 2163 
 2164     ret = _quota_dict_get_meta(this, dict, contri_key, keylen, &contri,
 2165                                IA_IFREG, _gf_true);
 2166     if (ret < 0) {
 2167         ret = mq_create_xattrs_txn(this, loc, NULL);
 2168     } else {
 2169         LOCK(&contribution->lock);
 2170         {
 2171             contribution->contribution = contri.size;
 2172             contribution->file_count = contri.file_count;
 2173             contribution->dir_count = contri.dir_count;
 2174         }
 2175         UNLOCK(&contribution->lock);
 2176 
 2177         ret = mq_get_ctx_updation_status(ctx, &status);
 2178         if (ret < 0 || status == _gf_true) {
 2179             /* If the update txn is in progress abort inspection */
 2180             ret = 0;
 2181             goto out;
 2182         }
 2183 
 2184         mq_compute_delta(&delta, &size, &contri);
 2185         if (!quota_meta_is_null(&delta))
 2186             mq_initiate_quota_txn(this, loc, NULL);
 2187     }
 2188     /* TODO: revist this code when fixing hardlinks */
 2189 
 2190 out:
 2191     return ret;
 2192 }
 2193 
 2194 int32_t
 2195 mq_xattr_state(xlator_t *this, loc_t *origin_loc, dict_t *dict,
 2196                struct iatt *buf)
 2197 {
 2198     int32_t ret = -1;
 2199     quota_inode_ctx_t *ctx = NULL;
 2200     loc_t loc = {
 2201         0,
 2202     };
 2203     inode_contribution_t *contribution = NULL;
 2204 
 2205     ret = mq_prevalidate_txn(this, origin_loc, &loc, &ctx, buf);
 2206     if (ret < 0 || loc.parent == NULL)
 2207         goto out;
 2208 
 2209     if (!loc_is_root(&loc)) {
 2210         contribution = mq_add_new_contribution_node(this, ctx, &loc);
 2211         if (contribution == NULL) {
 2212             if (!gf_uuid_is_null(loc.inode->gfid))
 2213                 gf_log(this->name, GF_LOG_WARNING,
 2214                        "cannot add a new contribution node "
 2215                        "(%s)",
 2216                        uuid_utoa(loc.gfid));
 2217             ret = -1;
 2218             goto out;
 2219         }
 2220         if (buf->ia_type == IA_IFDIR)
 2221             mq_inspect_directory_xattr(this, ctx, contribution, &loc, dict);
 2222         else
 2223             mq_inspect_file_xattr(this, ctx, contribution, &loc, dict, buf);
 2224     } else {
 2225         mq_inspect_directory_xattr(this, ctx, 0, &loc, dict);
 2226     }
 2227 
 2228 out:
 2229     loc_wipe(&loc);
 2230 
 2231     if (contribution)
 2232         GF_REF_PUT(contribution);
 2233 
 2234     return ret;
 2235 }
 2236 
 2237 int32_t
 2238 mq_req_xattr(xlator_t *this, loc_t *loc, dict_t *dict, char *contri_key,
 2239              char *size_key)
 2240 {
 2241     int32_t ret = -1;
 2242     char key[QUOTA_KEY_MAX] = {
 2243         0,
 2244     };
 2245 
 2246     GF_VALIDATE_OR_GOTO("marker", this, out);
 2247     GF_VALIDATE_OR_GOTO("marker", loc, out);
 2248     GF_VALIDATE_OR_GOTO("marker", dict, out);
 2249 
 2250     if (!loc_is_root(loc)) {
 2251         ret = mq_dict_set_contribution(this, dict, loc, NULL, contri_key);
 2252         if (ret < 0)
 2253             goto out;
 2254     }
 2255 
 2256     GET_SIZE_KEY(this, key, ret);
 2257     if (ret < 0)
 2258         goto out;
 2259     if (size_key)
 2260         if (snprintf(size_key, QUOTA_KEY_MAX, "%s", key) >= QUOTA_KEY_MAX) {
 2261             ret = -1;
 2262             goto out;
 2263         }
 2264 
 2265     ret = dict_set_uint64(dict, key, 0);
 2266     if (ret < 0)
 2267         goto out;
 2268 
 2269     ret = dict_set_int8(dict, QUOTA_DIRTY_KEY, 0);
 2270 
 2271 out:
 2272     if (ret < 0)
 2273         gf_log_callingfn(this ? this->name : "Marker", GF_LOG_ERROR,
 2274                          "dict set failed");
 2275     return ret;
 2276 }
 2277 
 2278 int32_t
 2279 mq_forget(xlator_t *this, quota_inode_ctx_t *ctx)
 2280 {
 2281     inode_contribution_t *contri = NULL;
 2282     inode_contribution_t *next = NULL;
 2283 
 2284     GF_VALIDATE_OR_GOTO("marker", this, out);
 2285     GF_VALIDATE_OR_GOTO("marker", ctx, out);
 2286 
 2287     list_for_each_entry_safe(contri, next, &ctx->contribution_head, contri_list)
 2288     {
 2289         list_del_init(&contri->contri_list);
 2290         GF_REF_PUT(contri);
 2291     }
 2292 
 2293     LOCK_DESTROY(&ctx->lock);
 2294     GF_FREE(ctx);
 2295 out:
 2296     return 0;
 2297 }