"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/cluster/dht/src/dht-rebalance.c" (16 Sep 2020, 147873 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "dht-rebalance.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 8.0_vs_8.1.

    1 /*
    2   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
    3   This file is part of GlusterFS.
    4 
    5   This file is licensed to you under your choice of the GNU Lesser
    6   General Public License, version 3 or any later version (LGPLv3 or
    7   later), or the GNU General Public License, version 2 (GPLv2), in all
    8   cases as published by the Free Software Foundation.
    9 */
   10 
   11 #include "dht-common.h"
   12 #include <glusterfs/syscall.h>
   13 #include <fnmatch.h>
   14 #include <signal.h>
   15 #include <glusterfs/events.h>
   16 #include "glusterfs/compat-errno.h"  // for ENODATA on BSD
   17 
   18 #define GF_DISK_SECTOR_SIZE 512
   19 #define DHT_REBALANCE_PID 4242              /* Change it if required */
   20 #define DHT_REBALANCE_BLKSIZE (1024 * 1024) /* 1 MB */
   21 #define MAX_MIGRATE_QUEUE_COUNT 500
   22 #define MIN_MIGRATE_QUEUE_COUNT 200
   23 #define MAX_REBAL_TYPE_SIZE 16
   24 #define FILE_CNT_INTERVAL 600       /* 10 mins */
   25 #define ESTIMATE_START_INTERVAL 600 /* 10 mins */
   26 #define HARDLINK_MIG_INPROGRESS -2
   27 #define SKIP_MIGRATION_FD_POSITIVE -3
   28 #ifndef MAX
   29 #define MAX(a, b) (((a) > (b)) ? (a) : (b))
   30 #endif
   31 
   32 #define GF_CRAWL_INDEX_MOVE(idx, sv_cnt)                                       \
   33     {                                                                          \
   34         idx++;                                                                 \
   35         idx %= sv_cnt;                                                         \
   36     }
   37 
   38 uint64_t g_totalfiles = 0;
   39 uint64_t g_totalsize = 0;
   40 
   41 void
   42 gf_defrag_free_dir_dfmeta(struct dir_dfmeta *meta, int local_subvols_cnt)
   43 {
   44     int i = 0;
   45 
   46     if (meta) {
   47         for (i = 0; i < local_subvols_cnt; i++) {
   48             gf_dirent_free(&meta->equeue[i]);
   49         }
   50 
   51         GF_FREE(meta->equeue);
   52         GF_FREE(meta->head);
   53         GF_FREE(meta->iterator);
   54         GF_FREE(meta->offset_var);
   55         GF_FREE(meta->fetch_entries);
   56         GF_FREE(meta);
   57     }
   58 }
   59 
   60 void
   61 gf_defrag_free_container(struct dht_container *container)
   62 {
   63     if (container) {
   64         gf_dirent_entry_free(container->df_entry);
   65 
   66         if (container->parent_loc) {
   67             loc_wipe(container->parent_loc);
   68         }
   69 
   70         GF_FREE(container->parent_loc);
   71 
   72         GF_FREE(container);
   73     }
   74 }
   75 
   76 void
   77 dht_set_global_defrag_error(gf_defrag_info_t *defrag, int ret)
   78 {
   79     LOCK(&defrag->lock);
   80     {
   81         defrag->global_error = ret;
   82     }
   83     UNLOCK(&defrag->lock);
   84     return;
   85 }
   86 
   87 static int
   88 dht_send_rebalance_event(xlator_t *this, int cmd, gf_defrag_status_t status)
   89 {
   90     int ret = -1;
   91     char *volname = NULL;
   92     char *tmpstr = NULL;
   93     char *ptr = NULL;
   94     char *suffix = "-dht";
   95     int len = 0;
   96 
   97     eventtypes_t event = EVENT_LAST;
   98 
   99     switch (status) {
  100         case GF_DEFRAG_STATUS_COMPLETE:
  101             event = EVENT_VOLUME_REBALANCE_COMPLETE;
  102             break;
  103         case GF_DEFRAG_STATUS_FAILED:
  104             event = EVENT_VOLUME_REBALANCE_FAILED;
  105             break;
  106         case GF_DEFRAG_STATUS_STOPPED:
  107             event = EVENT_VOLUME_REBALANCE_STOP;
  108             break;
  109         default:
  110             break;
  111     }
  112 
  113     /* DHT volume */
  114     len = strlen(this->name) - strlen(suffix);
  115     tmpstr = gf_strdup(this->name);
  116     if (tmpstr) {
  117         ptr = tmpstr + len;
  118         if (!strcmp(ptr, suffix)) {
  119             tmpstr[len] = '\0';
  120             volname = tmpstr;
  121         }
  122     }
  123 
  124     if (!volname) {
  125         /* Better than nothing */
  126         volname = this->name;
  127     }
  128 
  129     if (event != EVENT_LAST) {
  130         gf_event(event, "volume=%s", volname);
  131     }
  132 
  133     GF_FREE(tmpstr);
  134     return ret;
  135 }
  136 
  137 static void
  138 dht_strip_out_acls(dict_t *dict)
  139 {
  140     if (dict) {
  141         dict_del(dict, "trusted.SGI_ACL_FILE");
  142         dict_del(dict, POSIX_ACL_ACCESS_XATTR);
  143     }
  144 }
  145 
  146 static int
  147 dht_write_with_holes(xlator_t *to, fd_t *fd, struct iovec *vec, int count,
  148                      int32_t size, off_t offset, struct iobref *iobref,
  149                      int *fop_errno)
  150 {
  151     int i = 0;
  152     int ret = -1;
  153     int start_idx = 0;
  154     int tmp_offset = 0;
  155     int write_needed = 0;
  156     int buf_len = 0;
  157     int size_pending = 0;
  158     char *buf = NULL;
  159 
  160     /* loop through each vector */
  161     for (i = 0; i < count; i++) {
  162         buf = vec[i].iov_base;
  163         buf_len = vec[i].iov_len;
  164 
  165         for (start_idx = 0; (start_idx + GF_DISK_SECTOR_SIZE) <= buf_len;
  166              start_idx += GF_DISK_SECTOR_SIZE) {
  167             if (mem_0filled(buf + start_idx, GF_DISK_SECTOR_SIZE) != 0) {
  168                 write_needed = 1;
  169                 continue;
  170             }
  171 
  172             if (write_needed) {
  173                 ret = syncop_write(
  174                     to, fd, (buf + tmp_offset), (start_idx - tmp_offset),
  175                     (offset + tmp_offset), iobref, 0, NULL, NULL);
  176                 /* 'path' will be logged in calling function */
  177                 if (ret < 0) {
  178                     gf_log(THIS->name, GF_LOG_WARNING, "failed to write (%s)",
  179                            strerror(-ret));
  180                     *fop_errno = -ret;
  181                     ret = -1;
  182                     goto out;
  183                 }
  184 
  185                 write_needed = 0;
  186             }
  187             tmp_offset = start_idx + GF_DISK_SECTOR_SIZE;
  188         }
  189 
  190         if ((start_idx < buf_len) || write_needed) {
  191             /* This means, last chunk is not yet written.. write it */
  192             ret = syncop_write(to, fd, (buf + tmp_offset),
  193                                (buf_len - tmp_offset), (offset + tmp_offset),
  194                                iobref, 0, NULL, NULL);
  195             if (ret < 0) {
  196                 /* 'path' will be logged in calling function */
  197                 gf_log(THIS->name, GF_LOG_WARNING, "failed to write (%s)",
  198                        strerror(-ret));
  199                 *fop_errno = -ret;
  200                 ret = -1;
  201                 goto out;
  202             }
  203         }
  204 
  205         size_pending = (size - buf_len);
  206         if (!size_pending)
  207             break;
  208     }
  209 
  210     ret = size;
  211 out:
  212     return ret;
  213 }
  214 
  215 /*
  216    return values:
  217    -1 : failure
  218    -2 : success
  219 
  220 Hard link migration is carried out in three stages.
  221 
  222 (Say there are n hardlinks)
  223 Stage 1: Setting the new hashed subvol information on the 1st hardlink
  224          encountered (linkto setxattr)
  225 
  226 Stage 2: Creating hardlinks on new hashed subvol for the 2nd to (n-1)th
  227          hardlink
  228 
  229 Stage 3: Physical migration of the data file for nth hardlink
  230 
  231 Why to deem "-2" as success and not "0":
  232 
  233    dht_migrate_file expects return value "0" from _is_file_migratable if
  234 the file has to be migrated.
  235 
  236    _is_file_migratable returns zero only when it is called with the
  237 flag "GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS".
  238 
  239    gf_defrag_handle_hardlink calls dht_migrate_file for physical migration
  240 of the data file with the flag "GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS"
  241 
  242 Hence, gf_defrag_handle_hardlink returning "0" for success will force
  243 "dht_migrate_file" to migrate each of the hardlink which is not intended.
  244 
  245 For each of the three stage mentioned above "-2" will be returned and will
  246 be converted to "0" in dht_migrate_file.
  247 
  248 */
  249 
  250 int32_t
  251 gf_defrag_handle_hardlink(xlator_t *this, loc_t *loc, int *fop_errno)
  252 {
  253     int32_t ret = -1;
  254     xlator_t *cached_subvol = NULL;
  255     xlator_t *hashed_subvol = NULL;
  256     xlator_t *linkto_subvol = NULL;
  257     data_t *data = NULL;
  258     struct iatt iatt = {
  259         0,
  260     };
  261     int32_t op_errno = 0;
  262     dht_conf_t *conf = NULL;
  263     gf_loglevel_t loglevel = 0;
  264     dict_t *link_xattr = NULL;
  265     dict_t *dict = NULL;
  266     dict_t *xattr_rsp = NULL;
  267     struct iatt stbuf = {
  268         0,
  269     };
  270 
  271     *fop_errno = EINVAL;
  272 
  273     GF_VALIDATE_OR_GOTO("defrag", loc, out);
  274     GF_VALIDATE_OR_GOTO("defrag", loc->name, out);
  275     GF_VALIDATE_OR_GOTO("defrag", this, out);
  276     GF_VALIDATE_OR_GOTO("defrag", this->private, out);
  277 
  278     conf = this->private;
  279 
  280     if (gf_uuid_is_null(loc->pargfid)) {
  281         gf_msg("", GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  282                "Migrate file failed :"
  283                "loc->pargfid is NULL for %s",
  284                loc->path);
  285         *fop_errno = EINVAL;
  286         ret = -1;
  287         goto out;
  288     }
  289 
  290     if (gf_uuid_is_null(loc->gfid)) {
  291         gf_msg("", GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  292                "Migrate file failed :"
  293                "loc->gfid is NULL for %s",
  294                loc->path);
  295         *fop_errno = EINVAL;
  296         ret = -1;
  297         goto out;
  298     }
  299 
  300     link_xattr = dict_new();
  301     if (!link_xattr) {
  302         ret = -1;
  303         *fop_errno = ENOMEM;
  304         goto out;
  305     }
  306 
  307     /*
  308       Parallel migration can lead to migration of the hard link multiple
  309       times which can lead to data loss. Hence, adding a fresh lookup to
  310       decide whether migration is required or not.
  311 
  312       Elaborating the scenario for let say 10 hardlinks [link{1..10}]:
  313           Let say the first hard link "link1"  does the setxattr of the
  314       new hashed subvolume info on the cached file. As there are multiple
  315       threads working, we might have already all the links created on the
  316       new hashed by the time we reach hardlink let say link5. Now the
  317       number of links on hashed is equal to that of cached. Hence, file
  318       migration will happen for link6.
  319 
  320              Cached                                 Hashed
  321       --------T link6                        rwxrwxrwx   link6
  322 
  323       Now post above state all the link file on the cached will be zero
  324       byte linkto files. Hence, if we still do migration for the following
  325       files link{7..10}, we will end up migrating 0 data leading to data
  326       loss.
  327             Hence, a lookup can make sure whether we need to migrate the
  328       file or not.
  329     */
  330 
  331     dict = dict_new();
  332     if (!dict) {
  333         ret = -1;
  334         *fop_errno = ENOMEM;
  335         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY,
  336                "could not allocate memory for dict");
  337         goto out;
  338     }
  339 
  340     ret = dict_set_int32(dict, conf->link_xattr_name, 256);
  341     if (ret) {
  342         *fop_errno = ENOMEM;
  343         ret = -1;
  344         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  345                "Migrate file failed:"
  346                "%s: failed to set 'linkto' key in dict",
  347                loc->path);
  348         goto out;
  349     }
  350 
  351     ret = syncop_lookup(this, loc, &stbuf, NULL, dict, &xattr_rsp);
  352     if (ret) {
  353         /*Ignore ENOENT and ESTALE as file might have been
  354           migrated already*/
  355         if (-ret == ENOENT || -ret == ESTALE) {
  356             ret = -2;
  357             goto out;
  358         }
  359         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  360                "Migrate file failed:%s lookup failed with ret = %d", loc->path,
  361                ret);
  362         *fop_errno = -ret;
  363         ret = -1;
  364         goto out;
  365     }
  366 
  367     cached_subvol = dht_subvol_get_cached(this, loc->inode);
  368     if (!cached_subvol) {
  369         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  370                "Migrate file failed :"
  371                "Failed to get cached subvol"
  372                " for %s on %s",
  373                loc->name, this->name);
  374         *fop_errno = EINVAL;
  375         ret = -1;
  376         goto out;
  377     }
  378 
  379     hashed_subvol = dht_subvol_get_hashed(this, loc);
  380     if (!hashed_subvol) {
  381         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  382                "Migrate file failed :"
  383                "Failed to get hashed subvol"
  384                " for %s on %s",
  385                loc->name, this->name);
  386         *fop_errno = EINVAL;
  387         ret = -1;
  388         goto out;
  389     }
  390 
  391     /* Hardlink migration happens only with remove-brick. So this condition will
  392      * be true only when the migration has happened. In case hardlinks are
  393      * migrated for rebalance case, remove this check. Having this check here
  394      * avoid redundant calls below*/
  395     if (hashed_subvol == cached_subvol) {
  396         ret = -2;
  397         goto out;
  398     }
  399 
  400     gf_log(this->name, GF_LOG_INFO,
  401            "Attempting to migrate hardlink %s "
  402            "with gfid %s from %s -> %s",
  403            loc->name, uuid_utoa(loc->gfid), cached_subvol->name,
  404            hashed_subvol->name);
  405 
  406     data = dict_get(xattr_rsp, conf->link_xattr_name);
  407     /* set linkto on cached -> hashed if not present, else link it */
  408     if (!data) {
  409         ret = dict_set_str(link_xattr, conf->link_xattr_name,
  410                            hashed_subvol->name);
  411         if (ret) {
  412             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  413                    "Migrate file failed :"
  414                    "Failed to set dictionary value:"
  415                    " key = %s for %s",
  416                    conf->link_xattr_name, loc->name);
  417             *fop_errno = ENOMEM;
  418             ret = -1;
  419             goto out;
  420         }
  421 
  422         ret = syncop_setxattr(cached_subvol, loc, link_xattr, 0, NULL, NULL);
  423         if (ret) {
  424             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  425                    "Migrate file failed :"
  426                    "Linkto setxattr failed %s -> %s",
  427                    cached_subvol->name, loc->name);
  428             *fop_errno = -ret;
  429             ret = -1;
  430             goto out;
  431         }
  432 
  433         gf_msg_debug(this->name, 0,
  434                      "hardlink target subvol created on %s "
  435                      ",cached %s, file %s",
  436                      hashed_subvol->name, cached_subvol->name, loc->path);
  437 
  438         ret = -2;
  439         goto out;
  440     } else {
  441         linkto_subvol = dht_linkfile_subvol(this, NULL, NULL, xattr_rsp);
  442         if (!linkto_subvol) {
  443             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SUBVOL_ERROR,
  444                    "Failed to get "
  445                    "linkto subvol for %s",
  446                    loc->name);
  447         } else {
  448             hashed_subvol = linkto_subvol;
  449         }
  450 
  451         ret = syncop_link(hashed_subvol, loc, loc, &iatt, NULL, NULL);
  452         if (ret) {
  453             op_errno = -ret;
  454             ret = -1;
  455 
  456             loglevel = (op_errno == EEXIST) ? GF_LOG_DEBUG : GF_LOG_ERROR;
  457             gf_msg(this->name, loglevel, op_errno,
  458                    DHT_MSG_MIGRATE_HARDLINK_FILE_FAILED,
  459                    "link of %s -> %s"
  460                    " failed on  subvol %s",
  461                    loc->name, uuid_utoa(loc->gfid), hashed_subvol->name);
  462             if (op_errno != EEXIST) {
  463                 *fop_errno = op_errno;
  464                 goto out;
  465             }
  466         } else {
  467             gf_msg_debug(this->name, 0,
  468                          "syncop_link successful for"
  469                          " hardlink %s on subvol %s, cached %s",
  470                          loc->path, hashed_subvol->name, cached_subvol->name);
  471         }
  472     }
  473 
  474     ret = syncop_lookup(hashed_subvol, loc, &iatt, NULL, NULL, NULL);
  475     if (ret) {
  476         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  477                "Migrate file failed :Failed lookup %s on %s ", loc->name,
  478                hashed_subvol->name);
  479 
  480         *fop_errno = -ret;
  481         ret = -1;
  482         goto out;
  483     }
  484 
  485     /* There is a race where on the target subvol for the hardlink
  486      * (note: hash subvol for the hardlink might differ from this), some
  487      * other client(non-rebalance) would have created a linkto file for that
  488      * hardlink as part of lookup. So let say there are 10 hardlinks, on the
  489      * 5th hardlink it self the hardlinks might have migrated. Now for
  490      * (6..10th) hardlinks the cached and target would be same as the file
  491      * has already migrated. Hence this check is needed  */
  492     if (cached_subvol == hashed_subvol) {
  493         gf_msg_debug(this->name, 0,
  494                      "source %s and destination %s "
  495                      "for hardlink %s are same",
  496                      cached_subvol->name, hashed_subvol->name, loc->path);
  497         ret = -2;
  498         goto out;
  499     }
  500 
  501     if (iatt.ia_nlink == stbuf.ia_nlink) {
  502         ret = dht_migrate_file(this, loc, cached_subvol, hashed_subvol,
  503                                GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS, fop_errno);
  504         if (ret) {
  505             goto out;
  506         }
  507     }
  508     ret = -2;
  509 out:
  510     if (link_xattr)
  511         dict_unref(link_xattr);
  512 
  513     if (xattr_rsp)
  514         dict_unref(xattr_rsp);
  515 
  516     if (dict)
  517         dict_unref(dict);
  518 
  519     return ret;
  520 }
  521 
  522 static int
  523 __check_file_has_hardlink(xlator_t *this, loc_t *loc, struct iatt *stbuf,
  524                           dict_t *xattrs, int flags, gf_defrag_info_t *defrag,
  525                           dht_conf_t *conf, int *fop_errno)
  526 {
  527     int ret = 0;
  528 
  529     if (flags == GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS) {
  530         ret = 0;
  531         return ret;
  532     }
  533     if (stbuf->ia_nlink > 1) {
  534         /* support for decomission */
  535         if (flags == GF_DHT_MIGRATE_HARDLINK) {
  536             synclock_lock(&conf->link_lock);
  537             ret = gf_defrag_handle_hardlink(this, loc, fop_errno);
  538             synclock_unlock(&conf->link_lock);
  539             /*
  540             Returning zero will force the file to be remigrated.
  541             Checkout gf_defrag_handle_hardlink for more information.
  542             */
  543             if (ret && ret != -2) {
  544                 gf_msg(this->name, GF_LOG_WARNING, 0,
  545                        DHT_MSG_MIGRATE_FILE_FAILED,
  546                        "Migrate file failed:"
  547                        "%s: failed to migrate file with link",
  548                        loc->path);
  549             }
  550         } else {
  551             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  552                    "Migration skipped for:"
  553                    "%s: file has hardlinks",
  554                    loc->path);
  555             *fop_errno = ENOTSUP;
  556             ret = 1;
  557         }
  558     }
  559 
  560     return ret;
  561 }
  562 
  563 /*
  564      return values
  565      0 : File will be migrated
  566     -2 : File will not be migrated
  567          (This is the return value from gf_defrag_handle_hardlink. Checkout
  568          gf_defrag_handle_hardlink for description of "returning -2")
  569     -1 : failure
  570 */
  571 static int
  572 __is_file_migratable(xlator_t *this, loc_t *loc, struct iatt *stbuf,
  573                      dict_t *xattrs, int flags, gf_defrag_info_t *defrag,
  574                      dht_conf_t *conf, int *fop_errno)
  575 {
  576     int ret = -1;
  577     int lock_count = 0;
  578 
  579     if (IA_ISDIR(stbuf->ia_type)) {
  580         gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  581                "Migrate file failed:"
  582                "%s: migrate-file called on directory",
  583                loc->path);
  584         *fop_errno = EISDIR;
  585         ret = -1;
  586         goto out;
  587     }
  588 
  589     if (!conf->lock_migration_enabled) {
  590         ret = dict_get_int32(xattrs, GLUSTERFS_POSIXLK_COUNT, &lock_count);
  591         if (ret) {
  592             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  593                    "Migrate file failed:"
  594                    "%s: Unable to get lock count for file",
  595                    loc->path);
  596             *fop_errno = EINVAL;
  597             ret = -1;
  598             goto out;
  599         }
  600 
  601         if (lock_count) {
  602             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  603                    "Migrate file failed: %s: File has locks."
  604                    " Skipping file migration",
  605                    loc->path);
  606             *fop_errno = ENOTSUP;
  607             ret = 1;
  608             goto out;
  609         }
  610     }
  611 
  612     /* Check if file has hardlink*/
  613     ret = __check_file_has_hardlink(this, loc, stbuf, xattrs, flags, defrag,
  614                                     conf, fop_errno);
  615 out:
  616     return ret;
  617 }
  618 
  619 static int
  620 __dht_rebalance_create_dst_file(xlator_t *this, xlator_t *to, xlator_t *from,
  621                                 loc_t *loc, struct iatt *stbuf, fd_t **dst_fd,
  622                                 int *fop_errno)
  623 {
  624     int ret = -1;
  625     int ret2 = -1;
  626     fd_t *fd = NULL;
  627     struct iatt new_stbuf = {
  628         0,
  629     };
  630     struct iatt check_stbuf = {
  631         0,
  632     };
  633     dht_conf_t *conf = NULL;
  634     dict_t *dict = NULL;
  635     dict_t *xdata = NULL;
  636 
  637     conf = this->private;
  638 
  639     dict = dict_new();
  640     if (!dict) {
  641         *fop_errno = ENOMEM;
  642         ret = -1;
  643         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY,
  644                "dictionary allocation failed for"
  645                "path:%s",
  646                loc->path);
  647         goto out;
  648     }
  649     ret = dict_set_gfuuid(dict, "gfid-req", stbuf->ia_gfid, true);
  650     if (ret) {
  651         *fop_errno = ENOMEM;
  652         ret = -1;
  653         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
  654                "%s: failed to set dictionary value: key = gfid-req", loc->path);
  655         goto out;
  656     }
  657 
  658     ret = dict_set_str(dict, conf->link_xattr_name, from->name);
  659     if (ret) {
  660         *fop_errno = ENOMEM;
  661         ret = -1;
  662         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
  663                "%s: failed to set dictionary value: key = %s ", loc->path,
  664                conf->link_xattr_name);
  665         goto out;
  666     }
  667 
  668     fd = fd_create(loc->inode, DHT_REBALANCE_PID);
  669     if (!fd) {
  670         *fop_errno = ENOMEM;
  671         ret = -1;
  672         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_MIGRATE_FILE_FAILED,
  673                "%s: fd create failed (destination)", loc->path);
  674         goto out;
  675     }
  676 
  677     if (!!dht_is_tier_xlator(this)) {
  678         xdata = dict_new();
  679         if (!xdata) {
  680             *fop_errno = ENOMEM;
  681             ret = -1;
  682             gf_msg(this->name, GF_LOG_ERROR, ENOMEM,
  683                    DHT_MSG_MIGRATE_FILE_FAILED, "%s: dict_new failed)",
  684                    loc->path);
  685             goto out;
  686         }
  687 
  688         ret = dict_set_int32(xdata, GF_CLEAN_WRITE_PROTECTION, 1);
  689         if (ret) {
  690             *fop_errno = ENOMEM;
  691             ret = -1;
  692             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
  693                    "%s: failed to set dictionary value: key = %s ", loc->path,
  694                    GF_CLEAN_WRITE_PROTECTION);
  695             goto out;
  696         }
  697     }
  698 
  699     ret = syncop_lookup(to, loc, &new_stbuf, NULL, xdata, NULL);
  700     if (!ret) {
  701         /* File exits in the destination, check if gfid matches */
  702         if (gf_uuid_compare(stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) {
  703             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_GFID_MISMATCH,
  704                    "file %s exists in %s with different gfid", loc->path,
  705                    to->name);
  706             *fop_errno = EINVAL;
  707             ret = -1;
  708             goto out;
  709         }
  710     }
  711     if ((ret < 0) && (-ret != ENOENT)) {
  712         /* File exists in destination, but not accessible */
  713         gf_msg(THIS->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  714                "%s: failed to lookup file", loc->path);
  715         *fop_errno = -ret;
  716         ret = -1;
  717         goto out;
  718     }
  719 
  720     /* Create the destination with LINKFILE mode, and linkto xattr,
  721        if the linkfile already exists, just open the file */
  722     if (!ret) {
  723         /*
  724          * File already present, just open the file.
  725          */
  726         ret = syncop_open(to, loc, O_RDWR, fd, NULL, NULL);
  727         if (ret < 0) {
  728             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  729                    "failed to open %s on %s", loc->path, to->name);
  730             *fop_errno = -ret;
  731             ret = -1;
  732             goto out;
  733         }
  734     } else {
  735         ret = syncop_create(to, loc, O_RDWR, DHT_LINKFILE_MODE, fd, &new_stbuf,
  736                             dict, NULL);
  737         if (ret < 0) {
  738             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  739                    "failed to create %s on %s", loc->path, to->name);
  740             *fop_errno = -ret;
  741             ret = -1;
  742             goto out;
  743         }
  744     }
  745 
  746     fd_bind(fd);
  747 
  748     /*Reason of doing lookup after create again:
  749      *In the create, there is some time-gap between opening fd at the
  750      *server (posix_layer) and binding it in server (incrementing fd count),
  751      *so if in that time-gap, if other process sends unlink considering it
  752      *as a linkto file, because inode->fd count will be 0, so file will be
  753      *unlinked at the backend. And because further operations are performed
  754      *on fd, so though migration will be done but will end with no file
  755      *at  the backend.
  756      */
  757 
  758     ret = syncop_lookup(to, loc, &check_stbuf, NULL, NULL, NULL);
  759     if (!ret) {
  760         if (gf_uuid_compare(stbuf->ia_gfid, check_stbuf.ia_gfid) != 0) {
  761             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_GFID_MISMATCH,
  762                    "file %s exists in %s with different gfid,"
  763                    "found in lookup after create",
  764                    loc->path, to->name);
  765             *fop_errno = EINVAL;
  766             ret = -1;
  767             goto out;
  768         }
  769     }
  770 
  771     if (-ret == ENOENT) {
  772         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  773                "%s: file does not exist"
  774                "on %s",
  775                loc->path, to->name);
  776         *fop_errno = -ret;
  777         ret = -1;
  778         goto out;
  779     }
  780 
  781     ret = syncop_fsetattr(to, fd, stbuf, (GF_SET_ATTR_UID | GF_SET_ATTR_GID),
  782                           NULL, NULL, NULL, NULL);
  783     if (ret < 0) {
  784         *fop_errno = -ret;
  785         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  786                "chown failed for %s on %s", loc->path, to->name);
  787     }
  788 
  789     /* No need to bother about 0 byte size files */
  790     if (stbuf->ia_size > 0) {
  791         if (conf->use_fallocate) {
  792             ret = syncop_fallocate(to, fd, 0, 0, stbuf->ia_size, NULL, NULL);
  793             if (ret < 0) {
  794                 if (ret == -EOPNOTSUPP || ret == -EINVAL || ret == -ENOSYS) {
  795                     conf->use_fallocate = _gf_false;
  796                 } else {
  797                     gf_msg(this->name, GF_LOG_ERROR, -ret,
  798                            DHT_MSG_MIGRATE_FILE_FAILED,
  799                            "fallocate failed for %s on %s", loc->path,
  800                            to->name);
  801 
  802                     *fop_errno = -ret;
  803 
  804                     /* fallocate does not release the space
  805                      * in some cases
  806                      */
  807                     ret2 = syncop_ftruncate(to, fd, 0, NULL, NULL, NULL, NULL);
  808                     if (ret2 < 0) {
  809                         gf_msg(this->name, GF_LOG_WARNING, -ret2,
  810                                DHT_MSG_MIGRATE_FILE_FAILED,
  811                                "ftruncate failed for "
  812                                "%s on %s",
  813                                loc->path, to->name);
  814                     }
  815                     goto out;
  816                 }
  817             }
  818         }
  819 
  820         if (!conf->use_fallocate) {
  821             ret = syncop_ftruncate(to, fd, stbuf->ia_size, NULL, NULL, NULL,
  822                                    NULL);
  823             if (ret < 0) {
  824                 *fop_errno = -ret;
  825                 gf_msg(this->name, GF_LOG_WARNING, -ret,
  826                        DHT_MSG_MIGRATE_FILE_FAILED,
  827                        "ftruncate failed for %s on %s", loc->path, to->name);
  828             }
  829         }
  830     }
  831 
  832     /* success */
  833     ret = 0;
  834 
  835     if (dst_fd)
  836         *dst_fd = fd;
  837 
  838 out:
  839     if (ret) {
  840         if (fd) {
  841             fd_unref(fd);
  842         }
  843     }
  844     if (dict)
  845         dict_unref(dict);
  846 
  847     if (xdata)
  848         dict_unref(xdata);
  849 
  850     return ret;
  851 }
  852 
  853 static int
  854 __dht_check_free_space(xlator_t *this, xlator_t *to, xlator_t *from, loc_t *loc,
  855                        struct iatt *stbuf, int flag, dht_conf_t *conf,
  856                        gf_boolean_t *target_changed, xlator_t **new_subvol,
  857                        int *fop_errno)
  858 {
  859     struct statvfs src_statfs = {
  860         0,
  861     };
  862     struct statvfs dst_statfs = {
  863         0,
  864     };
  865     int ret = -1;
  866     dict_t *xdata = NULL;
  867     dht_layout_t *layout = NULL;
  868     uint64_t src_statfs_blocks = 1;
  869     uint64_t dst_statfs_blocks = 1;
  870     double dst_post_availspacepercent = 0;
  871     double src_post_availspacepercent = 0;
  872     uint64_t file_blocks = 0;
  873     uint64_t src_total_blocks = 0;
  874     uint64_t dst_total_blocks = 0;
  875 
  876     xdata = dict_new();
  877     if (!xdata) {
  878         *fop_errno = ENOMEM;
  879         ret = -1;
  880         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY,
  881                "failed to allocate dictionary");
  882         goto out;
  883     }
  884 
  885     ret = dict_set_int8(xdata, GF_INTERNAL_IGNORE_DEEM_STATFS, 1);
  886     if (ret) {
  887         gf_log(this->name, GF_LOG_ERROR,
  888                "Failed to set " GF_INTERNAL_IGNORE_DEEM_STATFS " in dict");
  889         ret = -1;
  890         *fop_errno = ENOMEM;
  891         goto out;
  892     }
  893 
  894     ret = syncop_statfs(from, loc, &src_statfs, xdata, NULL);
  895     if (ret) {
  896         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  897                "failed to get statfs of %s on %s", loc->path, from->name);
  898         *fop_errno = -ret;
  899         ret = -1;
  900         goto out;
  901     }
  902 
  903     ret = syncop_statfs(to, loc, &dst_statfs, xdata, NULL);
  904     if (ret) {
  905         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
  906                "failed to get statfs of %s on %s", loc->path, to->name);
  907         *fop_errno = -ret;
  908         ret = -1;
  909         goto out;
  910     }
  911 
  912     gf_msg_debug(this->name, 0,
  913                  "min_free_disk - %f , block available - %" PRId64
  914                  ", block size - %lu",
  915                  conf->min_free_disk, dst_statfs.f_bavail, dst_statfs.f_bsize);
  916 
  917     dst_statfs_blocks = dst_statfs.f_bavail *
  918                         (dst_statfs.f_frsize / GF_DISK_SECTOR_SIZE);
  919 
  920     src_statfs_blocks = src_statfs.f_bavail *
  921                         (src_statfs.f_frsize / GF_DISK_SECTOR_SIZE);
  922 
  923     dst_total_blocks = dst_statfs.f_blocks *
  924                        (dst_statfs.f_frsize / GF_DISK_SECTOR_SIZE);
  925 
  926     src_total_blocks = src_statfs.f_blocks *
  927                        (src_statfs.f_frsize / GF_DISK_SECTOR_SIZE);
  928 
  929     /* if force option is given, do not check for space @ dst.
  930      * Check only if space is avail for the file */
  931     if (flag != GF_DHT_MIGRATE_DATA)
  932         goto check_avail_space;
  933 
  934     /* Check:
  935        During rebalance `migrate-data` - Destination subvol experiences
  936        a `reduction` in 'blocks' of free space, at the same time source
  937        subvol gains certain 'blocks' of free space. A valid check is
  938        necessary here to avoid erroneous move to destination where
  939        the space could be scantily available.
  940        With heterogeneous brick support, an actual space comparison could
  941        prevent any files being migrated to newly added bricks if they are
  942        smaller then the free space available on the existing bricks.
  943      */
  944     if (!conf->use_fallocate) {
  945         file_blocks = stbuf->ia_size + GF_DISK_SECTOR_SIZE - 1;
  946         file_blocks /= GF_DISK_SECTOR_SIZE;
  947 
  948         if (file_blocks >= dst_statfs_blocks) {
  949             dst_statfs_blocks = 0;
  950         } else {
  951             dst_statfs_blocks -= file_blocks;
  952         }
  953     }
  954 
  955     src_post_availspacepercent = ((src_statfs_blocks + file_blocks) * 100) /
  956                                  src_total_blocks;
  957 
  958     dst_post_availspacepercent = (dst_statfs_blocks * 100) / dst_total_blocks;
  959 
  960     if (dst_post_availspacepercent < src_post_availspacepercent) {
  961         gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
  962                "data movement of file "
  963                "{blocks:%" PRIu64
  964                " name:(%s)} would result in "
  965                "dst node (%s:%" PRIu64
  966                ") having lower disk "
  967                "space than the source node (%s:%" PRIu64
  968                ")"
  969                ".Skipping file.",
  970                stbuf->ia_blocks, loc->path, to->name, dst_statfs_blocks,
  971                from->name, src_statfs_blocks);
  972 
  973         /* this is not a 'failure', but we don't want to
  974            consider this as 'success' too :-/ */
  975         *fop_errno = ENOSPC;
  976         ret = 1;
  977         goto out;
  978     }
  979 
  980 check_avail_space:
  981     if (conf->disk_unit == 'p' && dst_statfs.f_blocks) {
  982         dst_post_availspacepercent = (dst_statfs_blocks * 100) /
  983                                      dst_total_blocks;
  984 
  985         gf_msg_debug(this->name, 0,
  986                      "file : %s, post_availspacepercent"
  987                      " : %lf f_bavail : %" PRIu64 " min-free-disk: %lf",
  988                      loc->path, dst_post_availspacepercent, dst_statfs.f_bavail,
  989                      conf->min_free_disk);
  990 
  991         if (dst_post_availspacepercent < conf->min_free_disk) {
  992             gf_msg(this->name, GF_LOG_WARNING, 0, 0,
  993                    "Write will cross min-free-disk for "
  994                    "file - %s on subvol - %s. Looking "
  995                    "for new subvol",
  996                    loc->path, to->name);
  997 
  998             goto find_new_subvol;
  999         } else {
 1000             ret = 0;
 1001             goto out;
 1002         }
 1003     }
 1004 
 1005     if (conf->disk_unit != 'p') {
 1006         if ((dst_statfs_blocks * GF_DISK_SECTOR_SIZE) < conf->min_free_disk) {
 1007             gf_msg_debug(this->name, 0,
 1008                          "file : %s,  destination frsize: %lu "
 1009                          "f_bavail : %" PRIu64 " min-free-disk: %lf",
 1010                          loc->path, dst_statfs.f_frsize, dst_statfs.f_bavail,
 1011                          conf->min_free_disk);
 1012 
 1013             gf_msg(this->name, GF_LOG_WARNING, 0, 0,
 1014                    "write will"
 1015                    " cross min-free-disk for file - %s on subvol -"
 1016                    " %s. looking for new subvol",
 1017                    loc->path, to->name);
 1018 
 1019             goto find_new_subvol;
 1020 
 1021         } else {
 1022             ret = 0;
 1023             goto out;
 1024         }
 1025     }
 1026 
 1027 find_new_subvol:
 1028     layout = dht_layout_get(this, loc->parent);
 1029     if (!layout) {
 1030         gf_log(this->name, GF_LOG_ERROR, "Layout is NULL");
 1031         *fop_errno = EINVAL;
 1032         ret = -1;
 1033         goto out;
 1034     }
 1035 
 1036     *new_subvol = dht_subvol_with_free_space_inodes(this, to, from, layout,
 1037                                                     stbuf->ia_size);
 1038     if ((!(*new_subvol)) || (*new_subvol == from)) {
 1039         gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_SUBVOL_INSUFF_SPACE,
 1040                "Could not find any subvol"
 1041                " with space accommodating the file - %s. Consider "
 1042                "adding bricks",
 1043                loc->path);
 1044 
 1045         *target_changed = _gf_false;
 1046         *fop_errno = ENOSPC;
 1047         ret = -1;
 1048     } else {
 1049         gf_msg(this->name, GF_LOG_INFO, 0, 0,
 1050                "new target found - %s"
 1051                " for file - %s",
 1052                (*new_subvol)->name, loc->path);
 1053         *target_changed = _gf_true;
 1054         ret = 0;
 1055     }
 1056 
 1057 out:
 1058     if (xdata)
 1059         dict_unref(xdata);
 1060     return ret;
 1061 }
 1062 
 1063 static int
 1064 __dht_rebalance_migrate_data(xlator_t *this, gf_defrag_info_t *defrag,
 1065                              xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst,
 1066                              uint64_t ia_size, int hole_exists, int *fop_errno)
 1067 {
 1068     int ret = 0;
 1069     int count = 0;
 1070     off_t offset = 0;
 1071     struct iovec *vector = NULL;
 1072     struct iobref *iobref = NULL;
 1073     uint64_t total = 0;
 1074     size_t read_size = 0;
 1075     dict_t *xdata = NULL;
 1076     dht_conf_t *conf = NULL;
 1077 
 1078     conf = this->private;
 1079     /* if file size is '0', no need to enter this loop */
 1080     while (total < ia_size) {
 1081         read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE)
 1082                          ? DHT_REBALANCE_BLKSIZE
 1083                          : (ia_size - total));
 1084 
 1085         ret = syncop_readv(from, src, read_size, offset, 0, &vector, &count,
 1086                            &iobref, NULL, NULL, NULL);
 1087         if (!ret || (ret < 0)) {
 1088             if (!ret) {
 1089                 /* File was probably truncated*/
 1090                 ret = -1;
 1091                 *fop_errno = ENOSPC;
 1092             } else {
 1093                 *fop_errno = -ret;
 1094             }
 1095             break;
 1096         }
 1097 
 1098         if (hole_exists) {
 1099             ret = dht_write_with_holes(to, dst, vector, count, ret, offset,
 1100                                        iobref, fop_errno);
 1101         } else {
 1102             if (!conf->force_migration && !dht_is_tier_xlator(this)) {
 1103                 if (!xdata) {
 1104                     xdata = dict_new();
 1105                     if (!xdata) {
 1106                         gf_msg("dht", GF_LOG_ERROR, 0,
 1107                                DHT_MSG_MIGRATE_FILE_FAILED,
 1108                                "insufficient memory");
 1109                         ret = -1;
 1110                         *fop_errno = ENOMEM;
 1111                         break;
 1112                     }
 1113 
 1114                     /* Fail this write and abort rebalance if we
 1115                      * detect a write from client since migration of
 1116                      * this file started. This is done to avoid
 1117                      * potential data corruption due to out of order
 1118                      * writes from rebalance and client to the same
 1119                      * region (as compared between src and dst
 1120                      * files). See
 1121                      * https://github.com/gluster/glusterfs/issues/308
 1122                      * for more details.
 1123                      */
 1124                     ret = dict_set_int32(xdata, GF_AVOID_OVERWRITE, 1);
 1125                     if (ret) {
 1126                         gf_msg("dht", GF_LOG_ERROR, 0, ENOMEM,
 1127                                "failed to set dict");
 1128                         ret = -1;
 1129                         *fop_errno = ENOMEM;
 1130                         break;
 1131                     }
 1132                 }
 1133             }
 1134             ret = syncop_writev(to, dst, vector, count, offset, iobref, 0, NULL,
 1135                                 NULL, xdata, NULL);
 1136             if (ret < 0) {
 1137                 *fop_errno = -ret;
 1138             }
 1139         }
 1140 
 1141         if (ret < 0) {
 1142             break;
 1143         }
 1144 
 1145         offset += ret;
 1146         total += ret;
 1147 
 1148         GF_FREE(vector);
 1149         if (iobref)
 1150             iobref_unref(iobref);
 1151         iobref = NULL;
 1152         vector = NULL;
 1153     }
 1154     if (iobref)
 1155         iobref_unref(iobref);
 1156     GF_FREE(vector);
 1157 
 1158     if (ret >= 0)
 1159         ret = 0;
 1160     else
 1161         ret = -1;
 1162 
 1163     if (xdata) {
 1164         dict_unref(xdata);
 1165     }
 1166 
 1167     return ret;
 1168 }
 1169 
 1170 static int
 1171 __dht_rebalance_open_src_file(xlator_t *this, xlator_t *from, xlator_t *to,
 1172                               loc_t *loc, struct iatt *stbuf, fd_t **src_fd,
 1173                               gf_boolean_t *clean_src, int *fop_errno)
 1174 {
 1175     int ret = 0;
 1176     fd_t *fd = NULL;
 1177     dict_t *dict = NULL;
 1178     struct iatt iatt = {
 1179         0,
 1180     };
 1181     dht_conf_t *conf = NULL;
 1182 
 1183     conf = this->private;
 1184 
 1185     *clean_src = _gf_false;
 1186 
 1187     fd = fd_create(loc->inode, DHT_REBALANCE_PID);
 1188     if (!fd) {
 1189         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1190                "%s: fd create failed (source)", loc->path);
 1191         *fop_errno = ENOMEM;
 1192         ret = -1;
 1193         goto out;
 1194     }
 1195 
 1196     ret = syncop_open(from, loc, O_RDWR, fd, NULL, NULL);
 1197     if (ret < 0) {
 1198         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1199                "failed to open file %s on %s", loc->path, from->name);
 1200         *fop_errno = -ret;
 1201         ret = -1;
 1202         goto out;
 1203     }
 1204 
 1205     fd_bind(fd);
 1206 
 1207     if (src_fd)
 1208         *src_fd = fd;
 1209 
 1210     ret = -1;
 1211     dict = dict_new();
 1212     if (!dict) {
 1213         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1214                "%s: Could not allocate memory for dict", loc->path);
 1215         *fop_errno = ENOMEM;
 1216         ret = -1;
 1217         goto out;
 1218     }
 1219 
 1220     ret = dict_set_str(dict, conf->link_xattr_name, to->name);
 1221     if (ret) {
 1222         gf_log(this->name, GF_LOG_ERROR,
 1223                "failed to set xattr in dict for %s (linkto:%s)", loc->path,
 1224                to->name);
 1225         *fop_errno = ENOMEM;
 1226         ret = -1;
 1227         goto out;
 1228     }
 1229 
 1230     /* Once the migration starts, the source should have 'linkto' key set
 1231        to show which is the target, so other clients can work around it */
 1232     ret = syncop_setxattr(from, loc, dict, 0, NULL, NULL);
 1233     if (ret) {
 1234         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1235                "failed to set xattr on %s in %s", loc->path, from->name);
 1236         *fop_errno = -ret;
 1237         ret = -1;
 1238         goto out;
 1239     }
 1240 
 1241     /* Reset source mode/xattr if migration fails*/
 1242     *clean_src = _gf_true;
 1243 
 1244     /* mode should be (+S+T) to indicate migration is in progress */
 1245     iatt.ia_prot = stbuf->ia_prot;
 1246     iatt.ia_type = stbuf->ia_type;
 1247     iatt.ia_prot.sticky = 1;
 1248     iatt.ia_prot.sgid = 1;
 1249 
 1250     ret = syncop_setattr(from, loc, &iatt, GF_SET_ATTR_MODE, NULL, NULL, NULL,
 1251                          NULL);
 1252     if (ret) {
 1253         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1254                "failed to set mode on %s in %s", loc->path, from->name);
 1255         *fop_errno = -ret;
 1256         ret = -1;
 1257         goto out;
 1258     }
 1259 
 1260     /* success */
 1261     ret = 0;
 1262 out:
 1263     if (dict)
 1264         dict_unref(dict);
 1265 
 1266     return ret;
 1267 }
 1268 
 1269 int
 1270 migrate_special_files(xlator_t *this, xlator_t *from, xlator_t *to, loc_t *loc,
 1271                       struct iatt *buf, int *fop_errno)
 1272 {
 1273     int ret = -1;
 1274     dict_t *rsp_dict = NULL;
 1275     dict_t *dict = NULL;
 1276     char *link = NULL;
 1277     struct iatt stbuf = {
 1278         0,
 1279     };
 1280     dht_conf_t *conf = this->private;
 1281 
 1282     dict = dict_new();
 1283     if (!dict) {
 1284         *fop_errno = ENOMEM;
 1285         ret = -1;
 1286         goto out;
 1287     }
 1288     ret = dict_set_int32(dict, conf->link_xattr_name, 256);
 1289     if (ret) {
 1290         *fop_errno = ENOMEM;
 1291         ret = -1;
 1292         gf_log(this->name, GF_LOG_ERROR,
 1293                "%s: failed to set 'linkto' key in dict", loc->path);
 1294         goto out;
 1295     }
 1296 
 1297     /* check in the destination if the file is link file */
 1298     ret = syncop_lookup(to, loc, &stbuf, NULL, dict, &rsp_dict);
 1299     if ((ret < 0) && (-ret != ENOENT)) {
 1300         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1301                "%s: lookup failed", loc->path);
 1302         *fop_errno = -ret;
 1303         ret = -1;
 1304         goto out;
 1305     }
 1306 
 1307     /* we no more require this key */
 1308     dict_del(dict, conf->link_xattr_name);
 1309 
 1310     /* file exists in target node, only if it is 'linkfile' its valid,
 1311        otherwise, error out */
 1312     if (!ret) {
 1313         if (!check_is_linkfile(loc->inode, &stbuf, rsp_dict,
 1314                                conf->link_xattr_name)) {
 1315             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1316                    "%s: file exists in destination", loc->path);
 1317             *fop_errno = EINVAL;
 1318             ret = -1;
 1319             goto out;
 1320         }
 1321 
 1322         /* as file is linkfile, delete it */
 1323         ret = syncop_unlink(to, loc, NULL, NULL);
 1324         if (ret) {
 1325             gf_msg(this->name, GF_LOG_WARNING, -ret,
 1326                    DHT_MSG_MIGRATE_FILE_FAILED,
 1327                    "%s: failed to delete the linkfile", loc->path);
 1328             *fop_errno = -ret;
 1329             ret = -1;
 1330             goto out;
 1331         }
 1332     }
 1333 
 1334     /* Set the gfid of the source file in dict */
 1335     ret = dict_set_gfuuid(dict, "gfid-req", buf->ia_gfid, true);
 1336     if (ret) {
 1337         *fop_errno = ENOMEM;
 1338         ret = -1;
 1339         gf_log(this->name, GF_LOG_ERROR,
 1340                "%s: failed to set gfid in dict for create", loc->path);
 1341         goto out;
 1342     }
 1343 
 1344     /* Create the file in target */
 1345     if (IA_ISLNK(buf->ia_type)) {
 1346         /* Handle symlinks separately */
 1347         ret = syncop_readlink(from, loc, &link, buf->ia_size, NULL, NULL);
 1348         if (ret < 0) {
 1349             gf_msg(this->name, GF_LOG_WARNING, -ret,
 1350                    DHT_MSG_MIGRATE_FILE_FAILED,
 1351                    "%s: readlink on symlink failed", loc->path);
 1352             *fop_errno = -ret;
 1353             ret = -1;
 1354             goto out;
 1355         }
 1356 
 1357         ret = syncop_symlink(to, loc, link, 0, dict, NULL);
 1358         if (ret) {
 1359             gf_msg(this->name, GF_LOG_WARNING, -ret,
 1360                    DHT_MSG_MIGRATE_FILE_FAILED, "%s: creating symlink failed",
 1361                    loc->path);
 1362             *fop_errno = -ret;
 1363             ret = -1;
 1364             goto out;
 1365         }
 1366 
 1367         goto done;
 1368     }
 1369 
 1370     ret = syncop_mknod(to, loc, st_mode_from_ia(buf->ia_prot, buf->ia_type),
 1371                        makedev(ia_major(buf->ia_rdev), ia_minor(buf->ia_rdev)),
 1372                        0, dict, NULL);
 1373     if (ret) {
 1374         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1375                "%s: mknod failed", loc->path);
 1376         *fop_errno = -ret;
 1377         ret = -1;
 1378         goto out;
 1379     }
 1380 
 1381 done:
 1382     ret = syncop_setattr(to, loc, buf,
 1383                          (GF_SET_ATTR_MTIME | GF_SET_ATTR_UID |
 1384                           GF_SET_ATTR_GID | GF_SET_ATTR_MODE),
 1385                          NULL, NULL, NULL, NULL);
 1386     if (ret) {
 1387         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1388                "%s: failed to perform setattr on %s", loc->path, to->name);
 1389         *fop_errno = -ret;
 1390     }
 1391 
 1392     ret = syncop_unlink(from, loc, NULL, NULL);
 1393     if (ret) {
 1394         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1395                "%s: unlink failed", loc->path);
 1396         *fop_errno = -ret;
 1397         ret = -1;
 1398     }
 1399 
 1400 out:
 1401     GF_FREE(link);
 1402     if (dict)
 1403         dict_unref(dict);
 1404 
 1405     if (rsp_dict)
 1406         dict_unref(rsp_dict);
 1407 
 1408     return ret;
 1409 }
 1410 
 1411 static int
 1412 __dht_migration_cleanup_src_file(xlator_t *this, loc_t *loc, fd_t *fd,
 1413                                  xlator_t *from, ia_prot_t *src_ia_prot)
 1414 {
 1415     int ret = -1;
 1416     dht_conf_t *conf = NULL;
 1417     struct iatt new_stbuf = {
 1418         0,
 1419     };
 1420 
 1421     if (!this || !fd || !from || !src_ia_prot) {
 1422         goto out;
 1423     }
 1424 
 1425     conf = this->private;
 1426 
 1427     /*Revert source mode and xattr changes*/
 1428     ret = syncop_fstat(from, fd, &new_stbuf, NULL, NULL);
 1429     if (ret < 0) {
 1430         /* Failed to get the stat info */
 1431         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1432                "Migrate file cleanup failed: failed to fstat "
 1433                "file %s on %s ",
 1434                loc->path, from->name);
 1435         ret = -1;
 1436         goto out;
 1437     }
 1438 
 1439     /* Remove the sticky bit and sgid bit set, reset it to 0*/
 1440     if (!src_ia_prot->sticky)
 1441         new_stbuf.ia_prot.sticky = 0;
 1442 
 1443     if (!src_ia_prot->sgid)
 1444         new_stbuf.ia_prot.sgid = 0;
 1445 
 1446     ret = syncop_fsetattr(from, fd, &new_stbuf,
 1447                           (GF_SET_ATTR_GID | GF_SET_ATTR_MODE), NULL, NULL,
 1448                           NULL, NULL);
 1449 
 1450     if (ret) {
 1451         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1452                "Migrate file cleanup failed:"
 1453                "%s: failed to perform fsetattr on %s ",
 1454                loc->path, from->name);
 1455         ret = -1;
 1456         goto out;
 1457     }
 1458 
 1459     ret = syncop_fremovexattr(from, fd, conf->link_xattr_name, 0, NULL);
 1460     if (ret) {
 1461         gf_log(this->name, GF_LOG_WARNING,
 1462                "%s: failed to remove linkto xattr on %s (%s)", loc->path,
 1463                from->name, strerror(-ret));
 1464         ret = -1;
 1465         goto out;
 1466     }
 1467 
 1468     ret = 0;
 1469 
 1470 out:
 1471     return ret;
 1472 }
 1473 
 1474 /*
 1475   return values:
 1476 
 1477    -1 : failure
 1478     0 : successfully migrated data
 1479     1 : not a failure, but we can't migrate data as of now
 1480 */
 1481 int
 1482 dht_migrate_file(xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
 1483                  int flag, int *fop_errno)
 1484 {
 1485     int ret = -1;
 1486     struct iatt new_stbuf = {
 1487         0,
 1488     };
 1489     struct iatt stbuf = {
 1490         0,
 1491     };
 1492     struct iatt empty_iatt = {
 1493         0,
 1494     };
 1495     ia_prot_t src_ia_prot = {
 1496         0,
 1497     };
 1498     fd_t *src_fd = NULL;
 1499     fd_t *dst_fd = NULL;
 1500     dict_t *dict = NULL;
 1501     dict_t *xattr = NULL;
 1502     dict_t *xattr_rsp = NULL;
 1503     int file_has_holes = 0;
 1504     dht_conf_t *conf = this->private;
 1505     int rcvd_enoent_from_src = 0;
 1506     struct gf_flock flock = {
 1507         0,
 1508     };
 1509     struct gf_flock plock = {
 1510         0,
 1511     };
 1512     loc_t tmp_loc = {
 1513         0,
 1514     };
 1515     loc_t parent_loc = {
 1516         0,
 1517     };
 1518     gf_boolean_t inodelk_locked = _gf_false;
 1519     gf_boolean_t entrylk_locked = _gf_false;
 1520     gf_boolean_t p_locked = _gf_false;
 1521     int lk_ret = -1;
 1522     gf_defrag_info_t *defrag = NULL;
 1523     gf_boolean_t clean_src = _gf_false;
 1524     gf_boolean_t clean_dst = _gf_false;
 1525     int log_level = GF_LOG_INFO;
 1526     gf_boolean_t delete_src_linkto = _gf_true;
 1527     lock_migration_info_t locklist;
 1528     dict_t *meta_dict = NULL;
 1529     gf_boolean_t meta_locked = _gf_false;
 1530     gf_boolean_t target_changed = _gf_false;
 1531     xlator_t *new_target = NULL;
 1532     xlator_t *old_target = NULL;
 1533     xlator_t *hashed_subvol = NULL;
 1534     fd_t *linkto_fd = NULL;
 1535     dict_t *xdata = NULL;
 1536 
 1537     if (from == to) {
 1538         gf_msg_debug(this->name, 0,
 1539                      "destination and source are same. file %s"
 1540                      " might have migrated already",
 1541                      loc->path);
 1542         ret = 0;
 1543         goto out;
 1544     }
 1545 
 1546     /* If defrag is NULL, it should be assumed that migration is triggered
 1547      * from client using the trusted.distribute.migrate-data virtual xattr
 1548      */
 1549     defrag = conf->defrag;
 1550 
 1551     /* migration of files from clients is restricted to non-tiered clients
 1552      * for now */
 1553     if (!defrag && dht_is_tier_xlator(this)) {
 1554         ret = ENOTSUP;
 1555         goto out;
 1556     }
 1557 
 1558     if (defrag && defrag->tier_conf.is_tier)
 1559         log_level = GF_LOG_TRACE;
 1560 
 1561     gf_log(this->name, log_level, "%s: attempting to move from %s to %s",
 1562            loc->path, from->name, to->name);
 1563 
 1564     dict = dict_new();
 1565     if (!dict) {
 1566         ret = -1;
 1567         *fop_errno = ENOMEM;
 1568         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY,
 1569                "Could not allocate memory for dict");
 1570         goto out;
 1571     }
 1572     ret = dict_set_int32(dict, conf->link_xattr_name, 256);
 1573     if (ret) {
 1574         *fop_errno = ENOMEM;
 1575         ret = -1;
 1576         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1577                "Migrate file failed:"
 1578                "%s: failed to set 'linkto' key in dict",
 1579                loc->path);
 1580         goto out;
 1581     }
 1582 
 1583     /* Do not migrate file in case lock migration is not enabled on the
 1584      * volume*/
 1585     if (!conf->lock_migration_enabled) {
 1586         ret = dict_set_int32(dict, GLUSTERFS_POSIXLK_COUNT, sizeof(int32_t));
 1587         if (ret) {
 1588             *fop_errno = ENOMEM;
 1589             ret = -1;
 1590             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1591                    "Migrate file failed: %s: failed to "
 1592                    "set " GLUSTERFS_POSIXLK_COUNT " key in dict",
 1593                    loc->path);
 1594             goto out;
 1595         }
 1596     } else {
 1597         gf_msg(this->name, GF_LOG_INFO, 0, 0,
 1598                "locks will be migrated"
 1599                " for file: %s",
 1600                loc->path);
 1601     }
 1602 
 1603     /* The file is locked to prevent a rename during a migration. Renames
 1604      * and migrations on the file at the same time can lead to data loss.
 1605      */
 1606 
 1607     ret = dht_build_parent_loc(this, &parent_loc, loc, fop_errno);
 1608     if (ret < 0) {
 1609         ret = -1;
 1610         gf_msg(this->name, GF_LOG_WARNING, *fop_errno,
 1611                DHT_MSG_MIGRATE_FILE_FAILED,
 1612                "%s: failed to build parent loc, which is needed to "
 1613                "acquire entrylk to synchronize with renames on this "
 1614                "path. Skipping migration",
 1615                loc->path);
 1616         goto out;
 1617     }
 1618 
 1619     hashed_subvol = dht_subvol_get_hashed(this, loc);
 1620     if (hashed_subvol == NULL) {
 1621         ret = -1;
 1622         gf_msg(this->name, GF_LOG_WARNING, EINVAL, DHT_MSG_MIGRATE_FILE_FAILED,
 1623                "%s: cannot find hashed subvol which is needed to "
 1624                "synchronize with renames on this path. "
 1625                "Skipping migration",
 1626                loc->path);
 1627         goto out;
 1628     }
 1629 
 1630     flock.l_type = F_WRLCK;
 1631 
 1632     tmp_loc.inode = inode_ref(loc->inode);
 1633     gf_uuid_copy(tmp_loc.gfid, loc->gfid);
 1634     tmp_loc.path = gf_strdup(loc->path);
 1635 
 1636     /* this inodelk happens with flock.owner being zero. But to synchronize
 1637      * hardlink migration we need to have different lkowner for each migration
 1638      * Filed a bug here: https://bugzilla.redhat.com/show_bug.cgi?id=1468202 to
 1639      * track the fix for this. Currently synclock takes care of synchronizing
 1640      * hardlink migration. Once this bug is fixed we can avoid taking synclock
 1641      */
 1642     ret = syncop_inodelk(from, DHT_FILE_MIGRATE_DOMAIN, &tmp_loc, F_SETLKW,
 1643                          &flock, NULL, NULL);
 1644     if (ret < 0) {
 1645         *fop_errno = -ret;
 1646         ret = -1;
 1647         gf_msg(this->name, GF_LOG_WARNING, *fop_errno,
 1648                DHT_MSG_MIGRATE_FILE_FAILED,
 1649                "migrate file failed: "
 1650                "%s: failed to lock file on %s",
 1651                loc->path, from->name);
 1652         goto out;
 1653     }
 1654 
 1655     inodelk_locked = _gf_true;
 1656 
 1657     /* dht_rename has changed to use entrylk on hashed subvol for
 1658      * synchronization. So, rebalance too has to acquire an entrylk on
 1659      * hashed subvol.
 1660      */
 1661     ret = syncop_entrylk(hashed_subvol, DHT_ENTRY_SYNC_DOMAIN, &parent_loc,
 1662                          loc->name, ENTRYLK_LOCK, ENTRYLK_WRLCK, NULL, NULL);
 1663     if (ret < 0) {
 1664         *fop_errno = -ret;
 1665         ret = -1;
 1666         gf_msg(this->name, GF_LOG_WARNING, *fop_errno,
 1667                DHT_MSG_MIGRATE_FILE_FAILED,
 1668                "%s: failed to acquire entrylk on subvol %s", loc->path,
 1669                hashed_subvol->name);
 1670         goto out;
 1671     }
 1672 
 1673     entrylk_locked = _gf_true;
 1674 
 1675     /* Phase 1 - Data migration is in progress from now on */
 1676     ret = syncop_lookup(from, loc, &stbuf, NULL, dict, &xattr_rsp);
 1677     if (ret) {
 1678         *fop_errno = -ret;
 1679         ret = -1;
 1680         gf_msg(this->name, GF_LOG_ERROR, *fop_errno,
 1681                DHT_MSG_MIGRATE_FILE_FAILED,
 1682                "Migrate file failed:"
 1683                "%s: lookup failed on %s",
 1684                loc->path, from->name);
 1685         goto out;
 1686     }
 1687 
 1688     /* preserve source mode, so set the same to the destination */
 1689     src_ia_prot = stbuf.ia_prot;
 1690 
 1691     /* Check if file can be migrated */
 1692     ret = __is_file_migratable(this, loc, &stbuf, xattr_rsp, flag, defrag, conf,
 1693                                fop_errno);
 1694     if (ret) {
 1695         if (ret == HARDLINK_MIG_INPROGRESS)
 1696             ret = 0;
 1697         goto out;
 1698     }
 1699 
 1700     /* Take care of the special files */
 1701     if (!IA_ISREG(stbuf.ia_type)) {
 1702         /* Special files */
 1703         ret = migrate_special_files(this, from, to, loc, &stbuf, fop_errno);
 1704         goto out;
 1705     }
 1706 
 1707     /* create the destination, with required modes/xattr */
 1708     ret = __dht_rebalance_create_dst_file(this, to, from, loc, &stbuf, &dst_fd,
 1709                                           fop_errno);
 1710     if (ret) {
 1711         gf_msg(this->name, GF_LOG_ERROR, 0, 0,
 1712                "Create dst failed"
 1713                " on - %s for file - %s",
 1714                to->name, loc->path);
 1715         goto out;
 1716     }
 1717 
 1718     clean_dst = _gf_true;
 1719 
 1720     ret = __dht_check_free_space(this, to, from, loc, &stbuf, flag, conf,
 1721                                  &target_changed, &new_target, fop_errno);
 1722     if (target_changed) {
 1723         /* Can't handle for hardlinks. Marking this as failure */
 1724         if (flag == GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS || stbuf.ia_nlink > 1) {
 1725             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SUBVOL_INSUFF_SPACE,
 1726                    "Exiting migration for"
 1727                    " file - %s. flag - %d, stbuf.ia_nlink - %d",
 1728                    loc->path, flag, stbuf.ia_nlink);
 1729             ret = -1;
 1730             goto out;
 1731         }
 1732 
 1733         ret = syncop_ftruncate(to, dst_fd, 0, NULL, NULL, NULL, NULL);
 1734         if (ret) {
 1735             gf_log(this->name, GF_LOG_WARNING,
 1736                    "%s: failed to perform truncate on %s (%s)", loc->path,
 1737                    to->name, strerror(-ret));
 1738         }
 1739 
 1740         syncop_close(dst_fd);
 1741         dst_fd = NULL;
 1742 
 1743         old_target = to;
 1744         to = new_target;
 1745 
 1746         clean_dst = _gf_false;
 1747 
 1748         /* if the file migration is successful to this new target, then
 1749          * update the xattr on the old destination to point the new
 1750          * destination. We need to do update this only post migration
 1751          * as in case of failure the linkto needs to point to the source
 1752          * subvol */
 1753         ret = __dht_rebalance_create_dst_file(this, to, from, loc, &stbuf,
 1754                                               &dst_fd, fop_errno);
 1755         if (ret) {
 1756             gf_log(this->name, GF_LOG_ERROR,
 1757                    "Create dst failed"
 1758                    " on - %s for file - %s",
 1759                    to->name, loc->path);
 1760             goto out;
 1761         } else {
 1762             gf_msg(this->name, GF_LOG_INFO, 0, 0,
 1763                    "destination for file "
 1764                    "- %s is changed to - %s",
 1765                    loc->path, to->name);
 1766             clean_dst = _gf_true;
 1767         }
 1768     }
 1769 
 1770     if (ret) {
 1771         goto out;
 1772     }
 1773 
 1774     /* Open the source, and also update mode/xattr */
 1775     ret = __dht_rebalance_open_src_file(this, from, to, loc, &stbuf, &src_fd,
 1776                                         &clean_src, fop_errno);
 1777     if (ret) {
 1778         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1779                "Migrate file failed: failed to open %s on %s", loc->path,
 1780                from->name);
 1781         goto out;
 1782     }
 1783 
 1784     /* TODO: move all xattr related operations to fd based operations */
 1785     ret = syncop_listxattr(from, loc, &xattr, NULL, NULL);
 1786     if (ret < 0) {
 1787         *fop_errno = -ret;
 1788         gf_msg(this->name, GF_LOG_WARNING, *fop_errno,
 1789                DHT_MSG_MIGRATE_FILE_FAILED,
 1790                "Migrate file failed:"
 1791                "%s: failed to get xattr from %s",
 1792                loc->path, from->name);
 1793         ret = -1;
 1794         goto out;
 1795     }
 1796 
 1797     /* Copying posix acls to the linkto file messes up the permissions*/
 1798     dht_strip_out_acls(xattr);
 1799 
 1800     /* Remove the linkto xattr as we don't want to overwrite the value
 1801      * set on the dst.
 1802      */
 1803     dict_del(xattr, conf->link_xattr_name);
 1804 
 1805     /* We need to error out if this fails as having the wrong shard xattrs
 1806      * set on the dst could cause data corruption
 1807      */
 1808     ret = syncop_fsetxattr(to, dst_fd, xattr, 0, NULL, NULL);
 1809     if (ret < 0) {
 1810         *fop_errno = -ret;
 1811         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1812                "%s: failed to set xattr on %s", loc->path, to->name);
 1813         ret = -1;
 1814         goto out;
 1815     }
 1816 
 1817     if (xattr_rsp) {
 1818         /* we no more require this key */
 1819         dict_del(dict, conf->link_xattr_name);
 1820         dict_unref(xattr_rsp);
 1821     }
 1822 
 1823     ret = syncop_fstat(from, src_fd, &stbuf, dict, &xattr_rsp);
 1824     if (ret) {
 1825         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1826                "Migrate file failed:failed to lookup %s on %s ", loc->path,
 1827                from->name);
 1828         *fop_errno = -ret;
 1829         ret = -1;
 1830         goto out;
 1831     }
 1832 
 1833     /* Check again if file has hardlink */
 1834     ret = __check_file_has_hardlink(this, loc, &stbuf, xattr_rsp, flag, defrag,
 1835                                     conf, fop_errno);
 1836     if (ret) {
 1837         if (ret == HARDLINK_MIG_INPROGRESS)
 1838             ret = 0;
 1839         goto out;
 1840     }
 1841     /* Try to preserve 'holes' while migrating data */
 1842     if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE))
 1843         file_has_holes = 1;
 1844 
 1845     ret = __dht_rebalance_migrate_data(this, defrag, from, to, src_fd, dst_fd,
 1846                                        stbuf.ia_size, file_has_holes,
 1847                                        fop_errno);
 1848     if (ret) {
 1849         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1850                "Migrate file failed: %s: failed to migrate data", loc->path);
 1851 
 1852         ret = -1;
 1853         goto out;
 1854     }
 1855 
 1856     /* TODO: Sync the locks */
 1857 
 1858     xdata = dict_new();
 1859     if (!xdata || dict_set_int8(xdata, "last-fsync", 1)) {
 1860         gf_log(this->name, GF_LOG_ERROR,
 1861                "%s: failed to set last-fsync flag on "
 1862                "%s (%s)",
 1863                loc->path, to->name, strerror(ENOMEM));
 1864     }
 1865 
 1866     ret = syncop_fsync(to, dst_fd, 0, NULL, NULL, xdata, NULL);
 1867     if (ret) {
 1868         gf_log(this->name, GF_LOG_WARNING, "%s: failed to fsync on %s (%s)",
 1869                loc->path, to->name, strerror(-ret));
 1870         *fop_errno = -ret;
 1871     }
 1872 
 1873     /* Phase 2 - Data-Migration Complete, Housekeeping updates pending */
 1874 
 1875     ret = syncop_fstat(from, src_fd, &new_stbuf, NULL, NULL);
 1876     if (ret < 0) {
 1877         /* Failed to get the stat info */
 1878         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1879                "Migrate file failed: failed to fstat file %s on %s ", loc->path,
 1880                from->name);
 1881         *fop_errno = -ret;
 1882         ret = -1;
 1883         goto out;
 1884     }
 1885 
 1886     /* Lock the entire source file to prevent clients from taking a
 1887        lock on it as dht_lk does not handle file migration.
 1888 
 1889        This still leaves a small window where conflicting locks can
 1890        be granted to different clients. If client1 requests a blocking
 1891        lock on the src file, it will be granted after the migrating
 1892        process releases its lock. If client2 requests a lock on the dst
 1893        data file, it will also be granted, but all FOPs will be redirected
 1894        to the dst data file.
 1895     */
 1896 
 1897     /* Take meta lock  */
 1898 
 1899     if (conf->lock_migration_enabled) {
 1900         meta_dict = dict_new();
 1901         if (!meta_dict) {
 1902             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1903                    "dict_new failed");
 1904 
 1905             *fop_errno = ENOMEM;
 1906             ret = -1;
 1907             goto out;
 1908         }
 1909 
 1910         ret = dict_set_str(meta_dict, GLUSTERFS_INTERNAL_FOP_KEY, "yes");
 1911         if (ret) {
 1912             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
 1913                    "Failed to set dictionary value: key = %s,"
 1914                    " path = %s",
 1915                    GLUSTERFS_INTERNAL_FOP_KEY, loc->path);
 1916             *fop_errno = ENOMEM;
 1917             ret = -1;
 1918             goto out;
 1919         }
 1920 
 1921         ret = dict_set_int32(meta_dict, GF_META_LOCK_KEY, 1);
 1922         if (ret) {
 1923             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 1924                    "Trace dict_set failed");
 1925             *fop_errno = ENOMEM;
 1926             ret = -1;
 1927             goto out;
 1928         }
 1929 
 1930         ret = syncop_setxattr(from, loc, meta_dict, 0, NULL, NULL);
 1931         if (ret) {
 1932             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1933                    "Trace syncop_setxattr metalock failed");
 1934 
 1935             *fop_errno = -ret;
 1936             ret = -1;
 1937             goto out;
 1938         } else {
 1939             meta_locked = _gf_true;
 1940         }
 1941     }
 1942 
 1943     if (!conf->lock_migration_enabled) {
 1944         plock.l_type = F_WRLCK;
 1945         plock.l_start = 0;
 1946         plock.l_len = 0;
 1947         plock.l_whence = SEEK_SET;
 1948 
 1949         ret = syncop_lk(from, src_fd, F_SETLK, &plock, NULL, NULL);
 1950         if (ret) {
 1951             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 1952                    "Migrate file failed:"
 1953                    "%s: Failed to lock on %s",
 1954                    loc->path, from->name);
 1955             *fop_errno = -ret;
 1956             ret = -1;
 1957             goto out;
 1958         }
 1959 
 1960         p_locked = _gf_true;
 1961 
 1962     } else {
 1963         INIT_LIST_HEAD(&locklist.list);
 1964 
 1965         ret = syncop_getactivelk(from, loc, &locklist, NULL, NULL);
 1966         if (ret == 0) {
 1967             gf_log(this->name, GF_LOG_INFO, "No active locks on:%s", loc->path);
 1968 
 1969         } else if (ret > 0) {
 1970             ret = syncop_setactivelk(to, loc, &locklist, NULL, NULL);
 1971             if (ret) {
 1972                 gf_msg(this->name, GF_LOG_ERROR, -ret,
 1973                        DHT_MSG_LOCK_MIGRATION_FAILED, "write lock failed on:%s",
 1974                        loc->path);
 1975 
 1976                 *fop_errno = -ret;
 1977                 ret = -1;
 1978                 goto metaunlock;
 1979             }
 1980         } else {
 1981             gf_msg(this->name, GF_LOG_ERROR, -ret,
 1982                    DHT_MSG_LOCK_MIGRATION_FAILED,
 1983                    "getactivelk failed for file: %s", loc->path);
 1984             *fop_errno = -ret;
 1985         }
 1986     }
 1987 
 1988     /* source would have both sticky bit and sgid bit set, reset it to 0,
 1989        and set the source permission on destination, if it was not set
 1990        prior to setting rebalance-modes in source  */
 1991     if (!src_ia_prot.sticky)
 1992         new_stbuf.ia_prot.sticky = 0;
 1993 
 1994     if (!src_ia_prot.sgid)
 1995         new_stbuf.ia_prot.sgid = 0;
 1996 
 1997     /* TODO: if the source actually had sticky bit, or sgid bit set,
 1998        we are not handling it */
 1999 
 2000     ret = syncop_fsetattr(
 2001         to, dst_fd, &new_stbuf,
 2002         (GF_SET_ATTR_UID | GF_SET_ATTR_GID | GF_SET_ATTR_MODE), NULL, NULL,
 2003         NULL, NULL);
 2004     if (ret) {
 2005         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 2006                "Migrate file failed:"
 2007                "%s: failed to perform setattr on %s ",
 2008                loc->path, to->name);
 2009         *fop_errno = -ret;
 2010         ret = -1;
 2011         goto metaunlock;
 2012     }
 2013 
 2014     /* Because 'futimes' is not portable */
 2015     ret = syncop_setattr(to, loc, &new_stbuf,
 2016                          (GF_SET_ATTR_MTIME | GF_SET_ATTR_ATIME), NULL, NULL,
 2017                          NULL, NULL);
 2018     if (ret) {
 2019         gf_log(this->name, GF_LOG_WARNING,
 2020                "%s: failed to perform setattr on %s ", loc->path, to->name);
 2021         *fop_errno = -ret;
 2022     }
 2023 
 2024     if (target_changed) {
 2025         dict_del(dict, GLUSTERFS_POSIXLK_COUNT);
 2026         ret = dict_set_str(dict, conf->link_xattr_name, to->name);
 2027         if (ret) {
 2028             gf_log(this->name, GF_LOG_ERROR,
 2029                    "failed to set xattr in dict for %s (linkto:%s)", loc->path,
 2030                    to->name);
 2031             *fop_errno = ENOMEM;
 2032             ret = -1;
 2033             goto out;
 2034         }
 2035 
 2036         ret = syncop_setxattr(old_target, loc, dict, 0, NULL, NULL);
 2037         if (ret && -ret != ESTALE && -ret != ENOENT) {
 2038             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 2039                    "failed to set xattr on %s in %s", loc->path,
 2040                    old_target->name);
 2041             *fop_errno = -ret;
 2042             ret = -1;
 2043             goto out;
 2044         } else if (-ret == ESTALE || -ret == ENOENT) {
 2045             /* The failure ESTALE indicates that the linkto
 2046              * file on the hashed subvol might have been deleted.
 2047              * In this case will create a linkto file with new target
 2048              * as linkto xattr value*/
 2049             linkto_fd = fd_create(loc->inode, DHT_REBALANCE_PID);
 2050             if (!linkto_fd) {
 2051                 gf_msg(this->name, GF_LOG_ERROR, errno,
 2052                        DHT_MSG_MIGRATE_FILE_FAILED, "%s: fd create failed",
 2053                        loc->path);
 2054                 *fop_errno = ENOMEM;
 2055                 ret = -1;
 2056                 goto out;
 2057             }
 2058             ret = syncop_create(old_target, loc, O_RDWR, DHT_LINKFILE_MODE,
 2059                                 linkto_fd, NULL, dict, NULL);
 2060             if (ret != 0 && -ret != EEXIST && -ret != ESTALE) {
 2061                 *fop_errno = -ret;
 2062                 ret = -1;
 2063                 gf_msg(this->name, GF_LOG_ERROR, -ret,
 2064                        DHT_MSG_MIGRATE_FILE_FAILED,
 2065                        "failed to create linkto file on %s in %s", loc->path,
 2066                        old_target->name);
 2067                 goto out;
 2068             } else if (ret == 0) {
 2069                 ret = syncop_fsetattr(old_target, linkto_fd, &stbuf,
 2070                                       (GF_SET_ATTR_UID | GF_SET_ATTR_GID), NULL,
 2071                                       NULL, NULL, NULL);
 2072                 if (ret < 0) {
 2073                     *fop_errno = -ret;
 2074                     gf_msg(this->name, GF_LOG_ERROR, -ret,
 2075                            DHT_MSG_MIGRATE_FILE_FAILED,
 2076                            "chown failed for %s on %s", loc->path,
 2077                            old_target->name);
 2078                 }
 2079             }
 2080         }
 2081     }
 2082 
 2083     clean_dst = _gf_false;
 2084 
 2085     /* Posix acls are not set on DHT linkto files as part of the initial
 2086      * initial xattrs set on the dst file, so these need
 2087      * to be set on the dst file after the linkto attrs are removed.
 2088      * TODO: Optimize this.
 2089      */
 2090     if (xattr) {
 2091         dict_unref(xattr);
 2092         xattr = NULL;
 2093     }
 2094 
 2095     /* Set only the Posix ACLs this time */
 2096     ret = syncop_getxattr(from, loc, &xattr, POSIX_ACL_ACCESS_XATTR, NULL,
 2097                           NULL);
 2098     if (ret < 0) {
 2099         if ((-ret != ENODATA) && (-ret != ENOATTR)) {
 2100             gf_msg(this->name, GF_LOG_WARNING, -ret,
 2101                    DHT_MSG_MIGRATE_FILE_FAILED,
 2102                    "Migrate file failed:"
 2103                    "%s: failed to get xattr from %s",
 2104                    loc->path, from->name);
 2105             *fop_errno = -ret;
 2106         }
 2107     } else {
 2108         ret = syncop_setxattr(to, loc, xattr, 0, NULL, NULL);
 2109         if (ret < 0) {
 2110             /* Potential problem here where Posix ACLs will
 2111              * not be set on the target file */
 2112 
 2113             gf_msg(this->name, GF_LOG_WARNING, -ret,
 2114                    DHT_MSG_MIGRATE_FILE_FAILED,
 2115                    "Migrate file failed:"
 2116                    "%s: failed to set xattr on %s",
 2117                    loc->path, to->name);
 2118             *fop_errno = -ret;
 2119         }
 2120     }
 2121 
 2122     /* The src file is being unlinked after this so we don't need
 2123        to clean it up */
 2124     clean_src = _gf_false;
 2125 
 2126     /* Make the source as a linkfile first before deleting it */
 2127     empty_iatt.ia_prot.sticky = 1;
 2128     ret = syncop_fsetattr(from, src_fd, &empty_iatt, GF_SET_ATTR_MODE, NULL,
 2129                           NULL, NULL, NULL);
 2130     if (ret) {
 2131         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 2132                "Migrate file failed:"
 2133                "%s: failed to perform setattr on %s ",
 2134                loc->path, from->name);
 2135         *fop_errno = -ret;
 2136         ret = -1;
 2137         goto metaunlock;
 2138     }
 2139 
 2140     /* Free up the data blocks on the source node, as the whole
 2141         file is migrated */
 2142     ret = syncop_ftruncate(from, src_fd, 0, NULL, NULL, NULL, NULL);
 2143     if (ret) {
 2144         gf_log(this->name, GF_LOG_WARNING,
 2145                "%s: failed to perform truncate on %s (%s)", loc->path,
 2146                from->name, strerror(-ret));
 2147         *fop_errno = -ret;
 2148     }
 2149 
 2150     /* remove the 'linkto' xattr from the destination */
 2151     ret = syncop_fremovexattr(to, dst_fd, conf->link_xattr_name, 0, NULL);
 2152     if (ret) {
 2153         gf_log(this->name, GF_LOG_WARNING,
 2154                "%s: failed to perform removexattr on %s (%s)", loc->path,
 2155                to->name, strerror(-ret));
 2156         *fop_errno = -ret;
 2157     }
 2158 
 2159     /* Do a stat and check the gfid before unlink */
 2160 
 2161     /*
 2162      * Cached file changes its state from non-linkto to linkto file after
 2163      * migrating data. If lookup from any other mount-point is performed,
 2164      * converted-linkto-cached file will be treated as a stale and will be
 2165      * unlinked. But by this time, file is already migrated. So further
 2166      * failure because of ENOENT should  not be treated as error
 2167      */
 2168 
 2169     ret = syncop_stat(from, loc, &empty_iatt, NULL, NULL);
 2170     if (ret) {
 2171         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 2172                "%s: failed to do a stat on %s", loc->path, from->name);
 2173 
 2174         if (-ret != ENOENT) {
 2175             *fop_errno = -ret;
 2176             ret = -1;
 2177             goto metaunlock;
 2178         }
 2179 
 2180         rcvd_enoent_from_src = 1;
 2181     }
 2182 
 2183     if ((gf_uuid_compare(empty_iatt.ia_gfid, loc->gfid) == 0) &&
 2184         (!rcvd_enoent_from_src) && delete_src_linkto) {
 2185         /* take out the source from namespace */
 2186         ret = syncop_unlink(from, loc, NULL, NULL);
 2187         if (ret) {
 2188             gf_msg(this->name, GF_LOG_WARNING, -ret,
 2189                    DHT_MSG_MIGRATE_FILE_FAILED,
 2190                    "%s: failed to perform unlink on %s", loc->path, from->name);
 2191             *fop_errno = -ret;
 2192             ret = -1;
 2193             goto metaunlock;
 2194         }
 2195     }
 2196 
 2197     ret = syncop_lookup(this, loc, NULL, NULL, NULL, NULL);
 2198     if (ret) {
 2199         gf_msg_debug(this->name, -ret,
 2200                      "%s: failed to lookup the file on subvolumes", loc->path);
 2201         *fop_errno = -ret;
 2202     }
 2203 
 2204     gf_msg(this->name, log_level, 0, DHT_MSG_MIGRATE_FILE_COMPLETE,
 2205            "completed migration of %s from subvolume %s to %s", loc->path,
 2206            from->name, to->name);
 2207 
 2208     ret = 0;
 2209 
 2210 metaunlock:
 2211 
 2212     if (conf->lock_migration_enabled && meta_locked) {
 2213         dict_del(meta_dict, GF_META_LOCK_KEY);
 2214 
 2215         ret = dict_set_int32(meta_dict, GF_META_UNLOCK_KEY, 1);
 2216         if (ret) {
 2217             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 2218                    "Trace dict_set failed");
 2219 
 2220             *fop_errno = ENOMEM;
 2221             ret = -1;
 2222             goto out;
 2223         }
 2224 
 2225         if (clean_dst == _gf_false)
 2226             ret = dict_set_int32(meta_dict, "status", 1);
 2227         else
 2228             ret = dict_set_int32(meta_dict, "status", 0);
 2229 
 2230         if (ret) {
 2231             gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 2232                    "Trace dict_set failed");
 2233 
 2234             *fop_errno = ENOMEM;
 2235             ret = -1;
 2236             goto out;
 2237         }
 2238 
 2239         ret = syncop_setxattr(from, loc, meta_dict, 0, NULL, NULL);
 2240         if (ret) {
 2241             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 2242                    "Trace syncop_setxattr meta unlock failed");
 2243 
 2244             *fop_errno = -ret;
 2245             ret = -1;
 2246             goto out;
 2247         }
 2248     }
 2249 
 2250 out:
 2251     if (clean_src) {
 2252         /* Revert source mode and xattr changes*/
 2253         lk_ret = __dht_migration_cleanup_src_file(this, loc, src_fd, from,
 2254                                                   &src_ia_prot);
 2255         if (lk_ret) {
 2256             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED,
 2257                    "%s: failed to cleanup source file on %s", loc->path,
 2258                    from->name);
 2259         }
 2260     }
 2261 
 2262     /* reset the destination back to 0 */
 2263     if (clean_dst) {
 2264         lk_ret = syncop_ftruncate(to, dst_fd, 0, NULL, NULL, NULL, NULL);
 2265         if (lk_ret) {
 2266             gf_msg(this->name, GF_LOG_ERROR, -lk_ret,
 2267                    DHT_MSG_MIGRATE_FILE_FAILED,
 2268                    "Migrate file failed: "
 2269                    "%s: failed to reset target size back to 0",
 2270                    loc->path);
 2271         }
 2272     }
 2273 
 2274     if (inodelk_locked) {
 2275         flock.l_type = F_UNLCK;
 2276 
 2277         lk_ret = syncop_inodelk(from, DHT_FILE_MIGRATE_DOMAIN, &tmp_loc,
 2278                                 F_SETLK, &flock, NULL, NULL);
 2279         if (lk_ret < 0) {
 2280             gf_msg(this->name, GF_LOG_WARNING, -lk_ret,
 2281                    DHT_MSG_MIGRATE_FILE_FAILED,
 2282                    "%s: failed to unlock file on %s", loc->path, from->name);
 2283         }
 2284     }
 2285 
 2286     if (entrylk_locked) {
 2287         lk_ret = syncop_entrylk(hashed_subvol, DHT_ENTRY_SYNC_DOMAIN,
 2288                                 &parent_loc, loc->name, ENTRYLK_UNLOCK,
 2289                                 ENTRYLK_UNLOCK, NULL, NULL);
 2290         if (lk_ret < 0) {
 2291             gf_msg(this->name, GF_LOG_WARNING, -lk_ret,
 2292                    DHT_MSG_MIGRATE_FILE_FAILED,
 2293                    "%s: failed to unlock entrylk on %s", loc->path,
 2294                    hashed_subvol->name);
 2295         }
 2296     }
 2297 
 2298     if (p_locked) {
 2299         plock.l_type = F_UNLCK;
 2300         lk_ret = syncop_lk(from, src_fd, F_SETLK, &plock, NULL, NULL);
 2301 
 2302         if (lk_ret < 0) {
 2303             gf_msg(this->name, GF_LOG_WARNING, -lk_ret,
 2304                    DHT_MSG_MIGRATE_FILE_FAILED,
 2305                    "%s: failed to unlock file on %s", loc->path, from->name);
 2306         }
 2307     }
 2308 
 2309     if (!dht_is_tier_xlator(this)) {
 2310         lk_ret = syncop_removexattr(to, loc, GF_PROTECT_FROM_EXTERNAL_WRITES,
 2311                                     NULL, NULL);
 2312         if (lk_ret && (lk_ret != -ENODATA) && (lk_ret != -ENOATTR)) {
 2313             gf_msg(this->name, GF_LOG_WARNING, -lk_ret, 0,
 2314                    "%s: removexattr failed key %s", loc->path,
 2315                    GF_PROTECT_FROM_EXTERNAL_WRITES);
 2316         }
 2317     }
 2318 
 2319     if (dict)
 2320         dict_unref(dict);
 2321 
 2322     if (xattr)
 2323         dict_unref(xattr);
 2324     if (xattr_rsp)
 2325         dict_unref(xattr_rsp);
 2326 
 2327     if (dst_fd)
 2328         syncop_close(dst_fd);
 2329 
 2330     if (src_fd)
 2331         syncop_close(src_fd);
 2332     if (linkto_fd)
 2333         syncop_close(linkto_fd);
 2334 
 2335     if (xdata)
 2336         dict_unref(xdata);
 2337 
 2338     loc_wipe(&tmp_loc);
 2339     loc_wipe(&parent_loc);
 2340 
 2341     return ret;
 2342 }
 2343 
 2344 static int
 2345 rebalance_task(void *data)
 2346 {
 2347     int ret = -1;
 2348     dht_local_t *local = NULL;
 2349     call_frame_t *frame = NULL;
 2350     int fop_errno = 0;
 2351 
 2352     frame = data;
 2353 
 2354     local = frame->local;
 2355 
 2356     /* This function is 'synchrounous', hence if it returns,
 2357        we are done with the task */
 2358     ret = dht_migrate_file(THIS, &local->loc, local->rebalance.from_subvol,
 2359                            local->rebalance.target_node, local->flags,
 2360                            &fop_errno);
 2361 
 2362     return ret;
 2363 }
 2364 
 2365 static int
 2366 rebalance_task_completion(int op_ret, call_frame_t *sync_frame, void *data)
 2367 {
 2368     int32_t op_errno = EINVAL;
 2369 
 2370     if (op_ret == -1) {
 2371         /* Failure of migration process, mostly due to write process.
 2372            as we can't preserve the exact errno, lets say there was
 2373            no space to migrate-data
 2374         */
 2375         op_errno = ENOSPC;
 2376     } else if (op_ret == 1) {
 2377         /* migration didn't happen, but is not a failure, let the user
 2378            understand that he doesn't have permission to migrate the
 2379            file.
 2380         */
 2381         op_ret = -1;
 2382         op_errno = EPERM;
 2383     } else if (op_ret != 0) {
 2384         op_errno = -op_ret;
 2385         op_ret = -1;
 2386     }
 2387 
 2388     DHT_STACK_UNWIND(setxattr, sync_frame, op_ret, op_errno, NULL);
 2389     return 0;
 2390 }
 2391 
 2392 int
 2393 dht_start_rebalance_task(xlator_t *this, call_frame_t *frame)
 2394 {
 2395     int ret = -1;
 2396 
 2397     ret = synctask_new(this->ctx->env, rebalance_task,
 2398                        rebalance_task_completion, frame, frame);
 2399     return ret;
 2400 }
 2401 
 2402 int
 2403 gf_listener_stop(xlator_t *this)
 2404 {
 2405     glusterfs_ctx_t *ctx = NULL;
 2406     cmd_args_t *cmd_args = NULL;
 2407     int ret = 0;
 2408 
 2409     ctx = this->ctx;
 2410     GF_ASSERT(ctx);
 2411     cmd_args = &ctx->cmd_args;
 2412     if (cmd_args->sock_file) {
 2413         ret = sys_unlink(cmd_args->sock_file);
 2414         if (ret && (ENOENT == errno)) {
 2415             ret = 0;
 2416         }
 2417     }
 2418 
 2419     if (ret) {
 2420         gf_msg(this->name, GF_LOG_ERROR, errno, DHT_MSG_SOCKET_ERROR,
 2421                "Failed to unlink listener "
 2422                "socket %s",
 2423                cmd_args->sock_file);
 2424     }
 2425     return ret;
 2426 }
 2427 
 2428 void
 2429 dht_build_root_inode(xlator_t *this, inode_t **inode)
 2430 {
 2431     inode_table_t *itable = NULL;
 2432     static uuid_t root_gfid = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1};
 2433 
 2434     itable = inode_table_new(0, this);
 2435     if (!itable)
 2436         return;
 2437 
 2438     *inode = inode_find(itable, root_gfid);
 2439 }
 2440 
 2441 void
 2442 dht_build_root_loc(inode_t *inode, loc_t *loc)
 2443 {
 2444     loc->path = "/";
 2445     loc->inode = inode;
 2446     loc->inode->ia_type = IA_IFDIR;
 2447     memset(loc->gfid, 0, 16);
 2448     loc->gfid[15] = 1;
 2449 }
 2450 
 2451 /* return values: 1 -> error, bug ignore and continue
 2452                   0 -> proceed
 2453                  -1 -> error, handle it */
 2454 int32_t
 2455 gf_defrag_handle_migrate_error(int32_t op_errno, gf_defrag_info_t *defrag)
 2456 {
 2457     int ret = 0;
 2458     /* if errno is not ENOTCONN, we can still continue
 2459        with rebalance process */
 2460     if (op_errno != ENOTCONN) {
 2461         ret = 1;
 2462         goto out;
 2463     }
 2464 
 2465     if (op_errno == ENOTCONN) {
 2466         /* Most probably mount point went missing (mostly due
 2467            to a brick down), say rebalance failure to user,
 2468            let him restart it if everything is fine */
 2469         defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 2470         ret = -1;
 2471         goto out;
 2472     }
 2473 
 2474 out:
 2475     return ret;
 2476 }
 2477 
 2478 static gf_boolean_t
 2479 gf_defrag_pattern_match(gf_defrag_info_t *defrag, char *name, uint64_t size)
 2480 {
 2481     gf_defrag_pattern_list_t *trav = NULL;
 2482     gf_boolean_t match = _gf_false;
 2483     gf_boolean_t ret = _gf_false;
 2484 
 2485     GF_VALIDATE_OR_GOTO("dht", defrag, out);
 2486 
 2487     trav = defrag->defrag_pattern;
 2488     while (trav) {
 2489         if (!fnmatch(trav->path_pattern, name, FNM_NOESCAPE)) {
 2490             match = _gf_true;
 2491             break;
 2492         }
 2493         trav = trav->next;
 2494     }
 2495 
 2496     if ((match == _gf_true) && (size >= trav->size))
 2497         ret = _gf_true;
 2498 
 2499 out:
 2500     return ret;
 2501 }
 2502 
 2503 int
 2504 dht_dfreaddirp_done(dht_dfoffset_ctx_t *offset_var, int cnt)
 2505 {
 2506     int i;
 2507     int result = 1;
 2508 
 2509     for (i = 0; i < cnt; i++) {
 2510         if (offset_var[i].readdir_done == 0) {
 2511             result = 0;
 2512             break;
 2513         }
 2514     }
 2515     return result;
 2516 }
 2517 
 2518 int static gf_defrag_ctx_subvols_init(dht_dfoffset_ctx_t *offset_var,
 2519                                       xlator_t *this)
 2520 {
 2521     int i;
 2522     dht_conf_t *conf = NULL;
 2523 
 2524     conf = this->private;
 2525 
 2526     if (!conf)
 2527         return -1;
 2528 
 2529     for (i = 0; i < conf->local_subvols_cnt; i++) {
 2530         offset_var[i].this = conf->local_subvols[i];
 2531         offset_var[i].offset = (off_t)0;
 2532         offset_var[i].readdir_done = 0;
 2533     }
 2534 
 2535     return 0;
 2536 }
 2537 
 2538 static int
 2539 dht_get_first_non_null_index(subvol_nodeuuids_info_t *entry)
 2540 {
 2541     int i = 0;
 2542     int index = 0;
 2543 
 2544     for (i = 0; i < entry->count; i++) {
 2545         if (!gf_uuid_is_null(entry->elements[i].uuid)) {
 2546             index = i;
 2547             goto out;
 2548         }
 2549     }
 2550 
 2551     if (i == entry->count) {
 2552         index = -1;
 2553     }
 2554 out:
 2555     return index;
 2556 }
 2557 
 2558 /* Return value
 2559  * 0 : this node does not migrate the file
 2560  * 1 : this node migrates the file
 2561  *
 2562  * Use the hash value of the gfid to determine which node will migrate files.
 2563  * Using the gfid instead of the name also ensures that the same node handles
 2564  * all hardlinks.
 2565  */
 2566 
 2567 int
 2568 gf_defrag_should_i_migrate(xlator_t *this, int local_subvol_index, uuid_t gfid)
 2569 {
 2570     int ret = 0;
 2571     int i = local_subvol_index;
 2572     char *str = NULL;
 2573     uint32_t hashval = 0;
 2574     int32_t index = 0;
 2575     dht_conf_t *conf = NULL;
 2576     char buf[UUID_CANONICAL_FORM_LEN + 1] = {
 2577         0,
 2578     };
 2579     subvol_nodeuuids_info_t *entry = NULL;
 2580 
 2581     conf = this->private;
 2582 
 2583     /* Pure distribute. A subvol in this case
 2584         will be handled by only one node */
 2585 
 2586     entry = &(conf->local_nodeuuids[i]);
 2587     if (entry->count == 1) {
 2588         return 1;
 2589     }
 2590 
 2591     str = uuid_utoa_r(gfid, buf);
 2592     ret = dht_hash_compute(this, 0, str, &hashval);
 2593     if (ret == 0) {
 2594         index = (hashval % entry->count);
 2595         if (entry->elements[index].info == REBAL_NODEUUID_MINE) {
 2596             /* Index matches this node's nodeuuid.*/
 2597             ret = 1;
 2598             goto out;
 2599         }
 2600 
 2601         /* Brick down - some other node has to migrate these files*/
 2602         if (gf_uuid_is_null(entry->elements[index].uuid)) {
 2603             /* Fall back to the first non-null index */
 2604             index = dht_get_first_non_null_index(entry);
 2605 
 2606             if (index == -1) {
 2607                 /* None of the bricks in the subvol are up.
 2608                  * CHILD_DOWN will kill the process soon */
 2609 
 2610                 return 0;
 2611             }
 2612 
 2613             if (entry->elements[index].info == REBAL_NODEUUID_MINE) {
 2614                 /* Index matches this node's nodeuuid.*/
 2615                 ret = 1;
 2616                 goto out;
 2617             }
 2618         }
 2619     }
 2620 out:
 2621     return ret;
 2622 }
 2623 
 2624 int
 2625 gf_defrag_migrate_single_file(void *opaque)
 2626 {
 2627     xlator_t *this = NULL;
 2628     dht_conf_t *conf = NULL;
 2629     gf_defrag_info_t *defrag = NULL;
 2630     int ret = 0;
 2631     gf_dirent_t *entry = NULL;
 2632     struct timeval start = {
 2633         0,
 2634     };
 2635     loc_t entry_loc = {
 2636         0,
 2637     };
 2638     loc_t *loc = NULL;
 2639     struct iatt iatt = {
 2640         0,
 2641     };
 2642     dict_t *migrate_data = NULL;
 2643     struct timeval end = {
 2644         0,
 2645     };
 2646     double elapsed = {
 2647         0,
 2648     };
 2649     struct dht_container *rebal_entry = NULL;
 2650     inode_t *inode = NULL;
 2651     xlator_t *hashed_subvol = NULL;
 2652     xlator_t *cached_subvol = NULL;
 2653     call_frame_t *statfs_frame = NULL;
 2654     xlator_t *old_THIS = NULL;
 2655     data_t *tmp = NULL;
 2656     int fop_errno = 0;
 2657     gf_dht_migrate_data_type_t rebal_type = GF_DHT_MIGRATE_DATA;
 2658     char value[MAX_REBAL_TYPE_SIZE] = {
 2659         0,
 2660     };
 2661     struct iatt *iatt_ptr = NULL;
 2662     gf_boolean_t update_skippedcount = _gf_true;
 2663     int i = 0;
 2664 
 2665     rebal_entry = (struct dht_container *)opaque;
 2666     if (!rebal_entry) {
 2667         gf_log("DHT", GF_LOG_ERROR, "rebal_entry is NULL");
 2668         ret = -1;
 2669         goto out;
 2670     }
 2671 
 2672     this = rebal_entry->this;
 2673 
 2674     conf = this->private;
 2675 
 2676     defrag = conf->defrag;
 2677 
 2678     loc = rebal_entry->parent_loc;
 2679 
 2680     migrate_data = rebal_entry->migrate_data;
 2681 
 2682     entry = rebal_entry->df_entry;
 2683     iatt_ptr = &entry->d_stat;
 2684 
 2685     if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
 2686         ret = -1;
 2687         goto out;
 2688     }
 2689 
 2690     if (defrag->stats == _gf_true) {
 2691         gettimeofday(&start, NULL);
 2692     }
 2693 
 2694     if (defrag->defrag_pattern &&
 2695         (gf_defrag_pattern_match(defrag, entry->d_name,
 2696                                  entry->d_stat.ia_size) == _gf_false)) {
 2697         gf_log(this->name, GF_LOG_ERROR, "pattern_match failed");
 2698         goto out;
 2699     }
 2700 
 2701     memset(&entry_loc, 0, sizeof(entry_loc));
 2702 
 2703     ret = dht_build_child_loc(this, &entry_loc, loc, entry->d_name);
 2704     if (ret) {
 2705         LOCK(&defrag->lock);
 2706         {
 2707             defrag->total_failures += 1;
 2708         }
 2709         UNLOCK(&defrag->lock);
 2710 
 2711         ret = 0;
 2712 
 2713         gf_log(this->name, GF_LOG_ERROR, "Child loc build failed");
 2714 
 2715         goto out;
 2716     }
 2717 
 2718     gf_uuid_copy(entry_loc.gfid, entry->d_stat.ia_gfid);
 2719 
 2720     gf_uuid_copy(entry_loc.pargfid, loc->gfid);
 2721 
 2722     ret = syncop_lookup(this, &entry_loc, &iatt, NULL, NULL, NULL);
 2723     if (ret) {
 2724         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED,
 2725                "Migrate file failed: %s lookup failed", entry_loc.path);
 2726 
 2727         /* Increase failure count only for remove-brick op, so that
 2728          * user is warned to check the removed-brick for any files left
 2729          * unmigrated
 2730          */
 2731         if (conf->decommission_subvols_cnt) {
 2732             LOCK(&defrag->lock);
 2733             {
 2734                 defrag->total_failures += 1;
 2735             }
 2736             UNLOCK(&defrag->lock);
 2737         }
 2738 
 2739         ret = 0;
 2740         goto out;
 2741     }
 2742 
 2743     if (!gf_defrag_should_i_migrate(this, rebal_entry->local_subvol_index,
 2744                                     entry->d_stat.ia_gfid)) {
 2745         gf_msg_debug(this->name, 0, "Don't migrate %s ", entry_loc.path);
 2746         goto out;
 2747     }
 2748 
 2749     iatt_ptr = &iatt;
 2750 
 2751     hashed_subvol = dht_subvol_get_hashed(this, &entry_loc);
 2752     if (!hashed_subvol) {
 2753         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_HASHED_SUBVOL_GET_FAILED,
 2754                "Failed to get hashed subvol for %s", entry_loc.path);
 2755         ret = 0;
 2756         goto out;
 2757     }
 2758 
 2759     cached_subvol = dht_subvol_get_cached(this, entry_loc.inode);
 2760     if (!cached_subvol) {
 2761         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_CACHED_SUBVOL_GET_FAILED,
 2762                "Failed to get cached subvol for %s", entry_loc.path);
 2763 
 2764         ret = 0;
 2765         goto out;
 2766     }
 2767 
 2768     if (hashed_subvol == cached_subvol) {
 2769         ret = 0;
 2770         goto out;
 2771     }
 2772 
 2773     inode = inode_link(entry_loc.inode, entry_loc.parent, entry->d_name, &iatt);
 2774     inode_unref(entry_loc.inode);
 2775     /* use the inode returned by inode_link */
 2776     entry_loc.inode = inode;
 2777 
 2778     old_THIS = THIS;
 2779     THIS = this;
 2780     statfs_frame = create_frame(this, this->ctx->pool);
 2781     if (!statfs_frame) {
 2782         gf_msg(this->name, GF_LOG_ERROR, DHT_MSG_NO_MEMORY, ENOMEM,
 2783                "Insufficient memory. Frame creation failed");
 2784         ret = -1;
 2785         goto out;
 2786     }
 2787 
 2788     /* async statfs information for honoring min-free-disk */
 2789     dht_get_du_info(statfs_frame, this, loc);
 2790     THIS = old_THIS;
 2791 
 2792     tmp = dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY);
 2793     if (tmp) {
 2794         memcpy(value, tmp->data, tmp->len);
 2795         if (strcmp(value, "force") == 0)
 2796             rebal_type = GF_DHT_MIGRATE_DATA_EVEN_IF_LINK_EXISTS;
 2797 
 2798         if (conf->decommission_in_progress)
 2799             rebal_type = GF_DHT_MIGRATE_HARDLINK;
 2800     }
 2801 
 2802     ret = dht_migrate_file(this, &entry_loc, cached_subvol, hashed_subvol,
 2803                            rebal_type, &fop_errno);
 2804     if (ret == 1) {
 2805         if (fop_errno == ENOSPC) {
 2806             gf_msg_debug(this->name, 0,
 2807                          "migrate-data skipped for"
 2808                          " %s due to space constraints",
 2809                          entry_loc.path);
 2810 
 2811             /* For remove-brick case if the source is not one of the
 2812              * removed-brick, do not mark the error as failure */
 2813             if (conf->decommission_subvols_cnt) {
 2814                 for (i = 0; i < conf->subvolume_cnt; i++) {
 2815                     if (conf->decommissioned_bricks[i] == cached_subvol) {
 2816                         LOCK(&defrag->lock);
 2817                         {
 2818                             defrag->total_failures += 1;
 2819                             update_skippedcount = _gf_false;
 2820                         }
 2821                         UNLOCK(&defrag->lock);
 2822 
 2823                         break;
 2824                     }
 2825                 }
 2826             }
 2827 
 2828             if (update_skippedcount) {
 2829                 LOCK(&defrag->lock);
 2830                 {
 2831                     defrag->skipped += 1;
 2832                 }
 2833                 UNLOCK(&defrag->lock);
 2834 
 2835                 gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_MIGRATE_FILE_SKIPPED,
 2836                        "File migration skipped for %s.", entry_loc.path);
 2837             }
 2838 
 2839         } else if (fop_errno == ENOTSUP) {
 2840             gf_msg_debug(this->name, 0,
 2841                          "migrate-data skipped for"
 2842                          " hardlink %s ",
 2843                          entry_loc.path);
 2844             LOCK(&defrag->lock);
 2845             {
 2846                 defrag->skipped += 1;
 2847             }
 2848             UNLOCK(&defrag->lock);
 2849 
 2850             gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_MIGRATE_FILE_SKIPPED,
 2851                    "File migration skipped for %s.", entry_loc.path);
 2852         }
 2853 
 2854         ret = 0;
 2855         goto out;
 2856     } else if (ret < 0) {
 2857         if (fop_errno != EEXIST) {
 2858             gf_msg(this->name, GF_LOG_ERROR, fop_errno,
 2859                    DHT_MSG_MIGRATE_FILE_FAILED, "migrate-data failed for %s",
 2860                    entry_loc.path);
 2861 
 2862             LOCK(&defrag->lock);
 2863             {
 2864                 defrag->total_failures += 1;
 2865             }
 2866             UNLOCK(&defrag->lock);
 2867         }
 2868 
 2869         ret = gf_defrag_handle_migrate_error(fop_errno, defrag);
 2870 
 2871         if (!ret) {
 2872             gf_msg(this->name, GF_LOG_ERROR, fop_errno,
 2873                    DHT_MSG_MIGRATE_FILE_FAILED,
 2874                    "migrate-data on %s failed:", entry_loc.path);
 2875         } else if (ret == 1) {
 2876             ret = 0;
 2877         }
 2878 
 2879         goto out;
 2880     }
 2881 
 2882     LOCK(&defrag->lock);
 2883     {
 2884         defrag->total_files += 1;
 2885         defrag->total_data += iatt.ia_size;
 2886     }
 2887     UNLOCK(&defrag->lock);
 2888 
 2889     if (defrag->stats == _gf_true) {
 2890         gettimeofday(&end, NULL);
 2891         elapsed = (end.tv_sec - start.tv_sec) * 1e6 +
 2892                   (end.tv_usec - start.tv_usec);
 2893         gf_log(this->name, GF_LOG_INFO,
 2894                "Migration of "
 2895                "file:%s size:%" PRIu64
 2896                " bytes took %.2f"
 2897                "secs and ret: %d",
 2898                entry_loc.name, iatt.ia_size, elapsed / 1e6, ret);
 2899     }
 2900 
 2901 out:
 2902     if (statfs_frame) {
 2903         STACK_DESTROY(statfs_frame->root);
 2904     }
 2905 
 2906     if (iatt_ptr) {
 2907         LOCK(&defrag->lock);
 2908         {
 2909             defrag->size_processed += iatt_ptr->ia_size;
 2910         }
 2911         UNLOCK(&defrag->lock);
 2912     }
 2913     loc_wipe(&entry_loc);
 2914 
 2915     return ret;
 2916 }
 2917 
 2918 void *
 2919 gf_defrag_task(void *opaque)
 2920 {
 2921     struct list_head *q_head = NULL;
 2922     struct dht_container *iterator = NULL;
 2923     gf_defrag_info_t *defrag = NULL;
 2924     int ret = 0;
 2925     pid_t pid = GF_CLIENT_PID_DEFRAG;
 2926 
 2927     defrag = (gf_defrag_info_t *)opaque;
 2928     if (!defrag) {
 2929         gf_msg("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL");
 2930         goto out;
 2931     }
 2932 
 2933     syncopctx_setfspid(&pid);
 2934 
 2935     q_head = &(defrag->queue[0].list);
 2936 
 2937     /* The following while loop will dequeue one entry from the defrag->queue
 2938        under lock. We will update the defrag->global_error only when there
 2939        is an error which is critical to stop the rebalance process. The stop
 2940        message will be intimated to other migrator threads by setting the
 2941        defrag->defrag_status to GF_DEFRAG_STATUS_FAILED.
 2942 
 2943        In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is
 2944        maintained so that crawler does not starve the file migration
 2945        workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that
 2946        crawler does not go far ahead in filling up the queue.
 2947      */
 2948 
 2949     while (_gf_true) {
 2950         if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
 2951             pthread_cond_broadcast(&defrag->rebalance_crawler_alarm);
 2952             pthread_cond_broadcast(&defrag->parallel_migration_cond);
 2953             goto out;
 2954         }
 2955 
 2956         pthread_mutex_lock(&defrag->dfq_mutex);
 2957         {
 2958             /*Throttle down:
 2959               If the reconfigured count is less than current thread
 2960               count, then the current thread will sleep */
 2961 
 2962             /*TODO: Need to refactor the following block to work
 2963              *under defrag->lock. For now access
 2964              * defrag->current_thread_count and rthcount under
 2965              * dfq_mutex lock */
 2966             while (!defrag->crawl_done && (defrag->recon_thread_count <
 2967                                            defrag->current_thread_count)) {
 2968                 defrag->current_thread_count--;
 2969                 gf_msg_debug("DHT", 0,
 2970                              "Thread sleeping. "
 2971                              "current thread count: %d",
 2972                              defrag->current_thread_count);
 2973 
 2974                 pthread_cond_wait(&defrag->df_wakeup_thread,
 2975                                   &defrag->dfq_mutex);
 2976 
 2977                 defrag->current_thread_count++;
 2978                 gf_msg_debug("DHT", 0,
 2979                              "Thread wokeup. "
 2980                              "current thread count: %d",
 2981                              defrag->current_thread_count);
 2982             }
 2983 
 2984             if (defrag->q_entry_count) {
 2985                 iterator = list_entry(q_head->next, typeof(*iterator), list);
 2986 
 2987                 gf_msg_debug("DHT", 0,
 2988                              "picking entry "
 2989                              "%s",
 2990                              iterator->df_entry->d_name);
 2991 
 2992                 list_del_init(&(iterator->list));
 2993 
 2994                 defrag->q_entry_count--;
 2995 
 2996                 if ((defrag->q_entry_count < MIN_MIGRATE_QUEUE_COUNT) &&
 2997                     defrag->wakeup_crawler) {
 2998                     pthread_cond_broadcast(&defrag->rebalance_crawler_alarm);
 2999                 }
 3000                 pthread_mutex_unlock(&defrag->dfq_mutex);
 3001                 ret = gf_defrag_migrate_single_file((void *)iterator);
 3002 
 3003                 /*Critical errors: ENOTCONN and ENOSPACE*/
 3004                 if (ret) {
 3005                     dht_set_global_defrag_error(defrag, ret);
 3006 
 3007                     defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 3008 
 3009                     pthread_cond_broadcast(&defrag->rebalance_crawler_alarm);
 3010 
 3011                     pthread_cond_broadcast(&defrag->parallel_migration_cond);
 3012 
 3013                     goto out;
 3014                 }
 3015 
 3016                 gf_defrag_free_container(iterator);
 3017 
 3018                 continue;
 3019             } else {
 3020                 /* defrag->crawl_done flag is set means crawling
 3021                  file system is done and hence a list_empty when
 3022                  the above flag is set indicates there are no more
 3023                  entries to be added to the queue and rebalance is
 3024                  finished */
 3025 
 3026                 if (!defrag->crawl_done) {
 3027                     defrag->current_thread_count--;
 3028                     gf_msg_debug("DHT", 0,
 3029                                  "Thread "
 3030                                  "sleeping while  waiting "
 3031                                  "for migration entries. "
 3032                                  "current thread  count:%d",
 3033                                  defrag->current_thread_count);
 3034 
 3035                     pthread_cond_wait(&defrag->parallel_migration_cond,
 3036                                       &defrag->dfq_mutex);
 3037                 }
 3038 
 3039                 if (defrag->crawl_done && !defrag->q_entry_count) {
 3040                     defrag->current_thread_count++;
 3041                     gf_msg_debug("DHT", 0, "Exiting thread");
 3042 
 3043                     pthread_cond_broadcast(&defrag->parallel_migration_cond);
 3044                     goto unlock;
 3045                 } else {
 3046                     defrag->current_thread_count++;
 3047                     gf_msg_debug("DHT", 0,
 3048                                  "Thread woke up"
 3049                                  " as found migrating entries. "
 3050                                  "current thread count:%d",
 3051                                  defrag->current_thread_count);
 3052 
 3053                     pthread_mutex_unlock(&defrag->dfq_mutex);
 3054                     continue;
 3055                 }
 3056             }
 3057         }
 3058     unlock:
 3059         pthread_mutex_unlock(&defrag->dfq_mutex);
 3060         break;
 3061     }
 3062 out:
 3063     return NULL;
 3064 }
 3065 
 3066 int static gf_defrag_get_entry(xlator_t *this, int i,
 3067                                struct dht_container **container, loc_t *loc,
 3068                                dht_conf_t *conf, gf_defrag_info_t *defrag,
 3069                                fd_t *fd, dict_t *migrate_data,
 3070                                struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req,
 3071                                int *should_commit_hash, int *perrno)
 3072 {
 3073     int ret = -1;
 3074     char is_linkfile = 0;
 3075     gf_dirent_t *df_entry = NULL;
 3076     struct dht_container *tmp_container = NULL;
 3077 
 3078     if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
 3079         ret = -1;
 3080         goto out;
 3081     }
 3082 
 3083     if (dir_dfmeta->offset_var[i].readdir_done == 1) {
 3084         ret = 0;
 3085         goto out;
 3086     }
 3087 
 3088     if (dir_dfmeta->fetch_entries[i] == 1) {
 3089         ret = syncop_readdirp(conf->local_subvols[i], fd, 131072,
 3090                               dir_dfmeta->offset_var[i].offset,
 3091                               &(dir_dfmeta->equeue[i]), xattr_req, NULL);
 3092         if (ret == 0) {
 3093             dir_dfmeta->offset_var[i].readdir_done = 1;
 3094             ret = 0;
 3095             goto out;
 3096         }
 3097 
 3098         if (ret < 0) {
 3099             gf_msg(this->name, GF_LOG_WARNING, -ret,
 3100                    DHT_MSG_MIGRATE_DATA_FAILED,
 3101                    "Readdirp failed. Aborting data migration for "
 3102                    "directory: %s",
 3103                    loc->path);
 3104             *perrno = -ret;
 3105             ret = -1;
 3106             goto out;
 3107         }
 3108 
 3109         if (list_empty(&(dir_dfmeta->equeue[i].list))) {
 3110             dir_dfmeta->offset_var[i].readdir_done = 1;
 3111             ret = 0;
 3112             goto out;
 3113         }
 3114 
 3115         dir_dfmeta->fetch_entries[i] = 0;
 3116     }
 3117 
 3118     while (1) {
 3119         if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
 3120             ret = -1;
 3121             goto out;
 3122         }
 3123 
 3124         df_entry = list_entry(dir_dfmeta->iterator[i]->next, typeof(*df_entry),
 3125                               list);
 3126 
 3127         if (&df_entry->list == dir_dfmeta->head[i]) {
 3128             gf_dirent_free(&(dir_dfmeta->equeue[i]));
 3129             INIT_LIST_HEAD(&(dir_dfmeta->equeue[i].list));
 3130             dir_dfmeta->fetch_entries[i] = 1;
 3131             dir_dfmeta->iterator[i] = dir_dfmeta->head[i];
 3132             ret = 0;
 3133             goto out;
 3134         }
 3135 
 3136         dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next;
 3137 
 3138         dir_dfmeta->offset_var[i].offset = df_entry->d_off;
 3139         if (!strcmp(df_entry->d_name, ".") || !strcmp(df_entry->d_name, ".."))
 3140             continue;
 3141 
 3142         if (IA_ISDIR(df_entry->d_stat.ia_type)) {
 3143             defrag->size_processed += df_entry->d_stat.ia_size;
 3144             continue;
 3145         }
 3146 
 3147         defrag->num_files_lookedup++;
 3148 
 3149         if (defrag->defrag_pattern &&
 3150             (gf_defrag_pattern_match(defrag, df_entry->d_name,
 3151                                      df_entry->d_stat.ia_size) == _gf_false)) {
 3152             defrag->size_processed += df_entry->d_stat.ia_size;
 3153             continue;
 3154         }
 3155 
 3156         is_linkfile = check_is_linkfile(NULL, &df_entry->d_stat, df_entry->dict,
 3157                                         conf->link_xattr_name);
 3158 
 3159         if (is_linkfile) {
 3160             /* No need to add linkto file to the queue for
 3161                migration. Only the actual data file need to
 3162                be checked for migration criteria.
 3163             */
 3164 
 3165             gf_msg_debug(this->name, 0,
 3166                          "Skipping linkfile"
 3167                          " %s on subvol: %s",
 3168                          df_entry->d_name, conf->local_subvols[i]->name);
 3169             continue;
 3170         }
 3171 
 3172         /*Build Container Structure */
 3173 
 3174         tmp_container = GF_CALLOC(1, sizeof(struct dht_container),
 3175                                   gf_dht_mt_container_t);
 3176         if (!tmp_container) {
 3177             gf_log(this->name, GF_LOG_ERROR,
 3178                    "Failed to allocate "
 3179                    "memory for container");
 3180             ret = -1;
 3181             goto out;
 3182         }
 3183         tmp_container->df_entry = gf_dirent_for_name(df_entry->d_name);
 3184         if (!tmp_container->df_entry) {
 3185             gf_log(this->name, GF_LOG_ERROR,
 3186                    "Failed to allocate "
 3187                    "memory for df_entry");
 3188             ret = -1;
 3189             goto out;
 3190         }
 3191 
 3192         tmp_container->local_subvol_index = i;
 3193 
 3194         tmp_container->df_entry->d_stat = df_entry->d_stat;
 3195 
 3196         tmp_container->df_entry->d_ino = df_entry->d_ino;
 3197 
 3198         tmp_container->df_entry->d_type = df_entry->d_type;
 3199 
 3200         tmp_container->df_entry->d_len = df_entry->d_len;
 3201 
 3202         tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc), gf_dht_mt_loc_t);
 3203         if (!tmp_container->parent_loc) {
 3204             gf_log(this->name, GF_LOG_ERROR,
 3205                    "Failed to allocate "
 3206                    "memory for loc");
 3207             ret = -1;
 3208             goto out;
 3209         }
 3210 
 3211         ret = loc_copy(tmp_container->parent_loc, loc);
 3212         if (ret) {
 3213             gf_log(this->name, GF_LOG_ERROR, "loc_copy failed");
 3214             ret = -1;
 3215             goto out;
 3216         }
 3217 
 3218         tmp_container->migrate_data = migrate_data;
 3219 
 3220         tmp_container->this = this;
 3221 
 3222         if (df_entry->dict)
 3223             tmp_container->df_entry->dict = dict_ref(df_entry->dict);
 3224 
 3225         /*Build Container Structure >> END*/
 3226 
 3227         ret = 0;
 3228         goto out;
 3229     }
 3230 
 3231 out:
 3232     if (ret == 0) {
 3233         *container = tmp_container;
 3234     } else {
 3235         if (tmp_container) {
 3236             gf_defrag_free_container(tmp_container);
 3237         }
 3238     }
 3239 
 3240     return ret;
 3241 }
 3242 
 3243 int
 3244 gf_defrag_process_dir(xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
 3245                       dict_t *migrate_data, int *perrno)
 3246 {
 3247     int ret = -1;
 3248     fd_t *fd = NULL;
 3249     dht_conf_t *conf = NULL;
 3250     gf_dirent_t entries;
 3251     dict_t *xattr_req = NULL;
 3252     struct timeval dir_start = {
 3253         0,
 3254     };
 3255     struct timeval end = {
 3256         0,
 3257     };
 3258     double elapsed = {
 3259         0,
 3260     };
 3261     int local_subvols_cnt = 0;
 3262     int i = 0;
 3263     int j = 0;
 3264     struct dht_container *container = NULL;
 3265     int ldfq_count = 0;
 3266     int dfc_index = 0;
 3267     int throttle_up = 0;
 3268     struct dir_dfmeta *dir_dfmeta = NULL;
 3269     int should_commit_hash = 1;
 3270 
 3271     gf_log(this->name, GF_LOG_INFO, "migrate data called on %s", loc->path);
 3272     gettimeofday(&dir_start, NULL);
 3273 
 3274     conf = this->private;
 3275     local_subvols_cnt = conf->local_subvols_cnt;
 3276 
 3277     if (!local_subvols_cnt) {
 3278         ret = 0;
 3279         goto out;
 3280     }
 3281 
 3282     fd = fd_create(loc->inode, defrag->pid);
 3283     if (!fd) {
 3284         gf_log(this->name, GF_LOG_ERROR, "Failed to create fd");
 3285         ret = -1;
 3286         goto out;
 3287     }
 3288 
 3289     ret = syncop_opendir(this, loc, fd, NULL, NULL);
 3290     if (ret) {
 3291         gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_DATA_FAILED,
 3292                "Migrate data failed: Failed to open dir %s", loc->path);
 3293         *perrno = -ret;
 3294         ret = -1;
 3295         goto out;
 3296     }
 3297 
 3298     fd_bind(fd);
 3299     dir_dfmeta = GF_CALLOC(1, sizeof(*dir_dfmeta), gf_common_mt_pointer);
 3300     if (!dir_dfmeta) {
 3301         gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta is NULL");
 3302         ret = -1;
 3303         goto out;
 3304     }
 3305 
 3306     dir_dfmeta->head = GF_CALLOC(local_subvols_cnt, sizeof(*(dir_dfmeta->head)),
 3307                                  gf_common_mt_pointer);
 3308     if (!dir_dfmeta->head) {
 3309         gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL");
 3310         ret = -1;
 3311         goto out;
 3312     }
 3313 
 3314     dir_dfmeta->iterator = GF_CALLOC(local_subvols_cnt,
 3315                                      sizeof(*(dir_dfmeta->iterator)),
 3316                                      gf_common_mt_pointer);
 3317     if (!dir_dfmeta->iterator) {
 3318         gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->iterator is NULL");
 3319         ret = -1;
 3320         goto out;
 3321     }
 3322 
 3323     dir_dfmeta->equeue = GF_CALLOC(local_subvols_cnt, sizeof(entries),
 3324                                    gf_dht_mt_dirent_t);
 3325     if (!dir_dfmeta->equeue) {
 3326         gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL");
 3327         ret = -1;
 3328         goto out;
 3329     }
 3330 
 3331     dir_dfmeta->offset_var = GF_CALLOC(
 3332         local_subvols_cnt, sizeof(dht_dfoffset_ctx_t), gf_dht_mt_octx_t);
 3333     if (!dir_dfmeta->offset_var) {
 3334         gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->offset_var is NULL");
 3335         ret = -1;
 3336         goto out;
 3337     }
 3338     ret = gf_defrag_ctx_subvols_init(dir_dfmeta->offset_var, this);
 3339     if (ret) {
 3340         gf_log(this->name, GF_LOG_ERROR,
 3341                "dht_dfoffset_ctx_t"
 3342                "initialization failed");
 3343         ret = -1;
 3344         goto out;
 3345     }
 3346 
 3347     dir_dfmeta->fetch_entries = GF_CALLOC(local_subvols_cnt, sizeof(int),
 3348                                           gf_common_mt_int);
 3349     if (!dir_dfmeta->fetch_entries) {
 3350         gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->fetch_entries is NULL");
 3351         ret = -1;
 3352         goto out;
 3353     }
 3354 
 3355     for (i = 0; i < local_subvols_cnt; i++) {
 3356         INIT_LIST_HEAD(&(dir_dfmeta->equeue[i].list));
 3357         dir_dfmeta->head[i] = &(dir_dfmeta->equeue[i].list);
 3358         dir_dfmeta->iterator[i] = dir_dfmeta->head[i];
 3359         dir_dfmeta->fetch_entries[i] = 1;
 3360     }
 3361 
 3362     xattr_req = dict_new();
 3363     if (!xattr_req) {
 3364         gf_log(this->name, GF_LOG_ERROR, "dict_new failed");
 3365         ret = -1;
 3366         goto out;
 3367     }
 3368 
 3369     ret = dict_set_uint32(xattr_req, conf->link_xattr_name, 256);
 3370     if (ret) {
 3371         gf_log(this->name, GF_LOG_ERROR,
 3372                "failed to set dict for "
 3373                "key: %s",
 3374                conf->link_xattr_name);
 3375         ret = -1;
 3376         goto out;
 3377     }
 3378 
 3379     /*
 3380      Job: Read entries from each local subvol and store the entries
 3381           in equeue array of linked list. Now pick one entry from the
 3382           equeue array in a round robin basis and add them to defrag Queue.
 3383     */
 3384 
 3385     while (!dht_dfreaddirp_done(dir_dfmeta->offset_var, local_subvols_cnt)) {
 3386         pthread_mutex_lock(&defrag->dfq_mutex);
 3387         {
 3388             /*Throttle up: If reconfigured count is higher than
 3389               current thread count, wake up the sleeping threads
 3390               TODO: Need to refactor this. Instead of making the
 3391               thread sleep and wake, we should terminate and spawn
 3392               threads on-demand*/
 3393 
 3394             if (defrag->recon_thread_count > defrag->current_thread_count) {
 3395                 throttle_up = (defrag->recon_thread_count -
 3396                                defrag->current_thread_count);
 3397                 for (j = 0; j < throttle_up; j++) {
 3398                     pthread_cond_signal(&defrag->df_wakeup_thread);
 3399                 }
 3400             }
 3401 
 3402             while (defrag->q_entry_count > MAX_MIGRATE_QUEUE_COUNT) {
 3403                 defrag->wakeup_crawler = 1;
 3404                 pthread_cond_wait(&defrag->rebalance_crawler_alarm,
 3405                                   &defrag->dfq_mutex);
 3406             }
 3407 
 3408             ldfq_count = defrag->q_entry_count;
 3409 
 3410             if (defrag->wakeup_crawler) {
 3411                 defrag->wakeup_crawler = 0;
 3412             }
 3413         }
 3414         pthread_mutex_unlock(&defrag->dfq_mutex);
 3415 
 3416         while (
 3417             ldfq_count <= MAX_MIGRATE_QUEUE_COUNT &&
 3418             !dht_dfreaddirp_done(dir_dfmeta->offset_var, local_subvols_cnt)) {
 3419             ret = gf_defrag_get_entry(this, dfc_index, &container, loc, conf,
 3420                                       defrag, fd, migrate_data, dir_dfmeta,
 3421                                       xattr_req, &should_commit_hash, perrno);
 3422 
 3423             if (defrag->defrag_status == GF_DEFRAG_STATUS_STOPPED) {
 3424                 goto out;
 3425             }
 3426 
 3427             if (ret) {
 3428                 gf_log(this->name, GF_LOG_WARNING,
 3429                        "Found "
 3430                        "error from gf_defrag_get_entry");
 3431 
 3432                 ret = -1;
 3433                 goto out;
 3434             }
 3435 
 3436             /* Check if we got an entry, else we need to move the
 3437                index to the next subvol */
 3438             if (!container) {
 3439                 GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt);
 3440                 continue;
 3441             }
 3442 
 3443             /* Q this entry in the dfq */
 3444             pthread_mutex_lock(&defrag->dfq_mutex);
 3445             {
 3446                 list_add_tail(&container->list, &(defrag->queue[0].list));
 3447                 defrag->q_entry_count++;
 3448                 ldfq_count = defrag->q_entry_count;
 3449 
 3450                 gf_msg_debug(this->name, 0,
 3451                              "added "
 3452                              "file:%s parent:%s to the queue ",
 3453                              container->df_entry->d_name,
 3454                              container->parent_loc->path);
 3455 
 3456                 pthread_cond_signal(&defrag->parallel_migration_cond);
 3457             }
 3458             pthread_mutex_unlock(&defrag->dfq_mutex);
 3459 
 3460             GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt);
 3461         }
 3462     }
 3463 
 3464     gettimeofday(&end, NULL);
 3465     elapsed = (end.tv_sec - dir_start.tv_sec) * 1e6 +
 3466               (end.tv_usec - dir_start.tv_usec);
 3467     gf_log(this->name, GF_LOG_INFO,
 3468            "Migration operation on dir %s took "
 3469            "%.2f secs",
 3470            loc->path, elapsed / 1e6);
 3471     ret = 0;
 3472 out:
 3473 
 3474     gf_defrag_free_dir_dfmeta(dir_dfmeta, local_subvols_cnt);
 3475 
 3476     if (xattr_req)
 3477         dict_unref(xattr_req);
 3478 
 3479     if (fd)
 3480         fd_unref(fd);
 3481 
 3482     if (ret == 0 && should_commit_hash == 0) {
 3483         ret = 2;
 3484     }
 3485 
 3486     /* It does not matter if it errored out - this number is
 3487      * used to calculate rebalance estimated time to complete.
 3488      * No locking required as dirs are processed by a single thread.
 3489      */
 3490     defrag->num_dirs_processed++;
 3491     return ret;
 3492 }
 3493 int
 3494 gf_defrag_settle_hash(xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
 3495                       dict_t *fix_layout)
 3496 {
 3497     int ret;
 3498     dht_conf_t *conf = NULL;
 3499     /*
 3500      * Now we're ready to update the directory commit hash for the volume
 3501      * root, so that hash miscompares and broadcast lookups can stop.
 3502      * However, we want to skip that if fix-layout is all we did.  In
 3503      * that case, we want the miscompares etc. to continue until a real
 3504      * rebalance is complete.
 3505      */
 3506     if (defrag->cmd == GF_DEFRAG_CMD_START_LAYOUT_FIX ||
 3507         defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER ||
 3508         defrag->cmd == GF_DEFRAG_CMD_DETACH_START) {
 3509         return 0;
 3510     }
 3511 
 3512     conf = this->private;
 3513     if (!conf) {
 3514         /*Uh oh
 3515          */
 3516         return -1;
 3517     }
 3518 
 3519     if (conf->local_subvols_cnt == 0 || !conf->lookup_optimize) {
 3520         /* Commit hash updates are only done on local subvolumes and
 3521          * only when lookup optimization is needed (for older client
 3522          * support)
 3523          */
 3524         return 0;
 3525     }
 3526 
 3527     ret = dict_set_uint32(fix_layout, "new-commit-hash",
 3528                           defrag->new_commit_hash);
 3529     if (ret) {
 3530         gf_log(this->name, GF_LOG_ERROR, "Failed to set new-commit-hash");
 3531         return -1;
 3532     }
 3533 
 3534     ret = syncop_setxattr(this, loc, fix_layout, 0, NULL, NULL);
 3535     if (ret) {
 3536         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LAYOUT_FIX_FAILED,
 3537                "fix layout on %s failed", loc->path);
 3538 
 3539         if (-ret == ENOENT || -ret == ESTALE) {
 3540             /* Dir most likely is deleted */
 3541             return 0;
 3542         }
 3543 
 3544         return -1;
 3545     }
 3546 
 3547     /* TBD: find more efficient solution than adding/deleting every time */
 3548     dict_del(fix_layout, "new-commit-hash");
 3549 
 3550     return 0;
 3551 }
 3552 
 3553 /* Function for doing a named lookup on file inodes during an attach tier
 3554  * So that a hardlink lookup heal i.e gfid to parent gfid lookup heal
 3555  * happens on pre-existing data. This is required so that the ctr database has
 3556  * hardlinks of all the exisitng file in the volume. CTR xlator on the
 3557  * brick/server side does db update/insert of the hardlink on a namelookup.
 3558  * Currently the namedlookup is done synchronous to the fixlayout that is
 3559  * triggered by attach tier. This is not performant, adding more time to
 3560  * fixlayout. The performant approach is record the hardlinks on a compressed
 3561  * datastore and then do the namelookup asynchronously later, giving the ctr db
 3562  * eventual consistency
 3563  * */
 3564 int
 3565 gf_fix_layout_tier_attach_lookup(xlator_t *this, loc_t *parent_loc,
 3566                                  gf_dirent_t *file_dentry)
 3567 {
 3568     int ret = -1;
 3569     dict_t *lookup_xdata = NULL;
 3570     dht_conf_t *conf = NULL;
 3571     loc_t file_loc = {
 3572         0,
 3573     };
 3574     struct iatt iatt = {
 3575         0,
 3576     };
 3577 
 3578     GF_VALIDATE_OR_GOTO("tier", this, out);
 3579 
 3580     GF_VALIDATE_OR_GOTO(this->name, parent_loc, out);
 3581 
 3582     GF_VALIDATE_OR_GOTO(this->name, file_dentry, out);
 3583 
 3584     GF_VALIDATE_OR_GOTO(this->name, this->private, out);
 3585 
 3586     if (!parent_loc->inode) {
 3587         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
 3588                "%s/%s parent is NULL", parent_loc->path, file_dentry->d_name);
 3589         goto out;
 3590     }
 3591 
 3592     conf = this->private;
 3593 
 3594     loc_wipe(&file_loc);
 3595 
 3596     if (gf_uuid_is_null(file_dentry->d_stat.ia_gfid)) {
 3597         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
 3598                "%s/%s gfid not present", parent_loc->path, file_dentry->d_name);
 3599         goto out;
 3600     }
 3601 
 3602     gf_uuid_copy(file_loc.gfid, file_dentry->d_stat.ia_gfid);
 3603 
 3604     if (gf_uuid_is_null(parent_loc->gfid)) {
 3605         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
 3606                "%s/%s"
 3607                " gfid not present",
 3608                parent_loc->path, file_dentry->d_name);
 3609         goto out;
 3610     }
 3611 
 3612     gf_uuid_copy(file_loc.pargfid, parent_loc->gfid);
 3613 
 3614     ret = dht_build_child_loc(this, &file_loc, parent_loc, file_dentry->d_name);
 3615     if (ret) {
 3616         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
 3617                "Child loc build failed");
 3618         ret = -1;
 3619         goto out;
 3620     }
 3621 
 3622     lookup_xdata = dict_new();
 3623     if (!lookup_xdata) {
 3624         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
 3625                "Failed creating lookup dict for %s", file_dentry->d_name);
 3626         goto out;
 3627     }
 3628 
 3629     ret = dict_set_int32(lookup_xdata, CTR_ATTACH_TIER_LOOKUP, 1);
 3630     if (ret) {
 3631         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
 3632                "Failed to set lookup flag");
 3633         goto out;
 3634     }
 3635 
 3636     gf_uuid_copy(file_loc.parent->gfid, parent_loc->gfid);
 3637 
 3638     /* Sending lookup to cold tier only */
 3639     ret = syncop_lookup(conf->subvolumes[0], &file_loc, &iatt, NULL,
 3640                         lookup_xdata, NULL);
 3641     if (ret) {
 3642         /* If the file does not exist on the cold tier than it must */
 3643         /* have been discovered on the hot tier. This is not an error. */
 3644         gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
 3645                "%s lookup to cold tier on attach heal failed", file_loc.path);
 3646         goto out;
 3647     }
 3648 
 3649     ret = 0;
 3650 
 3651 out:
 3652 
 3653     loc_wipe(&file_loc);
 3654 
 3655     if (lookup_xdata)
 3656         dict_unref(lookup_xdata);
 3657 
 3658     return ret;
 3659 }
 3660 
 3661 int
 3662 gf_defrag_fix_layout(xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
 3663                      dict_t *fix_layout, dict_t *migrate_data)
 3664 {
 3665     int ret = -1;
 3666     loc_t entry_loc = {
 3667         0,
 3668     };
 3669     fd_t *fd = NULL;
 3670     gf_dirent_t entries;
 3671     gf_dirent_t *tmp = NULL;
 3672     gf_dirent_t *entry = NULL;
 3673     gf_boolean_t free_entries = _gf_false;
 3674     off_t offset = 0;
 3675     struct iatt iatt = {
 3676         0,
 3677     };
 3678     inode_t *linked_inode = NULL, *inode = NULL;
 3679     dht_conf_t *conf = NULL;
 3680     int should_commit_hash = 1;
 3681     int perrno = 0;
 3682 
 3683     conf = this->private;
 3684     if (!conf) {
 3685         ret = -1;
 3686         goto out;
 3687     }
 3688 
 3689     ret = syncop_lookup(this, loc, &iatt, NULL, NULL, NULL);
 3690     if (ret) {
 3691         if (strcmp(loc->path, "/") == 0) {
 3692             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_DIR_LOOKUP_FAILED,
 3693                    "lookup failed for:%s", loc->path);
 3694 
 3695             defrag->total_failures++;
 3696             ret = -1;
 3697             goto out;
 3698         }
 3699 
 3700         if (-ret == ENOENT || -ret == ESTALE) {
 3701             gf_msg(this->name, GF_LOG_INFO, -ret, DHT_MSG_DIR_LOOKUP_FAILED,
 3702                    "Dir:%s renamed or removed. Skipping", loc->path);
 3703             if (conf->decommission_subvols_cnt) {
 3704                 defrag->total_failures++;
 3705             }
 3706             ret = 0;
 3707             goto out;
 3708         } else {
 3709             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_DIR_LOOKUP_FAILED,
 3710                    "lookup failed for:%s", loc->path);
 3711 
 3712             defrag->total_failures++;
 3713             goto out;
 3714         }
 3715     }
 3716 
 3717     fd = fd_create(loc->inode, defrag->pid);
 3718     if (!fd) {
 3719         gf_log(this->name, GF_LOG_ERROR, "Failed to create fd");
 3720         ret = -1;
 3721         goto out;
 3722     }
 3723 
 3724     ret = syncop_opendir(this, loc, fd, NULL, NULL);
 3725     if (ret) {
 3726         if (-ret == ENOENT || -ret == ESTALE) {
 3727             if (conf->decommission_subvols_cnt) {
 3728                 defrag->total_failures++;
 3729             }
 3730             ret = 0;
 3731             goto out;
 3732         }
 3733 
 3734         gf_log(this->name, GF_LOG_ERROR,
 3735                "Failed to open dir %s, "
 3736                "err:%d",
 3737                loc->path, -ret);
 3738 
 3739         ret = -1;
 3740         goto out;
 3741     }
 3742 
 3743     fd_bind(fd);
 3744     INIT_LIST_HEAD(&entries.list);
 3745 
 3746     while ((ret = syncop_readdirp(this, fd, 131072, offset, &entries, NULL,
 3747                                   NULL)) != 0) {
 3748         if (ret < 0) {
 3749             if (-ret == ENOENT || -ret == ESTALE) {
 3750                 if (conf->decommission_subvols_cnt) {
 3751                     defrag->total_failures++;
 3752                 }
 3753                 ret = 0;
 3754                 goto out;
 3755             }
 3756 
 3757             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_READDIR_ERROR,
 3758                    "readdirp failed for "
 3759                    "path %s. Aborting fix-layout",
 3760                    loc->path);
 3761 
 3762             ret = -1;
 3763             goto out;
 3764         }
 3765 
 3766         if (list_empty(&entries.list))
 3767             break;
 3768 
 3769         free_entries = _gf_true;
 3770 
 3771         list_for_each_entry_safe(entry, tmp, &entries.list, list)
 3772         {
 3773             if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
 3774                 ret = 1;
 3775                 goto out;
 3776             }
 3777 
 3778             offset = entry->d_off;
 3779 
 3780             if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
 3781                 continue;
 3782             if (!IA_ISDIR(entry->d_stat.ia_type)) {
 3783                 /* If its a fix layout during the attach
 3784                  * tier operation do lookups on files
 3785                  * on cold subvolume so that there is a
 3786                  * CTR DB Lookup Heal triggered on existing
 3787                  * data.
 3788                  * */
 3789                 if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
 3790                     gf_fix_layout_tier_attach_lookup(this, loc, entry);
 3791                 }
 3792 
 3793                 continue;
 3794             }
 3795             loc_wipe(&entry_loc);
 3796 
 3797             ret = dht_build_child_loc(this, &entry_loc, loc, entry->d_name);
 3798             if (ret) {
 3799                 gf_log(this->name, GF_LOG_ERROR,
 3800                        "Child loc"
 3801                        " build failed for entry: %s",
 3802                        entry->d_name);
 3803 
 3804                 if (conf->decommission_in_progress) {
 3805                     defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 3806 
 3807                     goto out;
 3808                 } else {
 3809                     should_commit_hash = 0;
 3810 
 3811                     continue;
 3812                 }
 3813             }
 3814 
 3815             if (gf_uuid_is_null(entry->d_stat.ia_gfid)) {
 3816                 gf_log(this->name, GF_LOG_ERROR,
 3817                        "%s/%s"
 3818                        " gfid not present",
 3819                        loc->path, entry->d_name);
 3820                 continue;
 3821             }
 3822 
 3823             gf_uuid_copy(entry_loc.gfid, entry->d_stat.ia_gfid);
 3824 
 3825             /*In case the gfid stored in the inode by inode_link
 3826              * and the gfid obtained in the lookup differs, then
 3827              * client3_3_lookup_cbk will return ESTALE and proper
 3828              * error will be captured
 3829              */
 3830 
 3831             linked_inode = inode_link(entry_loc.inode, loc->inode,
 3832                                       entry->d_name, &entry->d_stat);
 3833 
 3834             inode = entry_loc.inode;
 3835             entry_loc.inode = linked_inode;
 3836             inode_unref(inode);
 3837 
 3838             if (gf_uuid_is_null(loc->gfid)) {
 3839                 gf_log(this->name, GF_LOG_ERROR,
 3840                        "%s/%s"
 3841                        " gfid not present",
 3842                        loc->path, entry->d_name);
 3843                 continue;
 3844             }
 3845 
 3846             gf_uuid_copy(entry_loc.pargfid, loc->gfid);
 3847 
 3848             ret = syncop_lookup(this, &entry_loc, &iatt, NULL, NULL, NULL);
 3849             if (ret) {
 3850                 if (-ret == ENOENT || -ret == ESTALE) {
 3851                     gf_msg(this->name, GF_LOG_INFO, -ret,
 3852                            DHT_MSG_DIR_LOOKUP_FAILED,
 3853                            "Dir:%s renamed or removed. "
 3854                            "Skipping",
 3855                            loc->path);
 3856                     ret = 0;
 3857                     if (conf->decommission_subvols_cnt) {
 3858                         defrag->total_failures++;
 3859                     }
 3860                     continue;
 3861                 } else {
 3862                     gf_msg(this->name, GF_LOG_ERROR, -ret,
 3863                            DHT_MSG_DIR_LOOKUP_FAILED, "lookup failed for:%s",
 3864                            entry_loc.path);
 3865 
 3866                     defrag->total_failures++;
 3867 
 3868                     if (conf->decommission_in_progress) {
 3869                         defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 3870                         ret = -1;
 3871                         goto out;
 3872                     } else {
 3873                         should_commit_hash = 0;
 3874                         continue;
 3875                     }
 3876                 }
 3877             }
 3878 
 3879             /* A return value of 2 means, either process_dir or
 3880              * lookup of a dir failed. Hence, don't commit hash
 3881              * for the current directory*/
 3882 
 3883             ret = gf_defrag_fix_layout(this, defrag, &entry_loc, fix_layout,
 3884                                        migrate_data);
 3885 
 3886             if (defrag->defrag_status == GF_DEFRAG_STATUS_STOPPED) {
 3887                 goto out;
 3888             }
 3889 
 3890             if (ret && ret != 2) {
 3891                 gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LAYOUT_FIX_FAILED,
 3892                        "Fix layout failed for %s", entry_loc.path);
 3893 
 3894                 defrag->total_failures++;
 3895 
 3896                 if (conf->decommission_in_progress) {
 3897                     defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 3898 
 3899                     goto out;
 3900                 } else {
 3901                     /* Let's not commit-hash if
 3902                      * gf_defrag_fix_layout failed*/
 3903                     continue;
 3904                 }
 3905             }
 3906         }
 3907 
 3908         gf_dirent_free(&entries);
 3909         free_entries = _gf_false;
 3910         INIT_LIST_HEAD(&entries.list);
 3911     }
 3912 
 3913     /* A directory layout is fixed only after its subdirs are healed to
 3914      * any newly added bricks. If the layout is fixed before subdirs are
 3915      * healed, the newly added brick will get a non-null layout.
 3916      * Any subdirs which hash to that layout will no longer show up
 3917      * in a directory listing until they are healed.
 3918      */
 3919 
 3920     ret = syncop_setxattr(this, loc, fix_layout, 0, NULL, NULL);
 3921     if (ret) {
 3922         if (-ret == ENOENT || -ret == ESTALE) {
 3923             gf_msg(this->name, GF_LOG_INFO, -ret, DHT_MSG_LAYOUT_FIX_FAILED,
 3924                    "Setxattr failed. Dir %s "
 3925                    "renamed or removed",
 3926                    loc->path);
 3927             if (conf->decommission_subvols_cnt) {
 3928                 defrag->total_failures++;
 3929             }
 3930             ret = 0;
 3931             goto out;
 3932         } else {
 3933             gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LAYOUT_FIX_FAILED,
 3934                    "Setxattr failed for %s", loc->path);
 3935 
 3936             defrag->total_failures++;
 3937 
 3938             if (conf->decommission_in_progress) {
 3939                 defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 3940                 ret = -1;
 3941                 goto out;
 3942             }
 3943         }
 3944     }
 3945 
 3946     if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
 3947         (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
 3948         ret = gf_defrag_process_dir(this, defrag, loc, migrate_data, &perrno);
 3949 
 3950         if (ret && (ret != 2)) {
 3951             if (perrno == ENOENT || perrno == ESTALE) {
 3952                 ret = 0;
 3953                 goto out;
 3954             } else {
 3955                 defrag->total_failures++;
 3956 
 3957                 gf_msg(this->name, GF_LOG_ERROR, 0,
 3958                        DHT_MSG_DEFRAG_PROCESS_DIR_FAILED,
 3959                        "gf_defrag_process_dir failed for "
 3960                        "directory: %s",
 3961                        loc->path);
 3962 
 3963                 if (conf->decommission_in_progress) {
 3964                     goto out;
 3965                 }
 3966 
 3967                 should_commit_hash = 0;
 3968             }
 3969         } else if (ret == 2) {
 3970             should_commit_hash = 0;
 3971         }
 3972     }
 3973 
 3974     gf_msg_trace(this->name, 0, "fix layout called on %s", loc->path);
 3975 
 3976     if (should_commit_hash &&
 3977         gf_defrag_settle_hash(this, defrag, loc, fix_layout) != 0) {
 3978         defrag->total_failures++;
 3979 
 3980         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SETTLE_HASH_FAILED,
 3981                "Settle hash failed for %s", loc->path);
 3982 
 3983         ret = -1;
 3984 
 3985         if (conf->decommission_in_progress) {
 3986             defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 3987             goto out;
 3988         }
 3989     }
 3990 
 3991     ret = 0;
 3992 out:
 3993     if (free_entries)
 3994         gf_dirent_free(&entries);
 3995 
 3996     loc_wipe(&entry_loc);
 3997 
 3998     if (fd)
 3999         fd_unref(fd);
 4000 
 4001     if (ret == 0 && should_commit_hash == 0) {
 4002         ret = 2;
 4003     }
 4004 
 4005     return ret;
 4006 }
 4007 
 4008 int
 4009 dht_init_local_subvols_and_nodeuuids(xlator_t *this, dht_conf_t *conf,
 4010                                      loc_t *loc)
 4011 {
 4012     dict_t *dict = NULL;
 4013     gf_defrag_info_t *defrag = NULL;
 4014     uuid_t *uuid_ptr = NULL;
 4015     int ret = -1;
 4016     int i = 0;
 4017     int j = 0;
 4018 
 4019     defrag = conf->defrag;
 4020 
 4021     if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) {
 4022         /* Find local subvolumes */
 4023         ret = syncop_getxattr(this, loc, &dict, GF_REBAL_FIND_LOCAL_SUBVOL,
 4024                               NULL, NULL);
 4025         if (ret && (ret != -ENODATA)) {
 4026             gf_msg(this->name, GF_LOG_ERROR, -ret, 0,
 4027                    "local "
 4028                    "subvolume determination failed with error: %d",
 4029                    -ret);
 4030             ret = -1;
 4031             goto out;
 4032         }
 4033 
 4034         if (!ret)
 4035             goto out;
 4036     }
 4037 
 4038     ret = syncop_getxattr(this, loc, &dict, GF_REBAL_OLD_FIND_LOCAL_SUBVOL,
 4039                           NULL, NULL);
 4040     if (ret) {
 4041         gf_msg(this->name, GF_LOG_ERROR, -ret, 0,
 4042                "local "
 4043                "subvolume determination failed with error: %d",
 4044                -ret);
 4045         ret = -1;
 4046         goto out;
 4047     }
 4048     ret = 0;
 4049 
 4050 out:
 4051     if (ret) {
 4052         return ret;
 4053     }
 4054 
 4055     for (i = 0; i < conf->local_subvols_cnt; i++) {
 4056         gf_msg(this->name, GF_LOG_INFO, 0, 0,
 4057                "local subvol: "
 4058                "%s",
 4059                conf->local_subvols[i]->name);
 4060 
 4061         for (j = 0; j < conf->local_nodeuuids[i].count; j++) {
 4062             uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid);
 4063             gf_msg(this->name, GF_LOG_INFO, 0, 0, "node uuid : %s",
 4064                    uuid_utoa(*uuid_ptr));
 4065         }
 4066     }
 4067 
 4068     return ret;
 4069 }
 4070 
 4071 /* Functions for the rebalance estimates feature */
 4072 
 4073 uint64_t
 4074 gf_defrag_subvol_file_size(xlator_t *this, loc_t *root_loc)
 4075 {
 4076     int ret = -1;
 4077     struct statvfs buf = {
 4078         0,
 4079     };
 4080 
 4081     ret = syncop_statfs(this, root_loc, &buf, NULL, NULL);
 4082     if (ret) {
 4083         /* Aargh! */
 4084         return 0;
 4085     }
 4086     return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize);
 4087 }
 4088 
 4089 uint64_t
 4090 gf_defrag_total_file_size(xlator_t *this, loc_t *root_loc)
 4091 {
 4092     dht_conf_t *conf = NULL;
 4093     int i = 0;
 4094     uint64_t size_files = 0;
 4095     uint64_t total_size = 0;
 4096 
 4097     conf = this->private;
 4098     if (!conf) {
 4099         return 0;
 4100     }
 4101 
 4102     for (i = 0; i < conf->local_subvols_cnt; i++) {
 4103         size_files = gf_defrag_subvol_file_size(conf->local_subvols[i],
 4104                                                 root_loc);
 4105         total_size += size_files;
 4106         gf_msg(this->name, GF_LOG_INFO, 0, 0,
 4107                "local subvol: %s,"
 4108                "cnt = %" PRIu64,
 4109                conf->local_subvols[i]->name, size_files);
 4110     }
 4111 
 4112     gf_msg(this->name, GF_LOG_INFO, 0, 0, "Total size files = %" PRIu64,
 4113            total_size);
 4114 
 4115     return total_size;
 4116 }
 4117 
 4118 static void *
 4119 dht_file_counter_thread(void *args)
 4120 {
 4121     gf_defrag_info_t *defrag = NULL;
 4122     loc_t root_loc = {
 4123         0,
 4124     };
 4125     struct timespec time_to_wait = {
 4126         0,
 4127     };
 4128     struct timeval now = {
 4129         0,
 4130     };
 4131     uint64_t tmp_size = 0;
 4132 
 4133     if (!args)
 4134         return NULL;
 4135 
 4136     defrag = (gf_defrag_info_t *)args;
 4137     dht_build_root_loc(defrag->root_inode, &root_loc);
 4138 
 4139     while (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED) {
 4140         gettimeofday(&now, NULL);
 4141         time_to_wait.tv_sec = now.tv_sec + 600;
 4142         time_to_wait.tv_nsec = 0;
 4143 
 4144         pthread_mutex_lock(&defrag->fc_mutex);
 4145         pthread_cond_timedwait(&defrag->fc_wakeup_cond, &defrag->fc_mutex,
 4146                                &time_to_wait);
 4147 
 4148         pthread_mutex_unlock(&defrag->fc_mutex);
 4149 
 4150         if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED)
 4151             break;
 4152 
 4153         tmp_size = gf_defrag_total_file_size(defrag->this, &root_loc);
 4154 
 4155         gf_log("dht", GF_LOG_INFO, "tmp data size =%" PRIu64, tmp_size);
 4156 
 4157         if (!tmp_size) {
 4158             gf_msg("dht", GF_LOG_ERROR, 0, 0,
 4159                    "Failed to get "
 4160                    "the total data size. Unable to estimate "
 4161                    "time to complete rebalance.");
 4162         } else {
 4163             g_totalsize = tmp_size;
 4164             gf_msg_debug("dht", 0, "total data size =%" PRIu64, g_totalsize);
 4165         }
 4166     }
 4167 
 4168     return NULL;
 4169 }
 4170 
 4171 int
 4172 gf_defrag_estimates_cleanup(xlator_t *this, gf_defrag_info_t *defrag,
 4173                             pthread_t filecnt_thread)
 4174 {
 4175     int ret = -1;
 4176 
 4177     /* Wake up the filecounter thread.
 4178      * By now the defrag status will no longer be
 4179      * GF_DEFRAG_STATUS_STARTED so the thread will exit the loop.
 4180      */
 4181     pthread_mutex_lock(&defrag->fc_mutex);
 4182     {
 4183         pthread_cond_broadcast(&defrag->fc_wakeup_cond);
 4184     }
 4185     pthread_mutex_unlock(&defrag->fc_mutex);
 4186 
 4187     ret = pthread_join(filecnt_thread, NULL);
 4188     if (ret) {
 4189         gf_msg("dht", GF_LOG_ERROR, ret, 0,
 4190                "file_counter_thread: pthread_join failed.");
 4191         ret = -1;
 4192     }
 4193     return ret;
 4194 }
 4195 
 4196 int
 4197 gf_defrag_estimates_init(xlator_t *this, loc_t *loc, pthread_t *filecnt_thread)
 4198 {
 4199     int ret = -1;
 4200     dht_conf_t *conf = NULL;
 4201     gf_defrag_info_t *defrag = NULL;
 4202 
 4203     conf = this->private;
 4204     defrag = conf->defrag;
 4205 
 4206     g_totalsize = gf_defrag_total_file_size(this, loc);
 4207     if (!g_totalsize) {
 4208         gf_msg(this->name, GF_LOG_ERROR, 0, 0,
 4209                "Failed to get "
 4210                "the total data size. Unable to estimate "
 4211                "time to complete rebalance.");
 4212         goto out;
 4213     }
 4214 
 4215     ret = gf_thread_create(filecnt_thread, NULL, &dht_file_counter_thread,
 4216                            (void *)defrag, "dhtfcnt");
 4217 
 4218     if (ret) {
 4219         gf_msg(this->name, GF_LOG_ERROR, ret, 0,
 4220                "Failed to "
 4221                "create the file counter thread ");
 4222         ret = -1;
 4223         goto out;
 4224     }
 4225     ret = 0;
 4226 out:
 4227     return ret;
 4228 }
 4229 
 4230 /* Init and cleanup functions for parallel file migration*/
 4231 int
 4232 gf_defrag_parallel_migration_init(xlator_t *this, gf_defrag_info_t *defrag,
 4233                                   pthread_t **tid_array, int *thread_index)
 4234 {
 4235     int ret = -1;
 4236     int thread_spawn_count = 0;
 4237     int index = 0;
 4238     pthread_t *tid = NULL;
 4239 
 4240     if (!defrag)
 4241         goto out;
 4242 
 4243     /* Initialize global entry queue */
 4244     defrag->queue = GF_CALLOC(1, sizeof(struct dht_container),
 4245                               gf_dht_mt_container_t);
 4246 
 4247     if (!defrag->queue) {
 4248         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, 0,
 4249                "Failed to initialise migration queue");
 4250         ret = -1;
 4251         goto out;
 4252     }
 4253 
 4254     INIT_LIST_HEAD(&(defrag->queue[0].list));
 4255 
 4256     thread_spawn_count = MAX(MAX_REBAL_THREADS, 4);
 4257 
 4258     gf_msg_debug(this->name, 0, "thread_spawn_count: %d", thread_spawn_count);
 4259 
 4260     tid = GF_CALLOC(thread_spawn_count, sizeof(pthread_t),
 4261                     gf_common_mt_pthread_t);
 4262     if (!tid) {
 4263         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, 0,
 4264                "Failed to create migration threads");
 4265         ret = -1;
 4266         goto out;
 4267     }
 4268     defrag->current_thread_count = thread_spawn_count;
 4269 
 4270     /*Spawn Threads Here*/
 4271     while (index < thread_spawn_count) {
 4272         ret = gf_thread_create(&(tid[index]), NULL, &gf_defrag_task,
 4273                                (void *)defrag, "dhtmig%d", (index + 1) & 0x3ff);
 4274         if (ret != 0) {
 4275             gf_msg("DHT", GF_LOG_ERROR, ret, 0, "Thread[%d] creation failed. ",
 4276                    index);
 4277             ret = -1;
 4278             goto out;
 4279         } else {
 4280             gf_log("DHT", GF_LOG_INFO,
 4281                    "Thread[%d] "
 4282                    "creation successful",
 4283                    index);
 4284         }
 4285         index++;
 4286     }
 4287 
 4288     ret = 0;
 4289 out:
 4290     *thread_index = index;
 4291     *tid_array = tid;
 4292 
 4293     return ret;
 4294 }
 4295 
 4296 int
 4297 gf_defrag_parallel_migration_cleanup(gf_defrag_info_t *defrag,
 4298                                      pthread_t *tid_array, int thread_index)
 4299 {
 4300     int ret = -1;
 4301     int i = 0;
 4302 
 4303     if (!defrag)
 4304         goto out;
 4305 
 4306     /* Wake up all migration threads */
 4307     pthread_mutex_lock(&defrag->dfq_mutex);
 4308     {
 4309         defrag->crawl_done = 1;
 4310 
 4311         pthread_cond_broadcast(&defrag->parallel_migration_cond);
 4312         pthread_cond_broadcast(&defrag->df_wakeup_thread);
 4313     }
 4314     pthread_mutex_unlock(&defrag->dfq_mutex);
 4315 
 4316     /*Wait for all the threads to complete their task*/
 4317     for (i = 0; i < thread_index; i++) {
 4318         pthread_join(tid_array[i], NULL);
 4319     }
 4320 
 4321     GF_FREE(tid_array);
 4322 
 4323     /* Cleanup the migration queue */
 4324     if (defrag->queue) {
 4325         gf_dirent_free(defrag->queue[0].df_entry);
 4326         INIT_LIST_HEAD(&(defrag->queue[0].list));
 4327     }
 4328 
 4329     GF_FREE(defrag->queue);
 4330 
 4331     ret = 0;
 4332 out:
 4333     return ret;
 4334 }
 4335 
 4336 int
 4337 gf_defrag_start_crawl(void *data)
 4338 {
 4339     xlator_t *this = NULL;
 4340     dht_conf_t *conf = NULL;
 4341     gf_defrag_info_t *defrag = NULL;
 4342     dict_t *fix_layout = NULL;
 4343     dict_t *migrate_data = NULL;
 4344     dict_t *status = NULL;
 4345     glusterfs_ctx_t *ctx = NULL;
 4346     call_frame_t *statfs_frame = NULL;
 4347     xlator_t *old_THIS = NULL;
 4348     int ret = -1;
 4349     loc_t loc = {
 4350         0,
 4351     };
 4352     struct iatt iatt = {
 4353         0,
 4354     };
 4355     struct iatt parent = {
 4356         0,
 4357     };
 4358     int thread_index = 0;
 4359     pthread_t *tid = NULL;
 4360     pthread_t filecnt_thread;
 4361     gf_boolean_t fc_thread_started = _gf_false;
 4362 
 4363     this = data;
 4364     if (!this)
 4365         goto exit;
 4366 
 4367     ctx = this->ctx;
 4368     if (!ctx)
 4369         goto exit;
 4370 
 4371     conf = this->private;
 4372     if (!conf)
 4373         goto exit;
 4374 
 4375     defrag = conf->defrag;
 4376     if (!defrag)
 4377         goto exit;
 4378 
 4379     gettimeofday(&defrag->start_time, NULL);
 4380     dht_build_root_inode(this, &defrag->root_inode);
 4381     if (!defrag->root_inode)
 4382         goto out;
 4383 
 4384     dht_build_root_loc(defrag->root_inode, &loc);
 4385 
 4386     /* fix-layout on '/' first */
 4387 
 4388     ret = syncop_lookup(this, &loc, &iatt, &parent, NULL, NULL);
 4389 
 4390     if (ret) {
 4391         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_REBALANCE_START_FAILED,
 4392                "Failed to start rebalance: look up on / failed");
 4393         ret = -1;
 4394         goto out;
 4395     }
 4396 
 4397     old_THIS = THIS;
 4398     THIS = this;
 4399 
 4400     statfs_frame = create_frame(this, this->ctx->pool);
 4401     if (!statfs_frame) {
 4402         gf_msg(this->name, GF_LOG_ERROR, DHT_MSG_NO_MEMORY, ENOMEM,
 4403                "Insufficient memory. Frame creation failed");
 4404         ret = -1;
 4405         goto out;
 4406     }
 4407 
 4408     /* async statfs update for honoring min-free-disk */
 4409     dht_get_du_info(statfs_frame, this, &loc);
 4410     THIS = old_THIS;
 4411 
 4412     fix_layout = dict_new();
 4413     if (!fix_layout) {
 4414         ret = -1;
 4415         goto out;
 4416     }
 4417 
 4418     /*
 4419      * Unfortunately, we can't do special xattrs (like fix.layout) and
 4420      * real ones in the same call currently, and changing it seems
 4421      * riskier than just doing two calls.
 4422      */
 4423 
 4424     gf_log(this->name, GF_LOG_INFO, "%s using commit hash %u", __func__,
 4425            conf->vol_commit_hash);
 4426 
 4427     ret = dict_set_uint32(fix_layout, conf->commithash_xattr_name,
 4428                           conf->vol_commit_hash);
 4429     if (ret) {
 4430         gf_log(this->name, GF_LOG_ERROR, "Failed to set %s",
 4431                conf->commithash_xattr_name);
 4432         defrag->total_failures++;
 4433         ret = -1;
 4434         goto out;
 4435     }
 4436 
 4437     ret = syncop_setxattr(this, &loc, fix_layout, 0, NULL, NULL);
 4438     if (ret) {
 4439         gf_log(this->name, GF_LOG_ERROR,
 4440                "Failed to set commit hash on %s. "
 4441                "Rebalance cannot proceed.",
 4442                loc.path);
 4443         defrag->total_failures++;
 4444         ret = -1;
 4445         goto out;
 4446     }
 4447 
 4448     /* We now return to our regularly scheduled program. */
 4449 
 4450     ret = dict_set_str(fix_layout, GF_XATTR_FIX_LAYOUT_KEY, "yes");
 4451     if (ret) {
 4452         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_REBALANCE_START_FAILED,
 4453                "Failed to start rebalance:"
 4454                "Failed to set dictionary value: key = %s",
 4455                GF_XATTR_FIX_LAYOUT_KEY);
 4456         defrag->total_failures++;
 4457         ret = -1;
 4458         goto out;
 4459     }
 4460 
 4461     defrag->new_commit_hash = conf->vol_commit_hash;
 4462 
 4463     ret = syncop_setxattr(this, &loc, fix_layout, 0, NULL, NULL);
 4464     if (ret) {
 4465         gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_REBALANCE_FAILED,
 4466                "fix layout on %s failed", loc.path);
 4467         defrag->total_failures++;
 4468         ret = -1;
 4469         goto out;
 4470     }
 4471 
 4472     if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
 4473         /* We need to migrate files */
 4474 
 4475         migrate_data = dict_new();
 4476         if (!migrate_data) {
 4477             defrag->total_failures++;
 4478             ret = -1;
 4479             goto out;
 4480         }
 4481         ret = dict_set_str(
 4482             migrate_data, GF_XATTR_FILE_MIGRATE_KEY,
 4483             (defrag->cmd == GF_DEFRAG_CMD_START_FORCE) ? "force" : "non-force");
 4484         if (ret) {
 4485             defrag->total_failures++;
 4486             ret = -1;
 4487             goto out;
 4488         }
 4489 
 4490         ret = dht_init_local_subvols_and_nodeuuids(this, conf, &loc);
 4491         if (ret) {
 4492             ret = -1;
 4493             goto out;
 4494         }
 4495 
 4496         /* Initialise the structures required for parallel migration */
 4497         ret = gf_defrag_parallel_migration_init(this, defrag, &tid,
 4498                                                 &thread_index);
 4499         if (ret) {
 4500             gf_msg(this->name, GF_LOG_ERROR, 0, 0, "Aborting rebalance.");
 4501             goto out;
 4502         }
 4503 
 4504         ret = gf_defrag_estimates_init(this, &loc, &filecnt_thread);
 4505         if (ret) {
 4506             /* Not a fatal error. Allow the rebalance to proceed*/
 4507             ret = 0;
 4508         } else {
 4509             fc_thread_started = _gf_true;
 4510         }
 4511     }
 4512 
 4513     ret = gf_defrag_fix_layout(this, defrag, &loc, fix_layout, migrate_data);
 4514     if (ret && ret != 2) {
 4515         defrag->total_failures++;
 4516         ret = -1;
 4517         goto out;
 4518     }
 4519 
 4520     if (ret != 2 &&
 4521         gf_defrag_settle_hash(this, defrag, &loc, fix_layout) != 0) {
 4522         defrag->total_failures++;
 4523         ret = -1;
 4524         goto out;
 4525     }
 4526 
 4527     gf_log("DHT", GF_LOG_INFO, "crawling file-system completed");
 4528 out:
 4529 
 4530     /* We are here means crawling the entire file system is done
 4531        or something failed. Set defrag->crawl_done flag to intimate
 4532        the migrator threads to exhaust the defrag->queue and terminate*/
 4533 
 4534     if (ret) {
 4535         defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
 4536     }
 4537 
 4538     gf_defrag_parallel_migration_cleanup(defrag, tid, thread_index);
 4539 
 4540     if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
 4541         (defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
 4542         defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;
 4543     }
 4544 
 4545     if (fc_thread_started) {
 4546         gf_defrag_estimates_cleanup(this, defrag, filecnt_thread);
 4547     }
 4548 
 4549     dht_send_rebalance_event(this, defrag->cmd, defrag->defrag_status);
 4550 
 4551     status = dict_new();
 4552     LOCK(&defrag->lock);
 4553     {
 4554         gf_defrag_status_get(conf, status);
 4555         if (ctx && ctx->notify)
 4556             ctx->notify(GF_EN_DEFRAG_STATUS, status);
 4557         if (status)
 4558             dict_unref(status);
 4559         defrag->is_exiting = 1;
 4560     }
 4561     UNLOCK(&defrag->lock);
 4562 
 4563     GF_FREE(defrag);
 4564     conf->defrag = NULL;
 4565 
 4566     if (migrate_data)
 4567         dict_unref(migrate_data);
 4568 
 4569     if (statfs_frame) {
 4570         STACK_DESTROY(statfs_frame->root);
 4571     }
 4572 exit:
 4573     return ret;
 4574 }
 4575 
 4576 static int
 4577 gf_defrag_done(int ret, call_frame_t *sync_frame, void *data)
 4578 {
 4579     gf_listener_stop(sync_frame->this);
 4580 
 4581     STACK_DESTROY(sync_frame->root);
 4582     kill(getpid(), SIGTERM);
 4583     return 0;
 4584 }
 4585 
 4586 void *
 4587 gf_defrag_start(void *data)
 4588 {
 4589     int ret = -1;
 4590     call_frame_t *frame = NULL;
 4591     dht_conf_t *conf = NULL;
 4592     gf_defrag_info_t *defrag = NULL;
 4593     xlator_t *this = NULL;
 4594     xlator_t *old_THIS = NULL;
 4595 
 4596     this = data;
 4597     conf = this->private;
 4598     if (!conf)
 4599         goto out;
 4600 
 4601     defrag = conf->defrag;
 4602     if (!defrag)
 4603         goto out;
 4604 
 4605     frame = create_frame(this, this->ctx->pool);
 4606     if (!frame)
 4607         goto out;
 4608 
 4609     frame->root->pid = GF_CLIENT_PID_DEFRAG;
 4610 
 4611     defrag->pid = frame->root->pid;
 4612 
 4613     defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
 4614 
 4615     old_THIS = THIS;
 4616     THIS = this;
 4617     ret = synctask_new(this->ctx->env, gf_defrag_start_crawl, gf_defrag_done,
 4618                        frame, this);
 4619 
 4620     if (ret)
 4621         gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_REBALANCE_START_FAILED,
 4622                "Could not create task for rebalance");
 4623     THIS = old_THIS;
 4624 out:
 4625     return NULL;
 4626 }
 4627 
 4628 uint64_t
 4629 gf_defrag_get_estimates_based_on_size(dht_conf_t *conf)
 4630 {
 4631     gf_defrag_info_t *defrag = NULL;
 4632     double rate_processed = 0;
 4633     uint64_t total_processed = 0;
 4634     uint64_t tmp_count = 0;
 4635     uint64_t time_to_complete = 0;
 4636     struct timeval now = {
 4637         0,
 4638     };
 4639     double elapsed = 0;
 4640 
 4641     defrag = conf->defrag;
 4642 
 4643     if (!g_totalsize)
 4644         goto out;
 4645 
 4646     gettimeofday(&now, NULL);
 4647     elapsed = now.tv_sec - defrag->start_time.tv_sec;
 4648 
 4649     /* Don't calculate the estimates for the first 10 minutes.
 4650      * It is unlikely to be accurate and estimates are not required
 4651      * if the process finishes in less than 10 mins.
 4652      */
 4653 
 4654     if (elapsed < ESTIMATE_START_INTERVAL) {
 4655         gf_msg(THIS->name, GF_LOG_INFO, 0, 0,
 4656                "Rebalance estimates will not be available for the "
 4657                "first %d seconds.",
 4658                ESTIMATE_START_INTERVAL);
 4659 
 4660         goto out;
 4661     }
 4662 
 4663     total_processed = defrag->size_processed;
 4664 
 4665     /* rate at which files processed */
 4666     rate_processed = (total_processed) / elapsed;
 4667 
 4668     tmp_count = g_totalsize;
 4669 
 4670     if (rate_processed) {
 4671         time_to_complete = (tmp_count) / rate_processed;
 4672 
 4673     } else {
 4674         gf_msg(THIS->name, GF_LOG_ERROR, 0, 0,
 4675                "Unable to calculate estimated time for rebalance");
 4676     }
 4677 
 4678     gf_log(THIS->name, GF_LOG_INFO,
 4679            "TIME: (size) total_processed=%" PRIu64 " tmp_cnt = %" PRIu64
 4680            ","
 4681            "rate_processed=%f, elapsed = %f",
 4682            total_processed, tmp_count, rate_processed, elapsed);
 4683 
 4684 out:
 4685     return time_to_complete;
 4686 }
 4687 
 4688 int
 4689 gf_defrag_status_get(dht_conf_t *conf, dict_t *dict)
 4690 {
 4691     int ret = 0;
 4692     uint64_t files = 0;
 4693     uint64_t size = 0;
 4694     uint64_t lookup = 0;
 4695     uint64_t failures = 0;
 4696     uint64_t skipped = 0;
 4697     uint64_t promoted = 0;
 4698     uint64_t demoted = 0;
 4699     char *status = "";
 4700     double elapsed = 0;
 4701     struct timeval end = {
 4702         0,
 4703     };
 4704     uint64_t time_to_complete = 0;
 4705     uint64_t time_left = 0;
 4706     gf_defrag_info_t *defrag = conf->defrag;
 4707 
 4708     if (!defrag)
 4709         goto out;
 4710 
 4711     ret = 0;
 4712     if (defrag->defrag_status == GF_DEFRAG_STATUS_NOT_STARTED)
 4713         goto out;
 4714 
 4715     files = defrag->total_files;
 4716     size = defrag->total_data;
 4717     lookup = defrag->num_files_lookedup;
 4718     failures = defrag->total_failures;
 4719     skipped = defrag->skipped;
 4720     promoted = defrag->total_files_promoted;
 4721     demoted = defrag->total_files_demoted;
 4722 
 4723     gettimeofday(&end, NULL);
 4724 
 4725     elapsed = end.tv_sec - defrag->start_time.tv_sec;
 4726 
 4727     /* The rebalance is still in progress */
 4728 
 4729     if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
 4730         (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED)) {
 4731         time_to_complete = gf_defrag_get_estimates_based_on_size(conf);
 4732 
 4733         if (time_to_complete && (time_to_complete > elapsed))
 4734             time_left = time_to_complete - elapsed;
 4735 
 4736         gf_log(THIS->name, GF_LOG_INFO,
 4737                "TIME: Estimated total time to complete (size)= %" PRIu64
 4738                " seconds, seconds left = %" PRIu64 "",
 4739                time_to_complete, time_left);
 4740     }
 4741 
 4742     if (!dict)
 4743         goto log;
 4744 
 4745     ret = dict_set_uint64(dict, "promoted", promoted);
 4746     if (ret)
 4747         gf_log(THIS->name, GF_LOG_WARNING, "failed to set promoted count");
 4748 
 4749     ret = dict_set_uint64(dict, "demoted", demoted);
 4750     if (ret)
 4751         gf_log(THIS->name, GF_LOG_WARNING, "failed to set demoted count");
 4752 
 4753     ret = dict_set_uint64(dict, "files", files);
 4754     if (ret)
 4755         gf_log(THIS->name, GF_LOG_WARNING, "failed to set file count");
 4756 
 4757     ret = dict_set_uint64(dict, "size", size);
 4758     if (ret)
 4759         gf_log(THIS->name, GF_LOG_WARNING, "failed to set size of xfer");
 4760 
 4761     ret = dict_set_uint64(dict, "lookups", lookup);
 4762     if (ret)
 4763         gf_log(THIS->name, GF_LOG_WARNING, "failed to set lookedup file count");
 4764 
 4765     ret = dict_set_int32(dict, "status", defrag->defrag_status);
 4766     if (ret)
 4767         gf_log(THIS->name, GF_LOG_WARNING, "failed to set status");
 4768 
 4769     ret = dict_set_double(dict, "run-time", elapsed);
 4770     if (ret)
 4771         gf_log(THIS->name, GF_LOG_WARNING, "failed to set run-time");
 4772 
 4773     ret = dict_set_uint64(dict, "failures", failures);
 4774     if (ret)
 4775         gf_log(THIS->name, GF_LOG_WARNING, "failed to set failure count");
 4776 
 4777     ret = dict_set_uint64(dict, "skipped", skipped);
 4778     if (ret)
 4779         gf_log(THIS->name, GF_LOG_WARNING, "failed to set skipped file count");
 4780 
 4781     ret = dict_set_uint64(dict, "time-left", time_left);
 4782     if (ret)
 4783         gf_log(THIS->name, GF_LOG_WARNING, "failed to set time-left");
 4784 
 4785 log:
 4786     switch (defrag->defrag_status) {
 4787         case GF_DEFRAG_STATUS_NOT_STARTED:
 4788             status = "not started";
 4789             break;
 4790         case GF_DEFRAG_STATUS_STARTED:
 4791             status = "in progress";
 4792             break;
 4793         case GF_DEFRAG_STATUS_STOPPED:
 4794             status = "stopped";
 4795             break;
 4796         case GF_DEFRAG_STATUS_COMPLETE:
 4797             status = "completed";
 4798             break;
 4799         case GF_DEFRAG_STATUS_FAILED:
 4800             status = "failed";
 4801             break;
 4802         default:
 4803             break;
 4804     }
 4805 
 4806     gf_msg(THIS->name, GF_LOG_INFO, 0, DHT_MSG_REBALANCE_STATUS,
 4807            "Rebalance is %s. Time taken is %.2f secs", status, elapsed);
 4808     gf_msg(THIS->name, GF_LOG_INFO, 0, DHT_MSG_REBALANCE_STATUS,
 4809            "Files migrated: %" PRIu64 ", size: %" PRIu64 ", lookups: %" PRIu64
 4810            ", failures: %" PRIu64
 4811            ", skipped: "
 4812            "%" PRIu64,
 4813            files, size, lookup, failures, skipped);
 4814 out:
 4815     return 0;
 4816 }
 4817 
 4818 int
 4819 gf_defrag_stop(dht_conf_t *conf, gf_defrag_status_t status, dict_t *output)
 4820 {
 4821     /* TODO: set a variable 'stop_defrag' here, it should be checked
 4822        in defrag loop */
 4823     int ret = -1;
 4824     gf_defrag_info_t *defrag = conf->defrag;
 4825 
 4826     GF_ASSERT(defrag);
 4827 
 4828     if (defrag->defrag_status == GF_DEFRAG_STATUS_NOT_STARTED) {
 4829         goto out;
 4830     }
 4831 
 4832     gf_msg("", GF_LOG_INFO, 0, DHT_MSG_REBALANCE_STOPPED,
 4833            "Received stop command on rebalance");
 4834     defrag->defrag_status = status;
 4835 
 4836     if (output)
 4837         gf_defrag_status_get(conf, output);
 4838     ret = 0;
 4839 out:
 4840     gf_msg_debug("", 0, "Returning %d", ret);
 4841     return ret;
 4842 }