"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/bit-rot/src/bitd/bit-rot-scrub.c" (16 Sep 2020, 56991 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "bit-rot-scrub.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 
   11 #include <math.h>
   12 #include <ctype.h>
   13 #include <sys/uio.h>
   14 
   15 #include <glusterfs/glusterfs.h>
   16 #include <glusterfs/logging.h>
   17 #include <glusterfs/common-utils.h>
   18 
   19 #include "bit-rot-scrub.h"
   20 #include <pthread.h>
   21 #include "bit-rot-bitd-messages.h"
   22 #include "bit-rot-scrub-status.h"
   23 #include <glusterfs/events.h>
   24 
   25 struct br_scrubbers {
   26     pthread_t scrubthread;
   27 
   28     struct list_head list;
   29 };
   30 
   31 struct br_fsscan_entry {
   32     void *data;
   33 
   34     loc_t parent;
   35 
   36     gf_dirent_t *entry;
   37 
   38     struct br_scanfs *fsscan; /* backpointer to subvolume scanner */
   39 
   40     struct list_head list;
   41 };
   42 
   43 /**
   44  * fetch signature extended attribute from an object's fd.
   45  * NOTE: On success @xattr is not unref'd as @sign points
   46  * to the dictionary value.
   47  */
   48 static int32_t
   49 bitd_fetch_signature(xlator_t *this, br_child_t *child, fd_t *fd,
   50                      dict_t **xattr, br_isignature_out_t **sign)
   51 {
   52     int32_t ret = -1;
   53 
   54     ret = syncop_fgetxattr(child->xl, fd, xattr, GLUSTERFS_GET_OBJECT_SIGNATURE,
   55                            NULL, NULL);
   56     if (ret < 0) {
   57         br_log_object(this, "fgetxattr", fd->inode->gfid, -ret);
   58         goto out;
   59     }
   60 
   61     ret = dict_get_ptr(*xattr, GLUSTERFS_GET_OBJECT_SIGNATURE, (void **)sign);
   62     if (ret) {
   63         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_GET_SIGN_FAILED,
   64                "failed to extract signature info [GFID: %s]",
   65                uuid_utoa(fd->inode->gfid));
   66         goto unref_dict;
   67     }
   68 
   69     return 0;
   70 
   71 unref_dict:
   72     dict_unref(*xattr);
   73 out:
   74     return -1;
   75 }
   76 
   77 /**
   78  * POST COMPUTE CHECK
   79  *
   80  * Checks to be performed before verifying calculated signature
   81  * Object is skipped if:
   82  *  - has stale signature
   83  *  - mismatches versions caches in pre-compute check
   84  */
   85 
   86 int32_t
   87 bitd_scrub_post_compute_check(xlator_t *this, br_child_t *child, fd_t *fd,
   88                               unsigned long version,
   89                               br_isignature_out_t **signature,
   90                               br_scrub_stats_t *scrub_stat,
   91                               gf_boolean_t skip_stat)
   92 {
   93     int32_t ret = 0;
   94     size_t signlen = 0;
   95     dict_t *xattr = NULL;
   96     br_isignature_out_t *signptr = NULL;
   97 
   98     ret = bitd_fetch_signature(this, child, fd, &xattr, &signptr);
   99     if (ret < 0) {
  100         if (!skip_stat)
  101             br_inc_unsigned_file_count(scrub_stat);
  102         goto out;
  103     }
  104 
  105     /**
  106      * Either the object got dirtied during the time the signature was
  107      * calculated OR the version we saved during pre-compute check does
  108      * not match now, implying that the object got dirtied and signed in
  109      * between scrubs pre & post compute checks (checksum window).
  110      *
  111      * The log entry looks pretty ugly, but helps in debugging..
  112      */
  113     if (signptr->stale || (signptr->version != version)) {
  114         if (!skip_stat)
  115             br_inc_unsigned_file_count(scrub_stat);
  116         gf_msg_debug(this->name, 0,
  117                      "<STAGE: POST> Object [GFID: %s] "
  118                      "either has a stale signature OR underwent "
  119                      "signing during checksumming {Stale: %d | "
  120                      "Version: %lu,%lu}",
  121                      uuid_utoa(fd->inode->gfid), (signptr->stale) ? 1 : 0,
  122                      version, signptr->version);
  123         ret = -1;
  124         goto unref_dict;
  125     }
  126 
  127     signlen = signptr->signaturelen;
  128     *signature = GF_MALLOC(sizeof(br_isignature_out_t) + signlen,
  129                            gf_common_mt_char);
  130 
  131     (void)memcpy(*signature, signptr, sizeof(br_isignature_out_t) + signlen);
  132 
  133     (*signature)->signaturelen = signlen;
  134 
  135 unref_dict:
  136     dict_unref(xattr);
  137 out:
  138     return ret;
  139 }
  140 
  141 static int32_t
  142 bitd_signature_staleness(xlator_t *this, br_child_t *child, fd_t *fd,
  143                          int *stale, unsigned long *version,
  144                          br_scrub_stats_t *scrub_stat, gf_boolean_t skip_stat)
  145 {
  146     int32_t ret = -1;
  147     dict_t *xattr = NULL;
  148     br_isignature_out_t *signptr = NULL;
  149 
  150     ret = bitd_fetch_signature(this, child, fd, &xattr, &signptr);
  151     if (ret < 0) {
  152         if (!skip_stat)
  153             br_inc_unsigned_file_count(scrub_stat);
  154         goto out;
  155     }
  156 
  157     /**
  158      * save version for validation in post compute stage
  159      * c.f. bitd_scrub_post_compute_check()
  160      */
  161     *stale = signptr->stale ? 1 : 0;
  162     *version = signptr->version;
  163 
  164     dict_unref(xattr);
  165 
  166 out:
  167     return ret;
  168 }
  169 
  170 /**
  171  * PRE COMPUTE CHECK
  172  *
  173  * Checks to be performed before initiating object signature calculation.
  174  * An object is skipped if:
  175  *  - it's already marked corrupted
  176  *  - has stale signature
  177  */
  178 int32_t
  179 bitd_scrub_pre_compute_check(xlator_t *this, br_child_t *child, fd_t *fd,
  180                              unsigned long *version,
  181                              br_scrub_stats_t *scrub_stat,
  182                              gf_boolean_t skip_stat)
  183 {
  184     int stale = 0;
  185     int32_t ret = -1;
  186 
  187     if (bitd_is_bad_file(this, child, NULL, fd)) {
  188         gf_msg(this->name, GF_LOG_WARNING, 0, BRB_MSG_SKIP_OBJECT,
  189                "Object [GFID: %s] is marked corrupted, skipping..",
  190                uuid_utoa(fd->inode->gfid));
  191         goto out;
  192     }
  193 
  194     ret = bitd_signature_staleness(this, child, fd, &stale, version, scrub_stat,
  195                                    skip_stat);
  196     if (!ret && stale) {
  197         if (!skip_stat)
  198             br_inc_unsigned_file_count(scrub_stat);
  199         gf_msg_debug(this->name, 0,
  200                      "<STAGE: PRE> Object [GFID: %s] "
  201                      "has stale signature",
  202                      uuid_utoa(fd->inode->gfid));
  203         ret = -1;
  204     }
  205 
  206 out:
  207     return ret;
  208 }
  209 
  210 /* static int */
  211 int
  212 bitd_compare_ckum(xlator_t *this, br_isignature_out_t *sign, unsigned char *md,
  213                   inode_t *linked_inode, gf_dirent_t *entry, fd_t *fd,
  214                   br_child_t *child, loc_t *loc)
  215 {
  216     int ret = -1;
  217     dict_t *xattr = NULL;
  218 
  219     GF_VALIDATE_OR_GOTO("bit-rot", this, out);
  220     GF_VALIDATE_OR_GOTO(this->name, sign, out);
  221     GF_VALIDATE_OR_GOTO(this->name, fd, out);
  222     GF_VALIDATE_OR_GOTO(this->name, child, out);
  223     GF_VALIDATE_OR_GOTO(this->name, linked_inode, out);
  224     GF_VALIDATE_OR_GOTO(this->name, md, out);
  225     GF_VALIDATE_OR_GOTO(this->name, entry, out);
  226 
  227     if (strncmp(sign->signature, (char *)md, sign->signaturelen) == 0) {
  228         gf_msg_debug(this->name, 0,
  229                      "%s [GFID: %s | Brick: %s] "
  230                      "matches calculated checksum",
  231                      loc->path, uuid_utoa(linked_inode->gfid),
  232                      child->brick_path);
  233         return 0;
  234     }
  235 
  236     gf_msg(this->name, GF_LOG_DEBUG, 0, BRB_MSG_CHECKSUM_MISMATCH,
  237            "Object checksum mismatch: %s [GFID: %s | Brick: %s]", loc->path,
  238            uuid_utoa(linked_inode->gfid), child->brick_path);
  239     gf_msg(this->name, GF_LOG_ALERT, 0, BRB_MSG_CHECKSUM_MISMATCH,
  240            "CORRUPTION DETECTED: Object %s {Brick: %s | GFID: %s}", loc->path,
  241            child->brick_path, uuid_utoa(linked_inode->gfid));
  242 
  243     /* Perform bad-file marking */
  244     xattr = dict_new();
  245     if (!xattr) {
  246         ret = -1;
  247         goto out;
  248     }
  249 
  250     ret = dict_set_int32(xattr, BITROT_OBJECT_BAD_KEY, _gf_true);
  251     if (ret) {
  252         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_MARK_BAD_FILE,
  253                "Error setting bad-file marker for %s [GFID: %s | "
  254                "Brick: %s]",
  255                loc->path, uuid_utoa(linked_inode->gfid), child->brick_path);
  256         goto dictfree;
  257     }
  258 
  259     gf_msg(this->name, GF_LOG_ALERT, 0, BRB_MSG_MARK_CORRUPTED,
  260            "Marking"
  261            " %s [GFID: %s | Brick: %s] as corrupted..",
  262            loc->path, uuid_utoa(linked_inode->gfid), child->brick_path);
  263     gf_event(EVENT_BITROT_BAD_FILE, "gfid=%s;path=%s;brick=%s",
  264              uuid_utoa(linked_inode->gfid), loc->path, child->brick_path);
  265     ret = syncop_fsetxattr(child->xl, fd, xattr, 0, NULL, NULL);
  266     if (ret)
  267         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_MARK_BAD_FILE,
  268                "Error marking object %s [GFID: %s] as corrupted", loc->path,
  269                uuid_utoa(linked_inode->gfid));
  270 
  271 dictfree:
  272     dict_unref(xattr);
  273 out:
  274     return ret;
  275 }
  276 
  277 /**
  278  * "The Scrubber"
  279  *
  280  * Perform signature validation for a given object with the assumption
  281  * that the signature is SHA256 (because signer as of now _always_
  282  * signs with SHA256).
  283  */
  284 int
  285 br_scrubber_scrub_begin(xlator_t *this, struct br_fsscan_entry *fsentry)
  286 {
  287     int32_t ret = -1;
  288     fd_t *fd = NULL;
  289     loc_t loc = {
  290         0,
  291     };
  292     struct iatt iatt = {
  293         0,
  294     };
  295     struct iatt parent_buf = {
  296         0,
  297     };
  298     pid_t pid = 0;
  299     br_child_t *child = NULL;
  300     unsigned char *md = NULL;
  301     inode_t *linked_inode = NULL;
  302     br_isignature_out_t *sign = NULL;
  303     unsigned long signedversion = 0;
  304     gf_dirent_t *entry = NULL;
  305     br_private_t *priv = NULL;
  306     loc_t *parent = NULL;
  307     gf_boolean_t skip_stat = _gf_false;
  308     uuid_t shard_root_gfid = {
  309         0,
  310     };
  311 
  312     GF_VALIDATE_OR_GOTO("bit-rot", fsentry, out);
  313 
  314     entry = fsentry->entry;
  315     parent = &fsentry->parent;
  316     child = fsentry->data;
  317 
  318     priv = this->private;
  319 
  320     GF_VALIDATE_OR_GOTO("bit-rot", entry, out);
  321     GF_VALIDATE_OR_GOTO("bit-rot", parent, out);
  322     GF_VALIDATE_OR_GOTO("bit-rot", child, out);
  323     GF_VALIDATE_OR_GOTO("bit-rot", priv, out);
  324 
  325     pid = GF_CLIENT_PID_SCRUB;
  326 
  327     ret = br_prepare_loc(this, child, parent, entry, &loc);
  328     if (!ret)
  329         goto out;
  330 
  331     syncopctx_setfspid(&pid);
  332 
  333     ret = syncop_lookup(child->xl, &loc, &iatt, &parent_buf, NULL, NULL);
  334     if (ret) {
  335         br_log_object_path(this, "lookup", loc.path, -ret);
  336         goto out;
  337     }
  338 
  339     linked_inode = inode_link(loc.inode, parent->inode, loc.name, &iatt);
  340     if (linked_inode)
  341         inode_lookup(linked_inode);
  342 
  343     gf_msg_debug(this->name, 0, "Scrubbing object %s [GFID: %s]", entry->d_name,
  344                  uuid_utoa(linked_inode->gfid));
  345 
  346     if (iatt.ia_type != IA_IFREG) {
  347         gf_msg_debug(this->name, 0, "%s is not a regular file", entry->d_name);
  348         ret = 0;
  349         goto unref_inode;
  350     }
  351 
  352     if (IS_DHT_LINKFILE_MODE((&iatt))) {
  353         gf_msg_debug(this->name, 0, "%s is a dht sticky bit file",
  354                      entry->d_name);
  355         ret = 0;
  356         goto unref_inode;
  357     }
  358 
  359     /* skip updating scrub statistics for shard entries */
  360     gf_uuid_parse(SHARD_ROOT_GFID, shard_root_gfid);
  361     if (gf_uuid_compare(loc.pargfid, shard_root_gfid) == 0)
  362         skip_stat = _gf_true;
  363 
  364     /**
  365      * open() an fd for subsequent operations
  366      */
  367     fd = fd_create(linked_inode, 0);
  368     if (!fd) {
  369         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_FD_CREATE_FAILED,
  370                "failed to create fd for inode %s",
  371                uuid_utoa(linked_inode->gfid));
  372         goto unref_inode;
  373     }
  374 
  375     ret = syncop_open(child->xl, &loc, O_RDWR, fd, NULL, NULL);
  376     if (ret) {
  377         br_log_object(this, "open", linked_inode->gfid, -ret);
  378         ret = -1;
  379         goto unrefd;
  380     }
  381 
  382     fd_bind(fd);
  383 
  384     /**
  385      * perform pre compute checks before initiating checksum
  386      * computation
  387      *  - presence of bad object
  388      *  - signature staleness
  389      */
  390     ret = bitd_scrub_pre_compute_check(this, child, fd, &signedversion,
  391                                        &priv->scrub_stat, skip_stat);
  392     if (ret)
  393         goto unrefd; /* skip this object */
  394 
  395     /* if all's good, proceed to calculate the hash */
  396     md = GF_MALLOC(SHA256_DIGEST_LENGTH, gf_common_mt_char);
  397     if (!md)
  398         goto unrefd;
  399 
  400     ret = br_calculate_obj_checksum(md, child, fd, &iatt);
  401     if (ret) {
  402         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_CALC_ERROR,
  403                "error calculating hash for object [GFID: %s]",
  404                uuid_utoa(fd->inode->gfid));
  405         ret = -1;
  406         goto free_md;
  407     }
  408 
  409     /**
  410      * perform post compute checks as an object's signature may have
  411      * become stale while scrubber calculated checksum.
  412      */
  413     ret = bitd_scrub_post_compute_check(this, child, fd, signedversion, &sign,
  414                                         &priv->scrub_stat, skip_stat);
  415     if (ret)
  416         goto free_md;
  417 
  418     ret = bitd_compare_ckum(this, sign, md, linked_inode, entry, fd, child,
  419                             &loc);
  420 
  421     if (!skip_stat)
  422         br_inc_scrubbed_file(&priv->scrub_stat);
  423 
  424     GF_FREE(sign); /* allocated on post-compute */
  425 
  426     /** fd_unref() takes care of closing fd.. like syncop_close() */
  427 
  428 free_md:
  429     GF_FREE(md);
  430 unrefd:
  431     fd_unref(fd);
  432 unref_inode:
  433     inode_unref(linked_inode);
  434 out:
  435     loc_wipe(&loc);
  436     return ret;
  437 }
  438 
  439 static void
  440 _br_lock_cleaner(void *arg)
  441 {
  442     pthread_mutex_t *mutex = arg;
  443 
  444     pthread_mutex_unlock(mutex);
  445 }
  446 
  447 static void
  448 wait_for_scrubbing(xlator_t *this, struct br_scanfs *fsscan)
  449 {
  450     br_private_t *priv = NULL;
  451     struct br_scrubber *fsscrub = NULL;
  452 
  453     priv = this->private;
  454     fsscrub = &priv->fsscrub;
  455 
  456     pthread_cleanup_push(_br_lock_cleaner, &fsscan->waitlock);
  457     pthread_mutex_lock(&fsscan->waitlock);
  458     {
  459         pthread_cleanup_push(_br_lock_cleaner, &fsscrub->mutex);
  460         pthread_mutex_lock(&fsscrub->mutex);
  461         {
  462             list_replace_init(&fsscan->queued, &fsscan->ready);
  463 
  464             /* wake up scrubbers */
  465             pthread_cond_broadcast(&fsscrub->cond);
  466         }
  467         pthread_mutex_unlock(&fsscrub->mutex);
  468         pthread_cleanup_pop(0);
  469 
  470         while (fsscan->entries != 0)
  471             pthread_cond_wait(&fsscan->waitcond, &fsscan->waitlock);
  472     }
  473     pthread_mutex_unlock(&fsscan->waitlock);
  474     pthread_cleanup_pop(0);
  475 }
  476 
  477 static void
  478 _br_fsscan_inc_entry_count(struct br_scanfs *fsscan)
  479 {
  480     fsscan->entries++;
  481 }
  482 
  483 static void
  484 _br_fsscan_dec_entry_count(struct br_scanfs *fsscan)
  485 {
  486     if (--fsscan->entries == 0) {
  487         pthread_mutex_lock(&fsscan->waitlock);
  488         {
  489             pthread_cond_signal(&fsscan->waitcond);
  490         }
  491         pthread_mutex_unlock(&fsscan->waitlock);
  492     }
  493 }
  494 
  495 static void
  496 _br_fsscan_collect_entry(struct br_scanfs *fsscan,
  497                          struct br_fsscan_entry *fsentry)
  498 {
  499     list_add_tail(&fsentry->list, &fsscan->queued);
  500     _br_fsscan_inc_entry_count(fsscan);
  501 }
  502 
  503 #define NR_ENTRIES (1 << 7) /* ..bulk scrubbing */
  504 
  505 int
  506 br_fsscanner_handle_entry(xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
  507                           void *data)
  508 {
  509     int32_t ret = -1;
  510     int scrub = 0;
  511     br_child_t *child = NULL;
  512     xlator_t *this = NULL;
  513     struct br_scanfs *fsscan = NULL;
  514     struct br_fsscan_entry *fsentry = NULL;
  515 
  516     GF_VALIDATE_OR_GOTO("bit-rot", subvol, error_return);
  517     GF_VALIDATE_OR_GOTO("bit-rot", data, error_return);
  518 
  519     child = data;
  520     this = child->this;
  521     fsscan = &child->fsscan;
  522 
  523     _mask_cancellation();
  524 
  525     fsentry = GF_CALLOC(1, sizeof(*fsentry), gf_br_mt_br_fsscan_entry_t);
  526     if (!fsentry)
  527         goto error_return;
  528 
  529     {
  530         fsentry->data = data;
  531         fsentry->fsscan = &child->fsscan;
  532 
  533         /* copy parent loc */
  534         ret = loc_copy(&fsentry->parent, parent);
  535         if (ret)
  536             goto dealloc;
  537 
  538         /* copy child entry */
  539         fsentry->entry = entry_copy(entry);
  540         if (!fsentry->entry)
  541             goto locwipe;
  542 
  543         INIT_LIST_HEAD(&fsentry->list);
  544     }
  545 
  546     LOCK(&fsscan->entrylock);
  547     {
  548         _br_fsscan_collect_entry(fsscan, fsentry);
  549 
  550         /**
  551          * need not be a equality check as entries may be pushed
  552          * back onto the scanned queue when thread(s) are cleaned.
  553          */
  554         if (fsscan->entries >= NR_ENTRIES)
  555             scrub = 1;
  556     }
  557     UNLOCK(&fsscan->entrylock);
  558 
  559     _unmask_cancellation();
  560 
  561     if (scrub)
  562         wait_for_scrubbing(this, fsscan);
  563 
  564     return 0;
  565 
  566 locwipe:
  567     loc_wipe(&fsentry->parent);
  568 dealloc:
  569     GF_FREE(fsentry);
  570 error_return:
  571     return -1;
  572 }
  573 
  574 int32_t
  575 br_fsscan_deactivate(xlator_t *this)
  576 {
  577     int ret = 0;
  578     br_private_t *priv = NULL;
  579     br_scrub_state_t nstate = 0;
  580     struct br_monitor *scrub_monitor = NULL;
  581 
  582     priv = this->private;
  583     scrub_monitor = &priv->scrub_monitor;
  584 
  585     ret = gf_tw_del_timer(priv->timer_wheel, scrub_monitor->timer);
  586     if (ret == 0) {
  587         nstate = BR_SCRUB_STATE_STALLED;
  588         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
  589                "Volume is under active scrubbing. Pausing scrub..");
  590     } else {
  591         nstate = BR_SCRUB_STATE_PAUSED;
  592         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
  593                "Scrubber paused");
  594     }
  595 
  596     _br_monitor_set_scrub_state(scrub_monitor, nstate);
  597 
  598     return 0;
  599 }
  600 
  601 static void
  602 br_scrubber_log_time(xlator_t *this, const char *sfx)
  603 {
  604     char timestr[1024] = {
  605         0,
  606     };
  607     struct timeval tv = {
  608         0,
  609     };
  610     br_private_t *priv = NULL;
  611 
  612     priv = this->private;
  613 
  614     gettimeofday(&tv, NULL);
  615     gf_time_fmt(timestr, sizeof(timestr), tv.tv_sec, gf_timefmt_FT);
  616 
  617     if (strcasecmp(sfx, "started") == 0) {
  618         br_update_scrub_start_time(&priv->scrub_stat, &tv);
  619         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_START,
  620                "Scrubbing %s at %s", sfx, timestr);
  621     } else {
  622         br_update_scrub_finish_time(&priv->scrub_stat, timestr, &tv);
  623         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_FINISH,
  624                "Scrubbing %s at %s", sfx, timestr);
  625     }
  626 }
  627 
  628 static void
  629 br_fsscanner_log_time(xlator_t *this, br_child_t *child, const char *sfx)
  630 {
  631     char timestr[1024] = {
  632         0,
  633     };
  634     struct timeval tv = {
  635         0,
  636     };
  637 
  638     gettimeofday(&tv, NULL);
  639     gf_time_fmt(timestr, sizeof(timestr), tv.tv_sec, gf_timefmt_FT);
  640 
  641     if (strcasecmp(sfx, "started") == 0) {
  642         gf_msg_debug(this->name, 0, "Scrubbing \"%s\" %s at %s",
  643                      child->brick_path, sfx, timestr);
  644     } else {
  645         gf_msg_debug(this->name, 0, "Scrubbing \"%s\" %s at %s",
  646                      child->brick_path, sfx, timestr);
  647     }
  648 }
  649 
  650 void
  651 br_child_set_scrub_state(br_child_t *child, gf_boolean_t state)
  652 {
  653     child->active_scrubbing = state;
  654 }
  655 
  656 static void
  657 br_fsscanner_wait_until_kicked(xlator_t *this, br_child_t *child)
  658 {
  659     br_private_t *priv = NULL;
  660     struct br_monitor *scrub_monitor = NULL;
  661 
  662     priv = this->private;
  663     scrub_monitor = &priv->scrub_monitor;
  664 
  665     pthread_cleanup_push(_br_lock_cleaner, &scrub_monitor->wakelock);
  666     pthread_mutex_lock(&scrub_monitor->wakelock);
  667     {
  668         while (!scrub_monitor->kick)
  669             pthread_cond_wait(&scrub_monitor->wakecond,
  670                               &scrub_monitor->wakelock);
  671 
  672         /* Child lock is to synchronize with disconnect events */
  673         pthread_cleanup_push(_br_lock_cleaner, &child->lock);
  674         pthread_mutex_lock(&child->lock);
  675         {
  676             scrub_monitor->active_child_count++;
  677             br_child_set_scrub_state(child, _gf_true);
  678         }
  679         pthread_mutex_unlock(&child->lock);
  680         pthread_cleanup_pop(0);
  681     }
  682     pthread_mutex_unlock(&scrub_monitor->wakelock);
  683     pthread_cleanup_pop(0);
  684 }
  685 
  686 static void
  687 br_scrubber_entry_control(xlator_t *this)
  688 {
  689     br_private_t *priv = NULL;
  690     struct br_monitor *scrub_monitor = NULL;
  691 
  692     priv = this->private;
  693     scrub_monitor = &priv->scrub_monitor;
  694 
  695     LOCK(&scrub_monitor->lock);
  696     {
  697         /* Move the state to BR_SCRUB_STATE_ACTIVE */
  698         if (scrub_monitor->state == BR_SCRUB_STATE_PENDING)
  699             scrub_monitor->state = BR_SCRUB_STATE_ACTIVE;
  700         br_scrubber_log_time(this, "started");
  701         priv->scrub_stat.scrub_running = 1;
  702     }
  703     UNLOCK(&scrub_monitor->lock);
  704 }
  705 
  706 static void
  707 br_scrubber_exit_control(xlator_t *this)
  708 {
  709     br_private_t *priv = NULL;
  710     struct br_monitor *scrub_monitor = NULL;
  711 
  712     priv = this->private;
  713     scrub_monitor = &priv->scrub_monitor;
  714 
  715     LOCK(&scrub_monitor->lock);
  716     {
  717         br_scrubber_log_time(this, "finished");
  718         priv->scrub_stat.scrub_running = 0;
  719 
  720         if (scrub_monitor->state == BR_SCRUB_STATE_ACTIVE) {
  721             (void)br_fsscan_activate(this);
  722         } else {
  723             UNLOCK(&scrub_monitor->lock);
  724             gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
  725                    "Volume waiting to get rescheduled..");
  726             return;
  727         }
  728     }
  729     UNLOCK(&scrub_monitor->lock);
  730 }
  731 
  732 static void
  733 br_fsscanner_entry_control(xlator_t *this, br_child_t *child)
  734 {
  735     br_fsscanner_log_time(this, child, "started");
  736 }
  737 
  738 static void
  739 br_fsscanner_exit_control(xlator_t *this, br_child_t *child)
  740 {
  741     br_private_t *priv = NULL;
  742     struct br_monitor *scrub_monitor = NULL;
  743 
  744     priv = this->private;
  745     scrub_monitor = &priv->scrub_monitor;
  746 
  747     if (!_br_is_child_connected(child)) {
  748         gf_msg(this->name, GF_LOG_WARNING, 0, BRB_MSG_SCRUB_INFO,
  749                "Brick [%s] disconnected while scrubbing. Scrubbing "
  750                "might be incomplete",
  751                child->brick_path);
  752     }
  753 
  754     br_fsscanner_log_time(this, child, "finished");
  755 
  756     pthread_cleanup_push(_br_lock_cleaner, &scrub_monitor->wakelock);
  757     pthread_mutex_lock(&scrub_monitor->wakelock);
  758     {
  759         scrub_monitor->active_child_count--;
  760         pthread_cleanup_push(_br_lock_cleaner, &child->lock);
  761         pthread_mutex_lock(&child->lock);
  762         {
  763             br_child_set_scrub_state(child, _gf_false);
  764         }
  765         pthread_mutex_unlock(&child->lock);
  766         pthread_cleanup_pop(0);
  767 
  768         if (scrub_monitor->active_child_count == 0) {
  769             /* The last child has finished scrubbing.
  770              * Set the kick to false and  wake up other
  771              * children who are waiting for the last
  772              * child to complete scrubbing.
  773              */
  774             scrub_monitor->kick = _gf_false;
  775             pthread_cond_broadcast(&scrub_monitor->wakecond);
  776 
  777             /* Signal monitor thread waiting for the all
  778              * the children to finish scrubbing.
  779              */
  780             pthread_cleanup_push(_br_lock_cleaner, &scrub_monitor->donelock);
  781             pthread_mutex_lock(&scrub_monitor->donelock);
  782             {
  783                 scrub_monitor->done = _gf_true;
  784                 pthread_cond_signal(&scrub_monitor->donecond);
  785             }
  786             pthread_mutex_unlock(&scrub_monitor->donelock);
  787             pthread_cleanup_pop(0);
  788         } else {
  789             while (scrub_monitor->active_child_count)
  790                 pthread_cond_wait(&scrub_monitor->wakecond,
  791                                   &scrub_monitor->wakelock);
  792         }
  793     }
  794     pthread_mutex_unlock(&scrub_monitor->wakelock);
  795     pthread_cleanup_pop(0);
  796 }
  797 
  798 void *
  799 br_fsscanner(void *arg)
  800 {
  801     loc_t loc = {
  802         0,
  803     };
  804     br_child_t *child = NULL;
  805     xlator_t *this = NULL;
  806     struct br_scanfs *fsscan = NULL;
  807 
  808     child = arg;
  809     this = child->this;
  810     fsscan = &child->fsscan;
  811 
  812     THIS = this;
  813     loc.inode = child->table->root;
  814 
  815     while (1) {
  816         br_fsscanner_wait_until_kicked(this, child);
  817         {
  818             /* precursor for scrub */
  819             br_fsscanner_entry_control(this, child);
  820 
  821             /* scrub */
  822             (void)syncop_ftw(child->xl, &loc, GF_CLIENT_PID_SCRUB, child,
  823                              br_fsscanner_handle_entry);
  824             if (!list_empty(&fsscan->queued))
  825                 wait_for_scrubbing(this, fsscan);
  826 
  827             /* scrub exit criteria */
  828             br_fsscanner_exit_control(this, child);
  829         }
  830     }
  831 
  832     return NULL;
  833 }
  834 
  835 /**
  836  * Keep this routine extremely simple and do not ever try to acquire
  837  * child->lock here: it may lead to deadlock. Scrubber state is
  838  * modified in br_fsscanner(). An intermediate state change to pause
  839  * changes the scrub state to the _correct_ state by identifying a
  840  * non-pending timer.
  841  */
  842 void
  843 br_kickstart_scanner(struct gf_tw_timer_list *timer, void *data,
  844                      unsigned long calltime)
  845 {
  846     xlator_t *this = NULL;
  847     struct br_monitor *scrub_monitor = data;
  848     br_private_t *priv = NULL;
  849 
  850     THIS = this = scrub_monitor->this;
  851     priv = this->private;
  852 
  853     /* Reset scrub statistics */
  854     priv->scrub_stat.scrubbed_files = 0;
  855     priv->scrub_stat.unsigned_files = 0;
  856 
  857     /* Moves state from PENDING to ACTIVE */
  858     (void)br_scrubber_entry_control(this);
  859 
  860     /* kickstart scanning.. */
  861     pthread_mutex_lock(&scrub_monitor->wakelock);
  862     {
  863         scrub_monitor->kick = _gf_true;
  864         GF_ASSERT(scrub_monitor->active_child_count == 0);
  865         pthread_cond_broadcast(&scrub_monitor->wakecond);
  866     }
  867     pthread_mutex_unlock(&scrub_monitor->wakelock);
  868 
  869     return;
  870 }
  871 
  872 static uint32_t
  873 br_fsscan_calculate_delta(uint32_t times)
  874 {
  875     return times;
  876 }
  877 
  878 #define BR_SCRUB_ONDEMAND (1)
  879 #define BR_SCRUB_MINUTE (60)
  880 #define BR_SCRUB_HOURLY (60 * 60)
  881 #define BR_SCRUB_DAILY (1 * 24 * 60 * 60)
  882 #define BR_SCRUB_WEEKLY (7 * 24 * 60 * 60)
  883 #define BR_SCRUB_BIWEEKLY (14 * 24 * 60 * 60)
  884 #define BR_SCRUB_MONTHLY (30 * 24 * 60 * 60)
  885 
  886 static unsigned int
  887 br_fsscan_calculate_timeout(scrub_freq_t freq)
  888 {
  889     uint32_t timo = 0;
  890 
  891     switch (freq) {
  892         case BR_FSSCRUB_FREQ_MINUTE:
  893             timo = br_fsscan_calculate_delta(BR_SCRUB_MINUTE);
  894             break;
  895         case BR_FSSCRUB_FREQ_HOURLY:
  896             timo = br_fsscan_calculate_delta(BR_SCRUB_HOURLY);
  897             break;
  898         case BR_FSSCRUB_FREQ_DAILY:
  899             timo = br_fsscan_calculate_delta(BR_SCRUB_DAILY);
  900             break;
  901         case BR_FSSCRUB_FREQ_WEEKLY:
  902             timo = br_fsscan_calculate_delta(BR_SCRUB_WEEKLY);
  903             break;
  904         case BR_FSSCRUB_FREQ_BIWEEKLY:
  905             timo = br_fsscan_calculate_delta(BR_SCRUB_BIWEEKLY);
  906             break;
  907         case BR_FSSCRUB_FREQ_MONTHLY:
  908             timo = br_fsscan_calculate_delta(BR_SCRUB_MONTHLY);
  909             break;
  910         default:
  911             timo = 0;
  912     }
  913 
  914     return timo;
  915 }
  916 
  917 int32_t
  918 br_fsscan_schedule(xlator_t *this)
  919 {
  920     uint32_t timo = 0;
  921     br_private_t *priv = NULL;
  922     struct timeval tv = {
  923         0,
  924     };
  925     char timestr[1024] = {
  926         0,
  927     };
  928     struct br_scrubber *fsscrub = NULL;
  929     struct gf_tw_timer_list *timer = NULL;
  930     struct br_monitor *scrub_monitor = NULL;
  931 
  932     priv = this->private;
  933     fsscrub = &priv->fsscrub;
  934     scrub_monitor = &priv->scrub_monitor;
  935 
  936     (void)gettimeofday(&tv, NULL);
  937     scrub_monitor->boot = tv.tv_sec;
  938 
  939     timo = br_fsscan_calculate_timeout(fsscrub->frequency);
  940     if (timo == 0) {
  941         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_ZERO_TIMEOUT_BUG,
  942                "BUG: Zero schedule timeout");
  943         goto error_return;
  944     }
  945 
  946     scrub_monitor->timer = GF_CALLOC(1, sizeof(*scrub_monitor->timer),
  947                                      gf_br_stub_mt_br_scanner_freq_t);
  948     if (!scrub_monitor->timer)
  949         goto error_return;
  950 
  951     timer = scrub_monitor->timer;
  952     INIT_LIST_HEAD(&timer->entry);
  953 
  954     timer->data = scrub_monitor;
  955     timer->expires = timo;
  956     timer->function = br_kickstart_scanner;
  957 
  958     gf_tw_add_timer(priv->timer_wheel, timer);
  959     _br_monitor_set_scrub_state(scrub_monitor, BR_SCRUB_STATE_PENDING);
  960 
  961     gf_time_fmt(timestr, sizeof(timestr), (scrub_monitor->boot + timo),
  962                 gf_timefmt_FT);
  963     gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
  964            "Scrubbing is "
  965            "scheduled to run at %s",
  966            timestr);
  967 
  968     return 0;
  969 
  970 error_return:
  971     return -1;
  972 }
  973 
  974 int32_t
  975 br_fsscan_activate(xlator_t *this)
  976 {
  977     uint32_t timo = 0;
  978     char timestr[1024] = {
  979         0,
  980     };
  981     struct timeval now = {
  982         0,
  983     };
  984     br_private_t *priv = NULL;
  985     struct br_scrubber *fsscrub = NULL;
  986     struct br_monitor *scrub_monitor = NULL;
  987 
  988     priv = this->private;
  989     fsscrub = &priv->fsscrub;
  990     scrub_monitor = &priv->scrub_monitor;
  991 
  992     (void)gettimeofday(&now, NULL);
  993     timo = br_fsscan_calculate_timeout(fsscrub->frequency);
  994     if (timo == 0) {
  995         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_ZERO_TIMEOUT_BUG,
  996                "BUG: Zero schedule timeout");
  997         return -1;
  998     }
  999 
 1000     pthread_mutex_lock(&scrub_monitor->donelock);
 1001     {
 1002         scrub_monitor->done = _gf_false;
 1003     }
 1004     pthread_mutex_unlock(&scrub_monitor->donelock);
 1005 
 1006     gf_time_fmt(timestr, sizeof(timestr), (now.tv_sec + timo), gf_timefmt_FT);
 1007     (void)gf_tw_mod_timer(priv->timer_wheel, scrub_monitor->timer, timo);
 1008 
 1009     _br_monitor_set_scrub_state(scrub_monitor, BR_SCRUB_STATE_PENDING);
 1010     gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
 1011            "Scrubbing is "
 1012            "rescheduled to run at %s",
 1013            timestr);
 1014 
 1015     return 0;
 1016 }
 1017 
 1018 int32_t
 1019 br_fsscan_reschedule(xlator_t *this)
 1020 {
 1021     int32_t ret = 0;
 1022     uint32_t timo = 0;
 1023     char timestr[1024] = {
 1024         0,
 1025     };
 1026     struct timeval now = {
 1027         0,
 1028     };
 1029     br_private_t *priv = NULL;
 1030     struct br_scrubber *fsscrub = NULL;
 1031     struct br_monitor *scrub_monitor = NULL;
 1032 
 1033     priv = this->private;
 1034     fsscrub = &priv->fsscrub;
 1035     scrub_monitor = &priv->scrub_monitor;
 1036 
 1037     if (!fsscrub->frequency_reconf)
 1038         return 0;
 1039 
 1040     (void)gettimeofday(&now, NULL);
 1041     timo = br_fsscan_calculate_timeout(fsscrub->frequency);
 1042     if (timo == 0) {
 1043         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_ZERO_TIMEOUT_BUG,
 1044                "BUG: Zero schedule timeout");
 1045         return -1;
 1046     }
 1047 
 1048     gf_time_fmt(timestr, sizeof(timestr), (now.tv_sec + timo), gf_timefmt_FT);
 1049 
 1050     pthread_mutex_lock(&scrub_monitor->donelock);
 1051     {
 1052         scrub_monitor->done = _gf_false;
 1053     }
 1054     pthread_mutex_unlock(&scrub_monitor->donelock);
 1055 
 1056     ret = gf_tw_mod_timer_pending(priv->timer_wheel, scrub_monitor->timer,
 1057                                   timo);
 1058     if (ret == 0)
 1059         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
 1060                "Scrubber is currently running and would be "
 1061                "rescheduled after completion");
 1062     else {
 1063         _br_monitor_set_scrub_state(scrub_monitor, BR_SCRUB_STATE_PENDING);
 1064         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
 1065                "Scrubbing rescheduled to run at %s", timestr);
 1066     }
 1067 
 1068     return 0;
 1069 }
 1070 
 1071 int32_t
 1072 br_fsscan_ondemand(xlator_t *this)
 1073 {
 1074     int32_t ret = 0;
 1075     uint32_t timo = 0;
 1076     char timestr[1024] = {
 1077         0,
 1078     };
 1079     struct timeval now = {
 1080         0,
 1081     };
 1082     br_private_t *priv = NULL;
 1083     struct br_monitor *scrub_monitor = NULL;
 1084 
 1085     priv = this->private;
 1086     scrub_monitor = &priv->scrub_monitor;
 1087 
 1088     (void)gettimeofday(&now, NULL);
 1089 
 1090     timo = BR_SCRUB_ONDEMAND;
 1091 
 1092     gf_time_fmt(timestr, sizeof(timestr), (now.tv_sec + timo), gf_timefmt_FT);
 1093 
 1094     pthread_mutex_lock(&scrub_monitor->donelock);
 1095     {
 1096         scrub_monitor->done = _gf_false;
 1097     }
 1098     pthread_mutex_unlock(&scrub_monitor->donelock);
 1099 
 1100     ret = gf_tw_mod_timer_pending(priv->timer_wheel, scrub_monitor->timer,
 1101                                   timo);
 1102     if (ret == 0)
 1103         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
 1104                "Scrubber is currently running and would be "
 1105                "rescheduled after completion");
 1106     else {
 1107         _br_monitor_set_scrub_state(scrub_monitor, BR_SCRUB_STATE_PENDING);
 1108         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
 1109                "Ondemand Scrubbing scheduled to run at %s", timestr);
 1110     }
 1111 
 1112     return 0;
 1113 }
 1114 
 1115 #define BR_SCRUB_THREAD_SCALE_LAZY 0
 1116 #define BR_SCRUB_THREAD_SCALE_NORMAL 0.4
 1117 #define BR_SCRUB_THREAD_SCALE_AGGRESSIVE 1.0
 1118 
 1119 #ifndef M_E
 1120 #define M_E 2.718
 1121 #endif
 1122 
 1123 /**
 1124  * This is just a simple exponential scale to a fixed value selected
 1125  * per throttle config. We probably need to be more smart and select
 1126  * the scale based on the number of processor cores too.
 1127  */
 1128 static unsigned int
 1129 br_scrubber_calc_scale(xlator_t *this, br_private_t *priv,
 1130                        scrub_throttle_t throttle)
 1131 {
 1132     unsigned int scale = 0;
 1133 
 1134     switch (throttle) {
 1135         case BR_SCRUB_THROTTLE_VOID:
 1136         case BR_SCRUB_THROTTLE_STALLED:
 1137             scale = 0;
 1138             break;
 1139         case BR_SCRUB_THROTTLE_LAZY:
 1140             scale = priv->child_count * pow(M_E, BR_SCRUB_THREAD_SCALE_LAZY);
 1141             break;
 1142         case BR_SCRUB_THROTTLE_NORMAL:
 1143             scale = priv->child_count * pow(M_E, BR_SCRUB_THREAD_SCALE_NORMAL);
 1144             break;
 1145         case BR_SCRUB_THROTTLE_AGGRESSIVE:
 1146             scale = priv->child_count *
 1147                     pow(M_E, BR_SCRUB_THREAD_SCALE_AGGRESSIVE);
 1148             break;
 1149         default:
 1150             gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_UNKNOWN_THROTTLE,
 1151                    "Unknown throttle %d", throttle);
 1152     }
 1153 
 1154     return scale;
 1155 }
 1156 
 1157 static br_child_t *
 1158 _br_scrubber_get_next_child(struct br_scrubber *fsscrub)
 1159 {
 1160     br_child_t *child = NULL;
 1161 
 1162     child = list_first_entry(&fsscrub->scrublist, br_child_t, list);
 1163     list_rotate_left(&fsscrub->scrublist);
 1164 
 1165     return child;
 1166 }
 1167 
 1168 static void
 1169 _br_scrubber_get_entry(br_child_t *child, struct br_fsscan_entry **fsentry)
 1170 {
 1171     struct br_scanfs *fsscan = &child->fsscan;
 1172 
 1173     if (list_empty(&fsscan->ready))
 1174         return;
 1175     *fsentry = list_first_entry(&fsscan->ready, struct br_fsscan_entry, list);
 1176     list_del_init(&(*fsentry)->list);
 1177 }
 1178 
 1179 static void
 1180 _br_scrubber_find_scrubbable_entry(struct br_scrubber *fsscrub,
 1181                                    struct br_fsscan_entry **fsentry)
 1182 {
 1183     br_child_t *child = NULL;
 1184     br_child_t *firstchild = NULL;
 1185 
 1186     while (1) {
 1187         while (list_empty(&fsscrub->scrublist))
 1188             pthread_cond_wait(&fsscrub->cond, &fsscrub->mutex);
 1189 
 1190         firstchild = NULL;
 1191         for (child = _br_scrubber_get_next_child(fsscrub); child != firstchild;
 1192              child = _br_scrubber_get_next_child(fsscrub)) {
 1193             if (!firstchild)
 1194                 firstchild = child;
 1195 
 1196             _br_scrubber_get_entry(child, fsentry);
 1197             if (*fsentry)
 1198                 break;
 1199         }
 1200 
 1201         if (*fsentry)
 1202             break;
 1203 
 1204         /* nothing to work on.. wait till available */
 1205         pthread_cond_wait(&fsscrub->cond, &fsscrub->mutex);
 1206     }
 1207 }
 1208 
 1209 static void
 1210 br_scrubber_pick_entry(struct br_scrubber *fsscrub,
 1211                        struct br_fsscan_entry **fsentry)
 1212 {
 1213     pthread_cleanup_push(_br_lock_cleaner, &fsscrub->mutex);
 1214 
 1215     pthread_mutex_lock(&fsscrub->mutex);
 1216     {
 1217         *fsentry = NULL;
 1218         _br_scrubber_find_scrubbable_entry(fsscrub, fsentry);
 1219     }
 1220     pthread_mutex_unlock(&fsscrub->mutex);
 1221 
 1222     pthread_cleanup_pop(0);
 1223 }
 1224 
 1225 struct br_scrub_entry {
 1226     gf_boolean_t scrubbed;
 1227     struct br_fsscan_entry *fsentry;
 1228 };
 1229 
 1230 /**
 1231  * We need to be a bit careful here. These thread(s) are prone to cancellations
 1232  * when threads are scaled down (depending on the thottling value configured)
 1233  * and pausing scrub. A thread can get cancelled while it's waiting for entries
 1234  * in the ->pending queue or when an object is undergoing scrubbing.
 1235  */
 1236 static void
 1237 br_scrubber_entry_handle(void *arg)
 1238 {
 1239     struct br_scanfs *fsscan = NULL;
 1240     struct br_scrub_entry *sentry = NULL;
 1241     struct br_fsscan_entry *fsentry = NULL;
 1242 
 1243     sentry = arg;
 1244 
 1245     fsentry = sentry->fsentry;
 1246     fsscan = fsentry->fsscan;
 1247 
 1248     LOCK(&fsscan->entrylock);
 1249     {
 1250         if (sentry->scrubbed) {
 1251             _br_fsscan_dec_entry_count(fsscan);
 1252 
 1253             /* cleanup ->entry */
 1254             fsentry->data = NULL;
 1255             fsentry->fsscan = NULL;
 1256             loc_wipe(&fsentry->parent);
 1257             gf_dirent_entry_free(fsentry->entry);
 1258 
 1259             GF_FREE(sentry->fsentry);
 1260         } else {
 1261             /* (re)queue the entry again for scrub */
 1262             _br_fsscan_collect_entry(fsscan, sentry->fsentry);
 1263         }
 1264     }
 1265     UNLOCK(&fsscan->entrylock);
 1266 }
 1267 
 1268 static void
 1269 br_scrubber_scrub_entry(xlator_t *this, struct br_fsscan_entry *fsentry)
 1270 {
 1271     struct br_scrub_entry sentry = {
 1272         0,
 1273     };
 1274 
 1275     sentry.scrubbed = 0;
 1276     sentry.fsentry = fsentry;
 1277 
 1278     pthread_cleanup_push(br_scrubber_entry_handle, &sentry);
 1279     {
 1280         (void)br_scrubber_scrub_begin(this, fsentry);
 1281         sentry.scrubbed = 1;
 1282     }
 1283     pthread_cleanup_pop(1);
 1284 }
 1285 
 1286 void *
 1287 br_scrubber_proc(void *arg)
 1288 {
 1289     xlator_t *this = NULL;
 1290     struct br_scrubber *fsscrub = NULL;
 1291     struct br_fsscan_entry *fsentry = NULL;
 1292 
 1293     fsscrub = arg;
 1294     THIS = this = fsscrub->this;
 1295 
 1296     while (1) {
 1297         br_scrubber_pick_entry(fsscrub, &fsentry);
 1298         br_scrubber_scrub_entry(this, fsentry);
 1299         sleep(1);
 1300     }
 1301 
 1302     return NULL;
 1303 }
 1304 
 1305 static int32_t
 1306 br_scrubber_scale_up(xlator_t *this, struct br_scrubber *fsscrub,
 1307                      unsigned int v1, unsigned int v2)
 1308 {
 1309     int i = 0;
 1310     int32_t ret = -1;
 1311     int diff = 0;
 1312     struct br_scrubbers *scrub = NULL;
 1313 
 1314     diff = (int)(v2 - v1);
 1315 
 1316     gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCALING_UP_SCRUBBER,
 1317            "Scaling up scrubbers [%d => %d]", v1, v2);
 1318 
 1319     for (i = 0; i < diff; i++) {
 1320         scrub = GF_CALLOC(diff, sizeof(*scrub), gf_br_mt_br_scrubber_t);
 1321         if (!scrub)
 1322             break;
 1323 
 1324         INIT_LIST_HEAD(&scrub->list);
 1325         ret = gf_thread_create(&scrub->scrubthread, NULL, br_scrubber_proc,
 1326                                fsscrub, "brsproc");
 1327         if (ret)
 1328             break;
 1329 
 1330         fsscrub->nr_scrubbers++;
 1331         list_add_tail(&scrub->list, &fsscrub->scrubbers);
 1332     }
 1333 
 1334     if ((i != diff) && !scrub)
 1335         goto error_return;
 1336 
 1337     if (i != diff) /* degraded scaling.. */
 1338         gf_msg(this->name, GF_LOG_WARNING, 0, BRB_MSG_SCALE_UP_FAILED,
 1339                "Could not fully scale up to %d scrubber(s). Spawned "
 1340                "%d/%d [total scrubber(s): %d]",
 1341                v2, i, diff, (v1 + i));
 1342 
 1343     return 0;
 1344 
 1345 error_return:
 1346     return -1;
 1347 }
 1348 
 1349 static int32_t
 1350 br_scrubber_scale_down(xlator_t *this, struct br_scrubber *fsscrub,
 1351                        unsigned int v1, unsigned int v2)
 1352 {
 1353     int i = 0;
 1354     int diff = 0;
 1355     int32_t ret = -1;
 1356     struct br_scrubbers *scrub = NULL;
 1357 
 1358     diff = (int)(v1 - v2);
 1359 
 1360     gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCALE_DOWN_SCRUBBER,
 1361            "Scaling down scrubbers [%d => %d]", v1, v2);
 1362 
 1363     for (i = 0; i < diff; i++) {
 1364         scrub = list_first_entry(&fsscrub->scrubbers, struct br_scrubbers,
 1365                                  list);
 1366 
 1367         list_del_init(&scrub->list);
 1368         ret = gf_thread_cleanup_xint(scrub->scrubthread);
 1369         if (ret)
 1370             break;
 1371         GF_FREE(scrub);
 1372 
 1373         fsscrub->nr_scrubbers--;
 1374     }
 1375 
 1376     if (ret) {
 1377         gf_msg(this->name, GF_LOG_WARNING, 0, BRB_MSG_SCALE_DOWN_FAILED,
 1378                "Could not fully scale down "
 1379                "to %d scrubber(s). Terminated %d/%d [total "
 1380                "scrubber(s): %d]",
 1381                v1, i, diff, (v2 - i));
 1382         ret = 0;
 1383     }
 1384 
 1385     return ret;
 1386 }
 1387 
 1388 static int32_t
 1389 br_scrubber_configure(xlator_t *this, br_private_t *priv,
 1390                       struct br_scrubber *fsscrub, scrub_throttle_t nthrottle)
 1391 {
 1392     int32_t ret = 0;
 1393     unsigned int v1 = 0;
 1394     unsigned int v2 = 0;
 1395 
 1396     v1 = fsscrub->nr_scrubbers;
 1397     v2 = br_scrubber_calc_scale(this, priv, nthrottle);
 1398 
 1399     if (v1 == v2)
 1400         return 0;
 1401 
 1402     if (v1 > v2)
 1403         ret = br_scrubber_scale_down(this, fsscrub, v1, v2);
 1404     else
 1405         ret = br_scrubber_scale_up(this, fsscrub, v1, v2);
 1406 
 1407     return ret;
 1408 }
 1409 
 1410 static int32_t
 1411 br_scrubber_fetch_option(xlator_t *this, char *opt, dict_t *options,
 1412                          char **value)
 1413 {
 1414     if (options)
 1415         GF_OPTION_RECONF(opt, *value, options, str, error_return);
 1416     else
 1417         GF_OPTION_INIT(opt, *value, str, error_return);
 1418 
 1419     return 0;
 1420 
 1421 error_return:
 1422     return -1;
 1423 }
 1424 
 1425 /* internal "throttle" override */
 1426 #define BR_SCRUB_STALLED "STALLED"
 1427 
 1428 /* TODO: token buket spec */
 1429 static int32_t
 1430 br_scrubber_handle_throttle(xlator_t *this, br_private_t *priv, dict_t *options,
 1431                             gf_boolean_t scrubstall)
 1432 {
 1433     int32_t ret = 0;
 1434     char *tmp = NULL;
 1435     struct br_scrubber *fsscrub = NULL;
 1436     scrub_throttle_t nthrottle = BR_SCRUB_THROTTLE_VOID;
 1437 
 1438     fsscrub = &priv->fsscrub;
 1439     fsscrub->throttle_reconf = _gf_false;
 1440 
 1441     ret = br_scrubber_fetch_option(this, "scrub-throttle", options, &tmp);
 1442     if (ret)
 1443         goto error_return;
 1444 
 1445     if (scrubstall)
 1446         tmp = BR_SCRUB_STALLED;
 1447 
 1448     if (strcasecmp(tmp, "lazy") == 0)
 1449         nthrottle = BR_SCRUB_THROTTLE_LAZY;
 1450     else if (strcasecmp(tmp, "normal") == 0)
 1451         nthrottle = BR_SCRUB_THROTTLE_NORMAL;
 1452     else if (strcasecmp(tmp, "aggressive") == 0)
 1453         nthrottle = BR_SCRUB_THROTTLE_AGGRESSIVE;
 1454     else if (strcasecmp(tmp, BR_SCRUB_STALLED) == 0)
 1455         nthrottle = BR_SCRUB_THROTTLE_STALLED;
 1456     else
 1457         goto error_return;
 1458 
 1459     /* on failure old throttling value is preserved */
 1460     ret = br_scrubber_configure(this, priv, fsscrub, nthrottle);
 1461     if (ret)
 1462         goto error_return;
 1463 
 1464     if (fsscrub->throttle != nthrottle)
 1465         fsscrub->throttle_reconf = _gf_true;
 1466 
 1467     fsscrub->throttle = nthrottle;
 1468     return 0;
 1469 
 1470 error_return:
 1471     return -1;
 1472 }
 1473 
 1474 static int32_t
 1475 br_scrubber_handle_stall(xlator_t *this, br_private_t *priv, dict_t *options,
 1476                          gf_boolean_t *scrubstall)
 1477 {
 1478     int32_t ret = 0;
 1479     char *tmp = NULL;
 1480 
 1481     ret = br_scrubber_fetch_option(this, "scrub-state", options, &tmp);
 1482     if (ret)
 1483         goto error_return;
 1484 
 1485     if (strcasecmp(tmp, "pause") == 0) /* anything else is active */
 1486         *scrubstall = _gf_true;
 1487 
 1488     return 0;
 1489 
 1490 error_return:
 1491     return -1;
 1492 }
 1493 
 1494 static int32_t
 1495 br_scrubber_handle_freq(xlator_t *this, br_private_t *priv, dict_t *options,
 1496                         gf_boolean_t scrubstall)
 1497 {
 1498     int32_t ret = -1;
 1499     char *tmp = NULL;
 1500     scrub_freq_t frequency = BR_FSSCRUB_FREQ_HOURLY;
 1501     struct br_scrubber *fsscrub = NULL;
 1502 
 1503     fsscrub = &priv->fsscrub;
 1504     fsscrub->frequency_reconf = _gf_true;
 1505 
 1506     ret = br_scrubber_fetch_option(this, "scrub-freq", options, &tmp);
 1507     if (ret)
 1508         goto error_return;
 1509 
 1510     if (scrubstall)
 1511         tmp = BR_SCRUB_STALLED;
 1512 
 1513     if (strcasecmp(tmp, "hourly") == 0) {
 1514         frequency = BR_FSSCRUB_FREQ_HOURLY;
 1515     } else if (strcasecmp(tmp, "daily") == 0) {
 1516         frequency = BR_FSSCRUB_FREQ_DAILY;
 1517     } else if (strcasecmp(tmp, "weekly") == 0) {
 1518         frequency = BR_FSSCRUB_FREQ_WEEKLY;
 1519     } else if (strcasecmp(tmp, "biweekly") == 0) {
 1520         frequency = BR_FSSCRUB_FREQ_BIWEEKLY;
 1521     } else if (strcasecmp(tmp, "monthly") == 0) {
 1522         frequency = BR_FSSCRUB_FREQ_MONTHLY;
 1523     } else if (strcasecmp(tmp, "minute") == 0) {
 1524         frequency = BR_FSSCRUB_FREQ_MINUTE;
 1525     } else if (strcasecmp(tmp, BR_SCRUB_STALLED) == 0) {
 1526         frequency = BR_FSSCRUB_FREQ_STALLED;
 1527     } else
 1528         goto error_return;
 1529 
 1530     if (fsscrub->frequency == frequency)
 1531         fsscrub->frequency_reconf = _gf_false;
 1532     else
 1533         fsscrub->frequency = frequency;
 1534 
 1535     return 0;
 1536 
 1537 error_return:
 1538     return -1;
 1539 }
 1540 
 1541 static void
 1542 br_scrubber_log_option(xlator_t *this, br_private_t *priv,
 1543                        gf_boolean_t scrubstall)
 1544 {
 1545     struct br_scrubber *fsscrub = &priv->fsscrub;
 1546     char *scrub_throttle_str[] = {
 1547         [BR_SCRUB_THROTTLE_LAZY] = "lazy",
 1548         [BR_SCRUB_THROTTLE_NORMAL] = "normal",
 1549         [BR_SCRUB_THROTTLE_AGGRESSIVE] = "aggressive",
 1550         [BR_SCRUB_THROTTLE_STALLED] = "stalled",
 1551     };
 1552 
 1553     char *scrub_freq_str[] = {
 1554         [0] = "",
 1555         [BR_FSSCRUB_FREQ_HOURLY] = "hourly",
 1556         [BR_FSSCRUB_FREQ_DAILY] = "daily",
 1557         [BR_FSSCRUB_FREQ_WEEKLY] = "weekly",
 1558         [BR_FSSCRUB_FREQ_BIWEEKLY] = "biweekly",
 1559         [BR_FSSCRUB_FREQ_MONTHLY] = "monthly (30 days)",
 1560         [BR_FSSCRUB_FREQ_MINUTE] = "every minute",
 1561     };
 1562 
 1563     if (scrubstall)
 1564         return; /* logged as pause */
 1565 
 1566     if (fsscrub->frequency_reconf || fsscrub->throttle_reconf) {
 1567         if (fsscrub->throttle == BR_SCRUB_THROTTLE_VOID)
 1568             return;
 1569         gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_TUNABLE,
 1570                "SCRUB TUNABLES:: [Frequency: %s, Throttle: %s]",
 1571                scrub_freq_str[fsscrub->frequency],
 1572                scrub_throttle_str[fsscrub->throttle]);
 1573     }
 1574 }
 1575 
 1576 int32_t
 1577 br_scrubber_handle_options(xlator_t *this, br_private_t *priv, dict_t *options)
 1578 {
 1579     int32_t ret = 0;
 1580     gf_boolean_t scrubstall = _gf_false; /* not as dangerous as it sounds */
 1581 
 1582     ret = br_scrubber_handle_stall(this, priv, options, &scrubstall);
 1583     if (ret)
 1584         goto error_return;
 1585 
 1586     ret = br_scrubber_handle_throttle(this, priv, options, scrubstall);
 1587     if (ret)
 1588         goto error_return;
 1589 
 1590     ret = br_scrubber_handle_freq(this, priv, options, scrubstall);
 1591     if (ret)
 1592         goto error_return;
 1593 
 1594     br_scrubber_log_option(this, priv, scrubstall);
 1595 
 1596     return 0;
 1597 
 1598 error_return:
 1599     return -1;
 1600 }
 1601 
 1602 inode_t *
 1603 br_lookup_bad_obj_dir(xlator_t *this, br_child_t *child, uuid_t gfid)
 1604 {
 1605     struct iatt statbuf = {
 1606         0,
 1607     };
 1608     inode_table_t *table = NULL;
 1609     int32_t ret = -1;
 1610     loc_t loc = {
 1611         0,
 1612     };
 1613     inode_t *linked_inode = NULL;
 1614     int32_t op_errno = 0;
 1615 
 1616     GF_VALIDATE_OR_GOTO("bit-rot-scrubber", this, out);
 1617     GF_VALIDATE_OR_GOTO(this->name, this->private, out);
 1618     GF_VALIDATE_OR_GOTO(this->name, child, out);
 1619 
 1620     table = child->table;
 1621 
 1622     loc.inode = inode_new(table);
 1623     if (!loc.inode) {
 1624         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRB_MSG_NO_MEMORY,
 1625                "failed to allocate a new inode for"
 1626                "bad object directory");
 1627         goto out;
 1628     }
 1629 
 1630     gf_uuid_copy(loc.gfid, gfid);
 1631 
 1632     ret = syncop_lookup(child->xl, &loc, &statbuf, NULL, NULL, NULL);
 1633     if (ret < 0) {
 1634         op_errno = -ret;
 1635         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_LOOKUP_FAILED,
 1636                "failed to lookup the bad "
 1637                "objects directory (gfid: %s (%s))",
 1638                uuid_utoa(gfid), strerror(op_errno));
 1639         goto out;
 1640     }
 1641 
 1642     linked_inode = inode_link(loc.inode, NULL, NULL, &statbuf);
 1643     if (linked_inode)
 1644         inode_lookup(linked_inode);
 1645 
 1646 out:
 1647     loc_wipe(&loc);
 1648     return linked_inode;
 1649 }
 1650 
 1651 int32_t
 1652 br_read_bad_object_dir(xlator_t *this, br_child_t *child, fd_t *fd,
 1653                        dict_t *dict)
 1654 {
 1655     gf_dirent_t entries;
 1656     gf_dirent_t *entry = NULL;
 1657     int32_t ret = -1;
 1658     off_t offset = 0;
 1659     int32_t count = 0;
 1660     char key[32] = {
 1661         0,
 1662     };
 1663     dict_t *out_dict = NULL;
 1664 
 1665     INIT_LIST_HEAD(&entries.list);
 1666 
 1667     while ((ret = syncop_readdir(child->xl, fd, 131072, offset, &entries, NULL,
 1668                                  &out_dict))) {
 1669         if (ret < 0)
 1670             goto out;
 1671 
 1672         list_for_each_entry(entry, &entries.list, list)
 1673         {
 1674             offset = entry->d_off;
 1675 
 1676             snprintf(key, sizeof(key), "quarantine-%d", count);
 1677 
 1678             /*
 1679              * ignore the dict_set errors for now. The intention is
 1680              * to get as many bad objects as possible instead of
 1681              * erroring out at the first failure.
 1682              */
 1683             ret = dict_set_dynstr_with_alloc(dict, key, entry->d_name);
 1684             if (!ret)
 1685                 count++;
 1686 
 1687             if (out_dict) {
 1688                 dict_copy(out_dict, dict);
 1689                 dict_unref(out_dict);
 1690                 out_dict = NULL;
 1691             }
 1692         }
 1693 
 1694         gf_dirent_free(&entries);
 1695     }
 1696 
 1697     ret = count;
 1698     ret = dict_set_int32_sizen(dict, "count", count);
 1699 
 1700 out:
 1701     return ret;
 1702 }
 1703 
 1704 int32_t
 1705 br_get_bad_objects_from_child(xlator_t *this, dict_t *dict, br_child_t *child)
 1706 {
 1707     inode_t *inode = NULL;
 1708     inode_table_t *table = NULL;
 1709     fd_t *fd = NULL;
 1710     int32_t ret = -1;
 1711     loc_t loc = {
 1712         0,
 1713     };
 1714     int32_t op_errno = 0;
 1715 
 1716     GF_VALIDATE_OR_GOTO("bit-rot-scrubber", this, out);
 1717     GF_VALIDATE_OR_GOTO(this->name, this->private, out);
 1718     GF_VALIDATE_OR_GOTO(this->name, child, out);
 1719     GF_VALIDATE_OR_GOTO(this->name, dict, out);
 1720 
 1721     table = child->table;
 1722 
 1723     inode = inode_find(table, BR_BAD_OBJ_CONTAINER);
 1724     if (!inode) {
 1725         inode = br_lookup_bad_obj_dir(this, child, BR_BAD_OBJ_CONTAINER);
 1726         if (!inode)
 1727             goto out;
 1728     }
 1729 
 1730     fd = fd_create(inode, 0);
 1731     if (!fd) {
 1732         gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRB_MSG_FD_CREATE_FAILED,
 1733                "fd creation for the bad "
 1734                "objects directory failed (gfid: %s)",
 1735                uuid_utoa(BR_BAD_OBJ_CONTAINER));
 1736         goto out;
 1737     }
 1738 
 1739     loc.inode = inode;
 1740     gf_uuid_copy(loc.gfid, inode->gfid);
 1741 
 1742     ret = syncop_opendir(child->xl, &loc, fd, NULL, NULL);
 1743     if (ret < 0) {
 1744         op_errno = -ret;
 1745         fd_unref(fd);
 1746         fd = NULL;
 1747         gf_msg(this->name, GF_LOG_ERROR, op_errno, BRB_MSG_FD_CREATE_FAILED,
 1748                "failed to open the bad "
 1749                "objects directory %s",
 1750                uuid_utoa(BR_BAD_OBJ_CONTAINER));
 1751         goto out;
 1752     }
 1753 
 1754     fd_bind(fd);
 1755 
 1756     ret = br_read_bad_object_dir(this, child, fd, dict);
 1757     if (ret < 0) {
 1758         gf_msg(this->name, GF_LOG_ERROR, 0, BRB_MSG_BAD_OBJ_READDIR_FAIL,
 1759                "readdir of the bad "
 1760                "objects directory (%s) failed ",
 1761                uuid_utoa(BR_BAD_OBJ_CONTAINER));
 1762         goto out;
 1763     }
 1764 
 1765     ret = 0;
 1766 
 1767 out:
 1768     loc_wipe(&loc);
 1769     if (fd)
 1770         fd_unref(fd);
 1771     return ret;
 1772 }
 1773 
 1774 int32_t
 1775 br_collect_bad_objects_of_child(xlator_t *this, br_child_t *child, dict_t *dict,
 1776                                 dict_t *child_dict, int32_t total_count)
 1777 {
 1778     int32_t ret = -1;
 1779     int32_t count = 0;
 1780     char key[32] = {
 1781         0,
 1782     };
 1783     char main_key[32] = {
 1784         0,
 1785     };
 1786     int32_t j = 0;
 1787     int32_t tmp_count = 0;
 1788     char *entry = NULL;
 1789     char tmp[PATH_MAX] = {
 1790         0,
 1791     };
 1792     char *path = NULL;
 1793     int32_t len = 0;
 1794 
 1795     ret = dict_get_int32_sizen(child_dict, "count", &count);
 1796     if (ret)
 1797         goto out;
 1798 
 1799     tmp_count = total_count;
 1800 
 1801     for (j = 0; j < count; j++) {
 1802         len = snprintf(key, PATH_MAX, "quarantine-%d", j);
 1803         ret = dict_get_strn(child_dict, key, len, &entry);
 1804         if (ret)
 1805             continue;
 1806 
 1807         ret = dict_get_str(child_dict, entry, &path);
 1808         len = snprintf(tmp, PATH_MAX, "%s ==> BRICK: %s\n path: %s", entry,
 1809                        child->brick_path, path);
 1810         if ((len < 0) || (len >= PATH_MAX)) {
 1811             continue;
 1812         }
 1813         snprintf(main_key, PATH_MAX, "quarantine-%d", tmp_count);
 1814 
 1815         ret = dict_set_dynstr_with_alloc(dict, main_key, tmp);
 1816         if (!ret)
 1817             tmp_count++;
 1818         path = NULL;
 1819     }
 1820 
 1821     ret = tmp_count;
 1822 
 1823 out:
 1824     return ret;
 1825 }
 1826 
 1827 int32_t
 1828 br_collect_bad_objects_from_children(xlator_t *this, dict_t *dict)
 1829 {
 1830     int32_t ret = -1;
 1831     dict_t *child_dict = NULL;
 1832     int32_t i = 0;
 1833     int32_t total_count = 0;
 1834     br_child_t *child = NULL;
 1835     br_private_t *priv = NULL;
 1836     dict_t *tmp_dict = NULL;
 1837 
 1838     priv = this->private;
 1839     tmp_dict = dict;
 1840 
 1841     for (i = 0; i < priv->child_count; i++) {
 1842         child = &priv->children[i];
 1843         GF_ASSERT(child);
 1844         if (!_br_is_child_connected(child))
 1845             continue;
 1846 
 1847         child_dict = dict_new();
 1848         if (!child_dict) {
 1849             gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRB_MSG_NO_MEMORY,
 1850                    "failed to allocate dict");
 1851             continue;
 1852         }
 1853         ret = br_get_bad_objects_from_child(this, child_dict, child);
 1854         /*
 1855          * Continue asking the remaining children for the list of
 1856          * bad objects even though getting the list from one of them
 1857          * fails.
 1858          */
 1859         if (ret) {
 1860             dict_unref(child_dict);
 1861             continue;
 1862         }
 1863 
 1864         ret = br_collect_bad_objects_of_child(this, child, tmp_dict, child_dict,
 1865                                               total_count);
 1866         if (ret < 0) {
 1867             dict_unref(child_dict);
 1868             continue;
 1869         }
 1870 
 1871         total_count = ret;
 1872         dict_unref(child_dict);
 1873         child_dict = NULL;
 1874     }
 1875 
 1876     ret = dict_set_int32(tmp_dict, "total-count", total_count);
 1877 
 1878     return ret;
 1879 }
 1880 
 1881 int32_t
 1882 br_get_bad_objects_list(xlator_t *this, dict_t **dict)
 1883 {
 1884     int32_t ret = -1;
 1885     dict_t *tmp_dict = NULL;
 1886 
 1887     GF_VALIDATE_OR_GOTO("bir-rot-scrubber", this, out);
 1888     GF_VALIDATE_OR_GOTO(this->name, dict, out);
 1889 
 1890     tmp_dict = *dict;
 1891     if (!tmp_dict) {
 1892         tmp_dict = dict_new();
 1893         if (!tmp_dict) {
 1894             gf_msg(this->name, GF_LOG_ERROR, ENOMEM, BRB_MSG_NO_MEMORY,
 1895                    "failed to allocate dict");
 1896             goto out;
 1897         }
 1898         *dict = tmp_dict;
 1899     }
 1900 
 1901     ret = br_collect_bad_objects_from_children(this, tmp_dict);
 1902 
 1903 out:
 1904     return ret;
 1905 }
 1906 
 1907 static int
 1908 wait_for_scrub_to_finish(xlator_t *this)
 1909 {
 1910     int ret = -1;
 1911     br_private_t *priv = NULL;
 1912     struct br_monitor *scrub_monitor = NULL;
 1913 
 1914     priv = this->private;
 1915     scrub_monitor = &priv->scrub_monitor;
 1916 
 1917     GF_VALIDATE_OR_GOTO("bit-rot", scrub_monitor, out);
 1918     GF_VALIDATE_OR_GOTO("bit-rot", this, out);
 1919 
 1920     gf_msg(this->name, GF_LOG_INFO, 0, BRB_MSG_SCRUB_INFO,
 1921            "Waiting for all children to start and finish scrub");
 1922 
 1923     pthread_mutex_lock(&scrub_monitor->donelock);
 1924     {
 1925         while (!scrub_monitor->done)
 1926             pthread_cond_wait(&scrub_monitor->donecond,
 1927                               &scrub_monitor->donelock);
 1928     }
 1929     pthread_mutex_unlock(&scrub_monitor->donelock);
 1930     ret = 0;
 1931 out:
 1932     return ret;
 1933 }
 1934 
 1935 /**
 1936  * This function is executed in a separate thread. This is scrubber monitor
 1937  * thread that takes care of state machine.
 1938  */
 1939 void *
 1940 br_monitor_thread(void *arg)
 1941 {
 1942     int32_t ret = 0;
 1943     xlator_t *this = NULL;
 1944     br_private_t *priv = NULL;
 1945     struct br_monitor *scrub_monitor = NULL;
 1946 
 1947     this = arg;
 1948     priv = this->private;
 1949 
 1950     /*
 1951      * Since, this is the topmost xlator, THIS has to be set by bit-rot
 1952      * xlator itself (STACK_WIND won't help in this case). Also it has
 1953      * to be done for each thread that gets spawned. Otherwise, a new
 1954      * thread will get global_xlator's pointer when it does "THIS".
 1955      */
 1956     THIS = this;
 1957 
 1958     scrub_monitor = &priv->scrub_monitor;
 1959 
 1960     pthread_mutex_lock(&scrub_monitor->mutex);
 1961     {
 1962         while (!scrub_monitor->inited)
 1963             pthread_cond_wait(&scrub_monitor->cond, &scrub_monitor->mutex);
 1964     }
 1965     pthread_mutex_unlock(&scrub_monitor->mutex);
 1966 
 1967     /* this needs to be serialized with reconfigure() */
 1968     pthread_mutex_lock(&priv->lock);
 1969     {
 1970         ret = br_scrub_state_machine(this, _gf_false);
 1971     }
 1972     pthread_mutex_unlock(&priv->lock);
 1973     if (ret) {
 1974         gf_msg(this->name, GF_LOG_ERROR, -ret, BRB_MSG_SSM_FAILED,
 1975                "Scrub state machine failed");
 1976         goto out;
 1977     }
 1978 
 1979     while (1) {
 1980         /* Wait for all children to finish scrubbing */
 1981         ret = wait_for_scrub_to_finish(this);
 1982         if (ret) {
 1983             gf_msg(this->name, GF_LOG_ERROR, -ret, BRB_MSG_SCRUB_WAIT_FAILED,
 1984                    "Scrub wait failed");
 1985             goto out;
 1986         }
 1987 
 1988         /* scrub exit criteria: Move the state to PENDING */
 1989         br_scrubber_exit_control(this);
 1990     }
 1991 
 1992 out:
 1993     return NULL;
 1994 }
 1995 
 1996 static void
 1997 br_set_scrub_state(struct br_monitor *scrub_monitor, br_scrub_state_t state)
 1998 {
 1999     LOCK(&scrub_monitor->lock);
 2000     {
 2001         _br_monitor_set_scrub_state(scrub_monitor, state);
 2002     }
 2003     UNLOCK(&scrub_monitor->lock);
 2004 }
 2005 
 2006 int32_t
 2007 br_scrubber_monitor_init(xlator_t *this, br_private_t *priv)
 2008 {
 2009     struct br_monitor *scrub_monitor = NULL;
 2010     int ret = 0;
 2011 
 2012     scrub_monitor = &priv->scrub_monitor;
 2013 
 2014     LOCK_INIT(&scrub_monitor->lock);
 2015     scrub_monitor->this = this;
 2016 
 2017     scrub_monitor->inited = _gf_false;
 2018     pthread_mutex_init(&scrub_monitor->mutex, NULL);
 2019     pthread_cond_init(&scrub_monitor->cond, NULL);
 2020 
 2021     scrub_monitor->kick = _gf_false;
 2022     scrub_monitor->active_child_count = 0;
 2023     pthread_mutex_init(&scrub_monitor->wakelock, NULL);
 2024     pthread_cond_init(&scrub_monitor->wakecond, NULL);
 2025 
 2026     scrub_monitor->done = _gf_false;
 2027     pthread_mutex_init(&scrub_monitor->donelock, NULL);
 2028     pthread_cond_init(&scrub_monitor->donecond, NULL);
 2029 
 2030     /* Set the state to INACTIVE */
 2031     br_set_scrub_state(&priv->scrub_monitor, BR_SCRUB_STATE_INACTIVE);
 2032 
 2033     /* Start the monitor thread */
 2034     ret = gf_thread_create(&scrub_monitor->thread, NULL, br_monitor_thread,
 2035                            this, "brmon");
 2036     if (ret != 0) {
 2037         gf_msg(this->name, GF_LOG_ERROR, -ret, BRB_MSG_SPAWN_FAILED,
 2038                "monitor thread creation failed");
 2039         ret = -1;
 2040         goto err;
 2041     }
 2042 
 2043     return 0;
 2044 err:
 2045     pthread_mutex_destroy(&scrub_monitor->mutex);
 2046     pthread_cond_destroy(&scrub_monitor->cond);
 2047 
 2048     pthread_mutex_destroy(&scrub_monitor->wakelock);
 2049     pthread_cond_destroy(&scrub_monitor->wakecond);
 2050 
 2051     pthread_mutex_destroy(&scrub_monitor->donelock);
 2052     pthread_cond_destroy(&scrub_monitor->donecond);
 2053 
 2054     LOCK_DESTROY(&scrub_monitor->lock);
 2055 
 2056     return ret;
 2057 }
 2058 
 2059 int32_t
 2060 br_scrubber_init(xlator_t *this, br_private_t *priv)
 2061 {
 2062     struct br_scrubber *fsscrub = NULL;
 2063     int ret = 0;
 2064 
 2065     priv->tbf = tbf_init(NULL, 0);
 2066     if (!priv->tbf)
 2067         return -1;
 2068 
 2069     ret = br_scrubber_monitor_init(this, priv);
 2070     if (ret)
 2071         return -1;
 2072 
 2073     fsscrub = &priv->fsscrub;
 2074 
 2075     fsscrub->this = this;
 2076     fsscrub->throttle = BR_SCRUB_THROTTLE_VOID;
 2077 
 2078     pthread_mutex_init(&fsscrub->mutex, NULL);
 2079     pthread_cond_init(&fsscrub->cond, NULL);
 2080 
 2081     fsscrub->nr_scrubbers = 0;
 2082     INIT_LIST_HEAD(&fsscrub->scrubbers);
 2083     INIT_LIST_HEAD(&fsscrub->scrublist);
 2084 
 2085     return 0;
 2086 }