"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/cluster/dht/src/switch.c" (16 Sep 2020, 26646 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "switch.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
    3   This file is part of GlusterFS.
    4 
    5   This file is licensed to you under your choice of the GNU Lesser
    6   General Public License, version 3 or any later version (LGPLv3 or
    7   later), or the GNU General Public License, version 2 (GPLv2), in all
    8   cases as published by the Free Software Foundation.
    9 */
   10 
   11 #include "dht-common.h"
   12 #include "dht-mem-types.h"
   13 
   14 #include <sys/time.h>
   15 #include <stdlib.h>
   16 #include <fnmatch.h>
   17 #include <string.h>
   18 
   19 extern struct volume_options dht_options[];
   20 
   21 struct switch_sched_array {
   22     xlator_t *xl;
   23     int32_t eligible;
   24     int32_t considered;
   25 };
   26 
   27 /* Select one of this struct based on the path's pattern match */
   28 struct switch_struct {
   29     struct switch_struct *next;
   30     struct switch_sched_array *array;
   31     int32_t node_index; /* Index of the node in
   32                            this pattern. */
   33     int32_t num_child;  /* Total num of child nodes
   34                            with this pattern. */
   35     char path_pattern[256];
   36 };
   37 
   38 /* TODO: all 'TODO's in dht.c holds good */
   39 /* This function should return child node as '*:subvolumes' is inserterd */
   40 
   41 static int32_t
   42 gf_switch_valid_child(xlator_t *this, const char *child)
   43 {
   44     xlator_list_t *children = NULL;
   45     int32_t ret = 0;
   46 
   47     children = this->children;
   48     while (children) {
   49         if (!strcmp(child, children->xlator->name)) {
   50             ret = 1;
   51             break;
   52         }
   53         children = children->next;
   54     }
   55 
   56     return ret;
   57 }
   58 
   59 static xlator_t *
   60 get_switch_matching_subvol(const char *path, dht_conf_t *conf,
   61                            xlator_t *hashed_subvol)
   62 {
   63     struct switch_struct *cond = NULL;
   64     struct switch_struct *trav = NULL;
   65     char *pathname = NULL;
   66     int idx = 0;
   67     xlator_t *subvol = NULL;
   68 
   69     cond = conf->private;
   70     subvol = hashed_subvol;
   71     if (!cond)
   72         goto out;
   73 
   74     pathname = gf_strdup(path);
   75     if (!pathname)
   76         goto out;
   77 
   78     trav = cond;
   79     while (trav) {
   80         if (fnmatch(trav->path_pattern, pathname, FNM_NOESCAPE) == 0) {
   81             for (idx = 0; idx < trav->num_child; idx++) {
   82                 if (trav->array[idx].xl == hashed_subvol)
   83                     goto out;
   84             }
   85             idx = trav->node_index++;
   86             trav->node_index %= trav->num_child;
   87             subvol = trav->array[idx].xl;
   88             goto out;
   89         }
   90         trav = trav->next;
   91     }
   92 out:
   93     GF_FREE(pathname);
   94 
   95     return subvol;
   96 }
   97 
   98 int
   99 switch_local_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  100                         int op_ret, int op_errno, inode_t *inode,
  101                         struct iatt *stbuf, dict_t *xattr,
  102                         struct iatt *postparent)
  103 {
  104     xlator_t *subvol = NULL;
  105     char is_linkfile = 0;
  106     char is_dir = 0;
  107     dht_conf_t *conf = NULL;
  108     dht_local_t *local = NULL;
  109     loc_t *loc = NULL;
  110     int i = 0;
  111     xlator_t *prev = NULL;
  112     int call_cnt = 0;
  113     int ret = 0;
  114 
  115     conf = this->private;
  116 
  117     prev = cookie;
  118     local = frame->local;
  119     loc = &local->loc;
  120 
  121     if (ENTRY_MISSING(op_ret, op_errno)) {
  122         if (conf->search_unhashed) {
  123             local->op_errno = ENOENT;
  124             dht_lookup_everywhere(frame, this, loc);
  125             return 0;
  126         }
  127     }
  128 
  129     if (op_ret == -1)
  130         goto out;
  131 
  132     is_linkfile = check_is_linkfile(inode, stbuf, xattr, conf->link_xattr_name);
  133     is_dir = check_is_dir(inode, stbuf, xattr);
  134 
  135     if (!is_dir && !is_linkfile) {
  136         /* non-directory and not a linkfile */
  137 
  138         ret = dht_layout_preset(this, prev, inode);
  139         if (ret < 0) {
  140             gf_msg_debug(this->name, 0,
  141                          "could not set pre-set layout "
  142                          "for subvol %s",
  143                          prev->name);
  144             op_ret = -1;
  145             op_errno = EINVAL;
  146             goto err;
  147         }
  148 
  149         goto out;
  150     }
  151 
  152     if (is_dir) {
  153         call_cnt = conf->subvolume_cnt;
  154         local->call_cnt = call_cnt;
  155 
  156         local->inode = inode_ref(inode);
  157         local->xattr = dict_ref(xattr);
  158 
  159         local->op_ret = 0;
  160         local->op_errno = 0;
  161 
  162         local->layout = dht_layout_new(this, conf->subvolume_cnt);
  163         if (!local->layout) {
  164             op_ret = -1;
  165             op_errno = ENOMEM;
  166             gf_msg_debug(this->name, 0, "memory allocation failed :(");
  167             goto err;
  168         }
  169 
  170         for (i = 0; i < call_cnt; i++) {
  171             STACK_WIND_COOKIE(frame, dht_lookup_dir_cbk, conf->subvolumes[i],
  172                               conf->subvolumes[i],
  173                               conf->subvolumes[i]->fops->lookup, &local->loc,
  174                               local->xattr_req);
  175         }
  176     }
  177 
  178     if (is_linkfile) {
  179         subvol = dht_linkfile_subvol(this, inode, stbuf, xattr);
  180 
  181         if (!subvol) {
  182             gf_msg_debug(this->name, 0,
  183                          "linkfile has no link subvolume.path=%s", loc->path);
  184             dht_lookup_everywhere(frame, this, loc);
  185             return 0;
  186         }
  187 
  188         STACK_WIND_COOKIE(frame, dht_lookup_linkfile_cbk, subvol, subvol,
  189                           subvol->fops->lookup, &local->loc, local->xattr_req);
  190     }
  191 
  192     return 0;
  193 
  194 out:
  195     if (!local->hashed_subvol) {
  196         gf_msg_debug(this->name, 0, "no subvolume in layout for path=%s",
  197                      local->loc.path);
  198         local->op_errno = ENOENT;
  199         dht_lookup_everywhere(frame, this, loc);
  200         return 0;
  201     }
  202 
  203     STACK_WIND_COOKIE(frame, dht_lookup_cbk, local->hashed_subvol,
  204                       local->hashed_subvol, local->hashed_subvol->fops->lookup,
  205                       &local->loc, local->xattr_req);
  206 
  207     return 0;
  208 
  209 err:
  210     DHT_STACK_UNWIND(lookup, frame, op_ret, op_errno, inode, stbuf, xattr,
  211                      NULL);
  212     return 0;
  213 }
  214 
  215 int
  216 switch_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc,
  217               dict_t *xattr_req)
  218 {
  219     xlator_t *hashed_subvol = NULL;
  220     xlator_t *cached_subvol = NULL;
  221     xlator_t *subvol = NULL;
  222     dht_local_t *local = NULL;
  223     dht_conf_t *conf = NULL;
  224     int ret = -1;
  225     int op_errno = -1;
  226     dht_layout_t *layout = NULL;
  227     int i = 0;
  228     int call_cnt = 0;
  229 
  230     VALIDATE_OR_GOTO(frame, err);
  231     VALIDATE_OR_GOTO(this, err);
  232     VALIDATE_OR_GOTO(loc, err);
  233     VALIDATE_OR_GOTO(loc->inode, err);
  234     VALIDATE_OR_GOTO(loc->path, err);
  235 
  236     conf = this->private;
  237 
  238     local = dht_local_init(frame, loc, NULL, GF_FOP_LOOKUP);
  239     if (!local) {
  240         op_errno = ENOMEM;
  241         goto err;
  242     }
  243 
  244     if (xattr_req) {
  245         local->xattr_req = dict_ref(xattr_req);
  246     } else {
  247         local->xattr_req = dict_new();
  248     }
  249 
  250     hashed_subvol = dht_subvol_get_hashed(this, &local->loc);
  251     cached_subvol = local->cached_subvol;
  252 
  253     local->hashed_subvol = hashed_subvol;
  254 
  255     if (is_revalidate(loc)) {
  256         layout = local->layout;
  257         if (!layout) {
  258             gf_msg_debug(this->name, 0,
  259                          "revalidate lookup without cache. path=%s", loc->path);
  260             op_errno = EINVAL;
  261             goto err;
  262         }
  263 
  264         if (layout->gen && (layout->gen < conf->gen)) {
  265             gf_msg_debug(this->name, 0, "incomplete layout failure for path=%s",
  266                          loc->path);
  267             dht_layout_unref(this, local->layout);
  268             goto do_fresh_lookup;
  269         }
  270 
  271         local->inode = inode_ref(loc->inode);
  272 
  273         local->call_cnt = layout->cnt;
  274         call_cnt = local->call_cnt;
  275 
  276         /* NOTE: we don't require 'trusted.glusterfs.dht.linkto'
  277          * attribute, revalidates directly go to the cached-subvolume.
  278          */
  279         ret = dict_set_uint32(local->xattr_req, conf->xattr_name, 4 * 4);
  280         if (ret < 0)
  281             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_DICT_SET_FAILED,
  282                    "failed to set dict value for %s", conf->xattr_name);
  283 
  284         for (i = 0; i < layout->cnt; i++) {
  285             subvol = layout->list[i].xlator;
  286 
  287             STACK_WIND_COOKIE(frame, dht_revalidate_cbk, subvol, subvol,
  288                               subvol->fops->lookup, loc, local->xattr_req);
  289 
  290             if (!--call_cnt)
  291                 break;
  292         }
  293     } else {
  294     do_fresh_lookup:
  295         ret = dict_set_uint32(local->xattr_req, conf->xattr_name, 4 * 4);
  296         if (ret < 0)
  297             gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_DICT_SET_FAILED,
  298                    "failed to set dict value for %s", conf->xattr_name);
  299 
  300         ret = dict_set_uint32(local->xattr_req, conf->link_xattr_name, 256);
  301         if (ret < 0)
  302             gf_msg(this->name, GF_LOG_WARNING, EINVAL, DHT_MSG_DICT_SET_FAILED,
  303                    "failed to set dict value for %s", conf->link_xattr_name);
  304 
  305         if (!hashed_subvol) {
  306             gf_msg_debug(this->name, 0,
  307                          "no subvolume in layout for path=%s, "
  308                          "checking on all the subvols to see if "
  309                          "it is a directory",
  310                          loc->path);
  311             call_cnt = conf->subvolume_cnt;
  312             local->call_cnt = call_cnt;
  313 
  314             local->layout = dht_layout_new(this, conf->subvolume_cnt);
  315             if (!local->layout) {
  316                 op_errno = ENOMEM;
  317                 goto err;
  318             }
  319 
  320             for (i = 0; i < call_cnt; i++) {
  321                 STACK_WIND_COOKIE(frame, dht_lookup_dir_cbk,
  322                                   conf->subvolumes[i], conf->subvolumes[i],
  323                                   conf->subvolumes[i]->fops->lookup,
  324                                   &local->loc, local->xattr_req);
  325             }
  326             return 0;
  327         }
  328 
  329         /*  */
  330         cached_subvol = get_switch_matching_subvol(loc->path, conf,
  331                                                    hashed_subvol);
  332         if (cached_subvol == hashed_subvol) {
  333             STACK_WIND_COOKIE(frame, dht_lookup_cbk, hashed_subvol,
  334                               hashed_subvol, hashed_subvol->fops->lookup, loc,
  335                               local->xattr_req);
  336         } else {
  337             STACK_WIND_COOKIE(frame, switch_local_lookup_cbk, cached_subvol,
  338                               cached_subvol, cached_subvol->fops->lookup, loc,
  339                               local->xattr_req);
  340         }
  341     }
  342 
  343     return 0;
  344 
  345 err:
  346     op_errno = (op_errno == -1) ? errno : op_errno;
  347     DHT_STACK_UNWIND(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
  348     return 0;
  349 }
  350 
  351 int
  352 switch_create_linkfile_create_cbk(call_frame_t *frame, void *cookie,
  353                                   xlator_t *this, int op_ret, int op_errno,
  354                                   inode_t *inode, struct iatt *stbuf,
  355                                   struct iatt *preparent,
  356                                   struct iatt *postparent, dict_t *xdata)
  357 {
  358     dht_local_t *local = NULL;
  359 
  360     local = frame->local;
  361 
  362     if (op_ret == -1)
  363         goto err;
  364 
  365     STACK_WIND_COOKIE(frame, dht_create_cbk, local->cached_subvol,
  366                       local->cached_subvol, local->cached_subvol->fops->create,
  367                       &local->loc, local->flags, local->mode, local->umask,
  368                       local->fd, local->params);
  369 
  370     return 0;
  371 
  372 err:
  373     DHT_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL,
  374                      NULL);
  375     return 0;
  376 }
  377 
  378 int
  379 switch_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
  380               mode_t mode, mode_t umask, fd_t *fd, dict_t *params)
  381 {
  382     dht_local_t *local = NULL;
  383     dht_conf_t *conf = NULL;
  384     xlator_t *subvol = NULL;
  385     xlator_t *avail_subvol = NULL;
  386     int op_errno = -1;
  387 
  388     VALIDATE_OR_GOTO(frame, err);
  389     VALIDATE_OR_GOTO(this, err);
  390     VALIDATE_OR_GOTO(loc, err);
  391 
  392     conf = this->private;
  393 
  394     dht_get_du_info(frame, this, loc);
  395 
  396     local = dht_local_init(frame, loc, fd, GF_FOP_CREATE);
  397     if (!local) {
  398         op_errno = ENOMEM;
  399         goto err;
  400     }
  401 
  402     subvol = dht_subvol_get_hashed(this, loc);
  403     if (!subvol) {
  404         gf_msg_debug(this->name, 0, "no subvolume in layout for path=%s",
  405                      loc->path);
  406         op_errno = ENOENT;
  407         goto err;
  408     }
  409 
  410     avail_subvol = get_switch_matching_subvol(loc->path, conf, subvol);
  411     if (dht_is_subvol_filled(this, avail_subvol)) {
  412         avail_subvol = dht_free_disk_available_subvol(this, avail_subvol,
  413                                                       local);
  414     }
  415 
  416     if (subvol != avail_subvol) {
  417         /* create a link file instead of actual file */
  418         local->mode = mode;
  419         local->flags = flags;
  420         local->umask = umask;
  421         local->cached_subvol = avail_subvol;
  422         dht_linkfile_create(frame, switch_create_linkfile_create_cbk, this,
  423                             avail_subvol, subvol, loc);
  424         return 0;
  425     }
  426 
  427     gf_msg_trace(this->name, 0, "creating %s on %s", loc->path, subvol->name);
  428 
  429     STACK_WIND_COOKIE(frame, dht_create_cbk, subvol, subvol,
  430                       subvol->fops->create, loc, flags, mode, umask, fd,
  431                       params);
  432 
  433     return 0;
  434 
  435 err:
  436     op_errno = (op_errno == -1) ? errno : op_errno;
  437     DHT_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL,
  438                      NULL);
  439 
  440     return 0;
  441 }
  442 
  443 int
  444 switch_mknod_linkfile_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
  445                           int op_ret, int op_errno, inode_t *inode,
  446                           struct iatt *stbuf, struct iatt *preparent,
  447                           struct iatt *postparent, dict_t *xdata)
  448 {
  449     dht_local_t *local = NULL;
  450 
  451     local = frame->local;
  452     if (!local || !local->cached_subvol) {
  453         op_errno = EINVAL;
  454         op_ret = -1;
  455         goto err;
  456     }
  457 
  458     if (op_ret >= 0) {
  459         STACK_WIND_COOKIE(
  460             frame, dht_newfile_cbk, (void *)local->cached_subvol,
  461             local->cached_subvol, local->cached_subvol->fops->mknod,
  462             &local->loc, local->mode, local->rdev, local->umask, local->params);
  463 
  464         return 0;
  465     }
  466 err:
  467     DHT_STACK_UNWIND(link, frame, op_ret, op_errno, inode, stbuf, preparent,
  468                      postparent, xdata);
  469     return 0;
  470 }
  471 
  472 int
  473 switch_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
  474              dev_t rdev, mode_t umask, dict_t *params)
  475 {
  476     dht_local_t *local = NULL;
  477     dht_conf_t *conf = NULL;
  478     xlator_t *subvol = NULL;
  479     xlator_t *avail_subvol = NULL;
  480     int op_errno = -1;
  481 
  482     VALIDATE_OR_GOTO(frame, err);
  483     VALIDATE_OR_GOTO(this, err);
  484     VALIDATE_OR_GOTO(loc, err);
  485 
  486     conf = this->private;
  487 
  488     dht_get_du_info(frame, this, loc);
  489 
  490     local = dht_local_init(frame, loc, NULL, GF_FOP_MKNOD);
  491     if (!local) {
  492         op_errno = ENOMEM;
  493         goto err;
  494     }
  495 
  496     subvol = dht_subvol_get_hashed(this, loc);
  497     if (!subvol) {
  498         gf_msg_debug(this->name, 0, "no subvolume in layout for path=%s",
  499                      loc->path);
  500         op_errno = ENOENT;
  501         goto err;
  502     }
  503 
  504     /* Consider the disksize in consideration */
  505     avail_subvol = get_switch_matching_subvol(loc->path, conf, subvol);
  506     if (dht_is_subvol_filled(this, avail_subvol)) {
  507         avail_subvol = dht_free_disk_available_subvol(this, avail_subvol,
  508                                                       local);
  509     }
  510 
  511     if (avail_subvol != subvol) {
  512         /* Create linkfile first */
  513 
  514         local->params = dict_ref(params);
  515         local->mode = mode;
  516         local->umask = umask;
  517         local->rdev = rdev;
  518         local->cached_subvol = avail_subvol;
  519 
  520         dht_linkfile_create(frame, switch_mknod_linkfile_cbk, this,
  521                             avail_subvol, subvol, loc);
  522         return 0;
  523     }
  524 
  525     gf_msg_trace(this->name, 0, "creating %s on %s", loc->path, subvol->name);
  526 
  527     STACK_WIND_COOKIE(frame, dht_newfile_cbk, (void *)subvol, subvol,
  528                       subvol->fops->mknod, loc, mode, rdev, umask, params);
  529 
  530     return 0;
  531 
  532 err:
  533     op_errno = (op_errno == -1) ? errno : op_errno;
  534     DHT_STACK_UNWIND(mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
  535 
  536     return 0;
  537 }
  538 
  539 void
  540 switch_fini(xlator_t *this)
  541 {
  542     dht_conf_t *conf = NULL;
  543     struct switch_struct *trav = NULL;
  544     struct switch_struct *prev = NULL;
  545 
  546     conf = this->private;
  547 
  548     if (conf) {
  549         trav = (struct switch_struct *)conf->private;
  550         conf->private = NULL;
  551         while (trav) {
  552             GF_FREE(trav->array);
  553             prev = trav;
  554             trav = trav->next;
  555             GF_FREE(prev);
  556         }
  557     }
  558 
  559     dht_fini(this);
  560 }
  561 
  562 int
  563 set_switch_pattern(xlator_t *this, dht_conf_t *conf, const char *pattern_str)
  564 {
  565     int flag = 0;
  566     int idx = 0;
  567     int index = 0;
  568     int child_count = 0;
  569     char *tmp = NULL;
  570     char *tmp1 = NULL;
  571     char *child = NULL;
  572     char *tmp_str = NULL;
  573     char *tmp_str1 = NULL;
  574     char *dup_str = NULL;
  575     char *dup_childs = NULL;
  576     char *switch_str = NULL;
  577     char *pattern = NULL;
  578     char *childs = NULL;
  579     char *option_string = NULL;
  580     size_t pattern_length;
  581     struct switch_struct *switch_buf = NULL;
  582     struct switch_struct *switch_opt = NULL;
  583     struct switch_struct *trav = NULL;
  584     struct switch_sched_array *switch_buf_array = NULL;
  585     xlator_list_t *trav_xl = NULL;
  586 
  587     trav_xl = this->children;
  588     while (trav_xl) {
  589         index++;
  590         trav_xl = trav_xl->next;
  591     }
  592     child_count = index;
  593     switch_buf_array = GF_CALLOC((index + 1), sizeof(struct switch_sched_array),
  594                                  gf_switch_mt_switch_sched_array);
  595     if (!switch_buf_array)
  596         goto err;
  597 
  598     trav_xl = this->children;
  599     index = 0;
  600 
  601     while (trav_xl) {
  602         switch_buf_array[index].xl = trav_xl->xlator;
  603         switch_buf_array[index].eligible = 1;
  604         trav_xl = trav_xl->next;
  605         index++;
  606     }
  607 
  608     /*  *jpg:child1,child2;*mpg:child3;*:child4,child5,child6 */
  609 
  610     /* Get the pattern for considering switch case.
  611        "option block-size *avi:10MB" etc */
  612     option_string = gf_strdup(pattern_str);
  613     if (option_string == NULL) {
  614         goto err;
  615     }
  616     switch_str = strtok_r(option_string, ";", &tmp_str);
  617     while (switch_str) {
  618         dup_str = gf_strdup(switch_str);
  619         if (dup_str == NULL) {
  620             goto err;
  621         }
  622         switch_opt = GF_CALLOC(1, sizeof(struct switch_struct),
  623                                gf_switch_mt_switch_struct);
  624         if (!switch_opt) {
  625             GF_FREE(dup_str);
  626             goto err;
  627         }
  628 
  629         pattern = strtok_r(dup_str, ":", &tmp_str1);
  630         childs = strtok_r(NULL, ":", &tmp_str1);
  631         if (strncmp(pattern, "*", 2) == 0) {
  632             gf_msg("switch", GF_LOG_INFO, 0, DHT_MSG_SWITCH_PATTERN_INFO,
  633                    "'*' pattern will be taken by default "
  634                    "for all the unconfigured child nodes,"
  635                    " hence neglecting current option");
  636             switch_str = strtok_r(NULL, ";", &tmp_str);
  637             GF_FREE(switch_opt);
  638             switch_opt = NULL;
  639             GF_FREE(dup_str);
  640             continue;
  641         }
  642         GF_FREE(dup_str);
  643 
  644         pattern_length = strlen(pattern);
  645         if (pattern_length >= (sizeof(switch_opt->path_pattern))) {
  646             gf_msg(this->name, GF_LOG_ERROR, 0,
  647                    DHT_MSG_SET_SWITCH_PATTERN_ERROR, "Pattern (%s) too long",
  648                    pattern);
  649             goto err;
  650         }
  651         memcpy(switch_opt->path_pattern, pattern, pattern_length);
  652         switch_opt->path_pattern[pattern_length] = '\0';
  653 
  654         if (childs) {
  655             dup_childs = gf_strdup(childs);
  656             if (dup_childs == NULL) {
  657                 goto err;
  658             }
  659             child = strtok_r(dup_childs, ",", &tmp);
  660             while (child) {
  661                 if (gf_switch_valid_child(this, child)) {
  662                     idx++;
  663                     child = strtok_r(NULL, ",", &tmp);
  664                 } else {
  665                     gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SUBVOL_ERROR,
  666                            "%s is not a subvolume of %s. "
  667                            "pattern can only be scheduled "
  668                            "only to a subvolume of %s",
  669                            child, this->name, this->name);
  670                     GF_FREE(dup_childs);
  671                     goto err;
  672                 }
  673             }
  674             GF_FREE(dup_childs);
  675             child = strtok_r(childs, ",", &tmp1);
  676             switch_opt->num_child = idx;
  677             switch_opt->array = GF_CALLOC(
  678                 1, (idx * sizeof(struct switch_sched_array)),
  679                 gf_switch_mt_switch_sched_array);
  680             if (!switch_opt->array)
  681                 goto err;
  682             idx = 0;
  683             while (child) {
  684                 for (index = 0; index < child_count; index++) {
  685                     if (strcmp(switch_buf_array[index].xl->name, child) == 0) {
  686                         gf_msg_debug("switch", 0,
  687                                      "'%s' pattern will be "
  688                                      "scheduled to \"%s\"",
  689                                      switch_opt->path_pattern, child);
  690                         /*
  691                           if (switch_buf_array[index-1].considered) {
  692                           gf_msg_debug ("switch", 0,
  693                           "ambiguity found, exiting");
  694                           return -1;
  695                           }
  696                         */
  697                         switch_opt->array[idx].xl = switch_buf_array[index].xl;
  698                         switch_buf_array[index].considered = 1;
  699                         idx++;
  700                         break;
  701                     }
  702                 }
  703                 child = strtok_r(NULL, ",", &tmp1);
  704             }
  705         } else {
  706             /* error */
  707             gf_msg("switch", GF_LOG_ERROR, 0, DHT_MSG_SET_SWITCH_PATTERN_ERROR,
  708                    "Check \"scheduler.switch.case\" "
  709                    "option in unify volume. Exiting");
  710             goto err;
  711         }
  712 
  713         /* Link it to the main structure */
  714         if (switch_buf) {
  715             /* there are already few entries */
  716             trav = switch_buf;
  717             while (trav->next)
  718                 trav = trav->next;
  719             trav->next = switch_opt;
  720         } else {
  721             /* First entry */
  722             switch_buf = switch_opt;
  723         }
  724         switch_opt = NULL;
  725         switch_str = strtok_r(NULL, ";", &tmp_str);
  726     }
  727 
  728     /* Now, all the pattern based considerations done, so for all the
  729      * remaining pattern, '*' to all the remaining child nodes
  730      */
  731     {
  732         for (index = 0; index < child_count; index++) {
  733             /* check for considered flag */
  734             if (switch_buf_array[index].considered)
  735                 continue;
  736             flag++;
  737         }
  738         if (!flag) {
  739             gf_msg("switch", GF_LOG_ERROR, 0, DHT_MSG_SET_SWITCH_PATTERN_ERROR,
  740                    "No nodes left for pattern '*'. Exiting");
  741             goto err;
  742         }
  743         switch_opt = GF_CALLOC(1, sizeof(struct switch_struct),
  744                                gf_switch_mt_switch_struct);
  745         if (!switch_opt)
  746             goto err;
  747 
  748         /* Add the '*' pattern to the array */
  749         memcpy(switch_opt->path_pattern, "*", 2);
  750         switch_opt->num_child = flag;
  751         switch_opt->array = GF_CALLOC(1,
  752                                       flag * sizeof(struct switch_sched_array),
  753                                       gf_switch_mt_switch_sched_array);
  754         if (!switch_opt->array)
  755             goto err;
  756         flag = 0;
  757         for (index = 0; index < child_count; index++) {
  758             /* check for considered flag */
  759             if (switch_buf_array[index].considered)
  760                 continue;
  761             gf_msg_debug("switch", 0,
  762                          "'%s'"
  763                          " pattern will be scheduled to \"%s\"",
  764                          switch_opt->path_pattern,
  765                          switch_buf_array[index].xl->name);
  766 
  767             switch_opt->array[flag].xl = switch_buf_array[index].xl;
  768             switch_buf_array[index].considered = 1;
  769             flag++;
  770         }
  771         if (switch_buf) {
  772             /* there are already few entries */
  773             trav = switch_buf;
  774             while (trav->next)
  775                 trav = trav->next;
  776             trav->next = switch_opt;
  777         } else {
  778             /* First entry */
  779             switch_buf = switch_opt;
  780         }
  781         switch_opt = NULL;
  782     }
  783     /* */
  784     conf->private = switch_buf;
  785 
  786     GF_FREE(option_string);
  787     return 0;
  788 err:
  789     GF_FREE(switch_buf_array);
  790     GF_FREE(switch_opt);
  791     GF_FREE(option_string);
  792 
  793     if (switch_buf) {
  794         trav = switch_buf;
  795         while (trav) {
  796             GF_FREE(trav->array);
  797             switch_opt = trav;
  798             trav = trav->next;
  799             GF_FREE(switch_opt);
  800         }
  801     }
  802     return -1;
  803 }
  804 
  805 int32_t
  806 switch_init(xlator_t *this)
  807 {
  808     dht_conf_t *conf = NULL;
  809     data_t *data = NULL;
  810     int ret = -1;
  811 
  812     ret = dht_init(this);
  813     if (ret) {
  814         return ret;
  815     }
  816     conf = this->private;
  817 
  818     data = dict_get(this->options, "pattern.switch.case");
  819     if (data) {
  820         /* TODO: */
  821         ret = set_switch_pattern(this, conf, data->data);
  822         if (ret) {
  823             goto err;
  824         }
  825     }
  826 
  827     this->private = conf;
  828     return 0;
  829 
  830 err:
  831     dht_fini(this);
  832     return -1;
  833 }
  834 
  835 struct xlator_fops fops = {
  836     .lookup = switch_lookup,
  837     .create = switch_create,
  838     .mknod = switch_mknod,
  839 
  840     .stat = dht_stat,
  841     .fstat = dht_fstat,
  842     .truncate = dht_truncate,
  843     .ftruncate = dht_ftruncate,
  844     .access = dht_access,
  845     .readlink = dht_readlink,
  846     .setxattr = dht_setxattr,
  847     .getxattr = dht_getxattr,
  848     .removexattr = dht_removexattr,
  849     .open = dht_open,
  850     .readv = dht_readv,
  851     .writev = dht_writev,
  852     .flush = dht_flush,
  853     .fsync = dht_fsync,
  854     .statfs = dht_statfs,
  855     .lk = dht_lk,
  856     .opendir = dht_opendir,
  857     .readdir = dht_readdir,
  858     .readdirp = dht_readdirp,
  859     .fsyncdir = dht_fsyncdir,
  860     .symlink = dht_symlink,
  861     .unlink = dht_unlink,
  862     .link = dht_link,
  863     .mkdir = dht_mkdir,
  864     .rmdir = dht_rmdir,
  865     .rename = dht_rename,
  866     .inodelk = dht_inodelk,
  867     .finodelk = dht_finodelk,
  868     .entrylk = dht_entrylk,
  869     .fentrylk = dht_fentrylk,
  870     .xattrop = dht_xattrop,
  871     .fxattrop = dht_fxattrop,
  872     .setattr = dht_setattr,
  873 };
  874 
  875 struct xlator_cbks cbks = {.forget = dht_forget};
  876 extern int32_t
  877 mem_acct_init(xlator_t *this);
  878 
  879 xlator_api_t xlator_api = {
  880     .init = switch_init,
  881     .fini = switch_fini,
  882     .notify = dht_notify,
  883     .reconfigure = dht_reconfigure,
  884     .mem_acct_init = mem_acct_init,
  885     .op_version = {1}, /* Present from the initial version */
  886     .fops = &fops,
  887     .cbks = &cbks,
  888     .options = dht_options,
  889     .identifier = "switch",
  890     .category = GF_TECH_PREVIEW,
  891 };