"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/index/src/index.c" (16 Sep 2020, 72532 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "index.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 #include "index.h"
   11 #include <glusterfs/options.h>
   12 #include "glusterfs3-xdr.h"
   13 #include <glusterfs/syscall.h>
   14 #include <glusterfs/syncop.h>
   15 #include <glusterfs/common-utils.h>
   16 #include "index-messages.h"
   17 #include <ftw.h>
   18 #include <libgen.h> /* for dirname() */
   19 #include <signal.h>
   20 
   21 #define XATTROP_SUBDIR "xattrop"
   22 #define DIRTY_SUBDIR "dirty"
   23 #define ENTRY_CHANGES_SUBDIR "entry-changes"
   24 
   25 struct index_syncop_args {
   26     inode_t *parent;
   27     gf_dirent_t *entries;
   28     char *path;
   29 };
   30 
   31 static char *index_vgfid_xattrs[XATTROP_TYPE_END] = {
   32     [XATTROP] = GF_XATTROP_INDEX_GFID,
   33     [DIRTY] = GF_XATTROP_DIRTY_GFID,
   34     [ENTRY_CHANGES] = GF_XATTROP_ENTRY_CHANGES_GFID};
   35 
   36 static char *index_subdirs[XATTROP_TYPE_END] = {
   37     [XATTROP] = XATTROP_SUBDIR,
   38     [DIRTY] = DIRTY_SUBDIR,
   39     [ENTRY_CHANGES] = ENTRY_CHANGES_SUBDIR};
   40 
   41 int
   42 index_get_type_from_vgfid(index_priv_t *priv, uuid_t vgfid)
   43 {
   44     int i = 0;
   45 
   46     for (i = 0; i < XATTROP_TYPE_END; i++) {
   47         if (gf_uuid_compare(priv->internal_vgfid[i], vgfid) == 0)
   48             return i;
   49     }
   50     return -1;
   51 }
   52 
   53 gf_boolean_t
   54 index_is_virtual_gfid(index_priv_t *priv, uuid_t vgfid)
   55 {
   56     if (index_get_type_from_vgfid(priv, vgfid) < 0)
   57         return _gf_false;
   58     return _gf_true;
   59 }
   60 
   61 static int
   62 __index_inode_ctx_get(inode_t *inode, xlator_t *this, index_inode_ctx_t **ctx)
   63 {
   64     int ret = 0;
   65     index_inode_ctx_t *ictx = NULL;
   66     uint64_t tmpctx = 0;
   67 
   68     ret = __inode_ctx_get(inode, this, &tmpctx);
   69     if (!ret) {
   70         ictx = (index_inode_ctx_t *)(long)tmpctx;
   71         goto out;
   72     }
   73     ictx = GF_CALLOC(1, sizeof(*ictx), gf_index_inode_ctx_t);
   74     if (!ictx) {
   75         ret = -1;
   76         goto out;
   77     }
   78 
   79     INIT_LIST_HEAD(&ictx->callstubs);
   80     ret = __inode_ctx_put(inode, this, (uint64_t)(uintptr_t)ictx);
   81     if (ret) {
   82         GF_FREE(ictx);
   83         ictx = NULL;
   84         goto out;
   85     }
   86 out:
   87     if (ictx)
   88         *ctx = ictx;
   89     return ret;
   90 }
   91 
   92 static int
   93 index_inode_ctx_get(inode_t *inode, xlator_t *this, index_inode_ctx_t **ctx)
   94 {
   95     int ret = 0;
   96 
   97     LOCK(&inode->lock);
   98     {
   99         ret = __index_inode_ctx_get(inode, this, ctx);
  100     }
  101     UNLOCK(&inode->lock);
  102 
  103     return ret;
  104 }
  105 
  106 static gf_boolean_t
  107 index_is_subdir_of_entry_changes(xlator_t *this, inode_t *inode)
  108 {
  109     index_inode_ctx_t *ctx = NULL;
  110     int ret = 0;
  111 
  112     if (!inode)
  113         return _gf_false;
  114 
  115     ret = index_inode_ctx_get(inode, this, &ctx);
  116     if ((ret == 0) && !gf_uuid_is_null(ctx->virtual_pargfid))
  117         return _gf_true;
  118     return _gf_false;
  119 }
  120 
  121 static int
  122 index_get_type_from_vgfid_xattr(const char *name)
  123 {
  124     int i = 0;
  125 
  126     for (i = 0; i < XATTROP_TYPE_END; i++) {
  127         if (strcmp(name, index_vgfid_xattrs[i]) == 0)
  128             return i;
  129     }
  130     return -1;
  131 }
  132 
  133 gf_boolean_t
  134 index_is_fop_on_internal_inode(xlator_t *this, inode_t *inode, uuid_t gfid)
  135 {
  136     index_priv_t *priv = this->private;
  137     uuid_t vgfid = {0};
  138 
  139     if (!inode)
  140         return _gf_false;
  141 
  142     if (gfid && !gf_uuid_is_null(gfid))
  143         gf_uuid_copy(vgfid, gfid);
  144     else
  145         gf_uuid_copy(vgfid, inode->gfid);
  146 
  147     if (index_is_virtual_gfid(priv, vgfid))
  148         return _gf_true;
  149     if (index_is_subdir_of_entry_changes(this, inode))
  150         return _gf_true;
  151     return _gf_false;
  152 }
  153 
  154 static gf_boolean_t
  155 index_is_vgfid_xattr(const char *name)
  156 {
  157     if (index_get_type_from_vgfid_xattr(name) < 0)
  158         return _gf_false;
  159     return _gf_true;
  160 }
  161 
  162 call_stub_t *
  163 __index_dequeue(struct list_head *callstubs)
  164 {
  165     call_stub_t *stub = NULL;
  166 
  167     if (!list_empty(callstubs)) {
  168         stub = list_entry(callstubs->next, call_stub_t, list);
  169         list_del_init(&stub->list);
  170     }
  171 
  172     return stub;
  173 }
  174 
  175 static void
  176 __index_enqueue(struct list_head *callstubs, call_stub_t *stub)
  177 {
  178     list_add_tail(&stub->list, callstubs);
  179 }
  180 
  181 static void
  182 worker_enqueue(xlator_t *this, call_stub_t *stub)
  183 {
  184     index_priv_t *priv = NULL;
  185 
  186     priv = this->private;
  187     pthread_mutex_lock(&priv->mutex);
  188     {
  189         __index_enqueue(&priv->callstubs, stub);
  190         GF_ATOMIC_INC(priv->stub_cnt);
  191         pthread_cond_signal(&priv->cond);
  192     }
  193     pthread_mutex_unlock(&priv->mutex);
  194 }
  195 
  196 void *
  197 index_worker(void *data)
  198 {
  199     index_priv_t *priv = NULL;
  200     xlator_t *this = NULL;
  201     call_stub_t *stub = NULL;
  202     gf_boolean_t bye = _gf_false;
  203 
  204     THIS = data;
  205     this = data;
  206     priv = this->private;
  207 
  208     for (;;) {
  209         pthread_mutex_lock(&priv->mutex);
  210         {
  211             while (list_empty(&priv->callstubs)) {
  212                 if (priv->down) {
  213                     bye = _gf_true; /*Avoid wait*/
  214                     break;
  215                 }
  216                 (void)pthread_cond_wait(&priv->cond, &priv->mutex);
  217                 if (priv->down) {
  218                     bye = _gf_true;
  219                     break;
  220                 }
  221             }
  222             if (!bye)
  223                 stub = __index_dequeue(&priv->callstubs);
  224             if (bye) {
  225                 priv->curr_count--;
  226                 if (priv->curr_count == 0)
  227                     pthread_cond_broadcast(&priv->cond);
  228             }
  229         }
  230         pthread_mutex_unlock(&priv->mutex);
  231 
  232         if (stub) { /* guard against spurious wakeups */
  233             call_resume(stub);
  234             GF_ATOMIC_DEC(priv->stub_cnt);
  235         }
  236         stub = NULL;
  237         if (bye)
  238             break;
  239     }
  240 
  241     return NULL;
  242 }
  243 
  244 static void
  245 make_index_dir_path(char *base, const char *subdir, char *index_dir, size_t len)
  246 {
  247     snprintf(index_dir, len, "%s/%s", base, subdir);
  248 }
  249 
  250 int
  251 index_dir_create(xlator_t *this, const char *subdir)
  252 {
  253     int ret = 0;
  254     struct stat st = {0};
  255     char fullpath[PATH_MAX] = {0};
  256     char path[PATH_MAX] = {0};
  257     char *dir = NULL;
  258     index_priv_t *priv = NULL;
  259     size_t len = 0;
  260     size_t pathlen = 0;
  261 
  262     priv = this->private;
  263     make_index_dir_path(priv->index_basepath, subdir, fullpath,
  264                         sizeof(fullpath));
  265     ret = sys_stat(fullpath, &st);
  266     if (!ret) {
  267         if (!S_ISDIR(st.st_mode))
  268             ret = -2;
  269         goto out;
  270     }
  271 
  272     pathlen = strlen(fullpath);
  273     if ((pathlen > 1) && fullpath[pathlen - 1] == '/')
  274         fullpath[pathlen - 1] = '\0';
  275     dir = strchr(fullpath, '/');
  276     while (dir) {
  277         dir = strchr(dir + 1, '/');
  278         if (dir)
  279             len = pathlen - strlen(dir);
  280         else
  281             len = pathlen;
  282         strncpy(path, fullpath, len);
  283         path[len] = '\0';
  284         ret = sys_mkdir(path, 0600);
  285         if (ret && (errno != EEXIST))
  286             goto out;
  287     }
  288     ret = 0;
  289 out:
  290     if (ret == -1) {
  291         gf_msg(this->name, GF_LOG_ERROR, errno,
  292                INDEX_MSG_INDEX_DIR_CREATE_FAILED,
  293                "%s/%s: Failed to "
  294                "create",
  295                priv->index_basepath, subdir);
  296     } else if (ret == -2) {
  297         gf_msg(this->name, GF_LOG_ERROR, ENOTDIR,
  298                INDEX_MSG_INDEX_DIR_CREATE_FAILED,
  299                "%s/%s: Failed to "
  300                "create, path exists, not a directory ",
  301                priv->index_basepath, subdir);
  302     }
  303     return ret;
  304 }
  305 
  306 void
  307 index_get_index(index_priv_t *priv, uuid_t index)
  308 {
  309     LOCK(&priv->lock);
  310     {
  311         gf_uuid_copy(index, priv->index);
  312     }
  313     UNLOCK(&priv->lock);
  314 }
  315 
  316 void
  317 index_generate_index(index_priv_t *priv, uuid_t index)
  318 {
  319     LOCK(&priv->lock);
  320     {
  321         // To prevent duplicate generates.
  322         // This method fails if number of contending threads is greater
  323         // than MAX_LINK count of the fs
  324         if (!gf_uuid_compare(priv->index, index))
  325             gf_uuid_generate(priv->index);
  326         gf_uuid_copy(index, priv->index);
  327     }
  328     UNLOCK(&priv->lock);
  329 }
  330 
  331 static void
  332 make_index_path(char *base, const char *subdir, uuid_t index, char *index_path,
  333                 size_t len)
  334 {
  335     make_index_dir_path(base, subdir, index_path, len);
  336     snprintf(index_path + strlen(index_path), len - strlen(index_path),
  337              "/%s-%s", subdir, uuid_utoa(index));
  338 }
  339 
  340 static void
  341 make_gfid_path(char *base, const char *subdir, uuid_t gfid, char *gfid_path,
  342                size_t len)
  343 {
  344     make_index_dir_path(base, subdir, gfid_path, len);
  345     snprintf(gfid_path + strlen(gfid_path), len - strlen(gfid_path), "/%s",
  346              uuid_utoa(gfid));
  347 }
  348 
  349 static void
  350 make_file_path(char *base, const char *subdir, const char *filename,
  351                char *file_path, size_t len)
  352 {
  353     make_index_dir_path(base, subdir, file_path, len);
  354     snprintf(file_path + strlen(file_path), len - strlen(file_path), "/%s",
  355              filename);
  356 }
  357 
  358 static int
  359 is_index_file_current(char *filename, uuid_t priv_index, char *subdir)
  360 {
  361     char current_index[GF_UUID_BUF_SIZE + 16] = {
  362         0,
  363     };
  364 
  365     snprintf(current_index, sizeof current_index, "%s-%s", subdir,
  366              uuid_utoa(priv_index));
  367     return (!strcmp(filename, current_index));
  368 }
  369 
  370 static void
  371 check_delete_stale_index_file(xlator_t *this, char *filename, char *subdir)
  372 {
  373     int ret = 0;
  374     struct stat st = {0};
  375     char filepath[PATH_MAX] = {0};
  376     index_priv_t *priv = NULL;
  377 
  378     priv = this->private;
  379 
  380     if (is_index_file_current(filename, priv->index, subdir))
  381         return;
  382 
  383     make_file_path(priv->index_basepath, subdir, filename, filepath,
  384                    sizeof(filepath));
  385     ret = sys_stat(filepath, &st);
  386     if (!ret && st.st_nlink == 1)
  387         sys_unlink(filepath);
  388 }
  389 
  390 static void
  391 index_set_link_count(index_priv_t *priv, int64_t count,
  392                      index_xattrop_type_t type)
  393 {
  394     switch (type) {
  395         case XATTROP:
  396             LOCK(&priv->lock);
  397             {
  398                 priv->pending_count = count;
  399             }
  400             UNLOCK(&priv->lock);
  401             break;
  402         default:
  403             break;
  404     }
  405 }
  406 
  407 static void
  408 index_get_link_count(index_priv_t *priv, int64_t *count,
  409                      index_xattrop_type_t type)
  410 {
  411     switch (type) {
  412         case XATTROP:
  413             LOCK(&priv->lock);
  414             {
  415                 *count = priv->pending_count;
  416             }
  417             UNLOCK(&priv->lock);
  418             break;
  419         default:
  420             break;
  421     }
  422 }
  423 
  424 static void
  425 index_dec_link_count(index_priv_t *priv, index_xattrop_type_t type)
  426 {
  427     switch (type) {
  428         case XATTROP:
  429             LOCK(&priv->lock);
  430             {
  431                 priv->pending_count--;
  432                 if (priv->pending_count == 0)
  433                     priv->pending_count--;
  434             }
  435             UNLOCK(&priv->lock);
  436             break;
  437         default:
  438             break;
  439     }
  440 }
  441 
  442 char *
  443 index_get_subdir_from_type(index_xattrop_type_t type)
  444 {
  445     if (type < XATTROP || type >= XATTROP_TYPE_END)
  446         return NULL;
  447     return index_subdirs[type];
  448 }
  449 
  450 char *
  451 index_get_subdir_from_vgfid(index_priv_t *priv, uuid_t vgfid)
  452 {
  453     return index_get_subdir_from_type(index_get_type_from_vgfid(priv, vgfid));
  454 }
  455 
  456 static int
  457 index_fill_readdir(fd_t *fd, index_fd_ctx_t *fctx, DIR *dir, off_t off,
  458                    size_t size, gf_dirent_t *entries)
  459 {
  460     off_t in_case = -1;
  461     off_t last_off = 0;
  462     size_t filled = 0;
  463     int count = 0;
  464     struct dirent *entry = NULL;
  465     struct dirent scratch[2] = {
  466         {
  467             0,
  468         },
  469     };
  470     int32_t this_size = -1;
  471     gf_dirent_t *this_entry = NULL;
  472     xlator_t *this = NULL;
  473 
  474     this = THIS;
  475     if (!off) {
  476         rewinddir(dir);
  477     } else {
  478         seekdir(dir, off);
  479 #ifndef GF_LINUX_HOST_OS
  480         if ((u_long)telldir(dir) != off && off != fctx->dir_eof) {
  481             gf_msg(THIS->name, GF_LOG_ERROR, EINVAL,
  482                    INDEX_MSG_INDEX_READDIR_FAILED,
  483                    "seekdir(0x%llx) failed on dir=%p: "
  484                    "Invalid argument (offset reused from "
  485                    "another DIR * structure?)",
  486                    off, dir);
  487             errno = EINVAL;
  488             count = -1;
  489             goto out;
  490         }
  491 #endif /* GF_LINUX_HOST_OS */
  492     }
  493 
  494     while (filled <= size) {
  495         in_case = (u_long)telldir(dir);
  496 
  497         if (in_case == -1) {
  498             gf_msg(THIS->name, GF_LOG_ERROR, errno,
  499                    INDEX_MSG_INDEX_READDIR_FAILED, "telldir failed on dir=%p",
  500                    dir);
  501             goto out;
  502         }
  503 
  504         errno = 0;
  505         entry = sys_readdir(dir, scratch);
  506         if (!entry || errno != 0) {
  507             if (errno == EBADF) {
  508                 gf_msg(THIS->name, GF_LOG_WARNING, errno,
  509                        INDEX_MSG_INDEX_READDIR_FAILED,
  510                        "readdir failed on dir=%p", dir);
  511                 goto out;
  512             }
  513             break;
  514         }
  515 
  516         if (!strncmp(entry->d_name, XATTROP_SUBDIR "-",
  517                      strlen(XATTROP_SUBDIR "-"))) {
  518             check_delete_stale_index_file(this, entry->d_name, XATTROP_SUBDIR);
  519             continue;
  520         } else if (!strncmp(entry->d_name, DIRTY_SUBDIR "-",
  521                             strlen(DIRTY_SUBDIR "-"))) {
  522             check_delete_stale_index_file(this, entry->d_name, DIRTY_SUBDIR);
  523             continue;
  524         }
  525 
  526         this_size = max(sizeof(gf_dirent_t), sizeof(gfs3_dirplist)) +
  527                     strlen(entry->d_name) + 1;
  528 
  529         if (this_size + filled > size) {
  530             seekdir(dir, in_case);
  531 #ifndef GF_LINUX_HOST_OS
  532             if ((u_long)telldir(dir) != in_case && in_case != fctx->dir_eof) {
  533                 gf_msg(THIS->name, GF_LOG_ERROR, EINVAL,
  534                        INDEX_MSG_INDEX_READDIR_FAILED,
  535                        "seekdir(0x%llx) failed on dir=%p: "
  536                        "Invalid argument (offset reused from "
  537                        "another DIR * structure?)",
  538                        in_case, dir);
  539                 errno = EINVAL;
  540                 count = -1;
  541                 goto out;
  542             }
  543 #endif /* GF_LINUX_HOST_OS */
  544             break;
  545         }
  546 
  547         this_entry = gf_dirent_for_name(entry->d_name);
  548 
  549         if (!this_entry) {
  550             gf_msg(THIS->name, GF_LOG_ERROR, errno,
  551                    INDEX_MSG_INDEX_READDIR_FAILED,
  552                    "could not create gf_dirent for entry %s", entry->d_name);
  553             goto out;
  554         }
  555         /*
  556          * we store the offset of next entry here, which is
  557          * probably not intended, but code using syncop_readdir()
  558          * (glfs-heal.c, afr-self-heald.c, pump.c) rely on it
  559          * for directory read resumption.
  560          */
  561         last_off = (u_long)telldir(dir);
  562         this_entry->d_off = last_off;
  563         this_entry->d_ino = entry->d_ino;
  564 
  565         list_add_tail(&this_entry->list, &entries->list);
  566 
  567         filled += this_size;
  568         count++;
  569     }
  570 
  571     errno = 0;
  572 
  573     if ((!sys_readdir(dir, scratch) && (errno == 0))) {
  574         /* Indicate EOF */
  575         errno = ENOENT;
  576         /* Remember EOF offset for later detection */
  577         fctx->dir_eof = last_off;
  578     }
  579 out:
  580     return count;
  581 }
  582 
  583 int
  584 index_link_to_base(xlator_t *this, char *fpath, const char *subdir)
  585 {
  586     int ret = 0;
  587     int fd = 0;
  588     int op_errno = 0;
  589     uuid_t index = {0};
  590     index_priv_t *priv = this->private;
  591     char base[PATH_MAX] = {0};
  592 
  593     index_get_index(priv, index);
  594     make_index_path(priv->index_basepath, subdir, index, base, sizeof(base));
  595 
  596     ret = sys_link(base, fpath);
  597     if (!ret || (errno == EEXIST)) {
  598         ret = 0;
  599         goto out;
  600     }
  601 
  602     op_errno = errno;
  603     if (op_errno == ENOENT) {
  604         ret = index_dir_create(this, subdir);
  605         if (ret) {
  606             op_errno = errno;
  607             goto out;
  608         }
  609     } else if (op_errno == EMLINK) {
  610         index_generate_index(priv, index);
  611         make_index_path(priv->index_basepath, subdir, index, base,
  612                         sizeof(base));
  613     } else {
  614         goto out;
  615     }
  616 
  617     op_errno = 0;
  618     fd = sys_creat(base, 0);
  619     if ((fd < 0) && (errno != EEXIST)) {
  620         op_errno = errno;
  621         gf_msg(this->name, GF_LOG_ERROR, op_errno, INDEX_MSG_INDEX_ADD_FAILED,
  622                "%s: Not able to "
  623                "create index",
  624                fpath);
  625         goto out;
  626     }
  627 
  628     if (fd >= 0)
  629         sys_close(fd);
  630 
  631     ret = sys_link(base, fpath);
  632     if (ret && (errno != EEXIST)) {
  633         op_errno = errno;
  634         gf_msg(this->name, GF_LOG_ERROR, errno, INDEX_MSG_INDEX_ADD_FAILED,
  635                "%s: Not able to "
  636                "add to index",
  637                fpath);
  638         goto out;
  639     }
  640 out:
  641     return -op_errno;
  642 }
  643 
  644 int
  645 index_add(xlator_t *this, uuid_t gfid, const char *subdir,
  646           index_xattrop_type_t type)
  647 {
  648     char gfid_path[PATH_MAX] = {0};
  649     int ret = -1;
  650     index_priv_t *priv = NULL;
  651     struct stat st = {0};
  652 
  653     priv = this->private;
  654 
  655     if (gf_uuid_is_null(gfid)) {
  656         GF_ASSERT(0);
  657         goto out;
  658     }
  659 
  660     make_gfid_path(priv->index_basepath, subdir, gfid, gfid_path,
  661                    sizeof(gfid_path));
  662 
  663     ret = sys_stat(gfid_path, &st);
  664     if (!ret)
  665         goto out;
  666     ret = index_link_to_base(this, gfid_path, subdir);
  667 out:
  668     return ret;
  669 }
  670 
  671 int
  672 index_del(xlator_t *this, uuid_t gfid, const char *subdir, int type)
  673 {
  674     int32_t op_errno __attribute__((unused)) = 0;
  675     index_priv_t *priv = NULL;
  676     int ret = 0;
  677     char gfid_path[PATH_MAX] = {0};
  678     char rename_dst[PATH_MAX] = {
  679         0,
  680     };
  681     uuid_t uuid;
  682 
  683     priv = this->private;
  684     GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, !gf_uuid_is_null(gfid), out,
  685                                   op_errno, EINVAL);
  686     make_gfid_path(priv->index_basepath, subdir, gfid, gfid_path,
  687                    sizeof(gfid_path));
  688 
  689     if ((strcmp(subdir, ENTRY_CHANGES_SUBDIR)) == 0) {
  690         ret = sys_rmdir(gfid_path);
  691         /* rmdir above could fail with ENOTEMPTY if the indices under
  692          * it were created when granular-entry-heal was enabled, whereas
  693          * the actual heal that happened was non-granular (or full) in
  694          * nature, resulting in name indices getting left out. To
  695          * clean up this directory without it affecting the IO path perf,
  696          * the directory is renamed to a unique name under
  697          * indices/entry-changes. Self-heal will pick up this entry
  698          * during crawl and on lookup into the file system figure that
  699          * the index is stale and subsequently wipe it out using rmdir().
  700          */
  701         if ((ret) && (errno == ENOTEMPTY)) {
  702             gf_uuid_generate(uuid);
  703             make_gfid_path(priv->index_basepath, subdir, uuid, rename_dst,
  704                            sizeof(rename_dst));
  705             ret = sys_rename(gfid_path, rename_dst);
  706         }
  707     } else {
  708         ret = sys_unlink(gfid_path);
  709     }
  710 
  711     if (ret && (errno != ENOENT)) {
  712         gf_msg(this->name, GF_LOG_ERROR, errno, INDEX_MSG_INDEX_DEL_FAILED,
  713                "%s: failed to delete"
  714                " from index",
  715                gfid_path);
  716         ret = -errno;
  717         goto out;
  718     }
  719 
  720     index_dec_link_count(priv, type);
  721     ret = 0;
  722 out:
  723     return ret;
  724 }
  725 
  726 static gf_boolean_t
  727 _is_xattr_in_watchlist(dict_t *d, char *k, data_t *v, void *tmp)
  728 {
  729     if (!strncmp(k, tmp, strlen(k)))
  730         return _gf_true;
  731 
  732     return _gf_false;
  733 }
  734 
  735 static gf_boolean_t
  736 is_xattr_in_watchlist(dict_t *this, char *key, data_t *value, void *matchdata)
  737 {
  738     int ret = -1;
  739 
  740     // matchdata is a list of xattrs
  741     // key is strncmp'ed with each xattr in matchdata.
  742     // ret will be 0 if key pattern is not present in the matchdata
  743     // else ret will be count number of xattrs the key pattern-matches with.
  744     ret = dict_foreach_match(matchdata, _is_xattr_in_watchlist, key,
  745                              dict_null_foreach_fn, NULL);
  746 
  747     if (ret > 0)
  748         return _gf_true;
  749     return _gf_false;
  750 }
  751 
  752 static int
  753 index_find_xattr_type(dict_t *d, char *k, data_t *v)
  754 {
  755     int idx = -1;
  756     index_priv_t *priv = THIS->private;
  757 
  758     if (priv->dirty_watchlist &&
  759         is_xattr_in_watchlist(d, k, v, priv->dirty_watchlist))
  760         idx = DIRTY;
  761     else if (priv->pending_watchlist &&
  762              is_xattr_in_watchlist(d, k, v, priv->pending_watchlist))
  763         idx = XATTROP;
  764 
  765     return idx;
  766 }
  767 
  768 int
  769 index_fill_zero_array(dict_t *d, char *k, data_t *v, void *adata)
  770 {
  771     int idx = -1;
  772     int *zfilled = adata;
  773     // zfilled array contains `state` for all types xattrs.
  774     // state : whether the gfid file of this file exists in
  775     // corresponding xattr directory or not.
  776 
  777     idx = index_find_xattr_type(d, k, v);
  778     if (idx == -1)
  779         return 0;
  780     zfilled[idx] = 0;
  781     return 0;
  782 }
  783 
  784 static int
  785 _check_key_is_zero_filled(dict_t *d, char *k, data_t *v, void *tmp)
  786 {
  787     int *zfilled = tmp;
  788     int idx = -1;
  789 
  790     idx = index_find_xattr_type(d, k, v);
  791     if (idx == -1)
  792         return 0;
  793 
  794     /* Along with checking that the value of a key is zero filled
  795      * the key's corresponding index should be assigned
  796      * appropriate value.
  797      * zfilled[idx] will be 0(false) if value not zero.
  798      *              will be 1(true) if value is zero.
  799      */
  800     if (mem_0filled((const char *)v->data, v->len)) {
  801         zfilled[idx] = 0;
  802         return 0;
  803     }
  804 
  805     /* If zfilled[idx] was previously 0, it means at least
  806      * one xattr of its "kind" is non-zero. Keep its value
  807      * the same.
  808      */
  809     if (zfilled[idx])
  810         zfilled[idx] = 1;
  811     return 0;
  812 }
  813 
  814 int
  815 index_entry_create(xlator_t *this, inode_t *inode, char *filename)
  816 {
  817     int ret = -1;
  818     int op_errno = 0;
  819     char pgfid_path[PATH_MAX] = {0};
  820     char entry_path[PATH_MAX] = {0};
  821     index_priv_t *priv = NULL;
  822     index_inode_ctx_t *ctx = NULL;
  823     int32_t len = 0;
  824 
  825     priv = this->private;
  826 
  827     GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, !gf_uuid_is_null(inode->gfid),
  828                                   out, op_errno, EINVAL);
  829     GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, filename, out, op_errno, EINVAL);
  830 
  831     ret = index_inode_ctx_get(inode, this, &ctx);
  832     if (ret) {
  833         op_errno = EINVAL;
  834         gf_msg(this->name, GF_LOG_ERROR, op_errno,
  835                INDEX_MSG_INODE_CTX_GET_SET_FAILED,
  836                "Not able to get inode ctx for %s", uuid_utoa(inode->gfid));
  837         goto out;
  838     }
  839 
  840     make_gfid_path(priv->index_basepath, ENTRY_CHANGES_SUBDIR, inode->gfid,
  841                    pgfid_path, sizeof(pgfid_path));
  842 
  843     if (ctx->state[ENTRY_CHANGES] != IN) {
  844         ret = sys_mkdir(pgfid_path, 0600);
  845         if (ret != 0 && errno != EEXIST) {
  846             op_errno = errno;
  847             goto out;
  848         }
  849         ctx->state[ENTRY_CHANGES] = IN;
  850     }
  851 
  852     if (strchr(filename, '/')) {
  853         gf_msg(this->name, GF_LOG_ERROR, EINVAL, INDEX_MSG_INDEX_ADD_FAILED,
  854                "Got invalid entry (%s) for pargfid path (%s)", filename,
  855                pgfid_path);
  856         op_errno = EINVAL;
  857         goto out;
  858     }
  859 
  860     len = snprintf(entry_path, sizeof(entry_path), "%s/%s", pgfid_path,
  861                    filename);
  862     if ((len < 0) || (len >= sizeof(entry_path))) {
  863         op_errno = EINVAL;
  864         goto out;
  865     }
  866 
  867     op_errno = 0;
  868 
  869     ret = index_link_to_base(this, entry_path, ENTRY_CHANGES_SUBDIR);
  870 out:
  871     if (op_errno)
  872         ret = -op_errno;
  873     return ret;
  874 }
  875 
  876 int
  877 index_entry_delete(xlator_t *this, uuid_t pgfid, char *filename)
  878 {
  879     int ret = 0;
  880     int op_errno = 0;
  881     char pgfid_path[PATH_MAX] = {0};
  882     char entry_path[PATH_MAX] = {0};
  883     index_priv_t *priv = NULL;
  884     int32_t len = 0;
  885 
  886     priv = this->private;
  887 
  888     GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, !gf_uuid_is_null(pgfid), out,
  889                                   op_errno, EINVAL);
  890     GF_ASSERT_AND_GOTO_WITH_ERROR(this->name, filename, out, op_errno, EINVAL);
  891 
  892     make_gfid_path(priv->index_basepath, ENTRY_CHANGES_SUBDIR, pgfid,
  893                    pgfid_path, sizeof(pgfid_path));
  894 
  895     if (strchr(filename, '/')) {
  896         gf_msg(this->name, GF_LOG_ERROR, EINVAL, INDEX_MSG_INDEX_DEL_FAILED,
  897                "Got invalid entry (%s) for pargfid path (%s)", filename,
  898                pgfid_path);
  899         op_errno = EINVAL;
  900         goto out;
  901     }
  902 
  903     len = snprintf(entry_path, sizeof(entry_path), "%s/%s", pgfid_path,
  904                    filename);
  905     if ((len < 0) || (len >= sizeof(entry_path))) {
  906         op_errno = EINVAL;
  907         goto out;
  908     }
  909 
  910     ret = sys_unlink(entry_path);
  911     if (ret && (errno != ENOENT)) {
  912         op_errno = errno;
  913         gf_msg(this->name, GF_LOG_ERROR, op_errno, INDEX_MSG_INDEX_DEL_FAILED,
  914                "%s: failed to delete from index/entry-changes", entry_path);
  915     }
  916 
  917 out:
  918     return -op_errno;
  919 }
  920 
  921 int
  922 index_entry_action(xlator_t *this, inode_t *inode, dict_t *xdata, char *key)
  923 {
  924     int ret = 0;
  925     char *filename = NULL;
  926 
  927     ret = dict_get_str(xdata, key, &filename);
  928     if (ret != 0) {
  929         ret = 0;
  930         goto out;
  931     }
  932 
  933     if (strcmp(key, GF_XATTROP_ENTRY_IN_KEY) == 0)
  934         ret = index_entry_create(this, inode, filename);
  935     else if (strcmp(key, GF_XATTROP_ENTRY_OUT_KEY) == 0)
  936         ret = index_entry_delete(this, inode->gfid, filename);
  937 
  938 out:
  939     return ret;
  940 }
  941 
  942 void
  943 _index_action(xlator_t *this, inode_t *inode, int *zfilled)
  944 {
  945     int ret = 0;
  946     int i = 0;
  947     index_inode_ctx_t *ctx = NULL;
  948     char *subdir = NULL;
  949 
  950     ret = index_inode_ctx_get(inode, this, &ctx);
  951     if (ret) {
  952         gf_msg(this->name, GF_LOG_ERROR, EINVAL,
  953                INDEX_MSG_INODE_CTX_GET_SET_FAILED,
  954                "Not able to get"
  955                " inode context for %s.",
  956                uuid_utoa(inode->gfid));
  957         goto out;
  958     }
  959 
  960     for (i = 0; i < XATTROP_TYPE_END; i++) {
  961         subdir = index_get_subdir_from_type(i);
  962         if (zfilled[i] == 1) {
  963             if (ctx->state[i] == NOTIN)
  964                 continue;
  965             ret = index_del(this, inode->gfid, subdir, i);
  966             if (!ret)
  967                 ctx->state[i] = NOTIN;
  968         } else if (zfilled[i] == 0) {
  969             if (ctx->state[i] == IN)
  970                 continue;
  971             ret = index_add(this, inode->gfid, subdir, i);
  972             if (!ret)
  973                 ctx->state[i] = IN;
  974         }
  975     }
  976 out:
  977     return;
  978 }
  979 
  980 static void
  981 index_init_state(xlator_t *this, inode_t *inode, index_inode_ctx_t *ctx,
  982                  char *subdir)
  983 {
  984     int ret = -1;
  985     char pgfid_path[PATH_MAX] = {0};
  986     struct stat st = {0};
  987     index_priv_t *priv = NULL;
  988 
  989     priv = this->private;
  990 
  991     make_gfid_path(priv->index_basepath, subdir, inode->gfid, pgfid_path,
  992                    sizeof(pgfid_path));
  993 
  994     ret = sys_stat(pgfid_path, &st);
  995     if (ret == 0)
  996         ctx->state[ENTRY_CHANGES] = IN;
  997     else if (ret != 0 && errno == ENOENT)
  998         ctx->state[ENTRY_CHANGES] = NOTIN;
  999 
 1000     return;
 1001 }
 1002 
 1003 void
 1004 xattrop_index_action(xlator_t *this, index_local_t *local, dict_t *xattr,
 1005                      dict_match_t match, void *match_data)
 1006 {
 1007     int ret = 0;
 1008     int zfilled[XATTROP_TYPE_END] = {
 1009         0,
 1010     };
 1011     int8_t value = 0;
 1012     char *subdir = NULL;
 1013     dict_t *req_xdata = NULL;
 1014     inode_t *inode = NULL;
 1015     index_inode_ctx_t *ctx = NULL;
 1016 
 1017     inode = local->inode;
 1018     req_xdata = local->xdata;
 1019 
 1020     memset(zfilled, -1, sizeof(zfilled));
 1021     ret = dict_foreach_match(xattr, match, match_data,
 1022                              _check_key_is_zero_filled, zfilled);
 1023     _index_action(this, inode, zfilled);
 1024 
 1025     if (req_xdata) {
 1026         ret = index_entry_action(this, inode, req_xdata,
 1027                                  GF_XATTROP_ENTRY_OUT_KEY);
 1028 
 1029         ret = dict_get_int8(req_xdata, GF_XATTROP_PURGE_INDEX, &value);
 1030         if ((ret) || (value == 0))
 1031             goto out;
 1032     }
 1033 
 1034     if (zfilled[XATTROP] != 1)
 1035         goto out;
 1036 
 1037     if (inode->ia_type != IA_IFDIR)
 1038         goto out;
 1039 
 1040     subdir = index_get_subdir_from_type(ENTRY_CHANGES);
 1041     ret = index_inode_ctx_get(inode, this, &ctx);
 1042     if (ctx->state[ENTRY_CHANGES] == UNKNOWN)
 1043         index_init_state(this, inode, ctx, subdir);
 1044     if (ctx->state[ENTRY_CHANGES] == IN) {
 1045         ret = index_del(this, inode->gfid, subdir, ENTRY_CHANGES);
 1046         ctx->state[ENTRY_CHANGES] = NOTIN;
 1047     }
 1048 
 1049 out:
 1050     return;
 1051 }
 1052 
 1053 static gf_boolean_t
 1054 index_xattrop_track(xlator_t *this, gf_xattrop_flags_t flags, dict_t *dict)
 1055 {
 1056     index_priv_t *priv = this->private;
 1057 
 1058     if (flags == GF_XATTROP_ADD_ARRAY)
 1059         return _gf_true;
 1060 
 1061     if (flags != GF_XATTROP_ADD_ARRAY64)
 1062         return _gf_false;
 1063 
 1064     if (!priv->pending_watchlist)
 1065         return _gf_false;
 1066 
 1067     if (dict_foreach_match(dict, is_xattr_in_watchlist, priv->pending_watchlist,
 1068                            dict_null_foreach_fn, NULL) > 0)
 1069         return _gf_true;
 1070 
 1071     return _gf_false;
 1072 }
 1073 
 1074 int
 1075 index_inode_path(xlator_t *this, inode_t *inode, char *dirpath, size_t len)
 1076 {
 1077     char *subdir = NULL;
 1078     int ret = 0;
 1079     index_priv_t *priv = NULL;
 1080     index_inode_ctx_t *ictx = NULL;
 1081 
 1082     priv = this->private;
 1083     if (!index_is_fop_on_internal_inode(this, inode, NULL)) {
 1084         ret = -EINVAL;
 1085         goto out;
 1086     }
 1087 
 1088     subdir = index_get_subdir_from_vgfid(priv, inode->gfid);
 1089     if (subdir) {
 1090         if (len <= strlen(priv->index_basepath) + 1 /*'/'*/ + strlen(subdir)) {
 1091             ret = -EINVAL;
 1092             goto out;
 1093         }
 1094         make_index_dir_path(priv->index_basepath, subdir, dirpath, len);
 1095     } else {
 1096         ret = index_inode_ctx_get(inode, this, &ictx);
 1097         if (ret)
 1098             goto out;
 1099         if (gf_uuid_is_null(ictx->virtual_pargfid)) {
 1100             ret = -EINVAL;
 1101             goto out;
 1102         }
 1103         make_index_dir_path(priv->index_basepath, ENTRY_CHANGES_SUBDIR, dirpath,
 1104                             len);
 1105         if (len <= strlen(dirpath) + 1 /*'/'*/ + SLEN(UUID0_STR)) {
 1106             ret = -EINVAL;
 1107             goto out;
 1108         }
 1109         strcat(dirpath, "/");
 1110         strcat(dirpath, uuid_utoa(ictx->virtual_pargfid));
 1111     }
 1112 out:
 1113     return ret;
 1114 }
 1115 
 1116 int
 1117 __index_fd_ctx_get(fd_t *fd, xlator_t *this, index_fd_ctx_t **ctx)
 1118 {
 1119     int ret = 0;
 1120     index_fd_ctx_t *fctx = NULL;
 1121     uint64_t tmpctx = 0;
 1122     char dirpath[PATH_MAX] = {0};
 1123 
 1124     ret = __fd_ctx_get(fd, this, &tmpctx);
 1125     if (!ret) {
 1126         fctx = (index_fd_ctx_t *)(long)tmpctx;
 1127         *ctx = fctx;
 1128         goto out;
 1129     }
 1130 
 1131     ret = index_inode_path(this, fd->inode, dirpath, sizeof(dirpath));
 1132     if (ret)
 1133         goto out;
 1134 
 1135     fctx = GF_CALLOC(1, sizeof(*fctx), gf_index_fd_ctx_t);
 1136     if (!fctx) {
 1137         ret = -ENOMEM;
 1138         goto out;
 1139     }
 1140 
 1141     fctx->dir = sys_opendir(dirpath);
 1142     if (!fctx->dir) {
 1143         ret = -errno;
 1144         GF_FREE(fctx);
 1145         fctx = NULL;
 1146         goto out;
 1147     }
 1148     fctx->dir_eof = -1;
 1149 
 1150     ret = __fd_ctx_set(fd, this, (uint64_t)(long)fctx);
 1151     if (ret) {
 1152         (void)sys_closedir(fctx->dir);
 1153         GF_FREE(fctx);
 1154         fctx = NULL;
 1155         ret = -EINVAL;
 1156         goto out;
 1157     }
 1158     *ctx = fctx;
 1159 out:
 1160     return ret;
 1161 }
 1162 
 1163 int
 1164 index_fd_ctx_get(fd_t *fd, xlator_t *this, index_fd_ctx_t **ctx)
 1165 {
 1166     int ret = 0;
 1167     LOCK(&fd->lock);
 1168     {
 1169         ret = __index_fd_ctx_get(fd, this, ctx);
 1170     }
 1171     UNLOCK(&fd->lock);
 1172     return ret;
 1173 }
 1174 
 1175 // new - Not NULL means start a fop
 1176 // new - NULL means done processing the fop
 1177 void
 1178 index_queue_process(xlator_t *this, inode_t *inode, call_stub_t *new)
 1179 {
 1180     call_stub_t *stub = NULL;
 1181     index_inode_ctx_t *ctx = NULL;
 1182     int ret = 0;
 1183     call_frame_t *frame = NULL;
 1184 
 1185     LOCK(&inode->lock);
 1186     {
 1187         ret = __index_inode_ctx_get(inode, this, &ctx);
 1188         if (ret)
 1189             goto unlock;
 1190 
 1191         if (new) {
 1192             __index_enqueue(&ctx->callstubs, new);
 1193             new = NULL;
 1194         } else {
 1195             ctx->processing = _gf_false;
 1196         }
 1197 
 1198         if (!ctx->processing) {
 1199             stub = __index_dequeue(&ctx->callstubs);
 1200             if (stub)
 1201                 ctx->processing = _gf_true;
 1202             else
 1203                 ctx->processing = _gf_false;
 1204         }
 1205     }
 1206 unlock:
 1207     UNLOCK(&inode->lock);
 1208 
 1209     if (ret && new) {
 1210         frame = new->frame;
 1211         if (new->fop == GF_FOP_XATTROP) {
 1212             INDEX_STACK_UNWIND(xattrop, frame, -1, ENOMEM, NULL, NULL);
 1213         } else if (new->fop == GF_FOP_FXATTROP) {
 1214             INDEX_STACK_UNWIND(fxattrop, frame, -1, ENOMEM, NULL, NULL);
 1215         }
 1216         call_stub_destroy(new);
 1217     } else if (stub) {
 1218         call_resume(stub);
 1219     }
 1220     return;
 1221 }
 1222 
 1223 static int
 1224 xattrop_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
 1225             int32_t op_errno, dict_t *xattr, dict_t *xdata, dict_match_t match,
 1226             dict_t *matchdata)
 1227 {
 1228     inode_t *inode = NULL;
 1229     index_local_t *local = NULL;
 1230 
 1231     local = frame->local;
 1232     inode = inode_ref(local->inode);
 1233 
 1234     if (op_ret < 0)
 1235         goto out;
 1236 
 1237     xattrop_index_action(this, local, xattr, match, matchdata);
 1238 out:
 1239     INDEX_STACK_UNWIND(xattrop, frame, op_ret, op_errno, xattr, xdata);
 1240     index_queue_process(this, inode, NULL);
 1241     inode_unref(inode);
 1242 
 1243     return 0;
 1244 }
 1245 
 1246 int32_t
 1247 index_xattrop_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 1248                   int32_t op_ret, int32_t op_errno, dict_t *xattr,
 1249                   dict_t *xdata)
 1250 {
 1251     index_priv_t *priv = this->private;
 1252 
 1253     xattrop_cbk(frame, cookie, this, op_ret, op_errno, xattr, xdata,
 1254                 is_xattr_in_watchlist, priv->complete_watchlist);
 1255     return 0;
 1256 }
 1257 
 1258 int32_t
 1259 index_xattrop64_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 1260                     int32_t op_ret, int32_t op_errno, dict_t *xattr,
 1261                     dict_t *xdata)
 1262 {
 1263     index_priv_t *priv = this->private;
 1264 
 1265     return xattrop_cbk(frame, cookie, this, op_ret, op_errno, xattr, xdata,
 1266                        is_xattr_in_watchlist, priv->pending_watchlist);
 1267 }
 1268 
 1269 void
 1270 index_xattrop_do(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
 1271                  gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
 1272 {
 1273     int ret = -1;
 1274     int zfilled[XATTROP_TYPE_END] = {
 1275         0,
 1276     };
 1277     index_local_t *local = NULL;
 1278     fop_xattrop_cbk_t x_cbk = NULL;
 1279 
 1280     local = frame->local;
 1281 
 1282     if (optype == GF_XATTROP_ADD_ARRAY)
 1283         x_cbk = index_xattrop_cbk;
 1284     else
 1285         x_cbk = index_xattrop64_cbk;
 1286 
 1287     // In wind phase bring the gfid into index. This way if the brick crashes
 1288     // just after posix performs xattrop before _cbk reaches index xlator
 1289     // we will still have the gfid in index.
 1290     memset(zfilled, -1, sizeof(zfilled));
 1291 
 1292     /* Foreach xattr, set corresponding index of zfilled to 1
 1293      * zfilled[index] = 1 implies the xattr's value is zero filled
 1294      * and should be added in its corresponding subdir.
 1295      *
 1296      * zfilled should be set to 1 only for those index that
 1297      * exist in xattr variable. This is to distinguish
 1298      * between different types of volumes.
 1299      * For e.g., if the check is not made,
 1300      * zfilled[DIRTY] is set to 1 for EC volumes,
 1301      * index file will be tried to create in indices/dirty dir
 1302      * which doesn't exist for an EC volume.
 1303      */
 1304     ret = dict_foreach(xattr, index_fill_zero_array, zfilled);
 1305 
 1306     _index_action(this, local->inode, zfilled);
 1307     if (xdata)
 1308         ret = index_entry_action(this, local->inode, xdata,
 1309                                  GF_XATTROP_ENTRY_IN_KEY);
 1310     if (ret < 0) {
 1311         x_cbk(frame, NULL, this, -1, -ret, NULL, NULL);
 1312         return;
 1313     }
 1314 
 1315     if (loc)
 1316         STACK_WIND(frame, x_cbk, FIRST_CHILD(this),
 1317                    FIRST_CHILD(this)->fops->xattrop, loc, optype, xattr, xdata);
 1318     else
 1319         STACK_WIND(frame, x_cbk, FIRST_CHILD(this),
 1320                    FIRST_CHILD(this)->fops->fxattrop, fd, optype, xattr, xdata);
 1321 }
 1322 
 1323 int
 1324 index_xattrop_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc,
 1325                       gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
 1326 {
 1327     index_xattrop_do(frame, this, loc, NULL, optype, xattr, xdata);
 1328     return 0;
 1329 }
 1330 
 1331 int
 1332 index_fxattrop_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd,
 1333                        gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
 1334 {
 1335     index_xattrop_do(frame, this, NULL, fd, optype, xattr, xdata);
 1336     return 0;
 1337 }
 1338 
 1339 int32_t
 1340 index_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc,
 1341               gf_xattrop_flags_t flags, dict_t *dict, dict_t *xdata)
 1342 {
 1343     call_stub_t *stub = NULL;
 1344     index_local_t *local = NULL;
 1345 
 1346     if (!index_xattrop_track(this, flags, dict))
 1347         goto out;
 1348 
 1349     local = mem_get0(this->local_pool);
 1350     if (!local)
 1351         goto err;
 1352 
 1353     frame->local = local;
 1354     local->inode = inode_ref(loc->inode);
 1355     if (xdata)
 1356         local->xdata = dict_ref(xdata);
 1357     stub = fop_xattrop_stub(frame, index_xattrop_wrapper, loc, flags, dict,
 1358                             xdata);
 1359 
 1360 err:
 1361     if ((!local) || (!stub)) {
 1362         INDEX_STACK_UNWIND(xattrop, frame, -1, ENOMEM, NULL, NULL);
 1363         return 0;
 1364     }
 1365 
 1366     index_queue_process(this, loc->inode, stub);
 1367     return 0;
 1368 out:
 1369     STACK_WIND(frame, default_xattrop_cbk, FIRST_CHILD(this),
 1370                FIRST_CHILD(this)->fops->xattrop, loc, flags, dict, xdata);
 1371     return 0;
 1372 }
 1373 
 1374 int32_t
 1375 index_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd,
 1376                gf_xattrop_flags_t flags, dict_t *dict, dict_t *xdata)
 1377 {
 1378     call_stub_t *stub = NULL;
 1379     index_local_t *local = NULL;
 1380 
 1381     if (!index_xattrop_track(this, flags, dict))
 1382         goto out;
 1383 
 1384     local = mem_get0(this->local_pool);
 1385     if (!local)
 1386         goto err;
 1387 
 1388     frame->local = local;
 1389     local->inode = inode_ref(fd->inode);
 1390     if (xdata)
 1391         local->xdata = dict_ref(xdata);
 1392     stub = fop_fxattrop_stub(frame, index_fxattrop_wrapper, fd, flags, dict,
 1393                              xdata);
 1394 
 1395 err:
 1396     if ((!local) || (!stub)) {
 1397         INDEX_STACK_UNWIND(fxattrop, frame, -1, ENOMEM, NULL, xdata);
 1398         return 0;
 1399     }
 1400 
 1401     index_queue_process(this, fd->inode, stub);
 1402     return 0;
 1403 out:
 1404     STACK_WIND(frame, default_fxattrop_cbk, FIRST_CHILD(this),
 1405                FIRST_CHILD(this)->fops->fxattrop, fd, flags, dict, xdata);
 1406     return 0;
 1407 }
 1408 
 1409 uint64_t
 1410 index_entry_count(xlator_t *this, char *subdir)
 1411 {
 1412     uint64_t count = 0;
 1413     index_priv_t *priv = NULL;
 1414     DIR *dirp = NULL;
 1415     struct dirent *entry = NULL;
 1416     struct dirent scratch[2] = {
 1417         {
 1418             0,
 1419         },
 1420     };
 1421     char index_dir[PATH_MAX] = {
 1422         0,
 1423     };
 1424 
 1425     priv = this->private;
 1426 
 1427     make_index_dir_path(priv->index_basepath, subdir, index_dir,
 1428                         sizeof(index_dir));
 1429 
 1430     dirp = sys_opendir(index_dir);
 1431     if (!dirp)
 1432         return 0;
 1433 
 1434     for (;;) {
 1435         errno = 0;
 1436         entry = sys_readdir(dirp, scratch);
 1437         if (!entry || errno != 0)
 1438             break;
 1439 
 1440         if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
 1441             continue;
 1442 
 1443         if (!strncmp(entry->d_name, subdir, strlen(subdir)))
 1444             continue;
 1445 
 1446         count++;
 1447     }
 1448 
 1449     (void)sys_closedir(dirp);
 1450 
 1451     return count;
 1452 }
 1453 
 1454 int32_t
 1455 index_getxattr_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc,
 1456                        const char *name, dict_t *xdata)
 1457 {
 1458     index_priv_t *priv = NULL;
 1459     dict_t *xattr = NULL;
 1460     int ret = 0;
 1461     int vgfid_type = 0;
 1462     uint64_t count = 0;
 1463 
 1464     priv = this->private;
 1465 
 1466     xattr = dict_new();
 1467     if (!xattr) {
 1468         ret = -ENOMEM;
 1469         goto done;
 1470     }
 1471 
 1472     vgfid_type = index_get_type_from_vgfid_xattr(name);
 1473     if (vgfid_type >= 0) {
 1474         ret = dict_set_static_bin(xattr, (char *)name,
 1475                                   priv->internal_vgfid[vgfid_type],
 1476                                   sizeof(priv->internal_vgfid[vgfid_type]));
 1477         if (ret) {
 1478             ret = -EINVAL;
 1479             gf_msg(this->name, GF_LOG_ERROR, -ret, INDEX_MSG_DICT_SET_FAILED,
 1480                    "xattrop index "
 1481                    "gfid set failed");
 1482             goto done;
 1483         }
 1484     }
 1485 
 1486     /* TODO: Need to check what kind of link-counts are needed for
 1487      * ENTRY-CHANGES before refactor of this block with array*/
 1488     if (strcmp(name, GF_XATTROP_INDEX_COUNT) == 0) {
 1489         count = index_entry_count(this, XATTROP_SUBDIR);
 1490 
 1491         ret = dict_set_uint64(xattr, (char *)name, count);
 1492         if (ret) {
 1493             ret = -EINVAL;
 1494             gf_msg(this->name, GF_LOG_ERROR, -ret, INDEX_MSG_DICT_SET_FAILED,
 1495                    "xattrop index "
 1496                    "count set failed");
 1497             goto done;
 1498         }
 1499     } else if (strcmp(name, GF_XATTROP_DIRTY_COUNT) == 0) {
 1500         count = index_entry_count(this, DIRTY_SUBDIR);
 1501 
 1502         ret = dict_set_uint64(xattr, (char *)name, count);
 1503         if (ret) {
 1504             ret = -EINVAL;
 1505             gf_msg(this->name, GF_LOG_ERROR, -ret, INDEX_MSG_DICT_SET_FAILED,
 1506                    "dirty index "
 1507                    "count set failed");
 1508             goto done;
 1509         }
 1510     }
 1511 done:
 1512     if (ret)
 1513         STACK_UNWIND_STRICT(getxattr, frame, -1, -ret, xattr, NULL);
 1514     else
 1515         STACK_UNWIND_STRICT(getxattr, frame, 0, 0, xattr, NULL);
 1516 
 1517     if (xattr)
 1518         dict_unref(xattr);
 1519 
 1520     return 0;
 1521 }
 1522 
 1523 static int
 1524 index_save_pargfid_for_entry_changes(xlator_t *this, loc_t *loc, char *path)
 1525 {
 1526     index_priv_t *priv = NULL;
 1527     index_inode_ctx_t *ctx = NULL;
 1528     int ret = 0;
 1529 
 1530     priv = this->private;
 1531     if (!loc)
 1532         return -1;
 1533     if (gf_uuid_compare(loc->pargfid, priv->internal_vgfid[ENTRY_CHANGES]))
 1534         return 0;
 1535 
 1536     ret = index_inode_ctx_get(loc->inode, this, &ctx);
 1537     if (ret) {
 1538         gf_msg(this->name, GF_LOG_ERROR, EINVAL,
 1539                INDEX_MSG_INODE_CTX_GET_SET_FAILED,
 1540                "Unable to get inode context for %s", path);
 1541         return -EINVAL;
 1542     }
 1543     ret = gf_uuid_parse(loc->name, ctx->virtual_pargfid);
 1544     if (ret) {
 1545         gf_msg(this->name, GF_LOG_ERROR, EINVAL,
 1546                INDEX_MSG_INODE_CTX_GET_SET_FAILED,
 1547                "Unable to store "
 1548                "virtual gfid in inode context for %s",
 1549                path);
 1550         return -EINVAL;
 1551     }
 1552     return 0;
 1553 }
 1554 
 1555 int32_t
 1556 index_lookup_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc,
 1557                      dict_t *xattr_req)
 1558 {
 1559     index_priv_t *priv = NULL;
 1560     struct stat lstatbuf = {0};
 1561     int ret = 0;
 1562     int32_t op_errno = EINVAL;
 1563     int32_t op_ret = -1;
 1564     uint64_t val = IA_INVAL;
 1565     char path[PATH_MAX] = {0};
 1566     struct iatt stbuf = {
 1567         0,
 1568     };
 1569     struct iatt postparent = {
 1570         0,
 1571     };
 1572     dict_t *xattr = NULL;
 1573     gf_boolean_t is_dir = _gf_false;
 1574     char *subdir = NULL;
 1575     loc_t iloc = {0};
 1576 
 1577     priv = this->private;
 1578     loc_copy(&iloc, loc);
 1579 
 1580     VALIDATE_OR_GOTO(loc, done);
 1581     if (index_is_fop_on_internal_inode(this, loc->parent, loc->pargfid)) {
 1582         subdir = index_get_subdir_from_vgfid(priv, loc->pargfid);
 1583         ret = index_inode_path(this, loc->parent, path, sizeof(path));
 1584         if (ret < 0) {
 1585             op_errno = -ret;
 1586             goto done;
 1587         }
 1588         ret = snprintf(path + strlen(path), PATH_MAX - strlen(path), "/%s",
 1589                        loc->name);
 1590 
 1591         if ((ret < 0) || (ret > (PATH_MAX - strlen(path)))) {
 1592             op_errno = EINVAL;
 1593             op_ret = -1;
 1594             goto done;
 1595         }
 1596 
 1597     } else if (index_is_virtual_gfid(priv, loc->gfid)) {
 1598         subdir = index_get_subdir_from_vgfid(priv, loc->gfid);
 1599         make_index_dir_path(priv->index_basepath, subdir, path, sizeof(path));
 1600         is_dir = _gf_true;
 1601 
 1602         if ((xattr_req) && (dict_get(xattr_req, GF_INDEX_IA_TYPE_GET_REQ))) {
 1603             if (0 == strcmp(subdir, index_get_subdir_from_type(ENTRY_CHANGES)))
 1604                 val = IA_IFDIR;
 1605             else
 1606                 val = IA_IFREG;
 1607         }
 1608     } else {
 1609         if (!inode_is_linked(loc->inode)) {
 1610             inode_unref(iloc.inode);
 1611             iloc.inode = inode_find(loc->inode->table, loc->gfid);
 1612         }
 1613         ret = index_inode_path(this, iloc.inode, path, sizeof(path));
 1614         if (ret < 0) {
 1615             op_errno = -ret;
 1616             goto done;
 1617         }
 1618     }
 1619     ret = sys_lstat(path, &lstatbuf);
 1620     if (ret) {
 1621         gf_msg_debug(this->name, errno, "Stat failed on %s dir ", path);
 1622         op_errno = errno;
 1623         goto done;
 1624     } else if (!S_ISDIR(lstatbuf.st_mode) && is_dir) {
 1625         op_errno = ENOTDIR;
 1626         gf_msg_debug(this->name, op_errno,
 1627                      "Stat failed on %s dir, "
 1628                      "not a directory",
 1629                      path);
 1630         goto done;
 1631     }
 1632     xattr = dict_new();
 1633     if (!xattr) {
 1634         op_errno = ENOMEM;
 1635         goto done;
 1636     }
 1637 
 1638     if (val != IA_INVAL) {
 1639         ret = dict_set_uint64(xattr, GF_INDEX_IA_TYPE_GET_RSP, val);
 1640         if (ret) {
 1641             op_ret = -1;
 1642             op_errno = -ret;
 1643             goto done;
 1644         }
 1645     }
 1646 
 1647     iatt_from_stat(&stbuf, &lstatbuf);
 1648     if (is_dir || inode_is_linked(iloc.inode))
 1649         loc_gfid(&iloc, stbuf.ia_gfid);
 1650     else
 1651         gf_uuid_generate(stbuf.ia_gfid);
 1652 
 1653     ret = index_save_pargfid_for_entry_changes(this, &iloc, path);
 1654     if (ret) {
 1655         op_ret = -1;
 1656         op_errno = -ret;
 1657         goto done;
 1658     }
 1659 
 1660     stbuf.ia_ino = -1;
 1661     op_ret = 0;
 1662 done:
 1663     STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno,
 1664                         loc ? loc->inode : NULL, &stbuf, xattr, &postparent);
 1665     if (xattr)
 1666         dict_unref(xattr);
 1667     loc_wipe(&iloc);
 1668     return 0;
 1669 }
 1670 
 1671 int
 1672 index_get_gfid_type(void *opaque)
 1673 {
 1674     gf_dirent_t *entry = NULL;
 1675     xlator_t *this = THIS;
 1676     struct index_syncop_args *args = opaque;
 1677     loc_t loc = {0};
 1678     struct iatt iatt = {0};
 1679     int ret = 0;
 1680 
 1681     list_for_each_entry(entry, &args->entries->list, list)
 1682     {
 1683         if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
 1684             continue;
 1685 
 1686         loc_wipe(&loc);
 1687 
 1688         entry->d_type = gf_d_type_from_ia_type(IA_INVAL);
 1689         entry->d_stat.ia_type = IA_INVAL;
 1690         if (gf_uuid_parse(entry->d_name, loc.gfid))
 1691             continue;
 1692 
 1693         loc.inode = inode_find(args->parent->table, loc.gfid);
 1694         if (loc.inode) {
 1695             entry->d_stat.ia_type = loc.inode->ia_type;
 1696             entry->d_type = gf_d_type_from_ia_type(loc.inode->ia_type);
 1697             continue;
 1698         }
 1699         loc.inode = inode_new(args->parent->table);
 1700         if (!loc.inode)
 1701             continue;
 1702         ret = syncop_lookup(FIRST_CHILD(this), &loc, &iatt, 0, 0, 0);
 1703         if (ret == 0) {
 1704             entry->d_type = gf_d_type_from_ia_type(iatt.ia_type);
 1705             entry->d_stat = iatt;
 1706         }
 1707     }
 1708     loc_wipe(&loc);
 1709 
 1710     return 0;
 1711 }
 1712 
 1713 int32_t
 1714 index_readdir_wrapper(call_frame_t *frame, xlator_t *this, fd_t *fd,
 1715                       size_t size, off_t off, dict_t *xdata)
 1716 {
 1717     index_fd_ctx_t *fctx = NULL;
 1718     index_priv_t *priv = NULL;
 1719     DIR *dir = NULL;
 1720     int ret = -1;
 1721     int32_t op_ret = -1;
 1722     int32_t op_errno = 0;
 1723     int count = 0;
 1724     gf_dirent_t entries;
 1725     struct index_syncop_args args = {0};
 1726 
 1727     priv = this->private;
 1728     INIT_LIST_HEAD(&entries.list);
 1729 
 1730     ret = index_fd_ctx_get(fd, this, &fctx);
 1731     if (ret < 0) {
 1732         op_errno = -ret;
 1733         gf_msg(this->name, GF_LOG_WARNING, op_errno, INDEX_MSG_FD_OP_FAILED,
 1734                "pfd is NULL, fd=%p", fd);
 1735         goto done;
 1736     }
 1737 
 1738     dir = fctx->dir;
 1739     if (!dir) {
 1740         op_errno = EINVAL;
 1741         gf_msg(this->name, GF_LOG_WARNING, op_errno,
 1742                INDEX_MSG_INDEX_READDIR_FAILED, "dir is NULL for fd=%p", fd);
 1743         goto done;
 1744     }
 1745 
 1746     count = index_fill_readdir(fd, fctx, dir, off, size, &entries);
 1747 
 1748     /* pick ENOENT to indicate EOF */
 1749     op_errno = errno;
 1750     op_ret = count;
 1751     if (index_is_virtual_gfid(priv, fd->inode->gfid) && xdata &&
 1752         dict_get(xdata, "get-gfid-type")) {
 1753         args.parent = fd->inode;
 1754         args.entries = &entries;
 1755         ret = synctask_new(this->ctx->env, index_get_gfid_type, NULL, NULL,
 1756                            &args);
 1757     }
 1758 done:
 1759     STACK_UNWIND_STRICT(readdir, frame, op_ret, op_errno, &entries, NULL);
 1760     gf_dirent_free(&entries);
 1761     return 0;
 1762 }
 1763 
 1764 int
 1765 deletion_handler(const char *fpath, const struct stat *sb, int typeflag,
 1766                  struct FTW *ftwbuf)
 1767 {
 1768     ia_type_t type = IA_INVAL;
 1769 
 1770     switch (sb->st_mode & S_IFMT) {
 1771         case S_IFREG:
 1772             sys_unlink(fpath);
 1773             break;
 1774 
 1775         case S_IFDIR:
 1776             sys_rmdir(fpath);
 1777             break;
 1778         default:
 1779             type = ia_type_from_st_mode(sb->st_mode);
 1780             gf_msg(THIS->name, GF_LOG_WARNING, EINVAL, INDEX_MSG_INVALID_ARGS,
 1781                    "%s neither a regular file nor a directory - type:%s", fpath,
 1782                    gf_inode_type_to_str(type));
 1783             break;
 1784     }
 1785     return 0;
 1786 }
 1787 
 1788 static int
 1789 index_wipe_index_subdir(void *opaque)
 1790 {
 1791     struct index_syncop_args *args = opaque;
 1792 
 1793     nftw(args->path, deletion_handler, 1, FTW_DEPTH | FTW_PHYS);
 1794     return 0;
 1795 }
 1796 
 1797 static void
 1798 index_get_parent_iatt(struct iatt *parent, char *path, loc_t *loc,
 1799                       int32_t *op_ret, int32_t *op_errno)
 1800 {
 1801     int ret = -1;
 1802     struct stat lstatbuf = {
 1803         0,
 1804     };
 1805 
 1806     ret = sys_lstat(path, &lstatbuf);
 1807     if (ret < 0) {
 1808         *op_ret = -1;
 1809         *op_errno = errno;
 1810         return;
 1811     }
 1812 
 1813     iatt_from_stat(parent, &lstatbuf);
 1814     gf_uuid_copy(parent->ia_gfid, loc->pargfid);
 1815     parent->ia_ino = -1;
 1816 
 1817     return;
 1818 }
 1819 
 1820 int
 1821 index_rmdir_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc, int flag,
 1822                     dict_t *xdata)
 1823 {
 1824     int ret = 0;
 1825     int32_t op_ret = 0;
 1826     int32_t op_errno = 0;
 1827     char *subdir = NULL;
 1828     char index_dir[PATH_MAX] = {0};
 1829     char index_subdir[PATH_MAX] = {0};
 1830     uuid_t gfid = {0};
 1831     struct iatt preparent = {0};
 1832     struct iatt postparent = {0};
 1833     index_priv_t *priv = NULL;
 1834     index_xattrop_type_t type = XATTROP_TYPE_UNSET;
 1835     struct index_syncop_args args = {
 1836         0,
 1837     };
 1838 
 1839     priv = this->private;
 1840 
 1841     type = index_get_type_from_vgfid(priv, loc->pargfid);
 1842     subdir = index_get_subdir_from_vgfid(priv, loc->pargfid);
 1843     make_index_dir_path(priv->index_basepath, subdir, index_dir,
 1844                         sizeof(index_dir));
 1845 
 1846     index_get_parent_iatt(&preparent, index_dir, loc, &op_ret, &op_errno);
 1847     if (op_ret < 0)
 1848         goto done;
 1849 
 1850     gf_uuid_parse(loc->name, gfid);
 1851     make_gfid_path(priv->index_basepath, subdir, gfid, index_subdir,
 1852                    sizeof(index_subdir));
 1853 
 1854     if (flag == 0) {
 1855         ret = index_del(this, gfid, subdir, type);
 1856         if (ret < 0) {
 1857             op_ret = -1;
 1858             op_errno = -ret;
 1859             goto done;
 1860         }
 1861     } else {
 1862         args.path = index_subdir;
 1863         ret = synctask_new(this->ctx->env, index_wipe_index_subdir, NULL, NULL,
 1864                            &args);
 1865     }
 1866 
 1867     index_get_parent_iatt(&postparent, index_dir, loc, &op_ret, &op_errno);
 1868     if (op_ret < 0)
 1869         goto done;
 1870 
 1871 done:
 1872     INDEX_STACK_UNWIND(rmdir, frame, op_ret, op_errno, &preparent, &postparent,
 1873                        xdata);
 1874     return 0;
 1875 }
 1876 
 1877 int
 1878 index_unlink_wrapper(call_frame_t *frame, xlator_t *this, loc_t *loc, int flag,
 1879                      dict_t *xdata)
 1880 {
 1881     index_priv_t *priv = NULL;
 1882     index_inode_ctx_t *ictx = NULL;
 1883     int32_t op_ret = 0;
 1884     int32_t op_errno = 0;
 1885     int ret = 0;
 1886     index_xattrop_type_t type = XATTROP_TYPE_UNSET;
 1887     struct iatt preparent = {0};
 1888     struct iatt postparent = {0};
 1889     char index_dir[PATH_MAX] = {0};
 1890     char filepath[PATH_MAX] = {0};
 1891     uuid_t gfid = {0};
 1892     char *subdir = NULL;
 1893 
 1894     priv = this->private;
 1895     type = index_get_type_from_vgfid(priv, loc->pargfid);
 1896     ret = index_inode_path(this, loc->parent, index_dir, sizeof(index_dir));
 1897     if (ret < 0) {
 1898         op_ret = -1;
 1899         op_errno = -ret;
 1900         goto done;
 1901     }
 1902 
 1903     index_get_parent_iatt(&preparent, index_dir, loc, &op_ret, &op_errno);
 1904     if (op_ret < 0)
 1905         goto done;
 1906 
 1907     if (type <= XATTROP_TYPE_UNSET) {
 1908         ret = index_inode_ctx_get(loc->parent, this, &ictx);
 1909         if ((ret == 0) && gf_uuid_is_null(ictx->virtual_pargfid)) {
 1910             ret = -EINVAL;
 1911         }
 1912         if (ret == 0) {
 1913             ret = index_entry_delete(this, ictx->virtual_pargfid,
 1914                                      (char *)loc->name);
 1915         }
 1916     } else if (type == ENTRY_CHANGES) {
 1917         make_file_path(priv->index_basepath, ENTRY_CHANGES_SUBDIR,
 1918                        (char *)loc->name, filepath, sizeof(filepath));
 1919         ret = sys_unlink(filepath);
 1920     } else {
 1921         subdir = index_get_subdir_from_type(type);
 1922         gf_uuid_parse(loc->name, gfid);
 1923         ret = index_del(this, gfid, subdir, type);
 1924     }
 1925     if (ret < 0) {
 1926         op_ret = -1;
 1927         op_errno = -ret;
 1928         goto done;
 1929     }
 1930 
 1931     index_get_parent_iatt(&postparent, index_dir, loc, &op_ret, &op_errno);
 1932     if (op_ret < 0)
 1933         goto done;
 1934 done:
 1935     INDEX_STACK_UNWIND(unlink, frame, op_ret, op_errno, &preparent, &postparent,
 1936                        xdata);
 1937     return 0;
 1938 }
 1939 
 1940 int32_t
 1941 index_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
 1942                const char *name, dict_t *xdata)
 1943 {
 1944     call_stub_t *stub = NULL;
 1945 
 1946     if (!name ||
 1947         (!index_is_vgfid_xattr(name) && strcmp(GF_XATTROP_INDEX_COUNT, name) &&
 1948          strcmp(GF_XATTROP_DIRTY_COUNT, name)))
 1949         goto out;
 1950 
 1951     stub = fop_getxattr_stub(frame, index_getxattr_wrapper, loc, name, xdata);
 1952     if (!stub) {
 1953         STACK_UNWIND_STRICT(getxattr, frame, -1, ENOMEM, NULL, NULL);
 1954         return 0;
 1955     }
 1956     worker_enqueue(this, stub);
 1957     return 0;
 1958 out:
 1959     STACK_WIND(frame, default_getxattr_cbk, FIRST_CHILD(this),
 1960                FIRST_CHILD(this)->fops->getxattr, loc, name, xdata);
 1961     return 0;
 1962 }
 1963 
 1964 int64_t
 1965 index_fetch_link_count(xlator_t *this, index_xattrop_type_t type)
 1966 {
 1967     index_priv_t *priv = this->private;
 1968     char *subdir = NULL;
 1969     struct stat lstatbuf = {
 1970         0,
 1971     };
 1972     int ret = -1;
 1973     int64_t count = -1;
 1974     DIR *dirp = NULL;
 1975     struct dirent *entry = NULL;
 1976     struct dirent scratch[2] = {
 1977         {
 1978             0,
 1979         },
 1980     };
 1981     char index_dir[PATH_MAX] = {
 1982         0,
 1983     };
 1984     char index_path[PATH_MAX] = {
 1985         0,
 1986     };
 1987 
 1988     subdir = index_get_subdir_from_type(type);
 1989     make_index_dir_path(priv->index_basepath, subdir, index_dir,
 1990                         sizeof(index_dir));
 1991 
 1992     dirp = sys_opendir(index_dir);
 1993     if (!dirp)
 1994         goto out;
 1995 
 1996     for (;;) {
 1997         errno = 0;
 1998         entry = sys_readdir(dirp, scratch);
 1999         if (!entry || errno != 0) {
 2000             if (count == -1)
 2001                 count = 0;
 2002             goto out;
 2003         }
 2004 
 2005         if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
 2006             continue;
 2007 
 2008         make_file_path(priv->index_basepath, subdir, entry->d_name, index_path,
 2009                        sizeof(index_path));
 2010 
 2011         ret = sys_lstat(index_path, &lstatbuf);
 2012         if (ret < 0) {
 2013             count = -2;
 2014             continue;
 2015         } else {
 2016             count = lstatbuf.st_nlink - 1;
 2017             if (count == 0)
 2018                 continue;
 2019             else
 2020                 break;
 2021         }
 2022     }
 2023 out:
 2024     if (dirp)
 2025         (void)sys_closedir(dirp);
 2026     return count;
 2027 }
 2028 
 2029 dict_t *
 2030 index_fill_link_count(xlator_t *this, dict_t *xdata)
 2031 {
 2032     int ret = -1;
 2033     index_priv_t *priv = NULL;
 2034     int64_t count = -1;
 2035 
 2036     priv = this->private;
 2037     xdata = (xdata) ? dict_ref(xdata) : dict_new();
 2038     if (!xdata)
 2039         goto out;
 2040 
 2041     index_get_link_count(priv, &count, XATTROP);
 2042     if (count < 0) {
 2043         count = index_fetch_link_count(this, XATTROP);
 2044         index_set_link_count(priv, count, XATTROP);
 2045     }
 2046 
 2047     if (count == 0) {
 2048         ret = dict_set_int8(xdata, "link-count", 0);
 2049         if (ret < 0)
 2050             gf_msg(this->name, GF_LOG_ERROR, EINVAL, INDEX_MSG_DICT_SET_FAILED,
 2051                    "Unable to set link-count");
 2052     } else {
 2053         ret = dict_set_int8(xdata, "link-count", 1);
 2054         if (ret < 0)
 2055             gf_msg(this->name, GF_LOG_ERROR, EINVAL, INDEX_MSG_DICT_SET_FAILED,
 2056                    "Unable to set link-count");
 2057     }
 2058 
 2059 out:
 2060     return xdata;
 2061 }
 2062 
 2063 int32_t
 2064 index_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 2065                  int32_t op_ret, int32_t op_errno, inode_t *inode,
 2066                  struct iatt *buf, dict_t *xdata, struct iatt *postparent)
 2067 {
 2068     xdata = index_fill_link_count(this, xdata);
 2069     STACK_UNWIND_STRICT(lookup, frame, op_ret, op_errno, inode, buf, xdata,
 2070                         postparent);
 2071     if (xdata)
 2072         dict_unref(xdata);
 2073     return 0;
 2074 }
 2075 
 2076 int32_t
 2077 index_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req)
 2078 {
 2079     inode_t *inode = NULL;
 2080     call_stub_t *stub = NULL;
 2081     char *flag = NULL;
 2082     int ret = -1;
 2083 
 2084     if (!index_is_fop_on_internal_inode(this, loc->parent, loc->pargfid) &&
 2085         !index_is_fop_on_internal_inode(this, loc->inode, loc->gfid)) {
 2086         if (!inode_is_linked(loc->inode)) {
 2087             inode = inode_find(loc->inode->table, loc->gfid);
 2088             if (!index_is_fop_on_internal_inode(this, inode, loc->gfid)) {
 2089                 inode_unref(inode);
 2090                 goto normal;
 2091             }
 2092             inode_unref(inode);
 2093         } else {
 2094             goto normal;
 2095         }
 2096     }
 2097 
 2098     stub = fop_lookup_stub(frame, index_lookup_wrapper, loc, xattr_req);
 2099     if (!stub) {
 2100         STACK_UNWIND_STRICT(lookup, frame, -1, ENOMEM, loc->inode, NULL, NULL,
 2101                             NULL);
 2102         return 0;
 2103     }
 2104     worker_enqueue(this, stub);
 2105     return 0;
 2106 normal:
 2107     ret = dict_get_str(xattr_req, "link-count", &flag);
 2108     if ((ret == 0) && (strcmp(flag, GF_XATTROP_INDEX_COUNT) == 0)) {
 2109         STACK_WIND(frame, index_lookup_cbk, FIRST_CHILD(this),
 2110                    FIRST_CHILD(this)->fops->lookup, loc, xattr_req);
 2111     } else {
 2112         STACK_WIND(frame, default_lookup_cbk, FIRST_CHILD(this),
 2113                    FIRST_CHILD(this)->fops->lookup, loc, xattr_req);
 2114     }
 2115 
 2116     return 0;
 2117 }
 2118 
 2119 int32_t
 2120 index_fstat_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
 2121                 int32_t op_ret, int32_t op_errno, struct iatt *buf,
 2122                 dict_t *xdata)
 2123 {
 2124     xdata = index_fill_link_count(this, xdata);
 2125     STACK_UNWIND_STRICT(fstat, frame, op_ret, op_errno, buf, xdata);
 2126     if (xdata)
 2127         dict_unref(xdata);
 2128     return 0;
 2129 }
 2130 
 2131 int32_t
 2132 index_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
 2133 {
 2134     int ret = -1;
 2135     char *flag = NULL;
 2136 
 2137     ret = dict_get_str(xdata, "link-count", &flag);
 2138     if ((ret == 0) && (strcmp(flag, GF_XATTROP_INDEX_COUNT) == 0)) {
 2139         STACK_WIND(frame, index_fstat_cbk, FIRST_CHILD(this),
 2140                    FIRST_CHILD(this)->fops->fstat, fd, xdata);
 2141     } else {
 2142         STACK_WIND(frame, default_fstat_cbk, FIRST_CHILD(this),
 2143                    FIRST_CHILD(this)->fops->fstat, fd, xdata);
 2144     }
 2145 
 2146     return 0;
 2147 }
 2148 
 2149 int32_t
 2150 index_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
 2151               dict_t *xdata)
 2152 {
 2153     if (!index_is_fop_on_internal_inode(this, fd->inode, NULL))
 2154         goto normal;
 2155 
 2156     frame->local = NULL;
 2157     STACK_UNWIND_STRICT(opendir, frame, 0, 0, fd, NULL);
 2158     return 0;
 2159 
 2160 normal:
 2161     STACK_WIND(frame, default_opendir_cbk, FIRST_CHILD(this),
 2162                FIRST_CHILD(this)->fops->opendir, loc, fd, xdata);
 2163     return 0;
 2164 }
 2165 
 2166 int32_t
 2167 index_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
 2168               off_t off, dict_t *xdata)
 2169 {
 2170     call_stub_t *stub = NULL;
 2171 
 2172     if (!index_is_fop_on_internal_inode(this, fd->inode, NULL))
 2173         goto out;
 2174 
 2175     stub = fop_readdir_stub(frame, index_readdir_wrapper, fd, size, off, xdata);
 2176     if (!stub) {
 2177         STACK_UNWIND_STRICT(readdir, frame, -1, ENOMEM, NULL, NULL);
 2178         return 0;
 2179     }
 2180     worker_enqueue(this, stub);
 2181     return 0;
 2182 out:
 2183     STACK_WIND(frame, default_readdir_cbk, FIRST_CHILD(this),
 2184                FIRST_CHILD(this)->fops->readdir, fd, size, off, xdata);
 2185     return 0;
 2186 }
 2187 
 2188 int
 2189 index_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
 2190              dict_t *xdata)
 2191 {
 2192     call_stub_t *stub = NULL;
 2193 
 2194     if (!index_is_fop_on_internal_inode(this, loc->parent, NULL))
 2195         goto out;
 2196 
 2197     stub = fop_unlink_stub(frame, index_unlink_wrapper, loc, xflag, xdata);
 2198     if (!stub) {
 2199         STACK_UNWIND_STRICT(unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
 2200         return 0;
 2201     }
 2202     worker_enqueue(this, stub);
 2203     return 0;
 2204 out:
 2205     STACK_WIND(frame, default_unlink_cbk, FIRST_CHILD(this),
 2206                FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
 2207     return 0;
 2208 }
 2209 
 2210 int
 2211 index_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
 2212             dict_t *xdata)
 2213 {
 2214     call_stub_t *stub = NULL;
 2215 
 2216     if (!index_is_fop_on_internal_inode(this, loc->parent, NULL))
 2217         goto out;
 2218 
 2219     stub = fop_rmdir_stub(frame, index_rmdir_wrapper, loc, flags, xdata);
 2220     if (!stub) {
 2221         STACK_UNWIND_STRICT(rmdir, frame, -1, ENOMEM, NULL, NULL, NULL);
 2222         return 0;
 2223     }
 2224     worker_enqueue(this, stub);
 2225     return 0;
 2226 out:
 2227     STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rmdir,
 2228                     loc, flags, xdata);
 2229     return 0;
 2230 }
 2231 
 2232 int
 2233 index_make_xattrop_watchlist(xlator_t *this, index_priv_t *priv,
 2234                              char *watchlist, index_xattrop_type_t type)
 2235 {
 2236     char *delim = NULL;
 2237     char *dup_watchlist = NULL;
 2238     char *key = NULL;
 2239     char *saveptr = NULL;
 2240     dict_t *xattrs = NULL;
 2241     data_t *dummy = NULL;
 2242     int ret = 0;
 2243 
 2244     if (!watchlist)
 2245         return 0;
 2246 
 2247     dup_watchlist = gf_strdup(watchlist);
 2248     if (!dup_watchlist)
 2249         return -1;
 2250 
 2251     xattrs = dict_new();
 2252     if (!xattrs) {
 2253         ret = -1;
 2254         goto out;
 2255     }
 2256 
 2257     dummy = int_to_data(1);
 2258     if (!dummy) {
 2259         ret = -1;
 2260         goto out;
 2261     }
 2262 
 2263     data_ref(dummy);
 2264 
 2265     delim = ",";
 2266     key = strtok_r(dup_watchlist, delim, &saveptr);
 2267     while (key) {
 2268         if (strlen(key) == 0) {
 2269             ret = -1;
 2270             goto out;
 2271         }
 2272 
 2273         ret = dict_set(xattrs, key, dummy);
 2274         if (ret)
 2275             goto out;
 2276 
 2277         key = strtok_r(NULL, delim, &saveptr);
 2278     }
 2279 
 2280     switch (type) {
 2281         case DIRTY:
 2282             priv->dirty_watchlist = dict_copy_with_ref(xattrs,
 2283                                                        priv->dirty_watchlist);
 2284             if (!priv->dirty_watchlist) {
 2285                 ret = -1;
 2286                 goto out;
 2287             }
 2288             break;
 2289         case XATTROP:
 2290             priv->pending_watchlist = dict_copy_with_ref(
 2291                 xattrs, priv->pending_watchlist);
 2292             if (!priv->pending_watchlist) {
 2293                 ret = -1;
 2294                 goto out;
 2295             }
 2296             break;
 2297         default:
 2298             break;
 2299     }
 2300 
 2301     ret = 0;
 2302 out:
 2303     if (xattrs)
 2304         dict_unref(xattrs);
 2305 
 2306     GF_FREE(dup_watchlist);
 2307 
 2308     if (dummy)
 2309         data_unref(dummy);
 2310 
 2311     return ret;
 2312 }
 2313 
 2314 int32_t
 2315 mem_acct_init(xlator_t *this)
 2316 {
 2317     int ret = -1;
 2318 
 2319     ret = xlator_mem_acct_init(this, gf_index_mt_end + 1);
 2320 
 2321     return ret;
 2322 }
 2323 
 2324 int
 2325 init(xlator_t *this)
 2326 {
 2327     int i = 0;
 2328     int ret = -1;
 2329     int64_t count = -1;
 2330     index_priv_t *priv = NULL;
 2331     pthread_attr_t w_attr;
 2332     gf_boolean_t mutex_inited = _gf_false;
 2333     gf_boolean_t cond_inited = _gf_false;
 2334     gf_boolean_t attr_inited = _gf_false;
 2335     char *watchlist = NULL;
 2336     char *dirtylist = NULL;
 2337     char *pendinglist = NULL;
 2338     char *index_base_parent = NULL;
 2339     char *tmp = NULL;
 2340 
 2341     if (!this->children || this->children->next) {
 2342         gf_msg(this->name, GF_LOG_ERROR, EINVAL, INDEX_MSG_INVALID_GRAPH,
 2343                "'index' not configured with exactly one child");
 2344         goto out;
 2345     }
 2346 
 2347     if (!this->parents) {
 2348         gf_msg(this->name, GF_LOG_WARNING, EINVAL, INDEX_MSG_INVALID_GRAPH,
 2349                "dangling volume. check volfile ");
 2350     }
 2351 
 2352     priv = GF_CALLOC(1, sizeof(*priv), gf_index_mt_priv_t);
 2353     if (!priv)
 2354         goto out;
 2355 
 2356     LOCK_INIT(&priv->lock);
 2357     if ((ret = pthread_cond_init(&priv->cond, NULL)) != 0) {
 2358         gf_msg(this->name, GF_LOG_ERROR, ret, INDEX_MSG_INVALID_ARGS,
 2359                "pthread_cond_init failed");
 2360         goto out;
 2361     }
 2362     cond_inited = _gf_true;
 2363 
 2364     if ((ret = pthread_mutex_init(&priv->mutex, NULL)) != 0) {
 2365         gf_msg(this->name, GF_LOG_ERROR, ret, INDEX_MSG_INVALID_ARGS,
 2366                "pthread_mutex_init failed");
 2367         goto out;
 2368     }
 2369     mutex_inited = _gf_true;
 2370 
 2371     if ((ret = pthread_attr_init(&w_attr)) != 0) {
 2372         gf_msg(this->name, GF_LOG_ERROR, ret, INDEX_MSG_INVALID_ARGS,
 2373                "pthread_attr_init failed");
 2374         goto out;
 2375     }
 2376     attr_inited = _gf_true;
 2377 
 2378     ret = pthread_attr_setstacksize(&w_attr, INDEX_THREAD_STACK_SIZE);
 2379     if (ret == EINVAL) {
 2380         gf_msg(this->name, GF_LOG_WARNING, ret, INDEX_MSG_INVALID_ARGS,
 2381                "Using default thread stack size");
 2382     }
 2383 
 2384     GF_OPTION_INIT("index-base", priv->index_basepath, path, out);
 2385     tmp = gf_strdup(priv->index_basepath);
 2386     index_base_parent = dirname(tmp);
 2387     if (gf_lstat_dir(index_base_parent, NULL) != 0) {
 2388         ret = -1;
 2389         gf_msg(this->name, GF_LOG_ERROR, errno,
 2390                INDEX_MSG_INDEX_DIR_CREATE_FAILED,
 2391                "Failed to find parent dir (%s) of index basepath %s.",
 2392                index_base_parent, priv->index_basepath);
 2393         goto out;
 2394     }
 2395 
 2396     GF_OPTION_INIT("xattrop64-watchlist", watchlist, str, out);
 2397     ret = index_make_xattrop_watchlist(this, priv, watchlist, XATTROP);
 2398     if (ret)
 2399         goto out;
 2400 
 2401     GF_OPTION_INIT("xattrop-dirty-watchlist", dirtylist, str, out);
 2402     ret = index_make_xattrop_watchlist(this, priv, dirtylist, DIRTY);
 2403     if (ret)
 2404         goto out;
 2405 
 2406     GF_OPTION_INIT("xattrop-pending-watchlist", pendinglist, str, out);
 2407     ret = index_make_xattrop_watchlist(this, priv, pendinglist, XATTROP);
 2408     if (ret)
 2409         goto out;
 2410 
 2411     if (priv->dirty_watchlist)
 2412         priv->complete_watchlist = dict_copy_with_ref(priv->dirty_watchlist,
 2413                                                       priv->complete_watchlist);
 2414     if (priv->pending_watchlist)
 2415         priv->complete_watchlist = dict_copy_with_ref(priv->pending_watchlist,
 2416                                                       priv->complete_watchlist);
 2417 
 2418     gf_uuid_generate(priv->index);
 2419     for (i = 0; i < XATTROP_TYPE_END; i++)
 2420         gf_uuid_generate(priv->internal_vgfid[i]);
 2421 
 2422     INIT_LIST_HEAD(&priv->callstubs);
 2423     GF_ATOMIC_INIT(priv->stub_cnt, 0);
 2424 
 2425     this->local_pool = mem_pool_new(index_local_t, 64);
 2426     if (!this->local_pool) {
 2427         ret = -1;
 2428         goto out;
 2429     }
 2430 
 2431     this->private = priv;
 2432 
 2433     ret = index_dir_create(this, XATTROP_SUBDIR);
 2434     if (ret < 0)
 2435         goto out;
 2436 
 2437     if (priv->dirty_watchlist) {
 2438         ret = index_dir_create(this, DIRTY_SUBDIR);
 2439         if (ret < 0)
 2440             goto out;
 2441     }
 2442 
 2443     ret = index_dir_create(this, ENTRY_CHANGES_SUBDIR);
 2444     if (ret < 0)
 2445         goto out;
 2446 
 2447     /*init indices files counts*/
 2448     count = index_fetch_link_count(this, XATTROP);
 2449     index_set_link_count(priv, count, XATTROP);
 2450     priv->down = _gf_false;
 2451 
 2452     priv->curr_count = 0;
 2453     ret = gf_thread_create(&priv->thread, &w_attr, index_worker, this,
 2454                            "idxwrker");
 2455     if (ret) {
 2456         gf_msg(this->name, GF_LOG_WARNING, ret,
 2457                INDEX_MSG_WORKER_THREAD_CREATE_FAILED,
 2458                "Failed to create worker thread, aborting");
 2459         goto out;
 2460     }
 2461     priv->curr_count++;
 2462     ret = 0;
 2463 out:
 2464     GF_FREE(tmp);
 2465 
 2466     if (ret) {
 2467         if (cond_inited)
 2468             pthread_cond_destroy(&priv->cond);
 2469         if (mutex_inited)
 2470             pthread_mutex_destroy(&priv->mutex);
 2471         if (priv && priv->dirty_watchlist)
 2472             dict_unref(priv->dirty_watchlist);
 2473         if (priv && priv->pending_watchlist)
 2474             dict_unref(priv->pending_watchlist);
 2475         if (priv && priv->complete_watchlist)
 2476             dict_unref(priv->complete_watchlist);
 2477         if (priv)
 2478             GF_FREE(priv);
 2479         this->private = NULL;
 2480         mem_pool_destroy(this->local_pool);
 2481         this->local_pool = NULL;
 2482     }
 2483 
 2484     if (attr_inited)
 2485         pthread_attr_destroy(&w_attr);
 2486     return ret;
 2487 }
 2488 
 2489 void
 2490 fini(xlator_t *this)
 2491 {
 2492     index_priv_t *priv = NULL;
 2493 
 2494     priv = this->private;
 2495     if (!priv)
 2496         goto out;
 2497 
 2498     priv->down = _gf_true;
 2499     pthread_cond_broadcast(&priv->cond);
 2500     if (priv->thread) {
 2501         gf_thread_cleanup_xint(priv->thread);
 2502         priv->thread = 0;
 2503     }
 2504     this->private = NULL;
 2505     LOCK_DESTROY(&priv->lock);
 2506     pthread_cond_destroy(&priv->cond);
 2507     pthread_mutex_destroy(&priv->mutex);
 2508     if (priv->dirty_watchlist)
 2509         dict_unref(priv->dirty_watchlist);
 2510     if (priv->pending_watchlist)
 2511         dict_unref(priv->pending_watchlist);
 2512     if (priv->complete_watchlist)
 2513         dict_unref(priv->complete_watchlist);
 2514     GF_FREE(priv);
 2515 
 2516     if (this->local_pool) {
 2517         mem_pool_destroy(this->local_pool);
 2518         this->local_pool = NULL;
 2519     }
 2520 out:
 2521     return;
 2522 }
 2523 
 2524 int
 2525 index_forget(xlator_t *this, inode_t *inode)
 2526 {
 2527     uint64_t tmp_cache = 0;
 2528     if (!inode_ctx_del(inode, this, &tmp_cache))
 2529         GF_FREE((index_inode_ctx_t *)(long)tmp_cache);
 2530 
 2531     return 0;
 2532 }
 2533 
 2534 int32_t
 2535 index_releasedir(xlator_t *this, fd_t *fd)
 2536 {
 2537     index_fd_ctx_t *fctx = NULL;
 2538     uint64_t ctx = 0;
 2539     int ret = 0;
 2540 
 2541     ret = fd_ctx_del(fd, this, &ctx);
 2542     if (ret < 0)
 2543         goto out;
 2544 
 2545     fctx = (index_fd_ctx_t *)(long)ctx;
 2546     if (fctx->dir) {
 2547         ret = sys_closedir(fctx->dir);
 2548         if (ret)
 2549             gf_msg(this->name, GF_LOG_ERROR, errno, INDEX_MSG_FD_OP_FAILED,
 2550                    "closedir error");
 2551     }
 2552 
 2553     GF_FREE(fctx);
 2554 out:
 2555     return 0;
 2556 }
 2557 
 2558 int32_t
 2559 index_release(xlator_t *this, fd_t *fd)
 2560 {
 2561     index_fd_ctx_t *fctx = NULL;
 2562     uint64_t ctx = 0;
 2563     int ret = 0;
 2564 
 2565     ret = fd_ctx_del(fd, this, &ctx);
 2566     if (ret < 0)
 2567         goto out;
 2568 
 2569     fctx = (index_fd_ctx_t *)(long)ctx;
 2570     GF_FREE(fctx);
 2571 out:
 2572     return 0;
 2573 }
 2574 
 2575 int
 2576 notify(xlator_t *this, int event, void *data, ...)
 2577 {
 2578     int ret = 0;
 2579     index_priv_t *priv = NULL;
 2580     uint64_t stub_cnt = 0;
 2581     xlator_t *victim = data;
 2582     struct timespec sleep_till = {
 2583         0,
 2584     };
 2585 
 2586     if (!this)
 2587         return 0;
 2588 
 2589     priv = this->private;
 2590     if (!priv)
 2591         return 0;
 2592 
 2593     if ((event == GF_EVENT_PARENT_DOWN) && victim->cleanup_starting) {
 2594         stub_cnt = GF_ATOMIC_GET(priv->stub_cnt);
 2595         clock_gettime(CLOCK_REALTIME, &sleep_till);
 2596         sleep_till.tv_sec += 1;
 2597 
 2598         /* Wait for draining stub from queue before notify PARENT_DOWN */
 2599         pthread_mutex_lock(&priv->mutex);
 2600         {
 2601             while (stub_cnt) {
 2602                 (void)pthread_cond_timedwait(&priv->cond, &priv->mutex,
 2603                                              &sleep_till);
 2604                 stub_cnt = GF_ATOMIC_GET(priv->stub_cnt);
 2605             }
 2606         }
 2607         pthread_mutex_unlock(&priv->mutex);
 2608         gf_log(this->name, GF_LOG_INFO,
 2609                "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name);
 2610     }
 2611 
 2612     if ((event == GF_EVENT_CHILD_DOWN) && victim->cleanup_starting) {
 2613         pthread_mutex_lock(&priv->mutex);
 2614         {
 2615             priv->down = _gf_true;
 2616             pthread_cond_broadcast(&priv->cond);
 2617             while (priv->curr_count)
 2618                 pthread_cond_wait(&priv->cond, &priv->mutex);
 2619         }
 2620         pthread_mutex_unlock(&priv->mutex);
 2621 
 2622         gf_log(this->name, GF_LOG_INFO,
 2623                "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name);
 2624     }
 2625 
 2626     ret = default_notify(this, event, data);
 2627     return ret;
 2628 }
 2629 
 2630 struct xlator_fops fops = {
 2631     .xattrop = index_xattrop,
 2632     .fxattrop = index_fxattrop,
 2633 
 2634     // interface functions follow
 2635     .getxattr = index_getxattr,
 2636     .lookup = index_lookup,
 2637     .opendir = index_opendir,
 2638     .readdir = index_readdir,
 2639     .unlink = index_unlink,
 2640     .rmdir = index_rmdir,
 2641     .fstat = index_fstat,
 2642 };
 2643 
 2644 struct xlator_dumpops dumpops;
 2645 
 2646 struct xlator_cbks cbks = {.forget = index_forget,
 2647                            .release = index_release,
 2648                            .releasedir = index_releasedir};
 2649 
 2650 struct volume_options options[] = {
 2651     {.key = {"index-base"},
 2652      .type = GF_OPTION_TYPE_PATH,
 2653      .description = "path where the index files need to be stored",
 2654      .default_value = "{{ brick.path }}/.glusterfs/indices"},
 2655     {.key = {"xattrop64-watchlist"},
 2656      .type = GF_OPTION_TYPE_STR,
 2657      .description = "Comma separated list of xattrs that are watched",
 2658      .default_value = "trusted.ec.dirty"},
 2659     {.key = {"xattrop-dirty-watchlist"},
 2660      .type = GF_OPTION_TYPE_STR,
 2661      .description = "Comma separated list of xattrs that are watched",
 2662      .default_value = "trusted.afr.dirty"},
 2663     {.key = {"xattrop-pending-watchlist"},
 2664      .type = GF_OPTION_TYPE_STR,
 2665      .description = "Comma separated list of xattrs that are watched",
 2666      .default_value = "trusted.afr.{{ volume.name }}"},
 2667     {.key = {NULL}},
 2668 };
 2669 
 2670 xlator_api_t xlator_api = {
 2671     .init = init,
 2672     .fini = fini,
 2673     .notify = notify,
 2674     .mem_acct_init = mem_acct_init,
 2675     .op_version = {1}, /* Present from the initial version */
 2676     .dumpops = &dumpops,
 2677     .fops = &fops,
 2678     .cbks = &cbks,
 2679     .options = options,
 2680     .identifier = "index",
 2681     .category = GF_MAINTAINED,
 2682 };