"Fossies" - the Fresh Open Source Software Archive

Member "glusterfs-8.2/xlators/features/locks/src/entrylk.c" (16 Sep 2020, 33732 Bytes) of package /linux/misc/glusterfs-8.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "entrylk.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2    Copyright (c) 2006-2012 Red Hat, Inc. <http://www.redhat.com>
    3    This file is part of GlusterFS.
    4 
    5    This file is licensed to you under your choice of the GNU Lesser
    6    General Public License, version 3 or any later version (LGPLv3 or
    7    later), or the GNU General Public License, version 2 (GPLv2), in all
    8    cases as published by the Free Software Foundation.
    9 */
   10 #include <glusterfs/glusterfs.h>
   11 #include <glusterfs/compat.h>
   12 #include <glusterfs/xlator.h>
   13 #include <glusterfs/logging.h>
   14 #include <glusterfs/common-utils.h>
   15 #include <glusterfs/list.h>
   16 #include <glusterfs/upcall-utils.h>
   17 
   18 #include "locks.h"
   19 #include "clear.h"
   20 #include "common.h"
   21 #include "pl-messages.h"
   22 
   23 void
   24 __pl_entrylk_unref(pl_entry_lock_t *lock)
   25 {
   26     lock->ref--;
   27     if (!lock->ref) {
   28         GF_FREE((char *)lock->basename);
   29         GF_FREE(lock->connection_id);
   30         GF_FREE(lock);
   31     }
   32 }
   33 
   34 static void
   35 __pl_entrylk_ref(pl_entry_lock_t *lock)
   36 {
   37     lock->ref++;
   38 }
   39 
   40 static pl_entry_lock_t *
   41 new_entrylk_lock(pl_inode_t *pinode, const char *basename, entrylk_type type,
   42                  const char *domain, call_frame_t *frame, char *conn_id,
   43                  int32_t *op_errno)
   44 {
   45     pl_entry_lock_t *newlock = NULL;
   46 
   47     if (!pl_is_lk_owner_valid(&frame->root->lk_owner, frame->root->client)) {
   48         *op_errno = EINVAL;
   49         goto out;
   50     }
   51 
   52     newlock = GF_CALLOC(1, sizeof(pl_entry_lock_t),
   53                         gf_locks_mt_pl_entry_lock_t);
   54     if (!newlock) {
   55         *op_errno = ENOMEM;
   56         goto out;
   57     }
   58 
   59     newlock->basename = basename ? gf_strdup(basename) : NULL;
   60     newlock->type = type;
   61     newlock->client = frame->root->client;
   62     newlock->client_pid = frame->root->pid;
   63     newlock->volume = domain;
   64     newlock->owner = frame->root->lk_owner;
   65     newlock->frame = frame;
   66     newlock->this = frame->this;
   67 
   68     if (conn_id) {
   69         newlock->connection_id = gf_strdup(conn_id);
   70     }
   71 
   72     INIT_LIST_HEAD(&newlock->domain_list);
   73     INIT_LIST_HEAD(&newlock->blocked_locks);
   74     INIT_LIST_HEAD(&newlock->client_list);
   75 
   76     __pl_entrylk_ref(newlock);
   77 out:
   78     return newlock;
   79 }
   80 
   81 /**
   82  * all_names - does a basename represent all names?
   83  * @basename: name to check
   84  */
   85 
   86 #define all_names(basename) ((basename == NULL) ? 1 : 0)
   87 
   88 /**
   89  * names_conflict - do two names conflict?
   90  * @n1: name
   91  * @n2: name
   92  */
   93 
   94 static int
   95 names_conflict(const char *n1, const char *n2)
   96 {
   97     return all_names(n1) || all_names(n2) || !strcmp(n1, n2);
   98 }
   99 
  100 static int
  101 __same_entrylk_owner(pl_entry_lock_t *l1, pl_entry_lock_t *l2)
  102 {
  103     return (is_same_lkowner(&l1->owner, &l2->owner) &&
  104             (l1->client == l2->client));
  105 }
  106 
  107 /* Just as in inodelk, allow conflicting name locks from same (lk_owner, conn)*/
  108 static int
  109 __conflicting_entrylks(pl_entry_lock_t *l1, pl_entry_lock_t *l2)
  110 {
  111     if (names_conflict(l1->basename, l2->basename) &&
  112         !__same_entrylk_owner(l1, l2))
  113         return 1;
  114 
  115     return 0;
  116 }
  117 
  118 /* See comments in inodelk.c for details */
  119 static inline gf_boolean_t
  120 __stale_entrylk(xlator_t *this, pl_entry_lock_t *candidate_lock,
  121                 pl_entry_lock_t *requested_lock, time_t *lock_age_sec)
  122 {
  123     posix_locks_private_t *priv = NULL;
  124     struct timeval curr;
  125 
  126     priv = this->private;
  127 
  128     /* Question: Should we just prune them all given the
  129      * chance?  Or just the locks we are attempting to acquire?
  130      */
  131     if (names_conflict(candidate_lock->basename, requested_lock->basename)) {
  132         gettimeofday(&curr, NULL);
  133         *lock_age_sec = curr.tv_sec - candidate_lock->granted_time.tv_sec;
  134         if (*lock_age_sec > priv->revocation_secs)
  135             return _gf_true;
  136     }
  137     return _gf_false;
  138 }
  139 
  140 /* See comments in inodelk.c for details */
  141 static gf_boolean_t
  142 __entrylk_prune_stale(xlator_t *this, pl_inode_t *pinode, pl_dom_list_t *dom,
  143                       pl_entry_lock_t *lock)
  144 {
  145     posix_locks_private_t *priv = NULL;
  146     pl_entry_lock_t *tmp = NULL;
  147     pl_entry_lock_t *lk = NULL;
  148     gf_boolean_t revoke_lock = _gf_false;
  149     int bcount = 0;
  150     int gcount = 0;
  151     int op_errno = 0;
  152     clrlk_args args;
  153     args.opts = NULL;
  154     time_t lk_age_sec = 0;
  155     uint32_t max_blocked = 0;
  156     char *reason_str = NULL;
  157 
  158     priv = this->private;
  159     args.type = CLRLK_ENTRY;
  160     if (priv->revocation_clear_all == _gf_true)
  161         args.kind = CLRLK_ALL;
  162     else
  163         args.kind = CLRLK_GRANTED;
  164 
  165     if (list_empty(&dom->entrylk_list))
  166         goto out;
  167 
  168     pthread_mutex_lock(&pinode->mutex);
  169     lock->pinode = pinode;
  170     list_for_each_entry_safe(lk, tmp, &dom->entrylk_list, domain_list)
  171     {
  172         if (__stale_entrylk(this, lk, lock, &lk_age_sec) == _gf_true) {
  173             revoke_lock = _gf_true;
  174             reason_str = "age";
  175             break;
  176         }
  177     }
  178     max_blocked = priv->revocation_max_blocked;
  179     if (max_blocked != 0 && revoke_lock == _gf_false) {
  180         list_for_each_entry_safe(lk, tmp, &dom->blocked_entrylks, blocked_locks)
  181         {
  182             max_blocked--;
  183             if (max_blocked == 0) {
  184                 revoke_lock = _gf_true;
  185                 reason_str = "max blocked";
  186                 break;
  187             }
  188         }
  189     }
  190     pthread_mutex_unlock(&pinode->mutex);
  191 
  192 out:
  193     if (revoke_lock == _gf_true) {
  194         clrlk_clear_entrylk(this, pinode, dom, &args, &bcount, &gcount,
  195                             &op_errno);
  196         gf_log(this->name, GF_LOG_WARNING,
  197                "Lock revocation [reason: %s; gfid: %s; domain: %s; "
  198                "age: %ld sec] - Entry lock revoked:  %d granted & %d "
  199                "blocked locks cleared",
  200                reason_str, uuid_utoa(pinode->gfid), dom->domain, lk_age_sec,
  201                gcount, bcount);
  202     }
  203 
  204     return revoke_lock;
  205 }
  206 
  207 void
  208 entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
  209                                 struct timespec *now, struct list_head *contend)
  210 {
  211     posix_locks_private_t *priv;
  212     int64_t elapsed;
  213 
  214     priv = this->private;
  215 
  216     /* If this lock is in a list, it means that we are about to send a
  217      * notification for it, so no need to do anything else. */
  218     if (!list_empty(&lock->contend)) {
  219         return;
  220     }
  221 
  222     elapsed = now->tv_sec;
  223     elapsed -= lock->contention_time.tv_sec;
  224     if (now->tv_nsec < lock->contention_time.tv_nsec) {
  225         elapsed--;
  226     }
  227     if (elapsed < priv->notify_contention_delay) {
  228         return;
  229     }
  230 
  231     /* All contention notifications will be sent outside of the locked
  232      * region. This means that currently granted locks might have already
  233      * been unlocked by that time. To avoid the lock or the inode to be
  234      * destroyed before we process them, we take an additional reference
  235      * on both. */
  236     inode_ref(lock->pinode->inode);
  237     __pl_entrylk_ref(lock);
  238 
  239     lock->contention_time = *now;
  240 
  241     list_add_tail(&lock->contend, contend);
  242 }
  243 
  244 void
  245 entrylk_contention_notify(xlator_t *this, struct list_head *contend)
  246 {
  247     struct gf_upcall up;
  248     struct gf_upcall_entrylk_contention lc;
  249     pl_entry_lock_t *lock;
  250     pl_inode_t *pl_inode;
  251     client_t *client;
  252     gf_boolean_t notify;
  253 
  254     while (!list_empty(contend)) {
  255         lock = list_first_entry(contend, pl_entry_lock_t, contend);
  256 
  257         pl_inode = lock->pinode;
  258 
  259         pthread_mutex_lock(&pl_inode->mutex);
  260 
  261         /* If the lock has already been released, no notification is
  262          * sent. We clear the notification time in this case. */
  263         notify = !list_empty(&lock->domain_list);
  264         if (!notify) {
  265             lock->contention_time.tv_sec = 0;
  266             lock->contention_time.tv_nsec = 0;
  267         } else {
  268             lc.type = lock->type;
  269             lc.name = lock->basename;
  270             lc.pid = lock->client_pid;
  271             lc.domain = lock->volume;
  272             lc.xdata = NULL;
  273 
  274             gf_uuid_copy(up.gfid, lock->pinode->gfid);
  275             client = (client_t *)lock->client;
  276             if (client == NULL) {
  277                 /* A NULL client can be found if the entrylk
  278                  * was issued by a server side xlator. */
  279                 up.client_uid = NULL;
  280             } else {
  281                 up.client_uid = client->client_uid;
  282             }
  283         }
  284 
  285         pthread_mutex_unlock(&pl_inode->mutex);
  286 
  287         if (notify) {
  288             up.event_type = GF_UPCALL_ENTRYLK_CONTENTION;
  289             up.data = &lc;
  290 
  291             if (this->notify(this, GF_EVENT_UPCALL, &up) < 0) {
  292                 gf_msg_debug(this->name, 0,
  293                              "Entrylk contention notification "
  294                              "failed");
  295             } else {
  296                 gf_msg_debug(this->name, 0,
  297                              "Entrylk contention notification "
  298                              "sent");
  299             }
  300         }
  301 
  302         pthread_mutex_lock(&pl_inode->mutex);
  303 
  304         list_del_init(&lock->contend);
  305         __pl_entrylk_unref(lock);
  306 
  307         pthread_mutex_unlock(&pl_inode->mutex);
  308 
  309         inode_unref(pl_inode->inode);
  310     }
  311 }
  312 
  313 /**
  314  * entrylk_grantable - is this lock grantable?
  315  * @inode: inode in which to look
  316  * @basename: name we're trying to lock
  317  * @type: type of lock
  318  */
  319 static pl_entry_lock_t *
  320 __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
  321                     struct timespec *now, struct list_head *contend)
  322 {
  323     pl_entry_lock_t *tmp = NULL;
  324     pl_entry_lock_t *ret = NULL;
  325 
  326     list_for_each_entry(tmp, &dom->entrylk_list, domain_list)
  327     {
  328         if (__conflicting_entrylks(tmp, lock)) {
  329             if (ret == NULL) {
  330                 ret = tmp;
  331                 if (contend == NULL) {
  332                     break;
  333                 }
  334             }
  335             entrylk_contention_notify_check(this, tmp, now, contend);
  336         }
  337     }
  338 
  339     return ret;
  340 }
  341 
  342 static pl_entry_lock_t *
  343 __blocked_entrylk_conflict(pl_dom_list_t *dom, pl_entry_lock_t *lock)
  344 {
  345     pl_entry_lock_t *tmp = NULL;
  346 
  347     list_for_each_entry(tmp, &dom->blocked_entrylks, blocked_locks)
  348     {
  349         if (names_conflict(tmp->basename, lock->basename))
  350             return lock;
  351     }
  352 
  353     return NULL;
  354 }
  355 
  356 static int
  357 __owner_has_lock(pl_dom_list_t *dom, pl_entry_lock_t *newlock)
  358 {
  359     pl_entry_lock_t *lock = NULL;
  360 
  361     list_for_each_entry(lock, &dom->entrylk_list, domain_list)
  362     {
  363         if (__same_entrylk_owner(lock, newlock))
  364             return 1;
  365     }
  366 
  367     list_for_each_entry(lock, &dom->blocked_entrylks, blocked_locks)
  368     {
  369         if (__same_entrylk_owner(lock, newlock))
  370             return 1;
  371     }
  372 
  373     return 0;
  374 }
  375 
  376 static int
  377 names_equal(const char *n1, const char *n2)
  378 {
  379     return (n1 == NULL && n2 == NULL) || (n1 && n2 && !strcmp(n1, n2));
  380 }
  381 
  382 void
  383 pl_print_entrylk(char *str, int size, entrylk_cmd cmd, entrylk_type type,
  384                  const char *basename, const char *domain)
  385 {
  386     char *cmd_str = NULL;
  387     char *type_str = NULL;
  388 
  389     switch (cmd) {
  390         case ENTRYLK_LOCK:
  391             cmd_str = "LOCK";
  392             break;
  393 
  394         case ENTRYLK_LOCK_NB:
  395             cmd_str = "LOCK_NB";
  396             break;
  397 
  398         case ENTRYLK_UNLOCK:
  399             cmd_str = "UNLOCK";
  400             break;
  401 
  402         default:
  403             cmd_str = "UNKNOWN";
  404             break;
  405     }
  406 
  407     switch (type) {
  408         case ENTRYLK_RDLCK:
  409             type_str = "READ";
  410             break;
  411         case ENTRYLK_WRLCK:
  412             type_str = "WRITE";
  413             break;
  414         default:
  415             type_str = "UNKNOWN";
  416             break;
  417     }
  418 
  419     snprintf(str, size,
  420              "lock=ENTRYLK, cmd=%s, type=%s, basename=%s, domain: %s", cmd_str,
  421              type_str, basename, domain);
  422 }
  423 
  424 void
  425 entrylk_trace_in(xlator_t *this, call_frame_t *frame, const char *domain,
  426                  fd_t *fd, loc_t *loc, const char *basename, entrylk_cmd cmd,
  427                  entrylk_type type)
  428 {
  429     posix_locks_private_t *priv = NULL;
  430     char pl_locker[256];
  431     char pl_lockee[256];
  432     char pl_entrylk[256];
  433 
  434     priv = this->private;
  435 
  436     if (!priv->trace)
  437         return;
  438 
  439     pl_print_locker(pl_locker, 256, this, frame);
  440     pl_print_lockee(pl_lockee, 256, fd, loc);
  441     pl_print_entrylk(pl_entrylk, 256, cmd, type, basename, domain);
  442 
  443     gf_log(this->name, GF_LOG_INFO,
  444            "[REQUEST] Locker = {%s} Lockee = {%s} Lock = {%s}", pl_locker,
  445            pl_lockee, pl_entrylk);
  446 }
  447 
  448 void
  449 entrylk_trace_out(xlator_t *this, call_frame_t *frame, const char *domain,
  450                   fd_t *fd, loc_t *loc, const char *basename, entrylk_cmd cmd,
  451                   entrylk_type type, int op_ret, int op_errno)
  452 {
  453     posix_locks_private_t *priv = NULL;
  454     char pl_locker[256];
  455     char pl_lockee[256];
  456     char pl_entrylk[256];
  457     char verdict[32];
  458 
  459     priv = this->private;
  460 
  461     if (!priv->trace)
  462         return;
  463 
  464     pl_print_locker(pl_locker, 256, this, frame);
  465     pl_print_lockee(pl_lockee, 256, fd, loc);
  466     pl_print_entrylk(pl_entrylk, 256, cmd, type, basename, domain);
  467     pl_print_verdict(verdict, 32, op_ret, op_errno);
  468 
  469     gf_log(this->name, GF_LOG_INFO,
  470            "[%s] Locker = {%s} Lockee = {%s} Lock = {%s}", verdict, pl_locker,
  471            pl_lockee, pl_entrylk);
  472 }
  473 
  474 void
  475 entrylk_trace_block(xlator_t *this, call_frame_t *frame, const char *volume,
  476                     fd_t *fd, loc_t *loc, const char *basename, entrylk_cmd cmd,
  477                     entrylk_type type)
  478 
  479 {
  480     posix_locks_private_t *priv = NULL;
  481     char pl_locker[256];
  482     char pl_lockee[256];
  483     char pl_entrylk[256];
  484 
  485     priv = this->private;
  486 
  487     if (!priv->trace)
  488         return;
  489 
  490     pl_print_locker(pl_locker, 256, this, frame);
  491     pl_print_lockee(pl_lockee, 256, fd, loc);
  492     pl_print_entrylk(pl_entrylk, 256, cmd, type, basename, volume);
  493 
  494     gf_log(this->name, GF_LOG_INFO,
  495            "[BLOCKED] Locker = {%s} Lockee = {%s} Lock = {%s}", pl_locker,
  496            pl_lockee, pl_entrylk);
  497 }
  498 
  499 /**
  500  * __find_most_matching_lock - find the lock struct which most matches in order
  501  * of: lock on the exact basename || an all_names lock
  502  *
  503  *
  504  * @inode: inode in which to look
  505  * @basename: name to search for
  506  */
  507 
  508 static pl_entry_lock_t *
  509 __find_most_matching_lock(pl_dom_list_t *dom, const char *basename)
  510 {
  511     pl_entry_lock_t *lock;
  512     pl_entry_lock_t *all = NULL;
  513     pl_entry_lock_t *exact = NULL;
  514 
  515     if (list_empty(&dom->entrylk_list))
  516         return NULL;
  517 
  518     list_for_each_entry(lock, &dom->entrylk_list, domain_list)
  519     {
  520         if (all_names(lock->basename))
  521             all = lock;
  522         else if (names_equal(lock->basename, basename))
  523             exact = lock;
  524     }
  525 
  526     return (exact ? exact : all);
  527 }
  528 
  529 static pl_entry_lock_t *
  530 __find_matching_lock(pl_dom_list_t *dom, pl_entry_lock_t *lock)
  531 {
  532     pl_entry_lock_t *tmp = NULL;
  533 
  534     list_for_each_entry(tmp, &dom->entrylk_list, domain_list)
  535     {
  536         if (names_equal(lock->basename, tmp->basename) &&
  537             __same_entrylk_owner(lock, tmp) && (lock->type == tmp->type))
  538             return tmp;
  539     }
  540     return NULL;
  541 }
  542 
  543 static int
  544 __lock_blocked_add(xlator_t *this, pl_inode_t *pinode, pl_dom_list_t *dom,
  545                    pl_entry_lock_t *lock, int nonblock)
  546 {
  547     struct timeval now;
  548 
  549     if (nonblock)
  550         goto out;
  551 
  552     gettimeofday(&now, NULL);
  553 
  554     lock->blkd_time = now;
  555     list_add_tail(&lock->blocked_locks, &dom->blocked_entrylks);
  556 
  557     gf_msg_trace(this->name, 0, "Blocking lock: {pinode=%p, basename=%s}",
  558                  pinode, lock->basename);
  559 
  560     entrylk_trace_block(this, lock->frame, NULL, NULL, NULL, lock->basename,
  561                         ENTRYLK_LOCK, lock->type);
  562 out:
  563     return -EAGAIN;
  564 }
  565 
  566 /**
  567  * __lock_entrylk - lock a name in a directory
  568  * @inode: inode for the directory in which to lock
  569  * @basename: name of the entry to lock
  570  *            if null, lock the entire directory
  571  *
  572  * the entire directory being locked is represented as: a single
  573  * pl_entry_lock_t present in the entrylk_locks list with its
  574  * basename = NULL
  575  */
  576 
  577 int
  578 __lock_entrylk(xlator_t *this, pl_inode_t *pinode, pl_entry_lock_t *lock,
  579                int nonblock, pl_dom_list_t *dom, struct timespec *now,
  580                struct list_head *contend)
  581 {
  582     pl_entry_lock_t *conf = NULL;
  583     int ret = -EAGAIN;
  584 
  585     conf = __entrylk_grantable(this, dom, lock, now, contend);
  586     if (conf) {
  587         ret = __lock_blocked_add(this, pinode, dom, lock, nonblock);
  588         goto out;
  589     }
  590 
  591     /* To prevent blocked locks starvation, check if there are any blocked
  592      * locks thay may conflict with this lock. If there is then don't grant
  593      * the lock. BUT grant the lock if the owner already has lock to allow
  594      * nested locks.
  595      * Example: SHD from Machine1 takes (gfid, basename=257-length-name)
  596      * and is granted.
  597      * SHD from machine2 takes (gfid, basename=NULL) and is blocked.
  598      * When SHD from Machine1 takes (gfid, basename=NULL) it needs to be
  599      * granted, without which self-heal can't progress.
  600      * TODO: Find why 'owner_has_lock' is checked even for blocked locks.
  601      */
  602     if (__blocked_entrylk_conflict(dom, lock) &&
  603         !(__owner_has_lock(dom, lock))) {
  604         if (nonblock == 0) {
  605             gf_log(this->name, GF_LOG_DEBUG,
  606                    "Lock is grantable, but blocking to prevent "
  607                    "starvation");
  608         }
  609 
  610         ret = __lock_blocked_add(this, pinode, dom, lock, nonblock);
  611         goto out;
  612     }
  613 
  614     __pl_entrylk_ref(lock);
  615     gettimeofday(&lock->granted_time, NULL);
  616     list_add(&lock->domain_list, &dom->entrylk_list);
  617 
  618     ret = 0;
  619 out:
  620     return ret;
  621 }
  622 
  623 /**
  624  * __unlock_entrylk - unlock a name in a directory
  625  * @inode: inode for the directory to unlock in
  626  * @basename: name of the entry to unlock
  627  *            if null, unlock the entire directory
  628  */
  629 
  630 pl_entry_lock_t *
  631 __unlock_entrylk(pl_dom_list_t *dom, pl_entry_lock_t *lock)
  632 {
  633     pl_entry_lock_t *ret_lock = NULL;
  634 
  635     ret_lock = __find_matching_lock(dom, lock);
  636 
  637     if (ret_lock) {
  638         list_del_init(&ret_lock->domain_list);
  639     } else {
  640         gf_log("locks", GF_LOG_ERROR,
  641                "unlock on %s "
  642                "(type=ENTRYLK_WRLCK) attempted but no matching lock "
  643                "found",
  644                lock->basename);
  645     }
  646 
  647     return ret_lock;
  648 }
  649 
  650 int32_t
  651 check_entrylk_on_basename(xlator_t *this, inode_t *parent, char *basename)
  652 {
  653     int32_t entrylk = 0;
  654     pl_dom_list_t *dom = NULL;
  655     pl_entry_lock_t *conf = NULL;
  656 
  657     pl_inode_t *pinode = pl_inode_get(this, parent, NULL);
  658     if (!pinode)
  659         goto out;
  660     pthread_mutex_lock(&pinode->mutex);
  661     {
  662         list_for_each_entry(dom, &pinode->dom_list, inode_list)
  663         {
  664             conf = __find_most_matching_lock(dom, basename);
  665             if (conf && conf->basename) {
  666                 entrylk = 1;
  667                 break;
  668             }
  669         }
  670     }
  671     pthread_mutex_unlock(&pinode->mutex);
  672 
  673 out:
  674     return entrylk;
  675 }
  676 
  677 void
  678 __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
  679                             pl_dom_list_t *dom, struct list_head *granted,
  680                             struct timespec *now, struct list_head *contend)
  681 {
  682     int bl_ret = 0;
  683     pl_entry_lock_t *bl = NULL;
  684     pl_entry_lock_t *tmp = NULL;
  685 
  686     struct list_head blocked_list;
  687 
  688     INIT_LIST_HEAD(&blocked_list);
  689     list_splice_init(&dom->blocked_entrylks, &blocked_list);
  690 
  691     list_for_each_entry_safe(bl, tmp, &blocked_list, blocked_locks)
  692     {
  693         list_del_init(&bl->blocked_locks);
  694 
  695         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
  696 
  697         if (bl_ret == 0) {
  698             list_add_tail(&bl->blocked_locks, granted);
  699         }
  700     }
  701 }
  702 
  703 /* Grants locks if possible which are blocked on a lock */
  704 void
  705 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
  706                           pl_dom_list_t *dom, struct timespec *now,
  707                           struct list_head *contend)
  708 {
  709     struct list_head granted_list;
  710     pl_entry_lock_t *tmp = NULL;
  711     pl_entry_lock_t *lock = NULL;
  712 
  713     INIT_LIST_HEAD(&granted_list);
  714 
  715     pthread_mutex_lock(&pl_inode->mutex);
  716     {
  717         __grant_blocked_entry_locks(this, pl_inode, dom, &granted_list, now,
  718                                     contend);
  719     }
  720     pthread_mutex_unlock(&pl_inode->mutex);
  721 
  722     list_for_each_entry_safe(lock, tmp, &granted_list, blocked_locks)
  723     {
  724         entrylk_trace_out(this, lock->frame, NULL, NULL, NULL, lock->basename,
  725                           ENTRYLK_LOCK, lock->type, 0, 0);
  726 
  727         STACK_UNWIND_STRICT(entrylk, lock->frame, 0, 0, NULL);
  728         lock->frame = NULL;
  729     }
  730 
  731     pthread_mutex_lock(&pl_inode->mutex);
  732     {
  733         list_for_each_entry_safe(lock, tmp, &granted_list, blocked_locks)
  734         {
  735             list_del_init(&lock->blocked_locks);
  736             __pl_entrylk_unref(lock);
  737         }
  738     }
  739     pthread_mutex_unlock(&pl_inode->mutex);
  740 }
  741 
  742 /* Common entrylk code called by pl_entrylk and pl_fentrylk */
  743 int
  744 pl_common_entrylk(call_frame_t *frame, xlator_t *this, const char *volume,
  745                   inode_t *inode, const char *basename, entrylk_cmd cmd,
  746                   entrylk_type type, loc_t *loc, fd_t *fd, dict_t *xdata)
  747 
  748 {
  749     int32_t op_ret = -1;
  750     int32_t op_errno = 0;
  751     int ret = -1;
  752     char unwind = 1;
  753     GF_UNUSED int dict_ret = -1;
  754     pl_inode_t *pinode = NULL;
  755     pl_entry_lock_t *reqlock = NULL;
  756     pl_entry_lock_t *unlocked = NULL;
  757     pl_dom_list_t *dom = NULL;
  758     char *conn_id = NULL;
  759     pl_ctx_t *ctx = NULL;
  760     int nonblock = 0;
  761     gf_boolean_t need_inode_unref = _gf_false;
  762     posix_locks_private_t *priv = NULL;
  763     struct list_head *pcontend = NULL;
  764     struct list_head contend;
  765     struct timespec now = {};
  766 
  767     priv = this->private;
  768 
  769     if (priv->notify_contention) {
  770         pcontend = &contend;
  771         INIT_LIST_HEAD(pcontend);
  772         timespec_now(&now);
  773     }
  774 
  775     if (xdata)
  776         dict_ret = dict_get_str(xdata, "connection-id", &conn_id);
  777 
  778     pinode = pl_inode_get(this, inode, NULL);
  779     if (!pinode) {
  780         op_errno = ENOMEM;
  781         goto out;
  782     }
  783 
  784     if (frame->root->client) {
  785         ctx = pl_ctx_get(frame->root->client, this);
  786         if (!ctx) {
  787             op_errno = ENOMEM;
  788             gf_log(this->name, GF_LOG_INFO, "pl_ctx_get() failed");
  789             goto unwind;
  790         }
  791     }
  792 
  793     dom = get_domain(pinode, volume);
  794     if (!dom) {
  795         op_errno = ENOMEM;
  796         goto out;
  797     }
  798 
  799     entrylk_trace_in(this, frame, volume, fd, loc, basename, cmd, type);
  800 
  801     reqlock = new_entrylk_lock(pinode, basename, type, dom->domain, frame,
  802                                conn_id, &op_errno);
  803     if (!reqlock) {
  804         op_ret = -1;
  805         goto unwind;
  806     }
  807 
  808     /* Ideally, AFTER a successful lock (both blocking and non-blocking) or
  809      * an unsuccessful blocking lock operation, the inode needs to be ref'd.
  810      *
  811      * But doing so might give room to a race where the lock-requesting
  812      * client could send a DISCONNECT just before this thread refs the inode
  813      * after the locking is done, and the epoll thread could unref the inode
  814      * in cleanup which means the inode's refcount would come down to 0, and
  815      * the call to pl_forget() at this point destroys @pinode. Now when
  816      * the io-thread executing this function tries to access pinode,
  817      * it could crash on account of illegal memory access.
  818      *
  819      * To get around this problem, the inode is ref'd once even before
  820      * adding the lock into client_list as a precautionary measure.
  821      * This way even if there are DISCONNECTs, there will always be 1 extra
  822      * ref on the inode, so @pinode is still alive until after the
  823      * current stack unwinds.
  824      */
  825     pinode->inode = inode_ref(inode);
  826     if (priv->revocation_secs != 0) {
  827         if (cmd != ENTRYLK_UNLOCK) {
  828             __entrylk_prune_stale(this, pinode, dom, reqlock);
  829         } else if (priv->monkey_unlocking == _gf_true) {
  830             if (pl_does_monkey_want_stuck_lock()) {
  831                 gf_log(this->name, GF_LOG_WARNING,
  832                        "MONKEY LOCKING (forcing stuck lock)!");
  833                 op_ret = 0;
  834                 need_inode_unref = _gf_true;
  835                 pthread_mutex_lock(&pinode->mutex);
  836                 {
  837                     __pl_entrylk_unref(reqlock);
  838                 }
  839                 pthread_mutex_unlock(&pinode->mutex);
  840                 goto out;
  841             }
  842         }
  843     }
  844 
  845     switch (cmd) {
  846         case ENTRYLK_LOCK_NB:
  847             nonblock = 1;
  848             /* fall through */
  849         case ENTRYLK_LOCK:
  850             if (ctx)
  851                 pthread_mutex_lock(&ctx->lock);
  852             pthread_mutex_lock(&pinode->mutex);
  853             {
  854                 reqlock->pinode = pinode;
  855 
  856                 ret = __lock_entrylk(this, pinode, reqlock, nonblock, dom, &now,
  857                                      pcontend);
  858                 if (ret == 0) {
  859                     reqlock->frame = NULL;
  860                     op_ret = 0;
  861                 } else {
  862                     op_errno = -ret;
  863                 }
  864 
  865                 if (ctx && (!ret || !nonblock))
  866                     list_add(&reqlock->client_list, &ctx->entrylk_lockers);
  867 
  868                 if (ret == -EAGAIN && !nonblock) {
  869                     /* blocked */
  870                     unwind = 0;
  871                 } else {
  872                     __pl_entrylk_unref(reqlock);
  873                 }
  874 
  875                 /* For all but the case where a non-blocking lock
  876                  * attempt fails, the extra ref taken before the switch
  877                  * block must be negated.
  878                  */
  879                 if ((ret == -EAGAIN) && (nonblock))
  880                     need_inode_unref = _gf_true;
  881             }
  882             pthread_mutex_unlock(&pinode->mutex);
  883             if (ctx)
  884                 pthread_mutex_unlock(&ctx->lock);
  885             break;
  886 
  887         case ENTRYLK_UNLOCK:
  888             if (ctx)
  889                 pthread_mutex_lock(&ctx->lock);
  890             pthread_mutex_lock(&pinode->mutex);
  891             {
  892                 /* Irrespective of whether unlock succeeds or not,
  893                  * the extra inode ref that was done before the switch
  894                  * block must be negated. Towards this,
  895                  * @need_inode_unref flag is set unconditionally here.
  896                  */
  897                 need_inode_unref = _gf_true;
  898                 unlocked = __unlock_entrylk(dom, reqlock);
  899                 if (unlocked) {
  900                     list_del_init(&unlocked->client_list);
  901                     __pl_entrylk_unref(unlocked);
  902                     op_ret = 0;
  903                 } else {
  904                     op_errno = EINVAL;
  905                 }
  906                 __pl_entrylk_unref(reqlock);
  907             }
  908             pthread_mutex_unlock(&pinode->mutex);
  909             if (ctx)
  910                 pthread_mutex_unlock(&ctx->lock);
  911 
  912             grant_blocked_entry_locks(this, pinode, dom, &now, pcontend);
  913 
  914             break;
  915 
  916         default:
  917             need_inode_unref = _gf_true;
  918             gf_log(this->name, GF_LOG_ERROR,
  919                    "Unexpected case in entrylk (cmd=%d). Please file"
  920                    "a bug report at http://bugs.gluster.com",
  921                    cmd);
  922             goto out;
  923     }
  924     /* The following (extra) unref corresponds to the ref that
  925      * was done at the time the lock was granted.
  926      */
  927     if ((cmd == ENTRYLK_UNLOCK) && (op_ret == 0))
  928         inode_unref(pinode->inode);
  929 
  930 out:
  931 
  932     if (need_inode_unref)
  933         inode_unref(pinode->inode);
  934 
  935     if (unwind) {
  936         entrylk_trace_out(this, frame, volume, fd, loc, basename, cmd, type,
  937                           op_ret, op_errno);
  938     unwind:
  939         STACK_UNWIND_STRICT(entrylk, frame, op_ret, op_errno, NULL);
  940     }
  941 
  942     if (pcontend != NULL) {
  943         entrylk_contention_notify(this, pcontend);
  944     }
  945 
  946     return 0;
  947 }
  948 
  949 /**
  950  * pl_entrylk:
  951  *
  952  * Locking on names (directory entries)
  953  */
  954 
  955 int
  956 pl_entrylk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc,
  957            const char *basename, entrylk_cmd cmd, entrylk_type type,
  958            dict_t *xdata)
  959 {
  960     pl_common_entrylk(frame, this, volume, loc->inode, basename, cmd, type, loc,
  961                       NULL, xdata);
  962 
  963     return 0;
  964 }
  965 
  966 /**
  967  * pl_fentrylk:
  968  *
  969  * Locking on names (directory entries)
  970  */
  971 
  972 int
  973 pl_fentrylk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd,
  974             const char *basename, entrylk_cmd cmd, entrylk_type type,
  975             dict_t *xdata)
  976 {
  977     pl_common_entrylk(frame, this, volume, fd->inode, basename, cmd, type, NULL,
  978                       fd, xdata);
  979 
  980     return 0;
  981 }
  982 
  983 static void
  984 pl_entrylk_log_cleanup(pl_entry_lock_t *lock)
  985 {
  986     pl_inode_t *pinode = NULL;
  987 
  988     pinode = lock->pinode;
  989 
  990     gf_log(THIS->name, GF_LOG_WARNING,
  991            "releasing lock on %s held by "
  992            "{client=%p, pid=%" PRId64 " lk-owner=%s}",
  993            uuid_utoa(pinode->gfid), lock->client, (uint64_t)lock->client_pid,
  994            lkowner_utoa(&lock->owner));
  995 }
  996 
  997 /* Release all entrylks from this client */
  998 int
  999 pl_entrylk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
 1000 {
 1001     posix_locks_private_t *priv;
 1002     pl_entry_lock_t *tmp = NULL;
 1003     pl_entry_lock_t *l = NULL;
 1004     pl_dom_list_t *dom = NULL;
 1005     pl_inode_t *pinode = NULL;
 1006     struct list_head *pcontend = NULL;
 1007     struct list_head released;
 1008     struct list_head unwind;
 1009     struct list_head contend;
 1010     struct timespec now = {};
 1011 
 1012     INIT_LIST_HEAD(&released);
 1013     INIT_LIST_HEAD(&unwind);
 1014 
 1015     priv = this->private;
 1016     if (priv->notify_contention) {
 1017         pcontend = &contend;
 1018         INIT_LIST_HEAD(pcontend);
 1019         timespec_now(&now);
 1020     }
 1021 
 1022     pthread_mutex_lock(&ctx->lock);
 1023     {
 1024         list_for_each_entry_safe(l, tmp, &ctx->entrylk_lockers, client_list)
 1025         {
 1026             pl_entrylk_log_cleanup(l);
 1027 
 1028             pinode = l->pinode;
 1029 
 1030             pthread_mutex_lock(&pinode->mutex);
 1031             {
 1032                 /* If the entrylk object is part of granted list but not
 1033                  * blocked list, then perform the following actions:
 1034                  * i.   delete the object from granted list;
 1035                  * ii.  grant other locks (from other clients) that may
 1036                  *      have been blocked on this entrylk; and
 1037                  * iii. unref the object.
 1038                  *
 1039                  * If the entrylk object (L1) is part of both granted
 1040                  * and blocked lists, then this means that a parallel
 1041                  * unlock on another entrylk (L2 say) may have 'granted'
 1042                  * L1 and added it to 'granted' list in
 1043                  * __grant_blocked_entry_locks() (although using the
 1044                  * 'blocked_locks' member). In that case, the cleanup
 1045                  * codepath must try and grant other overlapping
 1046                  * blocked entrylks from other clients, now that L1 is
 1047                  * out of their way and then unref L1 in the end, and
 1048                  * leave it to the other thread (the one executing
 1049                  * unlock codepath) to unwind L1's frame, delete it from
 1050                  * blocked_locks list, and perform the last unref on L1.
 1051                  *
 1052                  * If the entrylk object (L1) is part of blocked list
 1053                  * only, the cleanup code path must:
 1054                  * i.   delete it from the blocked_locks list inside
 1055                  *      this critical section,
 1056                  * ii.  unwind its frame with EAGAIN,
 1057                  * iii. try and grant blocked entry locks from other
 1058                  *      clients that were otherwise grantable, but were
 1059                  *      blocked to avoid leaving L1 to starve forever.
 1060                  * iv.  unref the object.
 1061                  */
 1062                 list_del_init(&l->client_list);
 1063 
 1064                 if (!list_empty(&l->domain_list)) {
 1065                     list_del_init(&l->domain_list);
 1066                     list_add_tail(&l->client_list, &released);
 1067                 } else {
 1068                     list_del_init(&l->blocked_locks);
 1069                     list_add_tail(&l->client_list, &unwind);
 1070                 }
 1071             }
 1072             pthread_mutex_unlock(&pinode->mutex);
 1073         }
 1074     }
 1075     pthread_mutex_unlock(&ctx->lock);
 1076 
 1077     if (!list_empty(&unwind)) {
 1078         list_for_each_entry_safe(l, tmp, &unwind, client_list)
 1079         {
 1080             list_del_init(&l->client_list);
 1081 
 1082             if (l->frame)
 1083                 STACK_UNWIND_STRICT(entrylk, l->frame, -1, EAGAIN, NULL);
 1084             list_add_tail(&l->client_list, &released);
 1085         }
 1086     }
 1087 
 1088     if (!list_empty(&released)) {
 1089         list_for_each_entry_safe(l, tmp, &released, client_list)
 1090         {
 1091             list_del_init(&l->client_list);
 1092 
 1093             pinode = l->pinode;
 1094 
 1095             dom = get_domain(pinode, l->volume);
 1096 
 1097             grant_blocked_entry_locks(this, pinode, dom, &now, pcontend);
 1098 
 1099             pthread_mutex_lock(&pinode->mutex);
 1100             {
 1101                 __pl_entrylk_unref(l);
 1102             }
 1103             pthread_mutex_unlock(&pinode->mutex);
 1104 
 1105             inode_unref(pinode->inode);
 1106         }
 1107     }
 1108 
 1109     if (pcontend != NULL) {
 1110         entrylk_contention_notify(this, pcontend);
 1111     }
 1112 
 1113     return 0;
 1114 }
 1115 
 1116 int32_t
 1117 __get_entrylk_count(xlator_t *this, pl_inode_t *pl_inode)
 1118 {
 1119     int32_t count = 0;
 1120     pl_entry_lock_t *lock = NULL;
 1121     pl_dom_list_t *dom = NULL;
 1122 
 1123     list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
 1124     {
 1125         list_for_each_entry(lock, &dom->entrylk_list, domain_list) { count++; }
 1126 
 1127         list_for_each_entry(lock, &dom->blocked_entrylks, blocked_locks)
 1128         {
 1129             count++;
 1130         }
 1131     }
 1132 
 1133     return count;
 1134 }
 1135 
 1136 int32_t
 1137 get_entrylk_count(xlator_t *this, inode_t *inode)
 1138 {
 1139     pl_inode_t *pl_inode = NULL;
 1140     uint64_t tmp_pl_inode = 0;
 1141     int ret = 0;
 1142     int32_t count = 0;
 1143 
 1144     ret = inode_ctx_get(inode, this, &tmp_pl_inode);
 1145     if (ret != 0) {
 1146         goto out;
 1147     }
 1148 
 1149     pl_inode = (pl_inode_t *)(long)tmp_pl_inode;
 1150 
 1151     pthread_mutex_lock(&pl_inode->mutex);
 1152     {
 1153         count = __get_entrylk_count(this, pl_inode);
 1154     }
 1155     pthread_mutex_unlock(&pl_inode->mutex);
 1156 
 1157 out:
 1158     return count;
 1159 }