"Fossies" - the Fresh Open Source Software Archive

Member "drbd-9.0.21-1/drbd/drbd_req.c" (11 Nov 2019, 77875 Bytes) of package /linux/misc/drbd-9.0.21-1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "drbd_req.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.20-1_vs_9.0.21-1.

    1 // SPDX-License-Identifier: GPL-2.0-or-later
    2 /*
    3    drbd_req.c
    4 
    5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
    6 
    7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
    8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
    9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   10 
   11 
   12  */
   13 
   14 #include <linux/module.h>
   15 
   16 #include <linux/slab.h>
   17 #include <linux/drbd.h>
   18 #include "drbd_int.h"
   19 #include "drbd_req.h"
   20 
   21 
   22 
   23 static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
   24 
   25 /* Update disk stats at start of I/O request */
   26 static void _drbd_start_io_acct(struct drbd_device *device, struct drbd_request *req)
   27 {
   28     generic_start_io_acct(device->rq_queue, bio_data_dir(req->master_bio), req->i.size >> 9,
   29                   &device->vdisk->part0);
   30 }
   31 
   32 /* Update disk stats when completing request upwards */
   33 static void _drbd_end_io_acct(struct drbd_device *device, struct drbd_request *req)
   34 {
   35     generic_end_io_acct(device->rq_queue, bio_data_dir(req->master_bio),
   36                 &device->vdisk->part0,
   37                 req->start_jif);
   38 }
   39 
   40 static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
   41 {
   42     struct drbd_request *req;
   43 
   44     req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
   45     if (!req)
   46         return NULL;
   47 
   48     memset(req, 0, sizeof(*req));
   49 
   50     kref_get(&device->kref);
   51     kref_debug_get(&device->kref_debug, 6);
   52 
   53     req->device = device;
   54     req->master_bio = bio_src;
   55     req->epoch = 0;
   56 
   57     drbd_clear_interval(&req->i);
   58     req->i.sector = bio_src->bi_iter.bi_sector;
   59     req->i.size = bio_src->bi_iter.bi_size;
   60     req->i.local = true;
   61     req->i.waiting = false;
   62 
   63     INIT_LIST_HEAD(&req->tl_requests);
   64     INIT_LIST_HEAD(&req->req_pending_master_completion);
   65     INIT_LIST_HEAD(&req->req_pending_local);
   66 
   67     /* one reference to be put by __drbd_make_request */
   68     atomic_set(&req->completion_ref, 1);
   69     /* one kref as long as completion_ref > 0 */
   70     kref_init(&req->kref);
   71 
   72     req->local_rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
   73                   | (bio_op(bio_src) == REQ_OP_WRITE_SAME ? RQ_WSAME : 0)
   74                   | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
   75                   | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
   76 
   77     return req;
   78 }
   79 
   80 static void req_destroy_no_send_peer_ack(struct kref *kref)
   81 {
   82     struct drbd_request *req = container_of(kref, struct drbd_request, kref);
   83     mempool_free(req, &drbd_request_mempool);
   84 }
   85 
   86 void drbd_queue_peer_ack(struct drbd_resource *resource, struct drbd_request *req)
   87 {
   88     struct drbd_connection *connection;
   89     bool queued = false;
   90 
   91     refcount_set(&req->kref.refcount, 1); /* was 0, instead of kref_get() */
   92     rcu_read_lock();
   93     for_each_connection_rcu(connection, resource) {
   94         unsigned int node_id = connection->peer_node_id;
   95         if (connection->agreed_pro_version < 110 ||
   96             connection->cstate[NOW] != C_CONNECTED ||
   97             !(req->net_rq_state[node_id] & RQ_NET_SENT))
   98             continue;
   99         kref_get(&req->kref);
  100         req->net_rq_state[node_id] |= RQ_PEER_ACK;
  101         if (!queued) {
  102             list_add_tail(&req->tl_requests, &resource->peer_ack_list);
  103             queued = true;
  104         }
  105         queue_work(connection->ack_sender, &connection->peer_ack_work);
  106     }
  107     rcu_read_unlock();
  108 
  109     kref_put(&req->kref, req_destroy_no_send_peer_ack);
  110 }
  111 
  112 static bool peer_ack_differs(struct drbd_request *req1, struct drbd_request *req2)
  113 {
  114     unsigned int max_node_id = req1->device->resource->max_node_id;
  115     unsigned int node_id;
  116 
  117     for (node_id = 0; node_id <= max_node_id; node_id++)
  118         if ((req1->net_rq_state[node_id] & RQ_NET_OK) !=
  119             (req2->net_rq_state[node_id] & RQ_NET_OK))
  120             return true;
  121     return false;
  122 }
  123 
  124 static bool peer_ack_window_full(struct drbd_request *req)
  125 {
  126     struct drbd_resource *resource = req->device->resource;
  127     u32 peer_ack_window = resource->res_opts.peer_ack_window;
  128     u64 last_dagtag = resource->last_peer_acked_dagtag + peer_ack_window;
  129 
  130     return dagtag_newer_eq(req->dagtag_sector, last_dagtag);
  131 }
  132 
  133 static void drbd_remove_request_interval(struct rb_root *root,
  134                      struct drbd_request *req)
  135 {
  136     struct drbd_device *device = req->device;
  137     struct drbd_interval *i = &req->i;
  138 
  139     drbd_remove_interval(root, i);
  140 
  141     /* Wake up any processes waiting for this request to complete.  */
  142     if (i->waiting)
  143         wake_up(&device->misc_wait);
  144 }
  145 
  146 /* must_hold resource->req_lock */
  147 void drbd_req_destroy(struct kref *kref)
  148 {
  149     struct drbd_request *req = container_of(kref, struct drbd_request, kref);
  150     struct drbd_request *destroy_next;
  151     struct drbd_device *device;
  152     struct drbd_peer_device *peer_device;
  153     unsigned int s;
  154     bool was_last_ref;
  155 
  156  tail_recursion:
  157     was_last_ref = false;
  158     device = req->device;
  159     s = req->local_rq_state;
  160     destroy_next = req->destroy_next;
  161 
  162 #ifdef CONFIG_DRBD_TIMING_STATS
  163     if (s & RQ_WRITE) {
  164         unsigned long flags;
  165 
  166         spin_lock_irqsave(&device->timing_lock, flags);
  167         device->reqs++;
  168         ktime_aggregate(device, req, in_actlog_kt);
  169         ktime_aggregate(device, req, pre_submit_kt);
  170         for_each_peer_device(peer_device, device) {
  171             int node_id = peer_device->node_id;
  172             unsigned ns = drbd_req_state_by_peer_device(req, peer_device);
  173             if (!(ns & RQ_NET_MASK))
  174                 continue;
  175             ktime_aggregate_pd(peer_device, node_id, req, pre_send_kt);
  176             ktime_aggregate_pd(peer_device, node_id, req, acked_kt);
  177             ktime_aggregate_pd(peer_device, node_id, req, net_done_kt);
  178         }
  179         spin_unlock_irqrestore(&device->timing_lock, flags);
  180     }
  181 #endif
  182 
  183     /* paranoia */
  184     for_each_peer_device(peer_device, device) {
  185         unsigned ns = drbd_req_state_by_peer_device(req, peer_device);
  186         if (!(ns & RQ_NET_MASK))
  187             continue;
  188         if (ns & RQ_NET_DONE)
  189             continue;
  190 
  191         drbd_err(device,
  192             "drbd_req_destroy: Logic BUG rq_state: (0:%x, %d:%x), completion_ref = %d\n",
  193             s, 1 + peer_device->node_id, ns, atomic_read(&req->completion_ref));
  194         return;
  195     }
  196 
  197     /* more paranoia */
  198     if ((req->master_bio && !(s & RQ_POSTPONED)) ||
  199         atomic_read(&req->completion_ref) || (s & RQ_LOCAL_PENDING)) {
  200         drbd_err(device, "drbd_req_destroy: Logic BUG rq_state: %x, completion_ref = %d\n",
  201                 s, atomic_read(&req->completion_ref));
  202         return;
  203     }
  204 
  205     list_del_init(&req->tl_requests);
  206 
  207     /* finally remove the request from the conflict detection
  208      * respective block_id verification interval tree. */
  209     if (!drbd_interval_empty(&req->i)) {
  210         struct rb_root *root;
  211 
  212         if (s & RQ_WRITE)
  213             root = &device->write_requests;
  214         else
  215             root = &device->read_requests;
  216         drbd_remove_request_interval(root, req);
  217     } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
  218         drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
  219             s, (unsigned long long)req->i.sector, req->i.size);
  220 
  221     if (s & RQ_WRITE) {
  222         /* There is a special case:
  223          * we may notice late that IO was suspended,
  224          * and postpone, or schedule for retry, a write,
  225          * before it even was submitted or sent.
  226          * In that case we do not want to touch the bitmap at all.
  227          */
  228         if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED &&
  229             req->i.size && get_ldev_if_state(device, D_DETACHING)) {
  230             struct drbd_peer_md *peer_md = device->ldev->md.peers;
  231             unsigned long bits = -1, mask = -1;
  232             int node_id, max_node_id = device->resource->max_node_id;
  233 
  234             for (node_id = 0; node_id <= max_node_id; node_id++) {
  235                 unsigned int net_rq_state;
  236 
  237                 net_rq_state = req->net_rq_state[node_id];
  238                 if (net_rq_state & RQ_NET_OK) {
  239                     int bitmap_index = peer_md[node_id].bitmap_index;
  240 
  241                     if (bitmap_index == -1)
  242                         continue;
  243 
  244                     if (net_rq_state & RQ_NET_SIS)
  245                         clear_bit(bitmap_index, &bits);
  246                     else
  247                         clear_bit(bitmap_index, &mask);
  248                 }
  249             }
  250             drbd_set_sync(device, req->i.sector, req->i.size, bits, mask);
  251             put_ldev(device);
  252         }
  253 
  254         /* one might be tempted to move the drbd_al_complete_io
  255          * to the local io completion callback drbd_request_endio.
  256          * but, if this was a mirror write, we may only
  257          * drbd_al_complete_io after this is RQ_NET_DONE,
  258          * otherwise the extent could be dropped from the al
  259          * before it has actually been written on the peer.
  260          * if we crash before our peer knows about the request,
  261          * but after the extent has been dropped from the al,
  262          * we would forget to resync the corresponding extent.
  263          */
  264         if (s & RQ_IN_ACT_LOG) {
  265             if (get_ldev_if_state(device, D_DETACHING)) {
  266                 was_last_ref = drbd_al_complete_io(device, &req->i);
  267                 put_ldev(device);
  268             } else if (drbd_ratelimit()) {
  269                 drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
  270                       "but my Disk seems to have failed :(\n",
  271                       (unsigned long long) req->i.sector, req->i.size);
  272 
  273             }
  274         }
  275     }
  276 
  277     if (s & RQ_WRITE && req->i.size) {
  278         struct drbd_resource *resource = device->resource;
  279         struct drbd_request *peer_ack_req = resource->peer_ack_req;
  280 
  281         if (peer_ack_req) {
  282             if (peer_ack_differs(req, peer_ack_req) ||
  283                 (was_last_ref && atomic_read(&device->ap_actlog_cnt)) ||
  284                 peer_ack_window_full(req)) {
  285                 drbd_queue_peer_ack(resource, peer_ack_req);
  286                 peer_ack_req = NULL;
  287             } else
  288                 mempool_free(peer_ack_req, &drbd_request_mempool);
  289         }
  290         req->device = NULL;
  291         resource->peer_ack_req = req;
  292         mod_timer(&resource->peer_ack_timer,
  293               jiffies + resource->res_opts.peer_ack_delay * HZ / 1000);
  294 
  295         if (!peer_ack_req)
  296             resource->last_peer_acked_dagtag = req->dagtag_sector;
  297     } else
  298         mempool_free(req, &drbd_request_mempool);
  299 
  300     /* In both branches of the if above, the reference to device gets released */
  301     kref_debug_put(&device->kref_debug, 6);
  302     kref_put(&device->kref, drbd_destroy_device);
  303 
  304     /*
  305      * Do the equivalent of:
  306      *   kref_put(&req->kref, drbd_req_destroy)
  307      * without recursing into the destructor.
  308      */
  309     if (destroy_next) {
  310         req = destroy_next;
  311         if (refcount_dec_and_test(&req->kref.refcount))
  312             goto tail_recursion;
  313     }
  314 }
  315 
  316 static void wake_all_senders(struct drbd_resource *resource) {
  317     struct drbd_connection *connection;
  318     /* We need make sure any update is visible before we wake up the
  319      * threads that may check the values in their wait_event() condition.
  320      * Do we need smp_mb here? Or rather switch to atomic_t? */
  321     rcu_read_lock();
  322     for_each_connection_rcu(connection, resource)
  323         wake_up(&connection->sender_work.q_wait);
  324     rcu_read_unlock();
  325 }
  326 
  327 /* must hold resource->req_lock */
  328 bool start_new_tl_epoch(struct drbd_resource *resource)
  329 {
  330     /* no point closing an epoch, if it is empty, anyways. */
  331     if (resource->current_tle_writes == 0)
  332         return false;
  333 
  334     resource->current_tle_writes = 0;
  335     atomic_inc(&resource->current_tle_nr);
  336     wake_all_senders(resource);
  337     return true;
  338 }
  339 
  340 void complete_master_bio(struct drbd_device *device,
  341         struct bio_and_error *m)
  342 {
  343     int rw = bio_data_dir(m->bio);
  344     m->bio->bi_status = errno_to_blk_status(m->error);
  345     bio_endio(m->bio);
  346     dec_ap_bio(device, rw);
  347 }
  348 
  349 
  350 /* Helper for __req_mod().
  351  * Set m->bio to the master bio, if it is fit to be completed,
  352  * or leave it alone (it is initialized to NULL in __req_mod),
  353  * if it has already been completed, or cannot be completed yet.
  354  * If m->bio is set, the error status to be returned is placed in m->error.
  355  */
  356 static
  357 void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
  358 {
  359     const unsigned s = req->local_rq_state;
  360     struct drbd_device *device = req->device;
  361     struct drbd_peer_device *peer_device;
  362     int error, ok = 0;
  363 
  364     /*
  365      * figure out whether to report success or failure.
  366      *
  367      * report success when at least one of the operations succeeded.
  368      * or, to put the other way,
  369      * only report failure, when both operations failed.
  370      *
  371      * what to do about the failures is handled elsewhere.
  372      * what we need to do here is just: complete the master_bio.
  373      *
  374      * local completion error, if any, has been stored as ERR_PTR
  375      * in private_bio within drbd_request_endio.
  376      */
  377     if (s & RQ_LOCAL_OK)
  378         ++ok;
  379     error = PTR_ERR(req->private_bio);
  380 
  381     for_each_peer_device(peer_device, device) {
  382         unsigned ns = drbd_req_state_by_peer_device(req, peer_device);
  383         /* any net ok ok local ok is good enough to complete this bio as OK */
  384         if (ns & RQ_NET_OK)
  385             ++ok;
  386         /* paranoia */
  387         /* we must not complete the master bio, while it is
  388          *  still being processed by _drbd_send_zc_bio (drbd_send_dblock),
  389          *  respectively still needed for the second drbd_csum_bio() there.
  390          *  not yet acknowledged by the peer
  391          *  not yet completed by the local io subsystem
  392          * these flags may get cleared in any order by
  393          *  the worker,
  394          *  the sender,
  395          *  the receiver,
  396          *  the bio_endio completion callbacks.
  397          */
  398         if (!(ns & RQ_NET_MASK))
  399             continue;
  400         if (!(ns & (RQ_NET_PENDING|RQ_NET_QUEUED)))
  401             continue;
  402 
  403         drbd_err(device,
  404             "drbd_req_complete: Logic BUG rq_state: (0:%x, %d:%x), completion_ref = %d\n",
  405              s, 1 + peer_device->node_id, ns, atomic_read(&req->completion_ref));
  406         return;
  407     }
  408 
  409     /* more paranoia */
  410     if (atomic_read(&req->completion_ref) ||
  411         ((s & RQ_LOCAL_PENDING) && !(s & RQ_LOCAL_ABORTED))) {
  412         drbd_err(device, "drbd_req_complete: Logic BUG rq_state: %x, completion_ref = %d\n",
  413                 s, atomic_read(&req->completion_ref));
  414         return;
  415     }
  416 
  417     if (!req->master_bio) {
  418         drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
  419         return;
  420     }
  421 
  422     /* Before we can signal completion to the upper layers,
  423      * we may need to close the current transfer log epoch.
  424      * We are within the request lock, so we can simply compare
  425      * the request epoch number with the current transfer log
  426      * epoch number.  If they match, increase the current_tle_nr,
  427      * and reset the transfer log epoch write_cnt.
  428      */
  429     if (bio_data_dir(req->master_bio) == WRITE &&
  430         req->epoch == atomic_read(&device->resource->current_tle_nr))
  431         start_new_tl_epoch(device->resource);
  432 
  433     /* Update disk stats */
  434     _drbd_end_io_acct(device, req);
  435 
  436     /* If READ failed,
  437      * have it be pushed back to the retry work queue,
  438      * so it will re-enter __drbd_make_request(),
  439      * and be re-assigned to a suitable local or remote path,
  440      * or failed if we do not have access to good data anymore.
  441      *
  442      * Unless it was failed early by __drbd_make_request(),
  443      * because no path was available, in which case
  444      * it was not even added to the transfer_log.
  445      *
  446      * read-ahead may fail, and will not be retried.
  447      *
  448      * WRITE should have used all available paths already.
  449      */
  450     if (!ok &&
  451         bio_op(req->master_bio) == REQ_OP_READ &&
  452         !(req->master_bio->bi_opf & REQ_RAHEAD) &&
  453         !list_empty(&req->tl_requests))
  454         req->local_rq_state |= RQ_POSTPONED;
  455 
  456     if (!(req->local_rq_state & RQ_POSTPONED)) {
  457         struct drbd_resource *resource = device->resource;
  458         bool quorum =
  459             resource->res_opts.on_no_quorum == ONQ_IO_ERROR ?
  460             resource->cached_all_devices_have_quorum : true;
  461 
  462         m->error = ok && quorum ? 0 : (error ?: -EIO);
  463         m->bio = req->master_bio;
  464         req->master_bio = NULL;
  465         /* We leave it in the tree, to be able to verify later
  466          * write-acks in protocol != C during resync.
  467          * But we mark it as "complete", so it won't be counted as
  468          * conflict in a multi-primary setup. */
  469         req->i.completed = true;
  470     }
  471 
  472     if (req->i.waiting)
  473         wake_up(&device->misc_wait);
  474 
  475     /* Either we are about to complete to upper layers,
  476      * or we will restart this request.
  477      * In either case, the request object will be destroyed soon,
  478      * so better remove it from all lists. */
  479     list_del_init(&req->req_pending_master_completion);
  480 }
  481 
  482 /* still holds resource->req_lock */
  483 static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
  484 {
  485     D_ASSERT(req->device, m || (req->local_rq_state & RQ_POSTPONED));
  486 
  487     if (!put)
  488         return;
  489 
  490     if (!atomic_sub_and_test(put, &req->completion_ref))
  491         return;
  492 
  493     drbd_req_complete(req, m);
  494 
  495     /* local completion may still come in later,
  496      * we need to keep the req object around. */
  497     if (req->local_rq_state & RQ_LOCAL_ABORTED)
  498         return;
  499 
  500     if (req->local_rq_state & RQ_POSTPONED) {
  501         /* don't destroy the req object just yet,
  502          * but queue it for retry */
  503         drbd_restart_request(req);
  504         return;
  505     }
  506 
  507     kref_put(&req->kref, drbd_req_destroy);
  508 }
  509 
  510 static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
  511 {
  512     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  513     if (!connection)
  514         return;
  515     if (connection->todo.req_next == NULL)
  516         connection->todo.req_next = req;
  517 }
  518 
  519 static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
  520 {
  521     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  522     if (!connection)
  523         return;
  524     if (connection->todo.req_next != req)
  525         return;
  526     list_for_each_entry_continue(req, &connection->resource->transfer_log, tl_requests) {
  527         const unsigned s = drbd_req_state_by_peer_device(req, peer_device);
  528         if (s & RQ_NET_QUEUED)
  529             break;
  530     }
  531     if (&req->tl_requests == &connection->resource->transfer_log)
  532         req = NULL;
  533     connection->todo.req_next = req;
  534 }
  535 
  536 static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
  537 {
  538     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  539     if (!connection)
  540         return;
  541     if (connection->req_ack_pending == NULL)
  542         connection->req_ack_pending = req;
  543 }
  544 
  545 static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
  546 {
  547     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  548     if (!connection)
  549         return;
  550     if (connection->req_ack_pending != req)
  551         return;
  552     list_for_each_entry_continue(req, &connection->resource->transfer_log, tl_requests) {
  553         const unsigned s = drbd_req_state_by_peer_device(req, peer_device);
  554         if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING))
  555             break;
  556     }
  557     if (&req->tl_requests == &connection->resource->transfer_log)
  558         req = NULL;
  559     connection->req_ack_pending = req;
  560 }
  561 
  562 static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
  563 {
  564     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  565     if (!connection)
  566         return;
  567     if (connection->req_not_net_done == NULL)
  568         connection->req_not_net_done = req;
  569 }
  570 
  571 static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
  572 {
  573     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  574     if (!connection)
  575         return;
  576     if (connection->req_not_net_done != req)
  577         return;
  578     list_for_each_entry_continue(req, &connection->resource->transfer_log, tl_requests) {
  579         const unsigned s = drbd_req_state_by_peer_device(req, peer_device);
  580         if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE))
  581             break;
  582     }
  583     if (&req->tl_requests == &connection->resource->transfer_log)
  584         req = NULL;
  585     connection->req_not_net_done = req;
  586 }
  587 
  588 /* for wsame, discard, and zero-out requests, the payload (amount of data we
  589  * need to send) is much smaller than the number of storage sectors affected */
  590 static unsigned int req_payload_sectors(struct drbd_request *req)
  591 {
  592     /* actually: physical_block_size,
  593      * but lets just hardcode 4k in sectors: */
  594     if (unlikely(req->local_rq_state & RQ_WSAME))
  595         return 8;
  596     /* really only a few bytes, but let's pretend one sector */
  597     if (unlikely(req->local_rq_state & (RQ_UNMAP|RQ_ZEROES)))
  598         return 1;
  599     /* other have all the data as payload on the wire */
  600     return req->i.size >> 9;
  601 }
  602 
  603 /* I'd like this to be the only place that manipulates
  604  * req->completion_ref and req->kref. */
  605 static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
  606         struct drbd_peer_device *peer_device,
  607         int clear, int set)
  608 {
  609     unsigned old_net = 0;
  610     unsigned old_local = req->local_rq_state;
  611     unsigned set_local = set & RQ_STATE_0_MASK;
  612     unsigned clear_local = clear & RQ_STATE_0_MASK;
  613     int c_put = 0;
  614     const int idx = peer_device ? peer_device->node_id : -1;
  615 
  616     set &= ~RQ_STATE_0_MASK;
  617     clear &= ~RQ_STATE_0_MASK;
  618 
  619     if (idx == -1) {
  620         /* do not try to manipulate net state bits
  621          * without an associated state slot! */
  622         BUG_ON(set);
  623         BUG_ON(clear);
  624     }
  625 
  626     if (drbd_suspended(req->device) && !((old_local | clear_local) & RQ_COMPLETION_SUSP))
  627         set_local |= RQ_COMPLETION_SUSP;
  628 
  629     /* apply */
  630 
  631     req->local_rq_state &= ~clear_local;
  632     req->local_rq_state |= set_local;
  633 
  634     if (idx != -1) {
  635         old_net = req->net_rq_state[idx];
  636         req->net_rq_state[idx] &= ~clear;
  637         req->net_rq_state[idx] |= set;
  638     }
  639 
  640 
  641     /* no change? */
  642     if (req->local_rq_state == old_local &&
  643         (idx == -1 || req->net_rq_state[idx] == old_net))
  644         return;
  645 
  646     /* intent: get references */
  647 
  648     kref_get(&req->kref);
  649 
  650     if (!(old_local & RQ_LOCAL_PENDING) && (set_local & RQ_LOCAL_PENDING))
  651         atomic_inc(&req->completion_ref);
  652 
  653     if (!(old_net & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
  654         inc_ap_pending(peer_device);
  655         atomic_inc(&req->completion_ref);
  656     }
  657 
  658     if (!(old_net & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
  659         atomic_inc(&req->completion_ref);
  660         set_if_null_req_next(peer_device, req);
  661     }
  662 
  663     if (!(old_net & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
  664         kref_get(&req->kref); /* wait for the DONE */
  665 
  666     if (!(old_net & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
  667         /* potentially already completed in the ack_receiver thread */
  668         if (!(old_net & RQ_NET_DONE)) {
  669             atomic_add(req_payload_sectors(req), &peer_device->connection->ap_in_flight);
  670             set_if_null_req_not_net_done(peer_device, req);
  671         }
  672         if (req->net_rq_state[idx] & RQ_NET_PENDING)
  673             set_if_null_req_ack_pending(peer_device, req);
  674     }
  675 
  676     if (!(old_local & RQ_COMPLETION_SUSP) && (set_local & RQ_COMPLETION_SUSP))
  677         atomic_inc(&req->completion_ref);
  678 
  679     /* progress: put references */
  680 
  681     if ((old_local & RQ_COMPLETION_SUSP) && (clear_local & RQ_COMPLETION_SUSP))
  682         ++c_put;
  683 
  684     if (!(old_local & RQ_LOCAL_ABORTED) && (set_local & RQ_LOCAL_ABORTED)) {
  685         D_ASSERT(req->device, req->local_rq_state & RQ_LOCAL_PENDING);
  686         ++c_put;
  687     }
  688 
  689     if ((old_local & RQ_LOCAL_PENDING) && (clear_local & RQ_LOCAL_PENDING)) {
  690         if (req->local_rq_state & RQ_LOCAL_ABORTED)
  691             kref_put(&req->kref, drbd_req_destroy);
  692         else
  693             ++c_put;
  694         list_del_init(&req->req_pending_local);
  695     }
  696 
  697     if ((old_net & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
  698         dec_ap_pending(peer_device);
  699         ++c_put;
  700         ktime_get_accounting(req->acked_kt[peer_device->node_id]);
  701         advance_conn_req_ack_pending(peer_device, req);
  702     }
  703 
  704     if ((old_net & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
  705         ++c_put;
  706         advance_conn_req_next(peer_device, req);
  707     }
  708 
  709     if (!(old_net & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
  710         atomic_t *ap_in_flight = &peer_device->connection->ap_in_flight;
  711 
  712         if (old_net & RQ_NET_SENT)
  713             atomic_sub(req_payload_sectors(req), ap_in_flight);
  714         if (old_net & RQ_EXP_BARR_ACK)
  715             kref_put(&req->kref, drbd_req_destroy);
  716         ktime_get_accounting(req->net_done_kt[peer_device->node_id]);
  717 
  718         if (peer_device->repl_state[NOW] == L_AHEAD &&
  719             atomic_read(ap_in_flight) == 0) {
  720             struct drbd_peer_device *pd;
  721             int vnr;
  722             /* The first peer device to notice that it is time to
  723              * go Ahead -> SyncSource tries to trigger that
  724              * transition for *all* peer devices currently in
  725              * L_AHEAD for this connection. */
  726             idr_for_each_entry(&peer_device->connection->peer_devices, pd, vnr) {
  727                 if (pd->repl_state[NOW] != L_AHEAD)
  728                     continue;
  729                 if (test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &pd->flags))
  730                     continue; /* already done */
  731                 pd->start_resync_side = L_SYNC_SOURCE;
  732                 pd->start_resync_timer.expires = jiffies + HZ;
  733                 add_timer(&pd->start_resync_timer);
  734             }
  735         }
  736 
  737         /* in ahead/behind mode, or just in case,
  738          * before we finally destroy this request,
  739          * the caching pointers must not reference it anymore */
  740         advance_conn_req_next(peer_device, req);
  741         advance_conn_req_ack_pending(peer_device, req);
  742         advance_conn_req_not_net_done(peer_device, req);
  743     }
  744 
  745     /* potentially complete and destroy */
  746 
  747     /* If we made progress, retry conflicting peer requests, if any. */
  748     if (req->i.waiting)
  749         wake_up(&req->device->misc_wait);
  750 
  751     drbd_req_put_completion_ref(req, m, c_put);
  752     kref_put(&req->kref, drbd_req_destroy);
  753 }
  754 
  755 static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
  756 {
  757         char b[BDEVNAME_SIZE];
  758 
  759     if (!drbd_ratelimit())
  760         return;
  761 
  762     drbd_warn(device, "local %s IO error sector %llu+%u on %s\n",
  763           (req->local_rq_state & RQ_WRITE) ? "WRITE" : "READ",
  764           (unsigned long long)req->i.sector,
  765           req->i.size >> 9,
  766           bdevname(device->ldev->backing_bdev, b));
  767 }
  768 
  769 /* Helper for HANDED_OVER_TO_NETWORK.
  770  * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
  771  * Is it also still "PENDING"?
  772  * --> If so, clear PENDING and set NET_OK below.
  773  * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
  774  * (and we must not set RQ_NET_OK) */
  775 static inline bool is_pending_write_protocol_A(struct drbd_request *req, int idx)
  776 {
  777     return (req->local_rq_state & RQ_WRITE) == 0 ? 0 :
  778         (req->net_rq_state[idx] &
  779            (RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
  780         ==  RQ_NET_PENDING;
  781 }
  782 
  783 /* obviously this could be coded as many single functions
  784  * instead of one huge switch,
  785  * or by putting the code directly in the respective locations
  786  * (as it has been before).
  787  *
  788  * but having it this way
  789  *  enforces that it is all in this one place, where it is easier to audit,
  790  *  it makes it obvious that whatever "event" "happens" to a request should
  791  *  happen "atomically" within the req_lock,
  792  *  and it enforces that we have to think in a very structured manner
  793  *  about the "events" that may happen to a request during its life time ...
  794  *
  795  *
  796  * peer_device == NULL means local disk
  797  */
  798 void __req_mod(struct drbd_request *req, enum drbd_req_event what,
  799         struct drbd_peer_device *peer_device,
  800         struct bio_and_error *m)
  801 {
  802     struct drbd_device *device = req->device;
  803     struct net_conf *nc;
  804     int p;
  805     int idx;
  806 
  807     if (m)
  808         m->bio = NULL;
  809 
  810     idx = peer_device ? peer_device->node_id : -1;
  811 
  812     switch (what) {
  813     default:
  814         drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
  815         break;
  816 
  817     /* does not happen...
  818      * initialization done in drbd_req_new
  819     case CREATED:
  820         break;
  821         */
  822 
  823     case TO_BE_SENT: /* via network */
  824         /* reached via __drbd_make_request
  825          * and from w_read_retry_remote */
  826         D_ASSERT(device, !(req->net_rq_state[idx] & RQ_NET_MASK));
  827         rcu_read_lock();
  828         nc = rcu_dereference(peer_device->connection->transport.net_conf);
  829         p = nc->wire_protocol;
  830         rcu_read_unlock();
  831         req->net_rq_state[idx] |=
  832             p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
  833             p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
  834         mod_rq_state(req, m, peer_device, 0, RQ_NET_PENDING);
  835         break;
  836 
  837     case TO_BE_SUBMITTED: /* locally */
  838         /* reached via __drbd_make_request */
  839         D_ASSERT(device, !(req->local_rq_state & RQ_LOCAL_MASK));
  840         mod_rq_state(req, m, peer_device, 0, RQ_LOCAL_PENDING);
  841         break;
  842 
  843     case COMPLETED_OK:
  844         if (req->local_rq_state & RQ_WRITE)
  845             device->writ_cnt += req->i.size >> 9;
  846         else
  847             device->read_cnt += req->i.size >> 9;
  848 
  849         mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING,
  850                 RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
  851         break;
  852 
  853     case ABORT_DISK_IO:
  854         mod_rq_state(req, m, peer_device, 0, RQ_LOCAL_ABORTED);
  855         break;
  856 
  857     case WRITE_COMPLETED_WITH_ERROR:
  858         drbd_report_io_error(device, req);
  859         __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
  860         mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
  861         break;
  862 
  863     case READ_COMPLETED_WITH_ERROR:
  864         drbd_set_all_out_of_sync(device, req->i.sector, req->i.size);
  865         drbd_report_io_error(device, req);
  866         __drbd_chk_io_error(device, DRBD_READ_ERROR);
  867         /* fall through. */
  868     case READ_AHEAD_COMPLETED_WITH_ERROR:
  869         /* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */
  870         mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
  871         break;
  872 
  873     case DISCARD_COMPLETED_NOTSUPP:
  874     case DISCARD_COMPLETED_WITH_ERROR:
  875         /* I'd rather not detach from local disk just because it
  876          * failed a REQ_OP_DISCARD. */
  877         mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
  878         break;
  879 
  880     case QUEUE_FOR_NET_READ:
  881         /* READ, and
  882          * no local disk,
  883          * or target area marked as invalid,
  884          * or just got an io-error. */
  885         /* from __drbd_make_request
  886          * or from bio_endio during read io-error recovery */
  887 
  888         /* So we can verify the handle in the answer packet.
  889          * Corresponding drbd_remove_request_interval is in
  890          * drbd_req_complete() */
  891         D_ASSERT(device, drbd_interval_empty(&req->i));
  892         drbd_insert_interval(&device->read_requests, &req->i);
  893 
  894         set_bit(UNPLUG_REMOTE, &device->flags);
  895 
  896         D_ASSERT(device, req->net_rq_state[idx] & RQ_NET_PENDING);
  897         D_ASSERT(device, (req->local_rq_state & RQ_LOCAL_MASK) == 0);
  898         mod_rq_state(req, m, peer_device, 0, RQ_NET_QUEUED);
  899         break;
  900 
  901     case QUEUE_FOR_NET_WRITE:
  902         /* assert something? */
  903         /* from __drbd_make_request only */
  904 
  905         /* NOTE
  906          * In case the req ended up on the transfer log before being
  907          * queued on the worker, it could lead to this request being
  908          * missed during cleanup after connection loss.
  909          * So we have to do both operations here,
  910          * within the same lock that protects the transfer log.
  911          *
  912          * _req_add_to_epoch(req); this has to be after the
  913          * _maybe_start_new_epoch(req); which happened in
  914          * __drbd_make_request, because we now may set the bit
  915          * again ourselves to close the current epoch.
  916          *
  917          * Add req to the (now) current epoch (barrier). */
  918 
  919         /* otherwise we may lose an unplug, which may cause some remote
  920          * io-scheduler timeout to expire, increasing maximum latency,
  921          * hurting performance. */
  922         set_bit(UNPLUG_REMOTE, &device->flags);
  923 
  924         /* queue work item to send data */
  925         D_ASSERT(device, req->net_rq_state[idx] & RQ_NET_PENDING);
  926         mod_rq_state(req, m, peer_device, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
  927 
  928         /* Close the epoch, in case it outgrew the limit.
  929          * Or if this is a "batch bio", and some of our peers is "old",
  930          * because a batch bio "storm" (like, large scale discarding
  931          * during mkfs time) would be likely to starve out the peers
  932          * activity log, if it is smaller than ours (or we don't have
  933          * any).  And a fix for the resulting potential distributed
  934          * deadlock was only implemented with P_CONFIRM_STABLE with
  935          * protocol version 114.
  936          */
  937         if (device->resource->cached_min_aggreed_protocol_version < 114 &&
  938             (req->local_rq_state & (RQ_UNMAP|RQ_WSAME|RQ_ZEROES)))
  939             p = 1;
  940         else {
  941             rcu_read_lock();
  942             nc = rcu_dereference(peer_device->connection->transport.net_conf);
  943             p = nc->max_epoch_size;
  944             rcu_read_unlock();
  945         }
  946         if (device->resource->current_tle_writes >= p)
  947             start_new_tl_epoch(device->resource);
  948         break;
  949 
  950     case QUEUE_FOR_SEND_OOS:
  951         mod_rq_state(req, m, peer_device, 0, RQ_NET_QUEUED);
  952         break;
  953 
  954     case READ_RETRY_REMOTE_CANCELED:
  955     case SEND_CANCELED:
  956     case SEND_FAILED:
  957         /* Just update flags so it is no longer marked as on the sender
  958          * queue; real cleanup will be done from
  959          * tl_walk(,CONNECTION_LOST_WHILE_PENDING). */
  960         mod_rq_state(req, m, peer_device, RQ_NET_QUEUED, 0);
  961         break;
  962 
  963     case HANDED_OVER_TO_NETWORK:
  964         /* assert something? */
  965         if (is_pending_write_protocol_A(req, idx))
  966             /* this is what is dangerous about protocol A:
  967              * pretend it was successfully written on the peer. */
  968             mod_rq_state(req, m, peer_device, RQ_NET_QUEUED|RQ_NET_PENDING,
  969                      RQ_NET_SENT|RQ_NET_OK);
  970         else
  971             mod_rq_state(req, m, peer_device, RQ_NET_QUEUED, RQ_NET_SENT);
  972         /* It is still not yet RQ_NET_DONE until the
  973          * corresponding epoch barrier got acked as well,
  974          * so we know what to dirty on connection loss. */
  975         break;
  976 
  977     case OOS_HANDED_TO_NETWORK:
  978         /* Was not set PENDING, no longer QUEUED, so is now DONE
  979          * as far as this connection is concerned. */
  980         mod_rq_state(req, m, peer_device, RQ_NET_QUEUED, RQ_NET_DONE);
  981         break;
  982 
  983     case CONNECTION_LOST_WHILE_PENDING:
  984         /* transfer log cleanup after connection loss */
  985         mod_rq_state(req, m, peer_device,
  986                 RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
  987                 RQ_NET_DONE);
  988         break;
  989 
  990     case DISCARD_WRITE:
  991         /* for discarded conflicting writes of multiple primaries,
  992          * there is no need to keep anything in the tl, potential
  993          * node crashes are covered by the activity log.
  994          *
  995          * If this request had been marked as RQ_POSTPONED before,
  996          * it will actually not be discarded, but "restarted",
  997          * resubmitted from the retry worker context. */
  998         D_ASSERT(device, req->net_rq_state[idx] & RQ_NET_PENDING);
  999         D_ASSERT(device, req->net_rq_state[idx] & RQ_EXP_WRITE_ACK);
 1000         mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
 1001         break;
 1002 
 1003     case WRITE_ACKED_BY_PEER_AND_SIS:
 1004         req->net_rq_state[idx] |= RQ_NET_SIS;
 1005     case WRITE_ACKED_BY_PEER:
 1006         /* Normal operation protocol C: successfully written on peer.
 1007          * During resync, even in protocol != C,
 1008          * we requested an explicit write ack anyways.
 1009          * Which means we cannot even assert anything here.
 1010          * Nothing more to do here.
 1011          * We want to keep the tl in place for all protocols, to cater
 1012          * for volatile write-back caches on lower level devices. */
 1013         goto ack_common;
 1014     case RECV_ACKED_BY_PEER:
 1015         D_ASSERT(device, req->net_rq_state[idx] & RQ_EXP_RECEIVE_ACK);
 1016         /* protocol B; pretends to be successfully written on peer.
 1017          * see also notes above in HANDED_OVER_TO_NETWORK about
 1018          * protocol != C */
 1019     ack_common:
 1020         mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_OK);
 1021         break;
 1022 
 1023     case POSTPONE_WRITE:
 1024         D_ASSERT(device, req->net_rq_state[idx] & RQ_EXP_WRITE_ACK);
 1025         /* If this node has already detected the write conflict, the
 1026          * worker will be waiting on misc_wait.  Wake it up once this
 1027          * request has completed locally.
 1028          */
 1029         D_ASSERT(device, req->net_rq_state[idx] & RQ_NET_PENDING);
 1030         req->local_rq_state |= RQ_POSTPONED;
 1031         if (req->i.waiting)
 1032             wake_up(&req->device->misc_wait);
 1033         /* Do not clear RQ_NET_PENDING. This request will make further
 1034          * progress via restart_conflicting_writes() or
 1035          * fail_postponed_requests(). Hopefully. */
 1036         break;
 1037 
 1038     case NEG_ACKED:
 1039         mod_rq_state(req, m, peer_device, RQ_NET_OK|RQ_NET_PENDING, 0);
 1040         break;
 1041 
 1042     case COMPLETION_RESUMED:
 1043         mod_rq_state(req, m, peer_device, RQ_COMPLETION_SUSP, 0);
 1044         break;
 1045 
 1046     case FAIL_FROZEN_DISK_IO:
 1047         if (!(req->local_rq_state & RQ_LOCAL_COMPLETED))
 1048             break;
 1049         mod_rq_state(req, m, peer_device, RQ_COMPLETION_SUSP, 0);
 1050         break;
 1051 
 1052     case RESEND:
 1053         /* Simply complete (local only) READs. */
 1054         if (!(req->local_rq_state & RQ_WRITE) && !(req->net_rq_state[idx] & RQ_NET_MASK)) {
 1055             mod_rq_state(req, m, peer_device, RQ_COMPLETION_SUSP, 0);
 1056             break;
 1057         }
 1058 
 1059         /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
 1060            before the connection loss (B&C only); only P_BARRIER_ACK
 1061            (or the local completion?) was missing when we suspended.
 1062            Throwing them out of the TL here by pretending we got a BARRIER_ACK.
 1063            During connection handshake, we ensure that the peer was not rebooted.
 1064 
 1065            Resending is only allowed on synchronous connections,
 1066            where all requests not yet completed to upper layers would
 1067            be in the same "reorder-domain", there can not possibly be
 1068            any dependency between incomplete requests, and we are
 1069            allowed to complete this one "out-of-sequence".
 1070          */
 1071         if (!(req->net_rq_state[idx] & RQ_NET_OK)) {
 1072             mod_rq_state(req, m, peer_device, RQ_COMPLETION_SUSP,
 1073                     RQ_NET_QUEUED|RQ_NET_PENDING);
 1074             break;
 1075         }
 1076         /* else, fall through - to BARRIER_ACKED */
 1077     case BARRIER_ACKED:
 1078         /* barrier ack for READ requests does not make sense */
 1079         if (!(req->local_rq_state & RQ_WRITE))
 1080             break;
 1081 
 1082         if (req->net_rq_state[idx] & RQ_NET_PENDING) {
 1083             /* barrier came in before all requests were acked.
 1084              * this is bad, because if the connection is lost now,
 1085              * we won't be able to clean them up... */
 1086             drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
 1087             mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_OK);
 1088         }
 1089         /* Allowed to complete requests, even while suspended.
 1090          * As this is called for all requests within a matching epoch,
 1091          * we need to filter, and only set RQ_NET_DONE for those that
 1092          * have actually been on the wire. */
 1093         mod_rq_state(req, m, peer_device, RQ_COMPLETION_SUSP,
 1094                 (req->net_rq_state[idx] & RQ_NET_MASK) ? RQ_NET_DONE : 0);
 1095         break;
 1096 
 1097     case DATA_RECEIVED:
 1098         D_ASSERT(device, req->net_rq_state[idx] & RQ_NET_PENDING);
 1099         mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
 1100         break;
 1101 
 1102     case QUEUE_AS_DRBD_BARRIER:
 1103         start_new_tl_epoch(device->resource);
 1104         for_each_peer_device(peer_device, device)
 1105             mod_rq_state(req, m, peer_device, 0, RQ_NET_OK|RQ_NET_DONE);
 1106         break;
 1107     };
 1108 }
 1109 
 1110 /* we may do a local read if:
 1111  * - we are consistent (of course),
 1112  * - or we are generally inconsistent,
 1113  *   BUT we are still/already IN SYNC with all peers for this area.
 1114  *   since size may be bigger than BM_BLOCK_SIZE,
 1115  *   we may need to check several bits.
 1116  */
 1117 static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
 1118 {
 1119     struct drbd_md *md = &device->ldev->md;
 1120     unsigned int node_id;
 1121     unsigned int n_checked = 0;
 1122 
 1123     unsigned long sbnr, ebnr;
 1124     sector_t esector, nr_sectors;
 1125 
 1126     if (device->disk_state[NOW] == D_UP_TO_DATE)
 1127         return true;
 1128     if (device->disk_state[NOW] != D_INCONSISTENT)
 1129         return false;
 1130     esector = sector + (size >> 9) - 1;
 1131     nr_sectors = drbd_get_capacity(device->this_bdev);
 1132     D_ASSERT(device, sector  < nr_sectors);
 1133     D_ASSERT(device, esector < nr_sectors);
 1134 
 1135     sbnr = BM_SECT_TO_BIT(sector);
 1136     ebnr = BM_SECT_TO_BIT(esector);
 1137 
 1138     for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
 1139         struct drbd_peer_md *peer_md = &md->peers[node_id];
 1140 
 1141         /* Skip bitmap indexes which are not assigned to a peer. */
 1142         if (!(peer_md->flags & MDF_HAVE_BITMAP))
 1143             continue;
 1144 
 1145         if (drbd_bm_count_bits(device, peer_md->bitmap_index, sbnr, ebnr))
 1146             return false;
 1147         ++n_checked;
 1148     }
 1149     if (n_checked == 0) {
 1150         if (drbd_ratelimit()) {
 1151             drbd_err(device, "No valid bitmap slots found to check!\n");
 1152         }
 1153         return false;
 1154     }
 1155     return true;
 1156 }
 1157 
 1158 /* TODO improve for more than one peer.
 1159  * also take into account the drbd protocol. */
 1160 static bool remote_due_to_read_balancing(struct drbd_device *device,
 1161         struct drbd_peer_device *peer_device, sector_t sector,
 1162         enum drbd_read_balancing rbm)
 1163 {
 1164     struct backing_dev_info *bdi;
 1165     int stripe_shift;
 1166 
 1167     switch (rbm) {
 1168     case RB_CONGESTED_REMOTE:
 1169         bdi = bdi_from_device(device);
 1170         return bdi_read_congested(bdi);
 1171     case RB_LEAST_PENDING:
 1172         return atomic_read(&device->local_cnt) >
 1173             atomic_read(&peer_device->ap_pending_cnt) + atomic_read(&peer_device->rs_pending_cnt);
 1174     case RB_32K_STRIPING:  /* stripe_shift = 15 */
 1175     case RB_64K_STRIPING:
 1176     case RB_128K_STRIPING:
 1177     case RB_256K_STRIPING:
 1178     case RB_512K_STRIPING:
 1179     case RB_1M_STRIPING:   /* stripe_shift = 20 */
 1180         stripe_shift = (rbm - RB_32K_STRIPING + 15);
 1181         return (sector >> (stripe_shift - 9)) & 1;
 1182     case RB_ROUND_ROBIN:
 1183         return test_and_change_bit(READ_BALANCE_RR, &device->flags);
 1184     case RB_PREFER_REMOTE:
 1185         return true;
 1186     case RB_PREFER_LOCAL:
 1187     default:
 1188         return false;
 1189     }
 1190 }
 1191 
 1192 /*
 1193  * complete_conflicting_writes  -  wait for any conflicting write requests
 1194  *
 1195  * The write_requests tree contains all active write requests which we
 1196  * currently know about.  Wait for any requests to complete which conflict with
 1197  * the new one.
 1198  *
 1199  * Only way out: remove the conflicting intervals from the tree.
 1200  */
 1201 static void complete_conflicting_writes(struct drbd_request *req)
 1202 {
 1203     DEFINE_WAIT(wait);
 1204     struct drbd_device *device = req->device;
 1205     struct drbd_interval *i;
 1206     sector_t sector = req->i.sector;
 1207     int size = req->i.size;
 1208 
 1209     for (;;) {
 1210         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
 1211             /* Ignore, if already completed to upper layers. */
 1212             if (i->completed)
 1213                 continue;
 1214             /* Handle the first found overlap.  After the schedule
 1215              * we have to restart the tree walk. */
 1216             break;
 1217         }
 1218         if (!i) /* if any */
 1219             break;
 1220 
 1221         /* Indicate to wake up device->misc_wait on progress.  */
 1222         prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
 1223         i->waiting = true;
 1224         spin_unlock_irq(&device->resource->req_lock);
 1225         schedule();
 1226         spin_lock_irq(&device->resource->req_lock);
 1227     }
 1228     finish_wait(&device->misc_wait, &wait);
 1229 }
 1230 
 1231 /* called within req_lock and rcu_read_lock() */
 1232 static void __maybe_pull_ahead(struct drbd_device *device, struct drbd_connection *connection)
 1233 {
 1234     struct net_conf *nc;
 1235     bool congested = false;
 1236     enum drbd_on_congestion on_congestion;
 1237     u32 cong_fill = 0, cong_extents = 0;
 1238     struct drbd_peer_device *peer_device = conn_peer_device(connection, device->vnr);
 1239 
 1240     if (connection->agreed_pro_version < 96)
 1241         return;
 1242 
 1243     rcu_read_lock();
 1244     nc = rcu_dereference(connection->transport.net_conf);
 1245     if (nc) {
 1246         on_congestion = nc->on_congestion;
 1247         cong_fill = nc->cong_fill;
 1248         cong_extents = nc->cong_extents;
 1249     } else {
 1250         on_congestion = OC_BLOCK;
 1251     }
 1252     rcu_read_unlock();
 1253     if (on_congestion == OC_BLOCK)
 1254         return;
 1255 
 1256     if (on_congestion == OC_PULL_AHEAD && peer_device->repl_state[NOW] == L_AHEAD)
 1257         return; /* nothing to do ... */
 1258 
 1259     /* If I don't even have good local storage, we can not reasonably try
 1260      * to pull ahead of the peer. We also need the local reference to make
 1261      * sure device->act_log is there.
 1262      */
 1263     if (!get_ldev_if_state(device, D_UP_TO_DATE))
 1264         return;
 1265 
 1266     /* if an other volume already found that we are congested, short circuit. */
 1267     congested = test_bit(CONN_CONGESTED, &connection->flags);
 1268 
 1269     if (!congested && cong_fill) {
 1270         int n = atomic_read(&connection->ap_in_flight) +
 1271             atomic_read(&connection->rs_in_flight);
 1272         if (n >= cong_fill) {
 1273             drbd_info(device, "Congestion-fill threshold reached (%d >= %d)\n", n, cong_fill);
 1274             congested = true;
 1275         }
 1276     }
 1277 
 1278     if (!congested && device->act_log->used >= cong_extents) {
 1279         drbd_info(device, "Congestion-extents threshold reached (%d >= %d)\n",
 1280             device->act_log->used, cong_extents);
 1281         congested = true;
 1282     }
 1283 
 1284     if (congested) {
 1285         struct drbd_resource *resource = device->resource;
 1286 
 1287         set_bit(CONN_CONGESTED, &connection->flags);
 1288 
 1289         /* start a new epoch for non-mirrored writes */
 1290         start_new_tl_epoch(resource);
 1291 
 1292         begin_state_change_locked(resource, CS_VERBOSE | CS_HARD);
 1293         if (on_congestion == OC_PULL_AHEAD)
 1294             __change_repl_state(peer_device, L_AHEAD);
 1295         else            /* on_congestion == OC_DISCONNECT */
 1296             __change_cstate(peer_device->connection, C_DISCONNECTING);
 1297         end_state_change_locked(resource);
 1298     }
 1299     put_ldev(device);
 1300 }
 1301 
 1302 /* called within req_lock */
 1303 static void maybe_pull_ahead(struct drbd_device *device)
 1304 {
 1305     struct drbd_connection *connection;
 1306 
 1307     for_each_connection(connection, device->resource)
 1308         if (connection->cstate[NOW] == C_CONNECTED)
 1309             __maybe_pull_ahead(device, connection);
 1310 }
 1311 
 1312 bool drbd_should_do_remote(struct drbd_peer_device *peer_device, enum which_state which)
 1313 {
 1314     enum drbd_disk_state peer_disk_state = peer_device->disk_state[which];
 1315     enum drbd_repl_state repl_state = peer_device->repl_state[which];
 1316 
 1317     return peer_disk_state == D_UP_TO_DATE ||
 1318         (peer_disk_state == D_INCONSISTENT &&
 1319          (repl_state == L_ESTABLISHED ||
 1320           (repl_state >= L_WF_BITMAP_T && repl_state < L_AHEAD)));
 1321     /* Before proto 96 that was >= CONNECTED instead of >= L_WF_BITMAP_T.
 1322        That is equivalent since before 96 IO was frozen in the L_WF_BITMAP*
 1323        states. */
 1324 }
 1325 
 1326 static bool drbd_should_send_out_of_sync(struct drbd_peer_device *peer_device)
 1327 {
 1328     return peer_device->repl_state[NOW] == L_AHEAD || peer_device->repl_state[NOW] == L_WF_BITMAP_S;
 1329     /* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary
 1330        since we enter state L_AHEAD only if proto >= 96 */
 1331 }
 1332 
 1333 /* Prefer to read from protcol C peers, then B, last A */
 1334 static u64 calc_nodes_to_read_from(struct drbd_device *device)
 1335 {
 1336     struct drbd_peer_device *peer_device;
 1337     u64 candidates[DRBD_PROT_C] = {};
 1338     int wp;
 1339 
 1340     rcu_read_lock();
 1341     for_each_peer_device_rcu(peer_device, device) {
 1342         struct net_conf *nc;
 1343 
 1344         if (peer_device->disk_state[NOW] != D_UP_TO_DATE)
 1345             continue;
 1346         nc = rcu_dereference(peer_device->connection->transport.net_conf);
 1347         if (!nc || !nc->allow_remote_read)
 1348             continue;
 1349         wp = nc->wire_protocol;
 1350         candidates[wp - 1] |= NODE_MASK(peer_device->node_id);
 1351     }
 1352     rcu_read_unlock();
 1353 
 1354     for (wp = DRBD_PROT_C; wp >= DRBD_PROT_A; wp--) {
 1355         if (candidates[wp - 1])
 1356             return candidates[wp - 1];
 1357     }
 1358     return 0;
 1359 }
 1360 
 1361 /* If this returns NULL, and req->private_bio is still set,
 1362  * the request should be submitted locally.
 1363  *
 1364  * If it returns NULL, but req->private_bio is not set,
 1365  * we do not have access to good data :(
 1366  *
 1367  * Otherwise, this destroys req->private_bio, if any,
 1368  * and returns the peer device which should be asked for data.
 1369  */
 1370 static struct drbd_peer_device *find_peer_device_for_read(struct drbd_request *req)
 1371 {
 1372     struct drbd_peer_device *peer_device;
 1373     struct drbd_device *device = req->device;
 1374     enum drbd_read_balancing rbm = RB_PREFER_REMOTE;
 1375 
 1376     if (req->private_bio) {
 1377         if (!drbd_may_do_local_read(device,
 1378                     req->i.sector, req->i.size)) {
 1379             bio_put(req->private_bio);
 1380             req->private_bio = NULL;
 1381             put_ldev(device);
 1382         }
 1383     }
 1384 
 1385     if (device->disk_state[NOW] > D_DISKLESS) {
 1386         rcu_read_lock();
 1387         rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
 1388         rcu_read_unlock();
 1389         if (rbm == RB_PREFER_LOCAL && req->private_bio) {
 1390             return NULL; /* submit locally */
 1391         }
 1392     }
 1393 
 1394     /* TODO: improve read balancing decisions, allow user to configure node weights */
 1395     while (true) {
 1396         if (!device->read_nodes)
 1397             device->read_nodes = calc_nodes_to_read_from(device);
 1398         if (device->read_nodes) {
 1399             int peer_node_id = __ffs64(device->read_nodes);
 1400             device->read_nodes &= ~NODE_MASK(peer_node_id);
 1401             peer_device = peer_device_by_node_id(device, peer_node_id);
 1402             if (!peer_device)
 1403                 continue;
 1404             if (peer_device->disk_state[NOW] != D_UP_TO_DATE)
 1405                 continue;
 1406             if (req->private_bio &&
 1407                 !remote_due_to_read_balancing(device, peer_device, req->i.sector, rbm))
 1408                 peer_device = NULL;
 1409         } else {
 1410             peer_device = NULL;
 1411         }
 1412         break;
 1413     }
 1414 
 1415     if (peer_device && req->private_bio) {
 1416         bio_put(req->private_bio);
 1417         req->private_bio = NULL;
 1418         put_ldev(device);
 1419     }
 1420     return peer_device;
 1421 }
 1422 
 1423 /* returns the number of connections expected to actually write this data,
 1424  * which does NOT include those that we are L_AHEAD for. */
 1425 static int drbd_process_write_request(struct drbd_request *req)
 1426 {
 1427     struct drbd_device *device = req->device;
 1428     struct drbd_peer_device *peer_device;
 1429     bool in_tree = false;
 1430     int remote, send_oos;
 1431     int count = 0;
 1432 
 1433     for_each_peer_device(peer_device, device) {
 1434         remote = drbd_should_do_remote(peer_device, NOW);
 1435         send_oos = drbd_should_send_out_of_sync(peer_device);
 1436 
 1437         if (!remote && !send_oos)
 1438             continue;
 1439 
 1440         D_ASSERT(device, !(remote && send_oos));
 1441 
 1442         if (remote) {
 1443             ++count;
 1444             _req_mod(req, TO_BE_SENT, peer_device);
 1445             if (!in_tree) {
 1446                 /* Corresponding drbd_remove_request_interval is in
 1447                  * drbd_req_complete() */
 1448                 drbd_insert_interval(&device->write_requests, &req->i);
 1449                 in_tree = true;
 1450             }
 1451             _req_mod(req, QUEUE_FOR_NET_WRITE, peer_device);
 1452         } else
 1453             _req_mod(req, QUEUE_FOR_SEND_OOS, peer_device);
 1454     }
 1455 
 1456     return count;
 1457 }
 1458 
 1459 static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
 1460 {
 1461     int err = drbd_issue_discard_or_zero_out(req->device,
 1462                 req->i.sector, req->i.size >> 9, flags);
 1463     req->private_bio->bi_status = err ? BLK_STS_IOERR : BLK_STS_OK;
 1464     bio_endio(req->private_bio);
 1465 }
 1466 
 1467 static void
 1468 drbd_submit_req_private_bio(struct drbd_request *req)
 1469 {
 1470     struct drbd_device *device = req->device;
 1471     struct bio *bio = req->private_bio;
 1472     unsigned int type;
 1473 
 1474     if (bio_op(bio) != REQ_OP_READ)
 1475         type = DRBD_FAULT_DT_WR;
 1476     else if (bio->bi_opf & REQ_RAHEAD)
 1477         type = DRBD_FAULT_DT_RA;
 1478     else
 1479         type = DRBD_FAULT_DT_RD;
 1480 
 1481     bio_set_dev(bio, device->ldev->backing_bdev);
 1482 
 1483     /* State may have changed since we grabbed our reference on the
 1484      * device->ldev member. Double check, and short-circuit to endio.
 1485      * In case the last activity log transaction failed to get on
 1486      * stable storage, and this is a WRITE, we may not even submit
 1487      * this bio. */
 1488     if (get_ldev(device)) {
 1489         if (drbd_insert_fault(device, type)) {
 1490             bio->bi_status = BLK_STS_IOERR;
 1491             bio_endio(bio);
 1492         } else if (bio_op(bio) == REQ_OP_WRITE_ZEROES) {
 1493             drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
 1494                 ((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
 1495         } else if (bio_op(bio) == REQ_OP_DISCARD) {
 1496             drbd_process_discard_or_zeroes_req(req, EE_TRIM);
 1497         } else {
 1498             generic_make_request(bio);
 1499         }
 1500         put_ldev(device);
 1501     } else {
 1502         bio->bi_status = BLK_STS_IOERR;
 1503         bio_endio(bio);
 1504     }
 1505  }
 1506 
 1507 static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
 1508 {
 1509     if (req->private_bio)
 1510         atomic_inc(&device->ap_actlog_cnt);
 1511     spin_lock_irq(&device->resource->req_lock);
 1512     list_add_tail(&req->tl_requests, &device->submit.writes);
 1513     list_add_tail(&req->req_pending_master_completion,
 1514             &device->pending_master_completion[1 /* WRITE */]);
 1515     spin_unlock_irq(&device->resource->req_lock);
 1516     queue_work(device->submit.wq, &device->submit.worker);
 1517     /* do_submit() may sleep internally on al_wait, too */
 1518     wake_up(&device->al_wait);
 1519 }
 1520 
 1521 static void req_make_private_bio(struct drbd_request *req, struct bio *bio_src)
 1522 {
 1523     struct bio *bio;
 1524     bio = bio_clone_fast(bio_src, GFP_NOIO, &drbd_io_bio_set);
 1525 
 1526     req->private_bio = bio;
 1527 
 1528     bio->bi_private  = req;
 1529     bio->bi_end_io   = drbd_request_endio;
 1530     bio->bi_next     = NULL;
 1531 }
 1532 
 1533 static void drbd_req_in_actlog(struct drbd_request *req)
 1534 {
 1535     req->local_rq_state |= RQ_IN_ACT_LOG;
 1536     ktime_get_accounting(req->in_actlog_kt);
 1537     atomic_sub(interval_to_al_extents(&req->i), &req->device->wait_for_actlog_ecnt);
 1538 }
 1539 
 1540 /* returns the new drbd_request pointer, if the caller is expected to
 1541  * drbd_send_and_submit() it (to save latency), or NULL if we queued the
 1542  * request on the submitter thread.
 1543  * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
 1544  */
 1545 #ifndef CONFIG_DRBD_TIMING_STATS
 1546 #define drbd_request_prepare(d,b,k,j) drbd_request_prepare(d,b,j)
 1547 #endif
 1548 static struct drbd_request *
 1549 drbd_request_prepare(struct drbd_device *device, struct bio *bio,
 1550         ktime_t start_kt,
 1551         unsigned long start_jif)
 1552 {
 1553     const int rw = bio_data_dir(bio);
 1554     struct drbd_request *req;
 1555 
 1556     /* allocate outside of all locks; */
 1557     req = drbd_req_new(device, bio);
 1558     if (!req) {
 1559         dec_ap_bio(device, rw);
 1560         /* only pass the error to the upper layers.
 1561          * if user cannot handle io errors, that's not our business. */
 1562         drbd_err(device, "could not kmalloc() req\n");
 1563         bio->bi_status = BLK_STS_RESOURCE;
 1564         bio_endio(bio);
 1565         return ERR_PTR(-ENOMEM);
 1566     }
 1567     if (get_ldev(device))
 1568         req_make_private_bio(req, bio);
 1569 
 1570     req->start_jif = start_jif;
 1571     ktime_get_accounting_assign(req->start_kt, start_kt);
 1572 
 1573     /* Update disk stats */
 1574     _drbd_start_io_acct(device, req);
 1575 
 1576     if (rw != WRITE || req->i.size == 0)
 1577         return req;
 1578 
 1579     /* Let the activity log know we are about to use it...
 1580      * FIXME
 1581      * Needs to slow down to not congest on the activity log, in case we
 1582      * have multiple primaries and the peer sends huge scattered epochs.
 1583      * See also how peer_requests are handled
 1584      * in receive_Data() { ... prepare_activity_log(); ... }
 1585      */
 1586     if (req->private_bio)
 1587         atomic_add(interval_to_al_extents(&req->i), &device->wait_for_actlog_ecnt);
 1588 
 1589     /* process discards always from our submitter thread */
 1590     if ((bio_op(bio) == REQ_OP_WRITE_ZEROES) ||
 1591         (bio_op(bio) == REQ_OP_DISCARD))
 1592         goto queue_for_submitter_thread;
 1593 
 1594     if (req->private_bio && !test_bit(AL_SUSPENDED, &device->flags)) {
 1595         if (!drbd_al_begin_io_fastpath(device, &req->i))
 1596             goto queue_for_submitter_thread;
 1597         drbd_req_in_actlog(req);
 1598     }
 1599     return req;
 1600 
 1601  queue_for_submitter_thread:
 1602     ktime_aggregate_delta(device, req->start_kt, before_queue_kt);
 1603     drbd_queue_write(device, req);
 1604     return NULL;
 1605 }
 1606 
 1607 /* Require at least one path to current data.
 1608  * We don't want to allow writes on C_STANDALONE D_INCONSISTENT:
 1609  * We would not allow to read what was written,
 1610  * we would not have bumped the data generation uuids,
 1611  * we would cause data divergence for all the wrong reasons.
 1612  *
 1613  * If we don't see at least one D_UP_TO_DATE, we will fail this request,
 1614  * which either returns EIO, or, if OND_SUSPEND_IO is set, suspends IO,
 1615  * and queues for retry later.
 1616  */
 1617 static bool may_do_writes(struct drbd_device *device)
 1618 {
 1619     struct drbd_peer_device *peer_device;
 1620 
 1621     if (device->disk_state[NOW] == D_UP_TO_DATE)
 1622         return true;
 1623 
 1624     for_each_peer_device(peer_device, device) {
 1625         if (peer_device->disk_state[NOW] == D_UP_TO_DATE)
 1626             return true;
 1627     }
 1628 
 1629     return false;
 1630 }
 1631 
 1632 struct drbd_plug_cb {
 1633     struct blk_plug_cb cb;
 1634     struct drbd_request *most_recent_req;
 1635     /* do we need more? */
 1636 };
 1637 
 1638 static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
 1639 {
 1640     struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
 1641     struct drbd_resource *resource = plug->cb.data;
 1642     struct drbd_request *req = plug->most_recent_req;
 1643 
 1644     kfree(cb);
 1645     if (!req)
 1646         return;
 1647 
 1648     spin_lock_irq(&resource->req_lock);
 1649     /* In case the sender did not process it yet, raise the flag to
 1650      * have it followed with P_UNPLUG_REMOTE just after. */
 1651     req->local_rq_state |= RQ_UNPLUG;
 1652     /* but also queue a generic unplug */
 1653     drbd_queue_unplug(req->device);
 1654     kref_put(&req->kref, drbd_req_destroy);
 1655     spin_unlock_irq(&resource->req_lock);
 1656 }
 1657 
 1658 static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
 1659 {
 1660     /* A lot of text to say
 1661      * return (struct drbd_plug_cb*)blk_check_plugged(); */
 1662     struct drbd_plug_cb *plug;
 1663     struct blk_plug_cb *cb = blk_check_plugged(drbd_unplug, resource, sizeof(*plug));
 1664 
 1665     if (cb)
 1666         plug = container_of(cb, struct drbd_plug_cb, cb);
 1667     else
 1668         plug = NULL;
 1669     return plug;
 1670 }
 1671 
 1672 static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
 1673 {
 1674     struct drbd_request *tmp = plug->most_recent_req;
 1675     /* Will be sent to some peer.
 1676      * Remember to tag it with UNPLUG_REMOTE on unplug */
 1677     kref_get(&req->kref);
 1678     plug->most_recent_req = req;
 1679     if (tmp)
 1680         kref_put(&tmp->kref, drbd_req_destroy);
 1681 }
 1682 
 1683 static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
 1684 {
 1685     struct drbd_resource *resource = device->resource;
 1686     struct drbd_peer_device *peer_device = NULL; /* for read */
 1687     const int rw = bio_data_dir(req->master_bio);
 1688     struct bio_and_error m = { NULL, };
 1689     bool no_remote = false;
 1690     bool submit_private_bio = false;
 1691 
 1692     spin_lock_irq(&resource->req_lock);
 1693     if (rw == WRITE) {
 1694         /* This may temporarily give up the req_lock,
 1695          * but will re-acquire it before it returns here.
 1696          * Needs to be before the check on drbd_suspended() */
 1697         complete_conflicting_writes(req);
 1698         /* no more giving up req_lock from now on! */
 1699 
 1700         /* check for congestion, and potentially stop sending
 1701          * full data updates, but start sending "dirty bits" only. */
 1702         maybe_pull_ahead(device);
 1703     }
 1704 
 1705 
 1706     if (drbd_suspended(device)) {
 1707         /* push back and retry: */
 1708         req->local_rq_state |= RQ_POSTPONED;
 1709         if (req->private_bio) {
 1710             bio_put(req->private_bio);
 1711             req->private_bio = NULL;
 1712             put_ldev(device);
 1713         }
 1714         goto out;
 1715     }
 1716 
 1717     /* We fail READ early, if we can not serve it.
 1718      * We must do this before req is registered on any lists.
 1719      * Otherwise, drbd_req_complete() will queue failed READ for retry. */
 1720     if (rw != WRITE) {
 1721         peer_device = find_peer_device_for_read(req);
 1722         if (!peer_device && !req->private_bio)
 1723             goto nodata;
 1724     }
 1725 
 1726     /* which transfer log epoch does this belong to? */
 1727     req->epoch = atomic_read(&resource->current_tle_nr);
 1728 
 1729     if (rw == WRITE)
 1730         resource->dagtag_sector += req->i.size >> 9;
 1731     req->dagtag_sector = resource->dagtag_sector;
 1732     /* no point in adding empty flushes to the transfer log,
 1733      * they are mapped to drbd barriers already. */
 1734     if (likely(req->i.size != 0)) {
 1735         if (rw == WRITE) {
 1736             struct drbd_request *req2;
 1737 
 1738             resource->current_tle_writes++;
 1739             list_for_each_entry_reverse(req2, &resource->transfer_log, tl_requests) {
 1740                 if (req2->local_rq_state & RQ_WRITE) {
 1741                     /* Make the new write request depend on
 1742                      * the previous one. */
 1743                     BUG_ON(req2->destroy_next);
 1744                     req2->destroy_next = req;
 1745                     kref_get(&req->kref);
 1746                     break;
 1747                 }
 1748             }
 1749         }
 1750         list_add_tail(&req->tl_requests, &resource->transfer_log);
 1751     }
 1752 
 1753     if (rw == WRITE) {
 1754         if (req->private_bio && !may_do_writes(device)) {
 1755             bio_put(req->private_bio);
 1756             req->private_bio = NULL;
 1757             put_ldev(device);
 1758             goto nodata;
 1759         }
 1760         /* Need to replicate writes.  Unless it is an empty flush,
 1761          * which is better mapped to a DRBD P_BARRIER packet,
 1762          * also for drbd wire protocol compatibility reasons.
 1763          * If this was a flush, just start a new epoch.
 1764          * Unless the current epoch was empty anyways, or we are not currently
 1765          * replicating, in which case there is no point. */
 1766         if (unlikely(req->i.size == 0)) {
 1767             /* The only size==0 bios we expect are empty flushes. */
 1768             D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
 1769             _req_mod(req, QUEUE_AS_DRBD_BARRIER, NULL);
 1770         } else if (!drbd_process_write_request(req))
 1771             no_remote = true;
 1772         wake_all_senders(resource);
 1773     } else {
 1774         if (peer_device) {
 1775             _req_mod(req, TO_BE_SENT, peer_device);
 1776             _req_mod(req, QUEUE_FOR_NET_READ, peer_device);
 1777             wake_up(&peer_device->connection->sender_work.q_wait);
 1778         } else
 1779             no_remote = true;
 1780     }
 1781 
 1782     if (no_remote == false) {
 1783         struct drbd_plug_cb *plug = drbd_check_plugged(resource);
 1784         if (plug)
 1785             drbd_update_plug(plug, req);
 1786     }
 1787 
 1788     /* If it took the fast path in drbd_request_prepare, add it here.
 1789      * The slow path has added it already. */
 1790     if (list_empty(&req->req_pending_master_completion))
 1791         list_add_tail(&req->req_pending_master_completion,
 1792             &device->pending_master_completion[rw == WRITE]);
 1793     if (req->private_bio) {
 1794         /* pre_submit_jif is used in request_timer_fn() */
 1795         req->pre_submit_jif = jiffies;
 1796         ktime_get_accounting(req->pre_submit_kt);
 1797         list_add_tail(&req->req_pending_local,
 1798             &device->pending_completion[rw == WRITE]);
 1799         _req_mod(req, TO_BE_SUBMITTED, NULL);
 1800         /* needs to be marked within the same spinlock
 1801          * but we need to give up the spinlock to submit */
 1802         submit_private_bio = true;
 1803     } else if (no_remote) {
 1804 nodata:
 1805         if (drbd_ratelimit())
 1806             drbd_err(req->device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
 1807                     (unsigned long long)req->i.sector, req->i.size >> 9);
 1808         /* A write may have been queued for send_oos, however.
 1809          * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
 1810     }
 1811 
 1812 out:
 1813     drbd_req_put_completion_ref(req, &m, 1);
 1814     spin_unlock_irq(&resource->req_lock);
 1815 
 1816     /* Even though above is a kref_put(), this is safe.
 1817      * As long as we still need to submit our private bio,
 1818      * we hold a completion ref, and the request cannot disappear.
 1819      * If however this request did not even have a private bio to submit
 1820      * (e.g. remote read), req may already be invalid now.
 1821      * That's why we cannot check on req->private_bio. */
 1822     if (submit_private_bio)
 1823         drbd_submit_req_private_bio(req);
 1824 
 1825     /* we need to plug ALWAYS since we possibly need to kick lo_dev.
 1826      * we plug after submit, so we won't miss an unplug event */
 1827     drbd_plug_device(bdev_get_queue(device->this_bdev));
 1828 
 1829     if (m.bio)
 1830         complete_master_bio(device, &m);
 1831 }
 1832 
 1833 static bool inc_ap_bio_cond(struct drbd_device *device, int rw)
 1834 {
 1835     bool rv = false;
 1836     unsigned int nr_requests;
 1837 
 1838     if (test_bit(NEW_CUR_UUID, &device->flags)) {
 1839         if (!test_and_set_bit(WRITING_NEW_CUR_UUID, &device->flags))
 1840             drbd_device_post_work(device, MAKE_NEW_CUR_UUID);
 1841 
 1842         return false;
 1843     }
 1844 
 1845     spin_lock_irq(&device->resource->req_lock);
 1846     nr_requests = device->resource->res_opts.nr_requests;
 1847     rv = may_inc_ap_bio(device) && atomic_read(&device->ap_bio_cnt[rw]) < nr_requests;
 1848     if (rv)
 1849         atomic_inc(&device->ap_bio_cnt[rw]);
 1850     spin_unlock_irq(&device->resource->req_lock);
 1851 
 1852     return rv;
 1853 }
 1854 
 1855 static void inc_ap_bio(struct drbd_device *device, int rw)
 1856 {
 1857     /* we wait here
 1858      *    as long as the device is suspended
 1859      *    until the bitmap is no longer on the fly during connection
 1860      *    handshake as long as we would exceed the max_buffer limit.
 1861      *
 1862      * to avoid races with the reconnect code,
 1863      * we need to atomic_inc within the spinlock. */
 1864 
 1865     wait_event(device->misc_wait, inc_ap_bio_cond(device, rw));
 1866 }
 1867 
 1868 void __drbd_make_request(struct drbd_device *device, struct bio *bio,
 1869         ktime_t start_kt,
 1870         unsigned long start_jif)
 1871 {
 1872     struct drbd_request *req;
 1873 
 1874     inc_ap_bio(device, bio_data_dir(bio));
 1875     req = drbd_request_prepare(device, bio, start_kt, start_jif);
 1876     if (IS_ERR_OR_NULL(req))
 1877         return;
 1878     drbd_send_and_submit(device, req);
 1879 }
 1880 
 1881 /* helpers for do_submit */
 1882 
 1883 struct incoming_pending_later {
 1884     /* from drbd_make_request() or receive_Data() */
 1885     struct list_head incoming;
 1886     /* for non-blocking fill-up # of updates in the transaction */
 1887     struct list_head more_incoming;
 1888     /* to be submitted after next AL-transaction commit */
 1889     struct list_head pending;
 1890     /* currently blocked e.g. by concurrent resync requests */
 1891     struct list_head later;
 1892     /* need cleanup */
 1893     struct list_head cleanup;
 1894 };
 1895 
 1896 struct waiting_for_act_log {
 1897     struct incoming_pending_later requests;
 1898     struct incoming_pending_later peer_requests;
 1899 };
 1900 
 1901 static void ipb_init(struct incoming_pending_later *ipb)
 1902 {
 1903     INIT_LIST_HEAD(&ipb->incoming);
 1904     INIT_LIST_HEAD(&ipb->more_incoming);
 1905     INIT_LIST_HEAD(&ipb->pending);
 1906     INIT_LIST_HEAD(&ipb->later);
 1907     INIT_LIST_HEAD(&ipb->cleanup);
 1908 }
 1909 
 1910 static void wfa_init(struct waiting_for_act_log *wfa)
 1911 {
 1912     ipb_init(&wfa->requests);
 1913     ipb_init(&wfa->peer_requests);
 1914 }
 1915 
 1916 #define wfa_lists_empty(_wfa, name) \
 1917     (list_empty(&(_wfa)->requests.name) && list_empty(&(_wfa)->peer_requests.name))
 1918 #define wfa_splice_init(_wfa, from, to) do { \
 1919     list_splice_init(&(_wfa)->requests.from, &(_wfa)->requests.to); \
 1920     list_splice_init(&(_wfa)->peer_requests.from, &(_wfa)->peer_requests.to); \
 1921     } while (0)
 1922 #define wfa_splice_tail_init(_wfa, from, to) do { \
 1923     list_splice_tail_init(&(_wfa)->requests.from, &(_wfa)->requests.to); \
 1924     list_splice_tail_init(&(_wfa)->peer_requests.from, &(_wfa)->peer_requests.to); \
 1925     } while (0)
 1926 
 1927 static void __drbd_submit_peer_request(struct drbd_peer_request *peer_req)
 1928 {
 1929     struct drbd_peer_device *peer_device = peer_req->peer_device;
 1930     struct drbd_device *device = peer_device->device;
 1931     int err;
 1932 
 1933     peer_req->flags |= EE_IN_ACTLOG;
 1934     atomic_sub(interval_to_al_extents(&peer_req->i), &device->wait_for_actlog_ecnt);
 1935     atomic_dec(&device->wait_for_actlog);
 1936     list_del_init(&peer_req->wait_for_actlog);
 1937 
 1938     err = drbd_submit_peer_request(peer_req);
 1939 
 1940     if (err)
 1941         drbd_cleanup_after_failed_submit_peer_request(peer_req);
 1942 }
 1943 
 1944 static void submit_fast_path(struct drbd_device *device, struct waiting_for_act_log *wfa)
 1945 {
 1946     struct blk_plug plug;
 1947     struct drbd_request *req, *tmp;
 1948     struct drbd_peer_request *pr, *pr_tmp;
 1949 
 1950     blk_start_plug(&plug);
 1951     list_for_each_entry_safe(pr, pr_tmp, &wfa->peer_requests.incoming, wait_for_actlog) {
 1952         if (!drbd_al_begin_io_fastpath(pr->peer_device->device, &pr->i))
 1953             continue;
 1954 
 1955         __drbd_submit_peer_request(pr);
 1956     }
 1957     list_for_each_entry_safe(req, tmp, &wfa->requests.incoming, tl_requests) {
 1958         const int rw = bio_data_dir(req->master_bio);
 1959 
 1960         if (rw == WRITE && req->private_bio && req->i.size
 1961         && !test_bit(AL_SUSPENDED, &device->flags)) {
 1962             if (!drbd_al_begin_io_fastpath(device, &req->i))
 1963                 continue;
 1964 
 1965             drbd_req_in_actlog(req);
 1966             atomic_dec(&device->ap_actlog_cnt);
 1967         }
 1968 
 1969         list_del_init(&req->tl_requests);
 1970         drbd_send_and_submit(device, req);
 1971     }
 1972     blk_finish_plug(&plug);
 1973 }
 1974 
 1975 static struct drbd_request *wfa_next_request(struct waiting_for_act_log *wfa)
 1976 {
 1977     struct list_head *lh = !list_empty(&wfa->requests.more_incoming) ?
 1978             &wfa->requests.more_incoming: &wfa->requests.incoming;
 1979     return list_first_entry_or_null(lh, struct drbd_request, tl_requests);
 1980 }
 1981 
 1982 static struct drbd_peer_request *wfa_next_peer_request(struct waiting_for_act_log *wfa)
 1983 {
 1984     struct list_head *lh = !list_empty(&wfa->peer_requests.more_incoming) ?
 1985             &wfa->peer_requests.more_incoming: &wfa->peer_requests.incoming;
 1986     return list_first_entry_or_null(lh, struct drbd_peer_request, wait_for_actlog);
 1987 }
 1988 
 1989 static bool prepare_al_transaction_nonblock(struct drbd_device *device,
 1990                         struct waiting_for_act_log *wfa)
 1991 {
 1992     struct drbd_peer_request *peer_req;
 1993     struct drbd_request *req;
 1994     bool made_progress = false;
 1995     bool wake = false;
 1996     int err;
 1997 
 1998     spin_lock_irq(&device->al_lock);
 1999 
 2000     /* Don't even try, if someone has it locked right now. */
 2001     if (test_bit(__LC_LOCKED, &device->act_log->flags))
 2002         goto out;
 2003 
 2004     while ((peer_req = wfa_next_peer_request(wfa))) {
 2005         if (peer_req->peer_device->connection->cstate[NOW] < C_CONNECTED) {
 2006             list_move_tail(&peer_req->wait_for_actlog, &wfa->peer_requests.cleanup);
 2007             made_progress = true;
 2008             continue;
 2009         }
 2010         err = drbd_al_begin_io_nonblock(device, &peer_req->i);
 2011         if (err == -ENOBUFS)
 2012             break;
 2013         if (err == -EBUSY)
 2014             wake = true;
 2015         if (err)
 2016             list_move_tail(&peer_req->wait_for_actlog, &wfa->peer_requests.later);
 2017         else {
 2018             list_move_tail(&peer_req->wait_for_actlog, &wfa->peer_requests.pending);
 2019             made_progress = true;
 2020         }
 2021     }
 2022     while ((req = wfa_next_request(wfa))) {
 2023         ktime_aggregate_delta(device, req->start_kt, before_al_begin_io_kt);
 2024         err = drbd_al_begin_io_nonblock(device, &req->i);
 2025         if (err == -ENOBUFS)
 2026             break;
 2027         if (err == -EBUSY)
 2028             wake = true;
 2029         if (err)
 2030             list_move_tail(&req->tl_requests, &wfa->requests.later);
 2031         else {
 2032             list_move_tail(&req->tl_requests, &wfa->requests.pending);
 2033             made_progress = true;
 2034         }
 2035     }
 2036  out:
 2037     spin_unlock_irq(&device->al_lock);
 2038     if (wake)
 2039         wake_up(&device->al_wait);
 2040     return made_progress;
 2041 }
 2042 
 2043 static void send_and_submit_pending(struct drbd_device *device, struct waiting_for_act_log *wfa)
 2044 {
 2045     struct blk_plug plug;
 2046     struct drbd_request *req, *tmp;
 2047     struct drbd_peer_request *pr, *pr_tmp;
 2048 
 2049     blk_start_plug(&plug);
 2050     list_for_each_entry_safe(pr, pr_tmp, &wfa->peer_requests.pending, wait_for_actlog) {
 2051         __drbd_submit_peer_request(pr);
 2052     }
 2053     list_for_each_entry_safe(req, tmp, &wfa->requests.pending, tl_requests) {
 2054         drbd_req_in_actlog(req);
 2055         atomic_dec(&device->ap_actlog_cnt);
 2056         list_del_init(&req->tl_requests);
 2057         drbd_send_and_submit(device, req);
 2058     }
 2059     blk_finish_plug(&plug);
 2060 }
 2061 
 2062 /* more: for non-blocking fill-up # of updates in the transaction */
 2063 static bool grab_new_incoming_requests(struct drbd_device *device, struct waiting_for_act_log *wfa, bool more)
 2064 {
 2065     /* grab new incoming requests */
 2066     struct list_head *reqs = more ? &wfa->requests.more_incoming : &wfa->requests.incoming;
 2067     struct list_head *peer_reqs = more ? &wfa->peer_requests.more_incoming : &wfa->peer_requests.incoming;
 2068     bool found_new = false;
 2069 
 2070     spin_lock_irq(&device->resource->req_lock);
 2071     found_new = !list_empty(&device->submit.writes);
 2072     list_splice_tail_init(&device->submit.writes, reqs);
 2073     found_new |= !list_empty(&device->submit.peer_writes);
 2074     list_splice_tail_init(&device->submit.peer_writes, peer_reqs);
 2075     spin_unlock_irq(&device->resource->req_lock);
 2076 
 2077     return found_new;
 2078 }
 2079 
 2080 void do_submit(struct work_struct *ws)
 2081 {
 2082     struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
 2083     struct waiting_for_act_log wfa;
 2084     bool made_progress;
 2085 
 2086     wfa_init(&wfa);
 2087 
 2088     grab_new_incoming_requests(device, &wfa, false);
 2089 
 2090     for (;;) {
 2091         DEFINE_WAIT(wait);
 2092 
 2093         /* move used-to-be-postponed back to front of incoming */
 2094         wfa_splice_init(&wfa, later, incoming);
 2095         submit_fast_path(device, &wfa);
 2096         if (wfa_lists_empty(&wfa, incoming))
 2097             break;
 2098 
 2099         for (;;) {
 2100             /*
 2101              * We put ourselves on device->al_wait, then check if
 2102              * we can need to actually sleep and wait for someone
 2103              * else to make progress.
 2104              *
 2105              * We need to sleep if we cannot activate enough
 2106              * activity log extents for even one single request.
 2107              * That would mean that all (peer-)requests in our incoming lists
 2108              * either target "cold" activity log extent, all
 2109              * activity log extent slots are have on-going
 2110              * in-flight IO (are "hot"), and no idle or free slot
 2111              * is available, or the target regions are busy doing resync,
 2112              * and lock out application requests for that reason.
 2113              *
 2114              * prepare_to_wait() can internally cause a wake_up()
 2115              * as well, though, so this may appear to busy-loop
 2116              * a couple times, but should settle down quickly.
 2117              *
 2118              * When resync and/or application requests make
 2119              * sufficient progress, some refcount on some extent
 2120              * will eventually drop to zero, we will be woken up,
 2121              * and can try to move that now idle extent to "cold",
 2122              * and recycle it's slot for one of the extents we'd
 2123              * like to become hot.
 2124              */
 2125             prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
 2126 
 2127             wfa_splice_init(&wfa, later, incoming);
 2128             made_progress = prepare_al_transaction_nonblock(device, &wfa);
 2129             if (made_progress)
 2130                 break;
 2131 
 2132             schedule();
 2133 
 2134             /* If all currently "hot" activity log extents are kept busy by
 2135              * incoming requests, we still must not totally starve new
 2136              * requests to "cold" extents.
 2137              * Something left on &incoming means there had not been
 2138              * enough update slots available, and the activity log
 2139              * has been marked as "starving".
 2140              *
 2141              * Try again now, without looking for new requests,
 2142              * effectively blocking all new requests until we made
 2143              * at least _some_ progress with what we currently have.
 2144              */
 2145             if (!wfa_lists_empty(&wfa, incoming))
 2146                 continue;
 2147 
 2148             /* Nothing moved to pending, but nothing left
 2149              * on incoming: all moved to "later"!
 2150              * Grab new and iterate. */
 2151             grab_new_incoming_requests(device, &wfa, false);
 2152         }
 2153         finish_wait(&device->al_wait, &wait);
 2154 
 2155         /* If the transaction was full, before all incoming requests
 2156          * had been processed, skip ahead to commit, and iterate
 2157          * without splicing in more incoming requests from upper layers.
 2158          *
 2159          * Else, if all incoming have been processed,
 2160          * they have become either "pending" (to be submitted after
 2161          * next transaction commit) or "busy" (blocked by resync).
 2162          *
 2163          * Maybe more was queued, while we prepared the transaction?
 2164          * Try to stuff those into this transaction as well.
 2165          * Be strictly non-blocking here,
 2166          * we already have something to commit.
 2167          *
 2168          * Commit as soon as we don't make any more progress.
 2169          */
 2170 
 2171         while (wfa_lists_empty(&wfa, incoming)) {
 2172             /* It is ok to look outside the lock,
 2173              * it's only an optimization anyways */
 2174             if (list_empty(&device->submit.writes) &&
 2175                 list_empty(&device->submit.peer_writes))
 2176                 break;
 2177 
 2178             if (!grab_new_incoming_requests(device, &wfa, true))
 2179                 break;
 2180 
 2181             made_progress = prepare_al_transaction_nonblock(device, &wfa);
 2182 
 2183             wfa_splice_tail_init(&wfa, more_incoming, incoming);
 2184             if (!made_progress)
 2185                 break;
 2186         }
 2187         if (!list_empty(&wfa.peer_requests.cleanup))
 2188             drbd_cleanup_peer_requests_wfa(device, &wfa.peer_requests.cleanup);
 2189 
 2190         drbd_al_begin_io_commit(device);
 2191 
 2192         send_and_submit_pending(device, &wfa);
 2193     }
 2194 }
 2195 
 2196 blk_qc_t drbd_make_request(struct request_queue *q, struct bio *bio)
 2197 {
 2198     struct drbd_device *device = (struct drbd_device *) q->queuedata;
 2199 #ifdef CONFIG_DRBD_TIMING_STATS
 2200     ktime_t start_kt;
 2201 #endif
 2202     unsigned long start_jif;
 2203 
 2204     blk_queue_split(q, &bio);
 2205 
 2206     if (device->cached_err_io) {
 2207         bio->bi_status = BLK_STS_IOERR;
 2208         bio_endio(bio);
 2209         return BLK_QC_T_NONE;
 2210     }
 2211 
 2212     ktime_get_accounting(start_kt);
 2213     start_jif = jiffies;
 2214 
 2215     __drbd_make_request(device, bio, start_kt, start_jif);
 2216 
 2217     return BLK_QC_T_NONE;
 2218 }
 2219 
 2220 static unsigned long time_min_in_future(unsigned long now,
 2221         unsigned long t1, unsigned long t2)
 2222 {
 2223     t1 = time_after(now, t1) ? now : t1;
 2224     t2 = time_after(now, t2) ? now : t2;
 2225     return time_after(t1, t2) ? t2 : t1;
 2226 }
 2227 
 2228 static bool net_timeout_reached(struct drbd_request *net_req,
 2229         struct drbd_connection *connection,
 2230         unsigned long now, unsigned long ent,
 2231         unsigned int ko_count, unsigned int timeout)
 2232 {
 2233     struct drbd_device *device = net_req->device;
 2234     struct drbd_peer_device *peer_device = conn_peer_device(connection, device->vnr);
 2235     int peer_node_id = peer_device->node_id;
 2236     unsigned long pre_send_jif = net_req->pre_send_jif[peer_node_id];
 2237 
 2238     if (!time_after(now, pre_send_jif + ent))
 2239         return false;
 2240 
 2241     if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
 2242         return false;
 2243 
 2244     if (net_req->net_rq_state[peer_node_id] & RQ_NET_PENDING) {
 2245         drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
 2246             jiffies_to_msecs(now - pre_send_jif), ko_count, timeout);
 2247         return true;
 2248     }
 2249 
 2250     /* We received an ACK already (or are using protocol A),
 2251      * but are waiting for the epoch closing barrier ack.
 2252      * Check if we sent the barrier already.  We should not blame the peer
 2253      * for being unresponsive, if we did not even ask it yet. */
 2254     if (net_req->epoch == connection->send.current_epoch_nr) {
 2255         drbd_warn(device,
 2256             "We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
 2257             jiffies_to_msecs(now - pre_send_jif), ko_count, timeout);
 2258         return false;
 2259     }
 2260 
 2261     /* Worst case: we may have been blocked for whatever reason, then
 2262      * suddenly are able to send a lot of requests (and epoch separating
 2263      * barriers) in quick succession.
 2264      * The timestamp of the net_req may be much too old and not correspond
 2265      * to the sending time of the relevant unack'ed barrier packet, so
 2266      * would trigger a spurious timeout.  The latest barrier packet may
 2267      * have a too recent timestamp to trigger the timeout, potentially miss
 2268      * a timeout.  Right now we don't have a place to conveniently store
 2269      * these timestamps.
 2270      * But in this particular situation, the application requests are still
 2271      * completed to upper layers, DRBD should still "feel" responsive.
 2272      * No need yet to kill this connection, it may still recover.
 2273      * If not, eventually we will have queued enough into the network for
 2274      * us to block. From that point of view, the timestamp of the last sent
 2275      * barrier packet is relevant enough.
 2276      */
 2277     if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
 2278         drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
 2279             connection->send.last_sent_barrier_jif, now,
 2280             jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
 2281         return true;
 2282     }
 2283     return false;
 2284 }
 2285 
 2286 /* A request is considered timed out, if
 2287  * - we have some effective timeout from the configuration,
 2288  *   with some state restrictions applied,
 2289  * - the oldest request is waiting for a response from the network
 2290  *   resp. the local disk,
 2291  * - the oldest request is in fact older than the effective timeout,
 2292  * - the connection was established (resp. disk was attached)
 2293  *   for longer than the timeout already.
 2294  * Note that for 32bit jiffies and very stable connections/disks,
 2295  * we may have a wrap around, which is caught by
 2296  *   !time_in_range(now, last_..._jif, last_..._jif + timeout).
 2297  *
 2298  * Side effect: once per 32bit wrap-around interval, which means every
 2299  * ~198 days with 250 HZ, we have a window where the timeout would need
 2300  * to expire twice (worst case) to become effective. Good enough.
 2301  */
 2302 
 2303 void request_timer_fn(struct timer_list *t)
 2304 {
 2305     struct drbd_device *device = from_timer(device, t, request_timer);
 2306     struct drbd_connection *connection;
 2307     struct drbd_request *req_read, *req_write;
 2308     unsigned long oldest_submit_jif;
 2309     unsigned long dt = 0;
 2310     unsigned long et = 0;
 2311     unsigned long now = jiffies;
 2312     unsigned long next_trigger_time = now;
 2313     bool restart_timer = false;
 2314 
 2315     rcu_read_lock();
 2316     if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */
 2317         dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
 2318         put_ldev(device);
 2319     }
 2320     rcu_read_unlock();
 2321 
 2322     spin_lock_irq(&device->resource->req_lock);
 2323     if (dt) {
 2324         unsigned long write_pre_submit_jif = now, read_pre_submit_jif = now;
 2325         req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
 2326         req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
 2327 
 2328         if (req_write)
 2329             write_pre_submit_jif = req_write->pre_submit_jif;
 2330         if (req_read)
 2331             read_pre_submit_jif = req_read->pre_submit_jif;
 2332         oldest_submit_jif =
 2333             (req_write && req_read)
 2334             ? ( time_before(write_pre_submit_jif, read_pre_submit_jif)
 2335               ? write_pre_submit_jif : read_pre_submit_jif )
 2336             : req_write ? write_pre_submit_jif
 2337             : req_read ? read_pre_submit_jif : now;
 2338 
 2339         if (device->disk_state[NOW] > D_FAILED) {
 2340             et = min_not_zero(et, dt);
 2341             next_trigger_time = time_min_in_future(now,
 2342                     next_trigger_time, oldest_submit_jif + dt);
 2343             restart_timer = true;
 2344         }
 2345 
 2346         if (time_after(now, oldest_submit_jif + dt) &&
 2347             !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
 2348             drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
 2349             __drbd_chk_io_error(device, DRBD_FORCE_DETACH);
 2350         }
 2351     }
 2352     for_each_connection(connection, device->resource) {
 2353         struct net_conf *nc;
 2354         struct drbd_request *req;
 2355         unsigned long ent = 0;
 2356         unsigned long pre_send_jif = 0;
 2357         unsigned int ko_count = 0, timeout = 0;
 2358 
 2359         /* maybe the oldest request waiting for the peer is in fact still
 2360          * blocking in tcp sendmsg.  That's ok, though, that's handled via the
 2361          * socket send timeout, requesting a ping, and bumping ko-count in
 2362          * we_should_drop_the_connection().
 2363          */
 2364 
 2365         /* check the oldest request we did successfully sent,
 2366          * but which is still waiting for an ACK. */
 2367         req = connection->req_ack_pending;
 2368 
 2369         /* if we don't have such request (e.g. protocol A)
 2370          * check the oldest requests which is still waiting on its epoch
 2371          * closing barrier ack. */
 2372         if (!req)
 2373             req = connection->req_not_net_done;
 2374 
 2375         /* evaluate the oldest peer request only in one timer! */
 2376         if (req && req->device != device)
 2377             req = NULL;
 2378         if (!req)
 2379             continue;
 2380 
 2381         rcu_read_lock();
 2382         nc = rcu_dereference(connection->transport.net_conf);
 2383         if (nc) {
 2384             /* effective timeout = ko_count * timeout */
 2385             if (connection->cstate[NOW] == C_CONNECTED) {
 2386                 ko_count = nc->ko_count;
 2387                 timeout = nc->timeout;
 2388             }
 2389         }
 2390         rcu_read_unlock();
 2391 
 2392         if (!timeout)
 2393             continue;
 2394 
 2395         pre_send_jif = req->pre_send_jif[connection->peer_node_id];
 2396 
 2397         ent = timeout * HZ/10 * ko_count;
 2398         et = min_not_zero(et, ent);
 2399         next_trigger_time = time_min_in_future(now,
 2400                 next_trigger_time, pre_send_jif + ent);
 2401         restart_timer = true;
 2402 
 2403         if (net_timeout_reached(req, connection, now, ent, ko_count, timeout)) {
 2404             begin_state_change_locked(device->resource, CS_VERBOSE | CS_HARD);
 2405             __change_cstate(connection, C_TIMEOUT);
 2406             end_state_change_locked(device->resource);
 2407         }
 2408     }
 2409     spin_unlock_irq(&device->resource->req_lock);
 2410 
 2411     if (restart_timer) {
 2412         next_trigger_time = time_min_in_future(now, next_trigger_time, now + et);
 2413         mod_timer(&device->request_timer, next_trigger_time);
 2414     }
 2415 }