"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/locks/src/common.c" (16 Sep 2020, 41125 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "common.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2006-2012, 2015-2016 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 #include <unistd.h>
   11 #include <fcntl.h>
   12 #include <limits.h>
   13 #include <pthread.h>
   14 
   15 #include <glusterfs/glusterfs.h>
   16 #include <glusterfs/compat.h>
   17 #include <glusterfs/logging.h>
   18 #include <glusterfs/syncop.h>
   19 
   20 #include "locks.h"
   21 #include "common.h"
   22 
   23 static int
   24 __is_lock_grantable(pl_inode_t *pl_inode, posix_lock_t *lock);
   25 static void
   26 __insert_and_merge(pl_inode_t *pl_inode, posix_lock_t *lock);
   27 static int
   28 pl_send_prelock_unlock(xlator_t *this, pl_inode_t *pl_inode,
   29                        posix_lock_t *old_lock);
   30 
   31 static pl_dom_list_t *
   32 __allocate_domain(const char *volume)
   33 {
   34     pl_dom_list_t *dom = NULL;
   35 
   36     dom = GF_CALLOC(1, sizeof(*dom), gf_locks_mt_pl_dom_list_t);
   37     if (!dom)
   38         goto out;
   39 
   40     dom->domain = gf_strdup(volume);
   41     if (!dom->domain)
   42         goto out;
   43 
   44     gf_log("posix-locks", GF_LOG_TRACE, "New domain allocated: %s",
   45            dom->domain);
   46 
   47     INIT_LIST_HEAD(&dom->inode_list);
   48     INIT_LIST_HEAD(&dom->entrylk_list);
   49     INIT_LIST_HEAD(&dom->blocked_entrylks);
   50     INIT_LIST_HEAD(&dom->inodelk_list);
   51     INIT_LIST_HEAD(&dom->blocked_inodelks);
   52 
   53 out:
   54     if (dom && (NULL == dom->domain)) {
   55         GF_FREE(dom);
   56         dom = NULL;
   57     }
   58 
   59     return dom;
   60 }
   61 
   62 /* Returns domain for the lock. If domain is not present,
   63  * allocates a domain and returns it
   64  */
   65 pl_dom_list_t *
   66 get_domain(pl_inode_t *pl_inode, const char *volume)
   67 {
   68     pl_dom_list_t *dom = NULL;
   69 
   70     GF_VALIDATE_OR_GOTO("posix-locks", pl_inode, out);
   71     GF_VALIDATE_OR_GOTO("posix-locks", volume, out);
   72 
   73     pthread_mutex_lock(&pl_inode->mutex);
   74     {
   75         list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
   76         {
   77             if (strcmp(dom->domain, volume) == 0)
   78                 goto unlock;
   79         }
   80 
   81         dom = __allocate_domain(volume);
   82         if (dom)
   83             list_add(&dom->inode_list, &pl_inode->dom_list);
   84     }
   85 unlock:
   86     pthread_mutex_unlock(&pl_inode->mutex);
   87     if (dom) {
   88         gf_log("posix-locks", GF_LOG_TRACE, "Domain %s found", volume);
   89     } else {
   90         gf_log("posix-locks", GF_LOG_TRACE, "Domain %s not found", volume);
   91     }
   92 out:
   93     return dom;
   94 }
   95 
   96 unsigned long
   97 fd_to_fdnum(fd_t *fd)
   98 {
   99     return ((unsigned long)fd);
  100 }
  101 
  102 fd_t *
  103 fd_from_fdnum(posix_lock_t *lock)
  104 {
  105     return ((fd_t *)lock->fd_num);
  106 }
  107 
  108 int
  109 __pl_inode_is_empty(pl_inode_t *pl_inode)
  110 {
  111     return (list_empty(&pl_inode->ext_list));
  112 }
  113 
  114 void
  115 pl_print_locker(char *str, int size, xlator_t *this, call_frame_t *frame)
  116 {
  117     snprintf(str, size, "Pid=%llu, lk-owner=%s, Client=%p, Frame=%llu",
  118              (unsigned long long)frame->root->pid,
  119              lkowner_utoa(&frame->root->lk_owner), frame->root->client,
  120              (unsigned long long)frame->root->unique);
  121 }
  122 
  123 void
  124 pl_print_lockee(char *str, int size, fd_t *fd, loc_t *loc)
  125 {
  126     inode_t *inode = NULL;
  127     char *ipath = NULL;
  128     int ret = 0;
  129 
  130     if (fd)
  131         inode = fd->inode;
  132     if (loc)
  133         inode = loc->inode;
  134 
  135     if (!inode) {
  136         snprintf(str, size, "<nul>");
  137         return;
  138     }
  139 
  140     if (loc && loc->path) {
  141         ipath = gf_strdup(loc->path);
  142     } else {
  143         ret = inode_path(inode, NULL, &ipath);
  144         if (ret <= 0)
  145             ipath = NULL;
  146     }
  147 
  148     snprintf(str, size, "gfid=%s, fd=%p, path=%s", uuid_utoa(inode->gfid), fd,
  149              ipath ? ipath : "<nul>");
  150 
  151     GF_FREE(ipath);
  152 }
  153 
  154 void
  155 pl_print_lock(char *str, int size, int cmd, struct gf_flock *flock,
  156               gf_lkowner_t *owner)
  157 {
  158     char *cmd_str = NULL;
  159     char *type_str = NULL;
  160 
  161     switch (cmd) {
  162 #if F_GETLK != F_GETLK64
  163         case F_GETLK64:
  164 #endif
  165         case F_GETLK:
  166             cmd_str = "GETLK";
  167             break;
  168 
  169 #if F_SETLK != F_SETLK64
  170         case F_SETLK64:
  171 #endif
  172         case F_SETLK:
  173             cmd_str = "SETLK";
  174             break;
  175 
  176 #if F_SETLKW != F_SETLKW64
  177         case F_SETLKW64:
  178 #endif
  179         case F_SETLKW:
  180             cmd_str = "SETLKW";
  181             break;
  182 
  183         default:
  184             cmd_str = "UNKNOWN";
  185             break;
  186     }
  187 
  188     switch (flock->l_type) {
  189         case F_RDLCK:
  190             type_str = "READ";
  191             break;
  192         case F_WRLCK:
  193             type_str = "WRITE";
  194             break;
  195         case F_UNLCK:
  196             type_str = "UNLOCK";
  197             break;
  198         default:
  199             type_str = "UNKNOWN";
  200             break;
  201     }
  202 
  203     snprintf(str, size,
  204              "lock=FCNTL, cmd=%s, type=%s, "
  205              "start=%llu, len=%llu, pid=%llu, lk-owner=%s",
  206              cmd_str, type_str, (unsigned long long)flock->l_start,
  207              (unsigned long long)flock->l_len, (unsigned long long)flock->l_pid,
  208              lkowner_utoa(owner));
  209 }
  210 
  211 void
  212 pl_trace_in(xlator_t *this, call_frame_t *frame, fd_t *fd, loc_t *loc, int cmd,
  213             struct gf_flock *flock, const char *domain)
  214 {
  215     posix_locks_private_t *priv = this->private;
  216     char pl_locker[256];
  217     char pl_lockee[256];
  218     char pl_lock[256];
  219 
  220     if (!priv->trace)
  221         return;
  222 
  223     pl_print_locker(pl_locker, 256, this, frame);
  224     pl_print_lockee(pl_lockee, 256, fd, loc);
  225     if (domain)
  226         pl_print_inodelk(pl_lock, 256, cmd, flock, domain);
  227     else
  228         pl_print_lock(pl_lock, 256, cmd, flock, &frame->root->lk_owner);
  229 
  230     gf_log(this->name, GF_LOG_INFO,
  231            "[REQUEST] Locker = {%s} Lockee = {%s} Lock = {%s}", pl_locker,
  232            pl_lockee, pl_lock);
  233 }
  234 
  235 void
  236 pl_print_verdict(char *str, int size, int op_ret, int op_errno)
  237 {
  238     char *verdict = NULL;
  239 
  240     if (op_ret == 0) {
  241         verdict = "GRANTED";
  242     } else {
  243         switch (op_errno) {
  244             case EAGAIN:
  245                 verdict = "TRYAGAIN";
  246                 break;
  247             default:
  248                 verdict = strerror(op_errno);
  249         }
  250     }
  251 
  252     snprintf(str, size, "%s", verdict);
  253 }
  254 
  255 void
  256 pl_trace_out(xlator_t *this, call_frame_t *frame, fd_t *fd, loc_t *loc, int cmd,
  257              struct gf_flock *flock, int op_ret, int op_errno,
  258              const char *domain)
  259 
  260 {
  261     posix_locks_private_t *priv = NULL;
  262     char pl_locker[256];
  263     char pl_lockee[256];
  264     char pl_lock[256];
  265     char verdict[32];
  266 
  267     priv = this->private;
  268 
  269     if (!priv->trace)
  270         return;
  271 
  272     pl_print_locker(pl_locker, 256, this, frame);
  273     pl_print_lockee(pl_lockee, 256, fd, loc);
  274     if (domain)
  275         pl_print_inodelk(pl_lock, 256, cmd, flock, domain);
  276     else
  277         pl_print_lock(pl_lock, 256, cmd, flock, &frame->root->lk_owner);
  278 
  279     pl_print_verdict(verdict, 32, op_ret, op_errno);
  280 
  281     gf_log(this->name, GF_LOG_INFO,
  282            "[%s] Locker = {%s} Lockee = {%s} Lock = {%s}", verdict, pl_locker,
  283            pl_lockee, pl_lock);
  284 }
  285 
  286 void
  287 pl_trace_block(xlator_t *this, call_frame_t *frame, fd_t *fd, loc_t *loc,
  288                int cmd, struct gf_flock *flock, const char *domain)
  289 
  290 {
  291     posix_locks_private_t *priv = this->private;
  292     char pl_locker[256];
  293     char pl_lockee[256];
  294     char pl_lock[256];
  295 
  296     if (!priv->trace)
  297         return;
  298 
  299     pl_print_locker(pl_locker, 256, this, frame);
  300     pl_print_lockee(pl_lockee, 256, fd, loc);
  301     if (domain)
  302         pl_print_inodelk(pl_lock, 256, cmd, flock, domain);
  303     else
  304         pl_print_lock(pl_lock, 256, cmd, flock, &frame->root->lk_owner);
  305 
  306     gf_log(this->name, GF_LOG_INFO,
  307            "[BLOCKED] Locker = {%s} Lockee = {%s} Lock = {%s}", pl_locker,
  308            pl_lockee, pl_lock);
  309 }
  310 
  311 void
  312 pl_trace_flush(xlator_t *this, call_frame_t *frame, fd_t *fd)
  313 {
  314     posix_locks_private_t *priv = NULL;
  315     char pl_locker[256];
  316     char pl_lockee[256];
  317     pl_inode_t *pl_inode = NULL;
  318 
  319     priv = this->private;
  320 
  321     if (!priv->trace)
  322         return;
  323 
  324     pl_inode = pl_inode_get(this, fd->inode, NULL);
  325 
  326     if (pl_inode && __pl_inode_is_empty(pl_inode))
  327         return;
  328 
  329     pl_print_locker(pl_locker, 256, this, frame);
  330     pl_print_lockee(pl_lockee, 256, fd, NULL);
  331 
  332     gf_log(this->name, GF_LOG_INFO, "[FLUSH] Locker = {%s} Lockee = {%s}",
  333            pl_locker, pl_lockee);
  334 }
  335 
  336 void
  337 pl_trace_release(xlator_t *this, fd_t *fd)
  338 {
  339     posix_locks_private_t *priv = NULL;
  340     char pl_lockee[256];
  341 
  342     priv = this->private;
  343 
  344     if (!priv->trace)
  345         return;
  346 
  347     pl_print_lockee(pl_lockee, 256, fd, NULL);
  348 
  349     gf_log(this->name, GF_LOG_INFO, "[RELEASE] Lockee = {%s}", pl_lockee);
  350 }
  351 
  352 void
  353 pl_update_refkeeper(xlator_t *this, inode_t *inode)
  354 {
  355     pl_inode_t *pl_inode = NULL;
  356     int is_empty = 0;
  357     int need_unref = 0;
  358     int need_ref = 0;
  359 
  360     pl_inode = pl_inode_get(this, inode, NULL);
  361     if (!pl_inode)
  362         return;
  363 
  364     pthread_mutex_lock(&pl_inode->mutex);
  365     {
  366         is_empty = __pl_inode_is_empty(pl_inode);
  367 
  368         if (is_empty && pl_inode->refkeeper) {
  369             need_unref = 1;
  370             pl_inode->refkeeper = NULL;
  371         }
  372 
  373         if (!is_empty && !pl_inode->refkeeper) {
  374             need_ref = 1;
  375             pl_inode->refkeeper = inode;
  376         }
  377     }
  378     pthread_mutex_unlock(&pl_inode->mutex);
  379 
  380     if (need_unref)
  381         inode_unref(inode);
  382 
  383     if (need_ref)
  384         inode_ref(inode);
  385 }
  386 
  387 /* Get lock enforcement info from disk */
  388 int
  389 pl_fetch_mlock_info_from_disk(xlator_t *this, pl_inode_t *pl_inode,
  390                               pl_local_t *local)
  391 {
  392     dict_t *xdata_rsp = NULL;
  393     int ret = 0;
  394     int op_ret = 0;
  395 
  396     if (!local) {
  397         return -1;
  398     }
  399 
  400     if (local->fd) {
  401         op_ret = syncop_fgetxattr(this, local->fd, &xdata_rsp,
  402                                   GF_ENFORCE_MANDATORY_LOCK, NULL, NULL);
  403     } else {
  404         op_ret = syncop_getxattr(this, &local->loc[0], &xdata_rsp,
  405                                  GF_ENFORCE_MANDATORY_LOCK, NULL, NULL);
  406     }
  407 
  408     pthread_mutex_lock(&pl_inode->mutex);
  409     {
  410         if (op_ret >= 0) {
  411             pl_inode->mlock_enforced = _gf_true;
  412             pl_inode->check_mlock_info = _gf_false;
  413         } else {
  414             gf_msg(this->name, GF_LOG_WARNING, -op_ret, 0,
  415                    "getxattr failed with %d", op_ret);
  416             pl_inode->mlock_enforced = _gf_false;
  417 
  418             if (-op_ret == ENODATA) {
  419                 pl_inode->check_mlock_info = _gf_false;
  420             } else {
  421                 pl_inode->check_mlock_info = _gf_true;
  422             }
  423         }
  424     }
  425     pthread_mutex_unlock(&pl_inode->mutex);
  426 
  427     return ret;
  428 }
  429 
  430 pl_inode_t *
  431 pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
  432 {
  433     uint64_t tmp_pl_inode = 0;
  434     pl_inode_t *pl_inode = NULL;
  435     int ret = 0;
  436 
  437     LOCK(&inode->lock);
  438     {
  439         ret = __inode_ctx_get(inode, this, &tmp_pl_inode);
  440         if (ret == 0) {
  441             pl_inode = (pl_inode_t *)(long)tmp_pl_inode;
  442             goto unlock;
  443         }
  444 
  445         pl_inode = GF_CALLOC(1, sizeof(*pl_inode), gf_locks_mt_pl_inode_t);
  446         if (!pl_inode) {
  447             goto unlock;
  448         }
  449 
  450         gf_log(this->name, GF_LOG_TRACE, "Allocating new pl inode");
  451 
  452         pthread_mutex_init(&pl_inode->mutex, NULL);
  453         pthread_cond_init(&pl_inode->check_fop_wind_count, 0);
  454 
  455         INIT_LIST_HEAD(&pl_inode->dom_list);
  456         INIT_LIST_HEAD(&pl_inode->ext_list);
  457         INIT_LIST_HEAD(&pl_inode->rw_list);
  458         INIT_LIST_HEAD(&pl_inode->reservelk_list);
  459         INIT_LIST_HEAD(&pl_inode->blocked_reservelks);
  460         INIT_LIST_HEAD(&pl_inode->blocked_calls);
  461         INIT_LIST_HEAD(&pl_inode->metalk_list);
  462         INIT_LIST_HEAD(&pl_inode->queued_locks);
  463         INIT_LIST_HEAD(&pl_inode->waiting);
  464         gf_uuid_copy(pl_inode->gfid, inode->gfid);
  465 
  466         pl_inode->check_mlock_info = _gf_true;
  467         pl_inode->mlock_enforced = _gf_false;
  468 
  469         /* -2 means never looked up. -1 means something went wrong and link
  470          * tracking is disabled. */
  471         pl_inode->links = -2;
  472 
  473         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
  474         if (ret) {
  475             pthread_mutex_destroy(&pl_inode->mutex);
  476             GF_FREE(pl_inode);
  477             pl_inode = NULL;
  478             goto unlock;
  479         }
  480     }
  481 unlock:
  482     UNLOCK(&inode->lock);
  483 
  484     if ((pl_inode != NULL) && pl_is_mandatory_locking_enabled(pl_inode) &&
  485         pl_inode->check_mlock_info && local) {
  486         /* Note: The lock enforcement information per file can be stored in the
  487            attribute flag of stat(x) in posix. With that there won't be a need
  488            for doing getxattr post a reboot
  489         */
  490         pl_fetch_mlock_info_from_disk(this, pl_inode, local);
  491     }
  492 
  493     return pl_inode;
  494 }
  495 
  496 /* Create a new posix_lock_t */
  497 posix_lock_t *
  498 new_posix_lock(struct gf_flock *flock, client_t *client, pid_t client_pid,
  499                gf_lkowner_t *owner, fd_t *fd, uint32_t lk_flags, int blocking,
  500                int32_t *op_errno)
  501 {
  502     posix_lock_t *lock = NULL;
  503 
  504     GF_VALIDATE_OR_GOTO("posix-locks", flock, out);
  505     GF_VALIDATE_OR_GOTO("posix-locks", client, out);
  506     GF_VALIDATE_OR_GOTO("posix-locks", fd, out);
  507 
  508     if (!pl_is_lk_owner_valid(owner, client)) {
  509         *op_errno = EINVAL;
  510         goto out;
  511     }
  512 
  513     lock = GF_CALLOC(1, sizeof(posix_lock_t), gf_locks_mt_posix_lock_t);
  514     if (!lock) {
  515         *op_errno = ENOMEM;
  516         goto out;
  517     }
  518 
  519     lock->fl_start = flock->l_start;
  520     lock->fl_type = flock->l_type;
  521 
  522     if (flock->l_len == 0)
  523         lock->fl_end = LLONG_MAX;
  524     else
  525         lock->fl_end = flock->l_start + flock->l_len - 1;
  526 
  527     lock->client = client;
  528 
  529     lock->client_uid = gf_strdup(client->client_uid);
  530     if (lock->client_uid == NULL) {
  531         GF_FREE(lock);
  532         lock = NULL;
  533         *op_errno = ENOMEM;
  534         goto out;
  535     }
  536 
  537     lock->fd_num = fd_to_fdnum(fd);
  538     lock->fd = fd;
  539     lock->client_pid = client_pid;
  540     lock->owner = *owner;
  541     lock->lk_flags = lk_flags;
  542 
  543     lock->blocking = blocking;
  544     memcpy(&lock->user_flock, flock, sizeof(lock->user_flock));
  545 
  546     INIT_LIST_HEAD(&lock->list);
  547 
  548 out:
  549     return lock;
  550 }
  551 
  552 /* Delete a lock from the inode's lock list */
  553 void
  554 __delete_lock(posix_lock_t *lock)
  555 {
  556     list_del_init(&lock->list);
  557 }
  558 
  559 /* Destroy a posix_lock */
  560 void
  561 __destroy_lock(posix_lock_t *lock)
  562 {
  563     GF_FREE(lock->client_uid);
  564     GF_FREE(lock);
  565 }
  566 
  567 static posix_lock_t *
  568 __copy_lock(posix_lock_t *src)
  569 {
  570     posix_lock_t *dst;
  571 
  572     dst = GF_MALLOC(sizeof(posix_lock_t), gf_locks_mt_posix_lock_t);
  573     if (dst != NULL) {
  574         memcpy(dst, src, sizeof(posix_lock_t));
  575         dst->client_uid = gf_strdup(src->client_uid);
  576         if (dst->client_uid == NULL) {
  577             GF_FREE(dst);
  578             dst = NULL;
  579         }
  580 
  581         if (dst != NULL)
  582             INIT_LIST_HEAD(&dst->list);
  583     }
  584 
  585     return dst;
  586 }
  587 
  588 /* Convert a posix_lock to a struct gf_flock */
  589 void
  590 posix_lock_to_flock(posix_lock_t *lock, struct gf_flock *flock)
  591 {
  592     flock->l_pid = lock->user_flock.l_pid;
  593     flock->l_type = lock->fl_type;
  594     flock->l_start = lock->fl_start;
  595     flock->l_owner = lock->owner;
  596 
  597     if (lock->fl_end == LLONG_MAX)
  598         flock->l_len = 0;
  599     else
  600         flock->l_len = lock->fl_end - lock->fl_start + 1;
  601 }
  602 
  603 /* Insert the lock into the inode's lock list */
  604 static void
  605 __insert_lock(pl_inode_t *pl_inode, posix_lock_t *lock)
  606 {
  607     if (lock->blocked)
  608         gettimeofday(&lock->blkd_time, NULL);
  609     else
  610         gettimeofday(&lock->granted_time, NULL);
  611 
  612     list_add_tail(&lock->list, &pl_inode->ext_list);
  613 
  614     return;
  615 }
  616 
  617 /* Return true if the locks overlap, false otherwise */
  618 int
  619 locks_overlap(posix_lock_t *l1, posix_lock_t *l2)
  620 {
  621     /*
  622        Note:
  623        FUSE always gives us absolute offsets, so no need to worry
  624        about SEEK_CUR or SEEK_END
  625     */
  626 
  627     return ((l1->fl_end >= l2->fl_start) && (l2->fl_end >= l1->fl_start));
  628 }
  629 
  630 /* Return true if the locks have the same owner */
  631 int
  632 same_owner(posix_lock_t *l1, posix_lock_t *l2)
  633 {
  634     return (is_same_lkowner(&l1->owner, &l2->owner) &&
  635             (l1->client == l2->client));
  636 }
  637 
  638 /* Delete all F_UNLCK locks */
  639 void
  640 __delete_unlck_locks(pl_inode_t *pl_inode)
  641 {
  642     posix_lock_t *l = NULL;
  643     posix_lock_t *tmp = NULL;
  644 
  645     list_for_each_entry_safe(l, tmp, &pl_inode->ext_list, list)
  646     {
  647         if (l->fl_type == F_UNLCK) {
  648             __delete_lock(l);
  649             __destroy_lock(l);
  650         }
  651     }
  652 }
  653 
  654 /* Add two locks */
  655 static posix_lock_t *
  656 add_locks(posix_lock_t *l1, posix_lock_t *l2, posix_lock_t *dst)
  657 {
  658     posix_lock_t *sum = NULL;
  659 
  660     sum = __copy_lock(dst);
  661     if (!sum)
  662         return NULL;
  663 
  664     sum->fl_start = min(l1->fl_start, l2->fl_start);
  665     sum->fl_end = max(l1->fl_end, l2->fl_end);
  666 
  667     posix_lock_to_flock(sum, &sum->user_flock);
  668 
  669     return sum;
  670 }
  671 
  672 /* Subtract two locks */
  673 struct _values {
  674     posix_lock_t *locks[3];
  675 };
  676 
  677 /* {big} must always be contained inside {small} */
  678 static struct _values
  679 subtract_locks(posix_lock_t *big, posix_lock_t *small)
  680 {
  681     struct _values v = {.locks = {0, 0, 0}};
  682 
  683     if ((big->fl_start == small->fl_start) && (big->fl_end == small->fl_end)) {
  684         /* both edges coincide with big */
  685         v.locks[0] = __copy_lock(big);
  686         if (!v.locks[0]) {
  687             goto out;
  688         }
  689 
  690         v.locks[0]->fl_type = small->fl_type;
  691         v.locks[0]->user_flock.l_type = small->fl_type;
  692         goto done;
  693     }
  694 
  695     if ((small->fl_start > big->fl_start) && (small->fl_end < big->fl_end)) {
  696         /* both edges lie inside big */
  697         v.locks[0] = __copy_lock(big);
  698         v.locks[1] = __copy_lock(small);
  699         v.locks[2] = __copy_lock(big);
  700         if ((v.locks[0] == NULL) || (v.locks[1] == NULL) ||
  701             (v.locks[2] == NULL)) {
  702             goto out;
  703         }
  704 
  705         v.locks[0]->fl_end = small->fl_start - 1;
  706         v.locks[2]->fl_start = small->fl_end + 1;
  707         posix_lock_to_flock(v.locks[0], &v.locks[0]->user_flock);
  708         posix_lock_to_flock(v.locks[2], &v.locks[2]->user_flock);
  709         goto done;
  710     }
  711 
  712     /* one edge coincides with big */
  713     if (small->fl_start == big->fl_start) {
  714         v.locks[0] = __copy_lock(big);
  715         v.locks[1] = __copy_lock(small);
  716         if ((v.locks[0] == NULL) || (v.locks[1] == NULL)) {
  717             goto out;
  718         }
  719 
  720         v.locks[0]->fl_start = small->fl_end + 1;
  721         posix_lock_to_flock(v.locks[0], &v.locks[0]->user_flock);
  722         goto done;
  723     }
  724 
  725     if (small->fl_end == big->fl_end) {
  726         v.locks[0] = __copy_lock(big);
  727         v.locks[1] = __copy_lock(small);
  728         if ((v.locks[0] == NULL) || (v.locks[1] == NULL)) {
  729             goto out;
  730         }
  731 
  732         v.locks[0]->fl_end = small->fl_start - 1;
  733         posix_lock_to_flock(v.locks[0], &v.locks[0]->user_flock);
  734         goto done;
  735     }
  736 
  737     GF_ASSERT(0);
  738     gf_log("posix-locks", GF_LOG_ERROR, "Unexpected case in subtract_locks");
  739 
  740 out:
  741     if (v.locks[0]) {
  742         __destroy_lock(v.locks[0]);
  743         v.locks[0] = NULL;
  744     }
  745     if (v.locks[1]) {
  746         __destroy_lock(v.locks[1]);
  747         v.locks[1] = NULL;
  748     }
  749     if (v.locks[2]) {
  750         __destroy_lock(v.locks[2]);
  751         v.locks[2] = NULL;
  752     }
  753 
  754 done:
  755     return v;
  756 }
  757 
  758 static posix_lock_t *
  759 first_conflicting_overlap(pl_inode_t *pl_inode, posix_lock_t *lock)
  760 {
  761     posix_lock_t *l = NULL;
  762     posix_lock_t *conf = NULL;
  763 
  764     pthread_mutex_lock(&pl_inode->mutex);
  765     {
  766         list_for_each_entry(l, &pl_inode->ext_list, list)
  767         {
  768             if (l->blocked)
  769                 continue;
  770 
  771             if (locks_overlap(l, lock)) {
  772                 if (same_owner(l, lock))
  773                     continue;
  774 
  775                 if ((l->fl_type == F_WRLCK) || (lock->fl_type == F_WRLCK)) {
  776                     conf = l;
  777                     goto unlock;
  778                 }
  779             }
  780         }
  781     }
  782 unlock:
  783     pthread_mutex_unlock(&pl_inode->mutex);
  784 
  785     return conf;
  786 }
  787 
  788 /*
  789   Start searching from {begin}, and return the first lock that
  790   conflicts, NULL if no conflict
  791   If {begin} is NULL, then start from the beginning of the list
  792 */
  793 static posix_lock_t *
  794 first_overlap(pl_inode_t *pl_inode, posix_lock_t *lock)
  795 {
  796     posix_lock_t *l = NULL;
  797 
  798     list_for_each_entry(l, &pl_inode->ext_list, list)
  799     {
  800         if (l->blocked)
  801             continue;
  802 
  803         if (locks_overlap(l, lock))
  804             return l;
  805     }
  806 
  807     return NULL;
  808 }
  809 
  810 /* Return true if lock is grantable */
  811 static int
  812 __is_lock_grantable(pl_inode_t *pl_inode, posix_lock_t *lock)
  813 {
  814     posix_lock_t *l = NULL;
  815     int ret = 1;
  816 
  817     list_for_each_entry(l, &pl_inode->ext_list, list)
  818     {
  819         if (!l->blocked && locks_overlap(lock, l)) {
  820             if (((l->fl_type == F_WRLCK) || (lock->fl_type == F_WRLCK)) &&
  821                 (lock->fl_type != F_UNLCK) && !same_owner(l, lock)) {
  822                 ret = 0;
  823                 break;
  824             }
  825         }
  826     }
  827     return ret;
  828 }
  829 
  830 extern void
  831 do_blocked_rw(pl_inode_t *);
  832 
  833 static void
  834 __insert_and_merge(pl_inode_t *pl_inode, posix_lock_t *lock)
  835 {
  836     posix_lock_t *conf = NULL;
  837     posix_lock_t *t = NULL;
  838     posix_lock_t *sum = NULL;
  839     int i = 0;
  840     struct _values v = {.locks = {0, 0, 0}};
  841 
  842     list_for_each_entry_safe(conf, t, &pl_inode->ext_list, list)
  843     {
  844         if (conf->blocked)
  845             continue;
  846         if (!locks_overlap(conf, lock))
  847             continue;
  848 
  849         if (same_owner(conf, lock)) {
  850             if (conf->fl_type == lock->fl_type &&
  851                 conf->lk_flags == lock->lk_flags) {
  852                 sum = add_locks(lock, conf, lock);
  853 
  854                 __delete_lock(conf);
  855                 __destroy_lock(conf);
  856 
  857                 __destroy_lock(lock);
  858                 INIT_LIST_HEAD(&sum->list);
  859                 posix_lock_to_flock(sum, &sum->user_flock);
  860                 __insert_and_merge(pl_inode, sum);
  861 
  862                 return;
  863             } else {
  864                 sum = add_locks(lock, conf, conf);
  865 
  866                 v = subtract_locks(sum, lock);
  867 
  868                 __delete_lock(conf);
  869                 __destroy_lock(conf);
  870 
  871                 __delete_lock(lock);
  872                 __destroy_lock(lock);
  873 
  874                 __destroy_lock(sum);
  875 
  876                 for (i = 0; i < 3; i++) {
  877                     if (!v.locks[i])
  878                         continue;
  879 
  880                     __insert_and_merge(pl_inode, v.locks[i]);
  881                 }
  882 
  883                 __delete_unlck_locks(pl_inode);
  884                 return;
  885             }
  886         }
  887 
  888         if (lock->fl_type == F_UNLCK) {
  889             continue;
  890         }
  891 
  892         if ((conf->fl_type == F_RDLCK) && (lock->fl_type == F_RDLCK)) {
  893             __insert_lock(pl_inode, lock);
  894             return;
  895         }
  896     }
  897 
  898     /* no conflicts, so just insert */
  899     if (lock->fl_type != F_UNLCK) {
  900         __insert_lock(pl_inode, lock);
  901     } else {
  902         __destroy_lock(lock);
  903     }
  904 }
  905 
  906 void
  907 __grant_blocked_locks(xlator_t *this, pl_inode_t *pl_inode,
  908                       struct list_head *granted)
  909 {
  910     struct list_head tmp_list;
  911     posix_lock_t *l = NULL;
  912     posix_lock_t *tmp = NULL;
  913     posix_lock_t *conf = NULL;
  914 
  915     INIT_LIST_HEAD(&tmp_list);
  916 
  917     list_for_each_entry_safe(l, tmp, &pl_inode->ext_list, list)
  918     {
  919         if (l->blocked) {
  920             conf = first_overlap(pl_inode, l);
  921             if (conf)
  922                 continue;
  923 
  924             l->blocked = 0;
  925             list_move_tail(&l->list, &tmp_list);
  926         }
  927     }
  928 
  929     list_for_each_entry_safe(l, tmp, &tmp_list, list)
  930     {
  931         list_del_init(&l->list);
  932 
  933         if (__is_lock_grantable(pl_inode, l)) {
  934             conf = GF_CALLOC(1, sizeof(*conf), gf_locks_mt_posix_lock_t);
  935 
  936             if (!conf) {
  937                 l->blocked = 1;
  938                 __insert_lock(pl_inode, l);
  939                 continue;
  940             }
  941 
  942             conf->frame = l->frame;
  943             l->frame = NULL;
  944 
  945             posix_lock_to_flock(l, &conf->user_flock);
  946 
  947             gf_log(this->name, GF_LOG_TRACE,
  948                    "%s (pid=%d) lk-owner:%s %" PRId64 " - %" PRId64
  949                    " => Granted",
  950                    l->fl_type == F_UNLCK ? "Unlock" : "Lock", l->client_pid,
  951                    lkowner_utoa(&l->owner), l->user_flock.l_start,
  952                    l->user_flock.l_len);
  953 
  954             __insert_and_merge(pl_inode, l);
  955 
  956             list_add(&conf->list, granted);
  957         } else {
  958             l->blocked = 1;
  959             __insert_lock(pl_inode, l);
  960         }
  961     }
  962 }
  963 
  964 void
  965 grant_blocked_locks(xlator_t *this, pl_inode_t *pl_inode)
  966 {
  967     struct list_head granted_list;
  968     posix_lock_t *tmp = NULL;
  969     posix_lock_t *lock = NULL;
  970     pl_local_t *local = NULL;
  971     INIT_LIST_HEAD(&granted_list);
  972 
  973     pthread_mutex_lock(&pl_inode->mutex);
  974     {
  975         __grant_blocked_locks(this, pl_inode, &granted_list);
  976     }
  977     pthread_mutex_unlock(&pl_inode->mutex);
  978 
  979     list_for_each_entry_safe(lock, tmp, &granted_list, list)
  980     {
  981         list_del_init(&lock->list);
  982 
  983         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
  984                      0, 0, NULL);
  985         local = lock->frame->local;
  986         PL_STACK_UNWIND_AND_FREE(local, lk, lock->frame, 0, 0,
  987                                  &lock->user_flock, NULL);
  988         __destroy_lock(lock);
  989     }
  990 
  991     return;
  992 }
  993 
  994 static int
  995 pl_send_prelock_unlock(xlator_t *this, pl_inode_t *pl_inode,
  996                        posix_lock_t *old_lock)
  997 {
  998     struct gf_flock flock = {
  999         0,
 1000     };
 1001     posix_lock_t *unlock_lock = NULL;
 1002     int32_t op_errno = 0;
 1003 
 1004     struct list_head granted_list;
 1005     posix_lock_t *tmp = NULL;
 1006     posix_lock_t *lock = NULL;
 1007     pl_local_t *local = NULL;
 1008 
 1009     int ret = -1;
 1010 
 1011     INIT_LIST_HEAD(&granted_list);
 1012 
 1013     flock.l_type = F_UNLCK;
 1014     flock.l_whence = old_lock->user_flock.l_whence;
 1015     flock.l_start = old_lock->user_flock.l_start;
 1016     flock.l_len = old_lock->user_flock.l_len;
 1017     flock.l_pid = old_lock->user_flock.l_pid;
 1018 
 1019     unlock_lock = new_posix_lock(&flock, old_lock->client, old_lock->client_pid,
 1020                                  &old_lock->owner, old_lock->fd,
 1021                                  old_lock->lk_flags, 0, &op_errno);
 1022     GF_VALIDATE_OR_GOTO(this->name, unlock_lock, out);
 1023     ret = 0;
 1024 
 1025     __insert_and_merge(pl_inode, unlock_lock);
 1026 
 1027     __grant_blocked_locks(this, pl_inode, &granted_list);
 1028 
 1029     list_for_each_entry_safe(lock, tmp, &granted_list, list)
 1030     {
 1031         list_del_init(&lock->list);
 1032 
 1033         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
 1034                      0, 0, NULL);
 1035         local = lock->frame->local;
 1036         PL_STACK_UNWIND_AND_FREE(local, lk, lock->frame, 0, 0,
 1037                                  &lock->user_flock, NULL);
 1038         __destroy_lock(lock);
 1039     }
 1040 
 1041 out:
 1042     return ret;
 1043 }
 1044 
 1045 int
 1046 pl_setlk(xlator_t *this, pl_inode_t *pl_inode, posix_lock_t *lock,
 1047          int can_block)
 1048 {
 1049     int ret = 0;
 1050 
 1051     errno = 0;
 1052 
 1053     pthread_mutex_lock(&pl_inode->mutex);
 1054     {
 1055         /* Send unlock before the actual lock to
 1056            prevent lock upgrade / downgrade
 1057            problems only if:
 1058            - it is a blocking call
 1059            - it has other conflicting locks
 1060         */
 1061 
 1062         if (can_block && !(__is_lock_grantable(pl_inode, lock))) {
 1063             ret = pl_send_prelock_unlock(this, pl_inode, lock);
 1064             if (ret)
 1065                 gf_log(this->name, GF_LOG_DEBUG,
 1066                        "Could not send pre-lock "
 1067                        "unlock");
 1068         }
 1069 
 1070         if (__is_lock_grantable(pl_inode, lock)) {
 1071             if (pl_metalock_is_active(pl_inode)) {
 1072                 __pl_queue_lock(pl_inode, lock);
 1073                 pthread_mutex_unlock(&pl_inode->mutex);
 1074                 ret = -2;
 1075                 goto out;
 1076             }
 1077             gf_log(this->name, GF_LOG_TRACE,
 1078                    "%s (pid=%d) lk-owner:%s %" PRId64 " - %" PRId64 " => OK",
 1079                    lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
 1080                    lock->client_pid, lkowner_utoa(&lock->owner),
 1081                    lock->user_flock.l_start, lock->user_flock.l_len);
 1082             __insert_and_merge(pl_inode, lock);
 1083         } else if (can_block) {
 1084             if (pl_metalock_is_active(pl_inode)) {
 1085                 __pl_queue_lock(pl_inode, lock);
 1086                 pthread_mutex_unlock(&pl_inode->mutex);
 1087                 ret = -2;
 1088                 goto out;
 1089             }
 1090             gf_log(this->name, GF_LOG_TRACE,
 1091                    "%s (pid=%d) lk-owner:%s %" PRId64 " - %" PRId64
 1092                    " => Blocked",
 1093                    lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
 1094                    lock->client_pid, lkowner_utoa(&lock->owner),
 1095                    lock->user_flock.l_start, lock->user_flock.l_len);
 1096 
 1097             pl_trace_block(this, lock->frame, NULL, NULL, F_SETLKW,
 1098                            &lock->user_flock, NULL);
 1099 
 1100             lock->blocked = 1;
 1101             __insert_lock(pl_inode, lock);
 1102             ret = -1;
 1103         } else {
 1104             gf_log(this->name, GF_LOG_TRACE,
 1105                    "%s (pid=%d) lk-owner:%s %" PRId64 " - %" PRId64 " => NOK",
 1106                    lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
 1107                    lock->client_pid, lkowner_utoa(&lock->owner),
 1108                    lock->user_flock.l_start, lock->user_flock.l_len);
 1109             errno = EAGAIN;
 1110             ret = -1;
 1111         }
 1112     }
 1113     pthread_mutex_unlock(&pl_inode->mutex);
 1114 
 1115     grant_blocked_locks(this, pl_inode);
 1116 
 1117     do_blocked_rw(pl_inode);
 1118 
 1119 out:
 1120     return ret;
 1121 }
 1122 
 1123 posix_lock_t *
 1124 pl_getlk(pl_inode_t *pl_inode, posix_lock_t *lock)
 1125 {
 1126     posix_lock_t *conf = first_conflicting_overlap(pl_inode, lock);
 1127     if (conf == NULL) {
 1128         lock->fl_type = F_UNLCK;
 1129         return lock;
 1130     }
 1131 
 1132     return conf;
 1133 }
 1134 
 1135 gf_boolean_t
 1136 pl_does_monkey_want_stuck_lock()
 1137 {
 1138     long int monkey_unlock_rand = 0;
 1139     long int monkey_unlock_rand_rem = 0;
 1140 
 1141     /* coverity[DC.WEAK_CRYPTO] */
 1142     monkey_unlock_rand = random();
 1143     monkey_unlock_rand_rem = monkey_unlock_rand % 100;
 1144     if (monkey_unlock_rand_rem == 0)
 1145         return _gf_true;
 1146     return _gf_false;
 1147 }
 1148 
 1149 int
 1150 pl_lock_preempt(pl_inode_t *pl_inode, posix_lock_t *reqlock)
 1151 {
 1152     posix_lock_t *lock = NULL;
 1153     posix_lock_t *i = NULL;
 1154     pl_rw_req_t *rw = NULL;
 1155     pl_rw_req_t *itr = NULL;
 1156     struct list_head unwind_blist = {
 1157         0,
 1158     };
 1159     struct list_head unwind_rw_list = {
 1160         0,
 1161     };
 1162     int ret = 0;
 1163 
 1164     INIT_LIST_HEAD(&unwind_blist);
 1165     INIT_LIST_HEAD(&unwind_rw_list);
 1166 
 1167     pthread_mutex_lock(&pl_inode->mutex);
 1168     {
 1169         /*
 1170             - go through the lock list
 1171             - remove all locks from different owners
 1172             - same owner locks will be added or substracted based on
 1173               the new request
 1174             - add the new lock
 1175         */
 1176         list_for_each_entry_safe(lock, i, &pl_inode->ext_list, list)
 1177         {
 1178             if (lock->blocked) {
 1179                 list_del_init(&lock->list);
 1180                 list_add(&lock->list, &unwind_blist);
 1181                 continue;
 1182             }
 1183 
 1184             if (locks_overlap(lock, reqlock)) {
 1185                 if (same_owner(lock, reqlock))
 1186                     continue;
 1187 
 1188                 /* remove conflicting locks */
 1189                 list_del_init(&lock->list);
 1190                 __delete_lock(lock);
 1191                 __destroy_lock(lock);
 1192             }
 1193         }
 1194 
 1195         __insert_and_merge(pl_inode, reqlock);
 1196 
 1197         list_for_each_entry_safe(rw, itr, &pl_inode->rw_list, list)
 1198         {
 1199             list_del_init(&rw->list);
 1200             list_add(&rw->list, &unwind_rw_list);
 1201         }
 1202     }
 1203     pthread_mutex_unlock(&pl_inode->mutex);
 1204 
 1205     /* unwind blocked locks */
 1206     list_for_each_entry_safe(lock, i, &unwind_blist, list)
 1207     {
 1208         PL_STACK_UNWIND_AND_FREE(((pl_local_t *)lock->frame->local), lk,
 1209                                  lock->frame, -1, EBUSY, &lock->user_flock,
 1210                                  NULL);
 1211         __destroy_lock(lock);
 1212     }
 1213 
 1214     /* unwind blocked IOs */
 1215     list_for_each_entry_safe(rw, itr, &unwind_rw_list, list)
 1216     {
 1217         pl_clean_local(rw->stub->frame->local);
 1218         call_unwind_error(rw->stub, -1, EBUSY);
 1219     }
 1220 
 1221     return ret;
 1222 }
 1223 
 1224 /* Return true in case we need to ensure mandatory-locking
 1225  * semantics under different modes.
 1226  */
 1227 gf_boolean_t
 1228 pl_is_mandatory_locking_enabled(pl_inode_t *pl_inode)
 1229 {
 1230     posix_locks_private_t *priv = THIS->private;
 1231 
 1232     if (priv->mandatory_mode == MLK_FILE_BASED && pl_inode->mandatory)
 1233         return _gf_true;
 1234     else if (priv->mandatory_mode == MLK_FORCED ||
 1235              priv->mandatory_mode == MLK_OPTIMAL)
 1236         return _gf_true;
 1237 
 1238     return _gf_false;
 1239 }
 1240 
 1241 void
 1242 pl_clean_local(pl_local_t *local)
 1243 {
 1244     if (!local)
 1245         return;
 1246 
 1247     if (local->inodelk_dom_count_req)
 1248         data_unref(local->inodelk_dom_count_req);
 1249     loc_wipe(&local->loc[0]);
 1250     loc_wipe(&local->loc[1]);
 1251     if (local->fd)
 1252         fd_unref(local->fd);
 1253     if (local->inode)
 1254         inode_unref(local->inode);
 1255     mem_put(local);
 1256 }
 1257 
 1258 /*
 1259 TODO: detach local initialization from PL_LOCAL_GET_REQUESTS and add it here
 1260 */
 1261 int
 1262 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
 1263 {
 1264     pl_local_t *local = NULL;
 1265 
 1266     if (!loc && !fd) {
 1267         return -1;
 1268     }
 1269 
 1270     if (!frame->local) {
 1271         local = mem_get0(this->local_pool);
 1272         if (!local) {
 1273             gf_msg(this->name, GF_LOG_ERROR, ENOMEM, 0,
 1274                    "mem allocation failed");
 1275             return -1;
 1276         }
 1277 
 1278         local->inode = (loc ? inode_ref(loc->inode) : inode_ref(fd->inode));
 1279 
 1280         frame->local = local;
 1281     }
 1282 
 1283     return 0;
 1284 }
 1285 
 1286 gf_boolean_t
 1287 pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
 1288 {
 1289     if (client && (client->opversion < GD_OP_VERSION_7_0)) {
 1290         return _gf_true;
 1291     }
 1292 
 1293     if (is_lk_owner_null(owner)) {
 1294         return _gf_false;
 1295     }
 1296     return _gf_true;
 1297 }
 1298 
 1299 static int32_t
 1300 pl_inode_from_loc(loc_t *loc, inode_t **pinode)
 1301 {
 1302     inode_t *inode = NULL;
 1303     int32_t error = 0;
 1304 
 1305     if (loc->inode != NULL) {
 1306         inode = inode_ref(loc->inode);
 1307         goto done;
 1308     }
 1309 
 1310     if (loc->parent == NULL) {
 1311         error = EINVAL;
 1312         goto done;
 1313     }
 1314 
 1315     if (!gf_uuid_is_null(loc->gfid)) {
 1316         inode = inode_find(loc->parent->table, loc->gfid);
 1317         if (inode != NULL) {
 1318             goto done;
 1319         }
 1320     }
 1321 
 1322     if (loc->name == NULL) {
 1323         error = EINVAL;
 1324         goto done;
 1325     }
 1326 
 1327     inode = inode_grep(loc->parent->table, loc->parent, loc->name);
 1328     if (inode == NULL) {
 1329         /* We haven't found any inode. This means that the file doesn't exist
 1330          * or that even if it exists, we don't have any knowledge about it, so
 1331          * we don't have locks on it either, which is fine for our purposes. */
 1332         goto done;
 1333     }
 1334 
 1335 done:
 1336     *pinode = inode;
 1337 
 1338     return error;
 1339 }
 1340 
 1341 static gf_boolean_t
 1342 pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
 1343                     struct timespec *now, struct list_head *contend)
 1344 {
 1345     pl_dom_list_t *dom;
 1346     pl_inode_lock_t *lock;
 1347     gf_boolean_t has_owners = _gf_false;
 1348 
 1349     list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
 1350     {
 1351         list_for_each_entry(lock, &dom->inodelk_list, list)
 1352         {
 1353             /* If the lock belongs to the same client, we assume it's related
 1354              * to the same operation, so we allow the removal to continue. */
 1355             if (lock->client == client) {
 1356                 continue;
 1357             }
 1358             /* If the lock belongs to an internal process, we don't block the
 1359              * removal. */
 1360             if (lock->client_pid < 0) {
 1361                 continue;
 1362             }
 1363             if (contend == NULL) {
 1364                 return _gf_true;
 1365             }
 1366             has_owners = _gf_true;
 1367             inodelk_contention_notify_check(xl, lock, now, contend);
 1368         }
 1369     }
 1370 
 1371     return has_owners;
 1372 }
 1373 
 1374 int32_t
 1375 pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
 1376                         pl_inode_t **ppl_inode, struct list_head *contend)
 1377 {
 1378     struct timespec now;
 1379     inode_t *inode;
 1380     pl_inode_t *pl_inode;
 1381     int32_t error;
 1382 
 1383     pl_inode = NULL;
 1384 
 1385     error = pl_inode_from_loc(loc, &inode);
 1386     if ((error != 0) || (inode == NULL)) {
 1387         goto done;
 1388     }
 1389 
 1390     pl_inode = pl_inode_get(xl, inode, NULL);
 1391     if (pl_inode == NULL) {
 1392         inode_unref(inode);
 1393         error = ENOMEM;
 1394         goto done;
 1395     }
 1396 
 1397     /* pl_inode_from_loc() already increments ref count for inode, so
 1398      * we only assign here our reference. */
 1399     pl_inode->inode = inode;
 1400 
 1401     timespec_now(&now);
 1402 
 1403     pthread_mutex_lock(&pl_inode->mutex);
 1404 
 1405     if (pl_inode->removed) {
 1406         error = ESTALE;
 1407         goto unlock;
 1408     }
 1409 
 1410     if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
 1411         error = -1;
 1412         /* We skip the unlock here because the caller must create a stub when
 1413          * we return -1 and do a call to pl_inode_remove_complete(), which
 1414          * assumes the lock is still acquired and will release it once
 1415          * everything else is prepared. */
 1416         goto done;
 1417     }
 1418 
 1419     pl_inode->is_locked = _gf_true;
 1420     pl_inode->remove_running++;
 1421 
 1422 unlock:
 1423     pthread_mutex_unlock(&pl_inode->mutex);
 1424 
 1425 done:
 1426     *ppl_inode = pl_inode;
 1427 
 1428     return error;
 1429 }
 1430 
 1431 int32_t
 1432 pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
 1433                          struct list_head *contend)
 1434 {
 1435     pl_inode_lock_t *lock;
 1436     int32_t error = -1;
 1437 
 1438     if (stub != NULL) {
 1439         list_add_tail(&stub->list, &pl_inode->waiting);
 1440         pl_inode->is_locked = _gf_true;
 1441     } else {
 1442         error = ENOMEM;
 1443 
 1444         while (!list_empty(contend)) {
 1445             lock = list_first_entry(contend, pl_inode_lock_t, list);
 1446             list_del_init(&lock->list);
 1447             __pl_inodelk_unref(lock);
 1448         }
 1449     }
 1450 
 1451     pthread_mutex_unlock(&pl_inode->mutex);
 1452 
 1453     if (error < 0) {
 1454         inodelk_contention_notify(xl, contend);
 1455     }
 1456 
 1457     inode_unref(pl_inode->inode);
 1458 
 1459     return error;
 1460 }
 1461 
 1462 void
 1463 pl_inode_remove_wake(struct list_head *list)
 1464 {
 1465     call_stub_t *stub;
 1466 
 1467     while (!list_empty(list)) {
 1468         stub = list_first_entry(list, call_stub_t, list);
 1469         list_del_init(&stub->list);
 1470 
 1471         call_resume(stub);
 1472     }
 1473 }
 1474 
 1475 void
 1476 pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
 1477 {
 1478     struct list_head contend, granted;
 1479     struct timespec now;
 1480     pl_dom_list_t *dom;
 1481 
 1482     if (pl_inode == NULL) {
 1483         return;
 1484     }
 1485 
 1486     INIT_LIST_HEAD(&contend);
 1487     INIT_LIST_HEAD(&granted);
 1488     timespec_now(&now);
 1489 
 1490     pthread_mutex_lock(&pl_inode->mutex);
 1491 
 1492     if (error == 0) {
 1493         if (pl_inode->links >= 0) {
 1494             pl_inode->links--;
 1495         }
 1496         if (pl_inode->links == 0) {
 1497             pl_inode->removed = _gf_true;
 1498         }
 1499     }
 1500 
 1501     pl_inode->remove_running--;
 1502 
 1503     if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
 1504         pl_inode->is_locked = _gf_false;
 1505 
 1506         list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
 1507         {
 1508             __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
 1509                                         &contend);
 1510         }
 1511     }
 1512 
 1513     pthread_mutex_unlock(&pl_inode->mutex);
 1514 
 1515     unwind_granted_inodes(xl, pl_inode, &granted);
 1516 
 1517     inodelk_contention_notify(xl, &contend);
 1518 
 1519     inode_unref(pl_inode->inode);
 1520 }
 1521 
 1522 void
 1523 pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
 1524                          struct list_head *list)
 1525 {
 1526     call_stub_t *stub, *tmp;
 1527 
 1528     if (!pl_inode->is_locked) {
 1529         return;
 1530     }
 1531 
 1532     list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
 1533     {
 1534         if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
 1535                                  NULL)) {
 1536             list_move_tail(&stub->list, list);
 1537         }
 1538     }
 1539 }
 1540 
 1541 /* This function determines if an inodelk attempt can be done now or it needs
 1542  * to wait.
 1543  *
 1544  * Possible return values:
 1545  *   < 0: An error occurred. Currently only -ESTALE can be returned if the
 1546  *        inode has been deleted previously by unlink/rmdir/rename
 1547  *   = 0: The lock can be attempted.
 1548  *   > 0: The lock needs to wait because a conflicting remove operation is
 1549  *        ongoing.
 1550  */
 1551 int32_t
 1552 pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
 1553 {
 1554     pl_dom_list_t *dom;
 1555     pl_inode_lock_t *ilock;
 1556 
 1557     /* If the inode has been deleted, we won't allow any lock. */
 1558     if (pl_inode->removed) {
 1559         return -ESTALE;
 1560     }
 1561 
 1562     /* We only synchronize with locks made for regular operations coming from
 1563      * the user. Locks done for internal purposes are hard to control and could
 1564      * lead to long delays or deadlocks quite easily. */
 1565     if (lock->client_pid < 0) {
 1566         return 0;
 1567     }
 1568     if (!pl_inode->is_locked) {
 1569         return 0;
 1570     }
 1571     if (pl_inode->remove_running > 0) {
 1572         return 1;
 1573     }
 1574 
 1575     list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
 1576     {
 1577         list_for_each_entry(ilock, &dom->inodelk_list, list)
 1578         {
 1579             /* If a lock from the same client is already granted, we allow this
 1580              * one to continue. This is necessary to prevent deadlocks when
 1581              * multiple locks are taken for the same operation.
 1582              *
 1583              * On the other side it's unlikely that the same client sends
 1584              * completely unrelated locks for the same inode.
 1585              */
 1586             if (ilock->client == lock->client) {
 1587                 return 0;
 1588             }
 1589         }
 1590     }
 1591 
 1592     return 1;
 1593 }