"Fossies" - the Fresh Open Source Software Archive

Member "drbd-9.0.21-1/drbd/drbd_receiver.c" (12 Nov 2019, 287373 Bytes) of package /linux/misc/drbd-9.0.21-1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "drbd_receiver.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.20-1_vs_9.0.21-1.

    1 // SPDX-License-Identifier: GPL-2.0-or-later
    2 /*
    3    drbd_receiver.c
    4 
    5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
    6 
    7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
    8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
    9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   10 
   11  */
   12 
   13 
   14 #include <linux/module.h>
   15 
   16 #include <linux/uaccess.h>
   17 #include <net/sock.h>
   18 
   19 #include <linux/drbd.h>
   20 #include <linux/fs.h>
   21 #include <linux/file.h>
   22 #include <linux/in.h>
   23 #include <linux/mm.h>
   24 #include <linux/memcontrol.h>
   25 #include <linux/mm_inline.h>
   26 #include <linux/slab.h>
   27 #include <linux/pkt_sched.h>
   28 #include <uapi/linux/sched/types.h>
   29 #define __KERNEL_SYSCALLS__
   30 #include <linux/unistd.h>
   31 #include <linux/vmalloc.h>
   32 #include <linux/random.h>
   33 #include <net/ipv6.h>
   34 #include "drbd_int.h"
   35 #include "drbd_protocol.h"
   36 #include "drbd_req.h"
   37 #include "drbd_vli.h"
   38 #include <linux/scatterlist.h>
   39 
   40 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
   41 
   42 struct flush_work {
   43     struct drbd_work w;
   44     struct drbd_epoch *epoch;
   45 };
   46 
   47 enum epoch_event {
   48     EV_PUT,
   49     EV_GOT_BARRIER_NR,
   50     EV_BARRIER_DONE,
   51     EV_BECAME_LAST,
   52     EV_CLEANUP = 32, /* used as flag */
   53 };
   54 
   55 enum finish_epoch {
   56     FE_STILL_LIVE,
   57     FE_DESTROYED,
   58     FE_RECYCLED,
   59 };
   60 
   61 enum resync_reason {
   62     AFTER_UNSTABLE,
   63     DISKLESS_PRIMARY,
   64 };
   65 
   66 enum sync_strategy {
   67     UNDETERMINED,
   68     NO_SYNC,
   69     SYNC_SOURCE_IF_BOTH_FAILED,
   70     SYNC_SOURCE_USE_BITMAP,
   71     SYNC_SOURCE_SET_BITMAP,
   72     SYNC_SOURCE_COPY_BITMAP,
   73     SYNC_TARGET_IF_BOTH_FAILED,
   74     SYNC_TARGET_USE_BITMAP,
   75     SYNC_TARGET_SET_BITMAP,
   76     SYNC_TARGET_CLEAR_BITMAP,
   77     SPLIT_BRAIN_AUTO_RECOVER,
   78     SPLIT_BRAIN_DISCONNECT,
   79     UNRELATED_DATA,
   80     RETRY_CONNECT,
   81     REQUIRES_PROTO_91,
   82     REQUIRES_PROTO_96,
   83 };
   84 
   85 struct sync_descriptor {
   86     char * const name;
   87     int required_protocol;
   88     bool is_split_brain;
   89     bool is_sync_source;
   90     bool is_sync_target;
   91     int resync_peer_preference;
   92     enum sync_strategy full_sync_equivalent;
   93 };
   94 
   95 static const struct sync_descriptor sync_descriptors[] = {
   96     [UNDETERMINED] = {
   97         .name = "?",
   98     },
   99     [NO_SYNC] = {
  100         .name = "no-sync",
  101     },
  102     [SYNC_SOURCE_IF_BOTH_FAILED] = {
  103         .name = "source-if-both-failed",
  104         .is_sync_source = true,
  105     },
  106     [SYNC_SOURCE_USE_BITMAP] = {
  107         .name = "source-use-bitmap",
  108         .is_sync_source = true,
  109         .full_sync_equivalent = SYNC_TARGET_USE_BITMAP,
  110     },
  111     [SYNC_SOURCE_SET_BITMAP] = {
  112         .name = "source-set-bitmap",
  113         .is_sync_source = true,
  114     },
  115     [SYNC_SOURCE_COPY_BITMAP] = {
  116         .name = "source-copy-other-bitmap",
  117         .is_sync_source = true,
  118     },
  119     [SYNC_TARGET_IF_BOTH_FAILED] = {
  120         .name = "target-if-both-failed",
  121         .is_sync_target = true,
  122         .resync_peer_preference = 4,
  123     },
  124     [SYNC_TARGET_USE_BITMAP] = {
  125         .name = "target-use-bitmap",
  126         .is_sync_target = true,
  127         .full_sync_equivalent = SYNC_SOURCE_USE_BITMAP,
  128         .resync_peer_preference = 3,
  129     },
  130     [SYNC_TARGET_SET_BITMAP] = {
  131         .name = "target-set-bitmap",
  132         .is_sync_target = true,
  133         .resync_peer_preference = 2,
  134     },
  135     [SYNC_TARGET_CLEAR_BITMAP] = {
  136         .name = "target-clear-bitmap",
  137         .is_sync_target = true,
  138         .resync_peer_preference = 1,
  139     },
  140     [SPLIT_BRAIN_AUTO_RECOVER] = {
  141         .name = "split-brain-auto-recover",
  142         .is_split_brain = true,
  143     },
  144     [SPLIT_BRAIN_DISCONNECT] = {
  145         .name = "split-brain-disconnect",
  146         .is_split_brain = true,
  147     },
  148     [UNRELATED_DATA] = {
  149         .name = "unrelated-data",
  150     },
  151     [RETRY_CONNECT] = {
  152         .name = "retry-connect",
  153     },
  154     [REQUIRES_PROTO_91] = {
  155         .name = "requires-proto-91",
  156         .required_protocol = 91,
  157     },
  158     [REQUIRES_PROTO_96] = {
  159         .name = "requires-proto-96",
  160         .required_protocol = 96,
  161     },
  162 };
  163 
  164 int drbd_do_features(struct drbd_connection *connection);
  165 int drbd_do_auth(struct drbd_connection *connection);
  166 void conn_disconnect(struct drbd_connection *connection);
  167 
  168 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
  169 static int e_end_block(struct drbd_work *, int);
  170 static void cleanup_unacked_peer_requests(struct drbd_connection *connection);
  171 static void cleanup_peer_ack_list(struct drbd_connection *connection);
  172 static u64 node_ids_to_bitmap(struct drbd_device *device, u64 node_ids);
  173 static int process_twopc(struct drbd_connection *, struct twopc_reply *, struct packet_info *, unsigned long);
  174 static void drbd_resync(struct drbd_peer_device *, enum resync_reason) __must_hold(local);
  175 static void drbd_unplug_all_devices(struct drbd_connection *connection);
  176 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
  177 static void check_resync_source(struct drbd_device *device, u64 weak_nodes);
  178 static void destroy_peer_ack_req(struct kref *kref);
  179 
  180 static const struct sync_descriptor strategy_descriptor(enum sync_strategy strategy)
  181 {
  182     if (strategy < 0 || strategy > sizeof(sync_descriptors) / sizeof(struct sync_descriptor))
  183         BUG();
  184     return sync_descriptors[strategy];
  185 }
  186 
  187 static bool is_strategy_determined(enum sync_strategy strategy)
  188 {
  189     return strategy == NO_SYNC ||
  190             strategy_descriptor(strategy).is_sync_source ||
  191             strategy_descriptor(strategy).is_sync_target;
  192 }
  193 
  194 static struct drbd_epoch *previous_epoch(struct drbd_connection *connection, struct drbd_epoch *epoch)
  195 {
  196     struct drbd_epoch *prev;
  197     spin_lock(&connection->epoch_lock);
  198     prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
  199     if (prev == epoch || prev == connection->current_epoch)
  200         prev = NULL;
  201     spin_unlock(&connection->epoch_lock);
  202     return prev;
  203 }
  204 
  205 /*
  206  * some helper functions to deal with single linked page lists,
  207  * page->private being our "next" pointer.
  208  */
  209 
  210 /* If at least n pages are linked at head, get n pages off.
  211  * Otherwise, don't modify head, and return NULL.
  212  * Locking is the responsibility of the caller.
  213  */
  214 static struct page *page_chain_del(struct page **head, int n)
  215 {
  216     struct page *page;
  217     struct page *tmp;
  218 
  219     BUG_ON(!n);
  220     BUG_ON(!head);
  221 
  222     page = *head;
  223 
  224     if (!page)
  225         return NULL;
  226 
  227     while (page) {
  228         tmp = page_chain_next(page);
  229         set_page_chain_offset(page, 0);
  230         set_page_chain_size(page, 0);
  231         if (--n == 0)
  232             break; /* found sufficient pages */
  233         if (tmp == NULL)
  234             /* insufficient pages, don't use any of them. */
  235             return NULL;
  236         page = tmp;
  237     }
  238 
  239     /* add end of list marker for the returned list */
  240     set_page_chain_next(page, NULL);
  241     /* actual return value, and adjustment of head */
  242     page = *head;
  243     *head = tmp;
  244     return page;
  245 }
  246 
  247 /* may be used outside of locks to find the tail of a (usually short)
  248  * "private" page chain, before adding it back to a global chain head
  249  * with page_chain_add() under a spinlock. */
  250 static struct page *page_chain_tail(struct page *page, int *len)
  251 {
  252     struct page *tmp;
  253     int i = 1;
  254     while ((tmp = page_chain_next(page)))
  255         ++i, page = tmp;
  256     if (len)
  257         *len = i;
  258     return page;
  259 }
  260 
  261 static int page_chain_free(struct page *page)
  262 {
  263     struct page *tmp;
  264     int i = 0;
  265     page_chain_for_each_safe(page, tmp) {
  266         set_page_chain_next_offset_size(page, NULL, 0, 0);
  267         put_page(page);
  268         ++i;
  269     }
  270     return i;
  271 }
  272 
  273 static void page_chain_add(struct page **head,
  274         struct page *chain_first, struct page *chain_last)
  275 {
  276 #if 1
  277     struct page *tmp;
  278     tmp = page_chain_tail(chain_first, NULL);
  279     BUG_ON(tmp != chain_last);
  280 #endif
  281 
  282     /* add chain to head */
  283     set_page_chain_next(chain_last, *head);
  284     *head = chain_first;
  285 }
  286 
  287 static struct page *__drbd_alloc_pages(unsigned int number, gfp_t gfp_mask)
  288 {
  289     struct page *page = NULL;
  290     struct page *tmp = NULL;
  291     unsigned int i = 0;
  292 
  293     /* Yes, testing drbd_pp_vacant outside the lock is racy.
  294      * So what. It saves a spin_lock. */
  295     if (drbd_pp_vacant >= number) {
  296         spin_lock(&drbd_pp_lock);
  297         page = page_chain_del(&drbd_pp_pool, number);
  298         if (page)
  299             drbd_pp_vacant -= number;
  300         spin_unlock(&drbd_pp_lock);
  301         if (page)
  302             return page;
  303     }
  304 
  305     for (i = 0; i < number; i++) {
  306         tmp = alloc_page(gfp_mask);
  307         if (!tmp)
  308             break;
  309         set_page_chain_next_offset_size(tmp, page, 0, 0);
  310         page = tmp;
  311     }
  312 
  313     if (i == number)
  314         return page;
  315 
  316     /* Not enough pages immediately available this time.
  317      * No need to jump around here, drbd_alloc_pages will retry this
  318      * function "soon". */
  319     if (page) {
  320         tmp = page_chain_tail(page, NULL);
  321         spin_lock(&drbd_pp_lock);
  322         page_chain_add(&drbd_pp_pool, page, tmp);
  323         drbd_pp_vacant += i;
  324         spin_unlock(&drbd_pp_lock);
  325     }
  326     return NULL;
  327 }
  328 
  329 static void rs_sectors_came_in(struct drbd_peer_device *peer_device, int size)
  330 {
  331     int rs_sect_in = atomic_add_return(size >> 9, &peer_device->rs_sect_in);
  332 
  333     /* In case resync runs faster than anticipated, run the resync_work early */
  334     if (rs_sect_in >= peer_device->rs_in_flight)
  335         drbd_queue_work_if_unqueued(
  336             &peer_device->connection->sender_work,
  337             &peer_device->resync_work);
  338 }
  339 
  340 static void reclaim_finished_net_peer_reqs(struct drbd_connection *connection,
  341                        struct list_head *to_be_freed)
  342 {
  343     struct drbd_peer_request *peer_req, *tmp;
  344 
  345     /* The EEs are always appended to the end of the list. Since
  346        they are sent in order over the wire, they have to finish
  347        in order. As soon as we see the first not finished we can
  348        stop to examine the list... */
  349 
  350     list_for_each_entry_safe(peer_req, tmp, &connection->net_ee, w.list) {
  351         if (drbd_peer_req_has_active_page(peer_req))
  352             break;
  353         list_move(&peer_req->w.list, to_be_freed);
  354     }
  355 }
  356 
  357 static void drbd_reclaim_net_peer_reqs(struct drbd_connection *connection)
  358 {
  359     LIST_HEAD(reclaimed);
  360     struct drbd_peer_request *peer_req, *t;
  361     struct drbd_resource *resource = connection->resource;
  362 
  363     spin_lock_irq(&resource->req_lock);
  364     reclaim_finished_net_peer_reqs(connection, &reclaimed);
  365     spin_unlock_irq(&resource->req_lock);
  366 
  367     list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
  368         drbd_free_net_peer_req(peer_req);
  369 }
  370 
  371 /**
  372  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
  373  * @device: DRBD device.
  374  * @number: number of pages requested
  375  * @gfp_mask:   how to allocate and whether to loop until we succeed
  376  *
  377  * Tries to allocate number pages, first from our own page pool, then from
  378  * the kernel.
  379  * Possibly retry until DRBD frees sufficient pages somewhere else.
  380  *
  381  * If this allocation would exceed the max_buffers setting, we throttle
  382  * allocation (schedule_timeout) to give the system some room to breathe.
  383  *
  384  * We do not use max-buffers as hard limit, because it could lead to
  385  * congestion and further to a distributed deadlock during online-verify or
  386  * (checksum based) resync, if the max-buffers, socket buffer sizes and
  387  * resync-rate settings are mis-configured.
  388  *
  389  * Returns a page chain linked via (struct drbd_page_chain*)&page->lru.
  390  */
  391 struct page *drbd_alloc_pages(struct drbd_transport *transport, unsigned int number,
  392                   gfp_t gfp_mask)
  393 {
  394     struct drbd_connection *connection =
  395         container_of(transport, struct drbd_connection, transport);
  396     struct page *page = NULL;
  397     DEFINE_WAIT(wait);
  398     unsigned int mxb;
  399 
  400     rcu_read_lock();
  401     mxb = rcu_dereference(transport->net_conf)->max_buffers;
  402     rcu_read_unlock();
  403 
  404     if (atomic_read(&connection->pp_in_use) < mxb)
  405         page = __drbd_alloc_pages(number, gfp_mask & ~__GFP_RECLAIM);
  406 
  407     /* Try to keep the fast path fast, but occasionally we need
  408      * to reclaim the pages we lent to the network stack. */
  409     if (page && atomic_read(&connection->pp_in_use_by_net) > 512)
  410         drbd_reclaim_net_peer_reqs(connection);
  411 
  412     while (page == NULL) {
  413         prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
  414 
  415         drbd_reclaim_net_peer_reqs(connection);
  416 
  417         if (atomic_read(&connection->pp_in_use) < mxb) {
  418             page = __drbd_alloc_pages(number, gfp_mask);
  419             if (page)
  420                 break;
  421         }
  422 
  423         if (!(gfp_mask & __GFP_RECLAIM))
  424             break;
  425 
  426         if (signal_pending(current)) {
  427             drbd_warn(connection, "drbd_alloc_pages interrupted!\n");
  428             break;
  429         }
  430 
  431         if (schedule_timeout(HZ/10) == 0)
  432             mxb = UINT_MAX;
  433     }
  434     finish_wait(&drbd_pp_wait, &wait);
  435 
  436     if (page)
  437         atomic_add(number, &connection->pp_in_use);
  438     return page;
  439 }
  440 
  441 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
  442  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
  443  * Either links the page chain back to the global pool,
  444  * or returns all pages to the system. */
  445 void drbd_free_pages(struct drbd_transport *transport, struct page *page, int is_net)
  446 {
  447     struct drbd_connection *connection =
  448         container_of(transport, struct drbd_connection, transport);
  449     atomic_t *a = is_net ? &connection->pp_in_use_by_net : &connection->pp_in_use;
  450     int i;
  451 
  452     if (page == NULL)
  453         return;
  454 
  455     if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
  456         i = page_chain_free(page);
  457     else {
  458         struct page *tmp;
  459         tmp = page_chain_tail(page, &i);
  460         spin_lock(&drbd_pp_lock);
  461         page_chain_add(&drbd_pp_pool, page, tmp);
  462         drbd_pp_vacant += i;
  463         spin_unlock(&drbd_pp_lock);
  464     }
  465     i = atomic_sub_return(i, a);
  466     if (i < 0)
  467         drbd_warn(connection, "ASSERTION FAILED: %s: %d < 0\n",
  468             is_net ? "pp_in_use_by_net" : "pp_in_use", i);
  469     wake_up(&drbd_pp_wait);
  470 }
  471 
  472 /*
  473 You need to hold the req_lock:
  474  _drbd_wait_ee_list_empty()
  475 
  476 You must not have the req_lock:
  477  drbd_free_peer_req()
  478  drbd_alloc_peer_req()
  479  drbd_free_peer_reqs()
  480  drbd_ee_fix_bhs()
  481  drbd_finish_peer_reqs()
  482  drbd_clear_done_ee()
  483  drbd_wait_ee_list_empty()
  484 */
  485 
  486 /* normal: payload_size == request size (bi_size)
  487  * w_same: payload_size == logical_block_size
  488  * trim: payload_size == 0 */
  489 struct drbd_peer_request *
  490 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, gfp_t gfp_mask) __must_hold(local)
  491 {
  492     struct drbd_device *device = peer_device->device;
  493     struct drbd_peer_request *peer_req;
  494 
  495     if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
  496         return NULL;
  497 
  498     peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
  499     if (!peer_req) {
  500         if (!(gfp_mask & __GFP_NOWARN))
  501             drbd_err(device, "%s: allocation failed\n", __func__);
  502         return NULL;
  503     }
  504 
  505     memset(peer_req, 0, sizeof(*peer_req));
  506     INIT_LIST_HEAD(&peer_req->w.list);
  507     drbd_clear_interval(&peer_req->i);
  508     INIT_LIST_HEAD(&peer_req->recv_order);
  509     INIT_LIST_HEAD(&peer_req->wait_for_actlog);
  510     peer_req->submit_jif = jiffies;
  511     peer_req->peer_device = peer_device;
  512 
  513     return peer_req;
  514 }
  515 
  516 void __drbd_free_peer_req(struct drbd_peer_request *peer_req, int is_net)
  517 {
  518     struct drbd_peer_device *peer_device = peer_req->peer_device;
  519 
  520     might_sleep();
  521     if (peer_req->flags & EE_HAS_DIGEST)
  522         kfree(peer_req->digest);
  523     D_ASSERT(peer_device, atomic_read(&peer_req->pending_bios) == 0);
  524     D_ASSERT(peer_device, drbd_interval_empty(&peer_req->i));
  525     drbd_free_page_chain(&peer_device->connection->transport, &peer_req->page_chain, is_net);
  526     mempool_free(peer_req, &drbd_ee_mempool);
  527 }
  528 
  529 int drbd_free_peer_reqs(struct drbd_resource *resource, struct list_head *list, bool is_net_ee)
  530 {
  531     LIST_HEAD(work_list);
  532     struct drbd_peer_request *peer_req, *t;
  533     int count = 0;
  534 
  535     spin_lock_irq(&resource->req_lock);
  536     list_splice_init(list, &work_list);
  537     spin_unlock_irq(&resource->req_lock);
  538 
  539     list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
  540         __drbd_free_peer_req(peer_req, is_net_ee);
  541         count++;
  542     }
  543     return count;
  544 }
  545 
  546 /*
  547  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
  548  */
  549 static int drbd_finish_peer_reqs(struct drbd_connection *connection)
  550 {
  551     LIST_HEAD(work_list);
  552     LIST_HEAD(reclaimed);
  553     struct drbd_peer_request *peer_req, *t;
  554     int err = 0;
  555     int n = 0;
  556 
  557     spin_lock_irq(&connection->resource->req_lock);
  558     reclaim_finished_net_peer_reqs(connection, &reclaimed);
  559     list_splice_init(&connection->done_ee, &work_list);
  560     spin_unlock_irq(&connection->resource->req_lock);
  561 
  562     list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
  563         drbd_free_net_peer_req(peer_req);
  564 
  565     /* possible callbacks here:
  566      * e_end_block, and e_end_resync_block, e_send_discard_write.
  567      * all ignore the last argument.
  568      */
  569     list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
  570         int err2;
  571 
  572         ++n;
  573         /* list_del not necessary, next/prev members not touched */
  574         err2 = peer_req->w.cb(&peer_req->w, !!err);
  575         if (!err)
  576             err = err2;
  577         if (!list_empty(&peer_req->recv_order)) {
  578             drbd_free_page_chain(&connection->transport, &peer_req->page_chain, 0);
  579         } else
  580             drbd_free_peer_req(peer_req);
  581     }
  582     if (atomic_sub_and_test(n, &connection->done_ee_cnt))
  583         wake_up(&connection->ee_wait);
  584 
  585     return err;
  586 }
  587 
  588 static int drbd_recv(struct drbd_connection *connection, void **buf, size_t size, int flags)
  589 {
  590     struct drbd_transport_ops *tr_ops = connection->transport.ops;
  591     int rv;
  592 
  593     rv = tr_ops->recv(&connection->transport, DATA_STREAM, buf, size, flags);
  594 
  595     if (rv < 0) {
  596         if (rv == -ECONNRESET)
  597             drbd_info(connection, "sock was reset by peer\n");
  598         else if (rv != -ERESTARTSYS)
  599             drbd_info(connection, "sock_recvmsg returned %d\n", rv);
  600     } else if (rv == 0) {
  601         if (test_bit(DISCONNECT_EXPECTED, &connection->flags)) {
  602             long t;
  603             rcu_read_lock();
  604             t = rcu_dereference(connection->transport.net_conf)->ping_timeo * HZ/10;
  605             rcu_read_unlock();
  606 
  607             t = wait_event_timeout(connection->resource->state_wait,
  608                            connection->cstate[NOW] < C_CONNECTED, t);
  609 
  610             if (t)
  611                 goto out;
  612         }
  613         drbd_info(connection, "sock was shut down by peer\n");
  614     }
  615 
  616     if (rv != size)
  617         change_cstate(connection, C_BROKEN_PIPE, CS_HARD);
  618 
  619 out:
  620     return rv;
  621 }
  622 
  623 static int drbd_recv_into(struct drbd_connection *connection, void *buf, size_t size)
  624 {
  625     int err;
  626 
  627     err = drbd_recv(connection, &buf, size, CALLER_BUFFER);
  628 
  629     if (err != size) {
  630         if (err >= 0)
  631             err = -EIO;
  632     } else
  633         err = 0;
  634     return err;
  635 }
  636 
  637 static int drbd_recv_all(struct drbd_connection *connection, void **buf, size_t size)
  638 {
  639     int err;
  640 
  641     err = drbd_recv(connection, buf, size, 0);
  642 
  643     if (err != size) {
  644         if (err >= 0)
  645             err = -EIO;
  646     } else
  647         err = 0;
  648     return err;
  649 }
  650 
  651 static int drbd_recv_all_warn(struct drbd_connection *connection, void **buf, size_t size)
  652 {
  653     int err;
  654 
  655     err = drbd_recv_all(connection, buf, size);
  656     if (err && !signal_pending(current))
  657         drbd_warn(connection, "short read (expected size %d)\n", (int)size);
  658     return err;
  659 }
  660 
  661 /* Gets called if a connection is established, or if a new minor gets created
  662    in a connection */
  663 int drbd_connected(struct drbd_peer_device *peer_device)
  664 {
  665     struct drbd_device *device = peer_device->device;
  666     u64 weak_nodes = 0;
  667     int err;
  668 
  669     atomic_set(&peer_device->packet_seq, 0);
  670     peer_device->peer_seq = 0;
  671 
  672     if (device->resource->role[NOW] == R_PRIMARY)
  673         weak_nodes = drbd_weak_nodes_device(device);
  674 
  675     err = drbd_send_sync_param(peer_device);
  676     if (!err)
  677         err = drbd_send_sizes(peer_device, 0, 0);
  678     if (!err)
  679         err = drbd_send_uuids(peer_device, 0, weak_nodes);
  680     if (!err) {
  681         set_bit(INITIAL_STATE_SENT, &peer_device->flags);
  682         err = drbd_send_current_state(peer_device);
  683     }
  684 
  685     clear_bit(USE_DEGR_WFC_T, &peer_device->flags);
  686     clear_bit(RESIZE_PENDING, &peer_device->flags);
  687     mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
  688     return err;
  689 }
  690 
  691 void connect_timer_fn(struct timer_list *t)
  692 {
  693     struct drbd_connection *connection = from_timer(connection, t, connect_timer);
  694     struct drbd_resource *resource = connection->resource;
  695     unsigned long irq_flags;
  696 
  697     spin_lock_irqsave(&resource->req_lock, irq_flags);
  698     drbd_queue_work(&connection->sender_work, &connection->connect_timer_work);
  699     spin_unlock_irqrestore(&resource->req_lock, irq_flags);
  700 }
  701 
  702 static void conn_connect2(struct drbd_connection *connection)
  703 {
  704     struct drbd_peer_device *peer_device;
  705     int vnr;
  706 
  707     atomic_set(&connection->ap_in_flight, 0);
  708     atomic_set(&connection->rs_in_flight, 0);
  709 
  710     rcu_read_lock();
  711     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
  712         struct drbd_device *device = peer_device->device;
  713         kref_get(&device->kref);
  714         /* connection cannot go away: caller holds a reference. */
  715         rcu_read_unlock();
  716         drbd_connected(peer_device);
  717         rcu_read_lock();
  718         kref_put(&device->kref, drbd_destroy_device);
  719     }
  720     rcu_read_unlock();
  721 }
  722 
  723 static int connect_work(struct drbd_work *work, int cancel)
  724 {
  725     struct drbd_connection *connection =
  726         container_of(work, struct drbd_connection, connect_timer_work);
  727     struct drbd_resource *resource = connection->resource;
  728     enum drbd_state_rv rv;
  729     long t = resource->res_opts.auto_promote_timeout * HZ / 10;
  730 
  731     if (connection->cstate[NOW] != C_CONNECTING)
  732         goto out_put;
  733 
  734     do {
  735         rv = change_cstate(connection, C_CONNECTED, CS_SERIALIZE | CS_VERBOSE | CS_DONT_RETRY);
  736         if (rv != SS_PRIMARY_READER)
  737             break;
  738 
  739         /* We have a connection established, peer is primary. On my side is a
  740            read-only opener, probably udev or some other scanning after device creating.
  741            This short lived read-only open prevents now that we can continue.
  742            Better retry after the read-only opener goes away. */
  743 
  744         t = wait_event_interruptible_timeout(resource->state_wait,
  745                              !drbd_open_ro_count(resource),
  746                              t);
  747     } while (t > 0);
  748 
  749     if (rv >= SS_SUCCESS) {
  750         conn_connect2(connection);
  751     } else if (rv == SS_TIMEOUT || rv == SS_CONCURRENT_ST_CHG) {
  752         if (connection->cstate[NOW] != C_CONNECTING)
  753             goto out_put;
  754         connection->connect_timer.expires = jiffies + HZ/20;
  755         add_timer(&connection->connect_timer);
  756         return 0; /* Return early. Keep the reference on the connection! */
  757     } else {
  758         drbd_info(connection, "Failure to connect; retrying\n");
  759         change_cstate(connection, C_NETWORK_FAILURE, CS_HARD);
  760     }
  761 
  762  out_put:
  763     kref_debug_put(&connection->kref_debug, 11);
  764     kref_put(&connection->kref, drbd_destroy_connection);
  765     return 0;
  766 }
  767 
  768 /*
  769  * Returns true if we have a valid connection.
  770  */
  771 static bool conn_connect(struct drbd_connection *connection)
  772 {
  773     struct drbd_transport *transport = &connection->transport;
  774     struct drbd_resource *resource = connection->resource;
  775     int ping_timeo, ping_int, h, err, vnr, timeout;
  776     struct drbd_peer_device *peer_device;
  777     struct net_conf *nc;
  778     bool discard_my_data;
  779     bool have_mutex;
  780     bool no_addr = false;
  781 
  782 start:
  783     have_mutex = false;
  784     clear_bit(DISCONNECT_EXPECTED, &connection->flags);
  785     if (change_cstate(connection, C_CONNECTING, CS_VERBOSE) < SS_SUCCESS) {
  786         /* We do not have a network config. */
  787         return false;
  788     }
  789 
  790     /* Assume that the peer only understands our minimum supported
  791      * protocol version; until we know better. */
  792     connection->agreed_pro_version = drbd_protocol_version_min;
  793 
  794     err = transport->ops->connect(transport);
  795     if (err == -EAGAIN) {
  796         if (connection->cstate[NOW] == C_DISCONNECTING)
  797             return false;
  798         goto retry;
  799     } else if (err == -EADDRNOTAVAIL) {
  800         struct net_conf *nc;
  801         int connect_int;
  802         long t;
  803 
  804         rcu_read_lock();
  805         nc = rcu_dereference(transport->net_conf);
  806         connect_int = nc ? nc->connect_int : 10;
  807         rcu_read_unlock();
  808 
  809         if (!no_addr) {
  810             drbd_warn(connection,
  811                   "Configured local address not found, retrying every %d sec, "
  812                   "err=%d\n", connect_int, err);
  813             no_addr = true;
  814         }
  815 
  816         t = schedule_timeout_interruptible(connect_int * HZ);
  817         if (t || connection->cstate[NOW] == C_DISCONNECTING)
  818             return false;
  819         goto start;
  820     } else if (err < 0) {
  821         drbd_warn(connection, "Failed to initiate connection, err=%d\n", err);
  822         goto abort;
  823     }
  824 
  825     connection->last_received = jiffies;
  826 
  827     rcu_read_lock();
  828     nc = rcu_dereference(connection->transport.net_conf);
  829     ping_timeo = nc->ping_timeo;
  830     ping_int = nc->ping_int;
  831     rcu_read_unlock();
  832 
  833     /* Make sure we are "uncorked", otherwise we risk timeouts,
  834      * in case this is a reconnect and we had been corked before. */
  835     drbd_uncork(connection, CONTROL_STREAM);
  836     drbd_uncork(connection, DATA_STREAM);
  837 
  838     /* Make sure the handshake happens without interference from other threads,
  839      * or the challenge response authentication could be garbled. */
  840     mutex_lock(&connection->mutex[DATA_STREAM]);
  841     have_mutex = true;
  842     transport->ops->set_rcvtimeo(transport, DATA_STREAM, ping_timeo * 4 * HZ/10);
  843     transport->ops->set_rcvtimeo(transport, CONTROL_STREAM, ping_int * HZ);
  844 
  845     h = drbd_do_features(connection);
  846     if (h < 0)
  847         goto abort;
  848     if (h == 0)
  849         goto retry;
  850 
  851     if (connection->cram_hmac_tfm) {
  852         switch (drbd_do_auth(connection)) {
  853         case -1:
  854             drbd_err(connection, "Authentication of peer failed\n");
  855             goto abort;
  856         case 0:
  857             drbd_err(connection, "Authentication of peer failed, trying again.\n");
  858             goto retry;
  859         }
  860     }
  861 
  862     transport->ops->set_rcvtimeo(transport, DATA_STREAM, MAX_SCHEDULE_TIMEOUT);
  863 
  864     discard_my_data = test_bit(CONN_DISCARD_MY_DATA, &connection->flags);
  865 
  866     if (__drbd_send_protocol(connection, P_PROTOCOL) == -EOPNOTSUPP)
  867         goto abort;
  868 
  869     rcu_read_lock();
  870     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
  871         clear_bit(INITIAL_STATE_SENT, &peer_device->flags);
  872         clear_bit(INITIAL_STATE_RECEIVED, &peer_device->flags);
  873     }
  874     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
  875         if (discard_my_data)
  876             set_bit(DISCARD_MY_DATA, &peer_device->flags);
  877         else
  878             clear_bit(DISCARD_MY_DATA, &peer_device->flags);
  879     }
  880     rcu_read_unlock();
  881     mutex_unlock(&connection->mutex[DATA_STREAM]);
  882     have_mutex = false;
  883 
  884     drbd_thread_start(&connection->ack_receiver);
  885     connection->ack_sender =
  886         alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
  887     if (!connection->ack_sender) {
  888         drbd_err(connection, "Failed to create workqueue ack_sender\n");
  889         schedule_timeout_uninterruptible(HZ);
  890         goto retry;
  891     }
  892 
  893     if (connection->agreed_pro_version >= 110) {
  894         if (resource->res_opts.node_id < connection->peer_node_id) {
  895             kref_get(&connection->kref);
  896             kref_debug_get(&connection->kref_debug, 11);
  897             connection->connect_timer_work.cb = connect_work;
  898             timeout = twopc_retry_timeout(resource, 0);
  899             drbd_debug(connection, "Waiting for %ums to avoid transaction "
  900                    "conflicts\n", jiffies_to_msecs(timeout));
  901             connection->connect_timer.expires = jiffies + timeout;
  902             add_timer(&connection->connect_timer);
  903         }
  904     } else {
  905         enum drbd_state_rv rv;
  906         rv = change_cstate(connection, C_CONNECTED,
  907                    CS_VERBOSE | CS_WAIT_COMPLETE | CS_SERIALIZE | CS_LOCAL_ONLY);
  908         if (rv < SS_SUCCESS || connection->cstate[NOW] != C_CONNECTED)
  909             goto retry;
  910         conn_connect2(connection);
  911     }
  912     return true;
  913 
  914 retry:
  915     if (have_mutex)
  916         mutex_unlock(&connection->mutex[DATA_STREAM]);
  917     conn_disconnect(connection);
  918     schedule_timeout_interruptible(HZ);
  919     goto start;
  920 
  921 abort:
  922     if (have_mutex)
  923         mutex_unlock(&connection->mutex[DATA_STREAM]);
  924     change_cstate(connection, C_DISCONNECTING, CS_HARD);
  925     return false;
  926 }
  927 
  928 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
  929 {
  930     unsigned int header_size = drbd_header_size(connection);
  931 
  932     if (header_size == sizeof(struct p_header100) &&
  933         *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
  934         struct p_header100 *h = header;
  935         u16 vnr = be16_to_cpu(h->volume);
  936 
  937         if (h->pad != 0) {
  938             drbd_err(connection, "Header padding is not zero\n");
  939             return -EINVAL;
  940         }
  941         pi->vnr = vnr == ((u16) 0xFFFF) ? -1 : vnr;
  942 
  943         pi->cmd = be16_to_cpu(h->command);
  944         pi->size = be32_to_cpu(h->length);
  945     } else if (header_size == sizeof(struct p_header95) &&
  946            *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
  947         struct p_header95 *h = header;
  948         pi->cmd = be16_to_cpu(h->command);
  949         pi->size = be32_to_cpu(h->length);
  950         pi->vnr = 0;
  951     } else if (header_size == sizeof(struct p_header80) &&
  952            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
  953         struct p_header80 *h = header;
  954         pi->cmd = be16_to_cpu(h->command);
  955         pi->size = be16_to_cpu(h->length);
  956         pi->vnr = 0;
  957     } else {
  958         drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
  959              be32_to_cpu(*(__be32 *)header),
  960              connection->agreed_pro_version);
  961         return -EINVAL;
  962     }
  963     pi->data = header + header_size;
  964     return 0;
  965 }
  966 
  967 static void drbd_unplug_all_devices(struct drbd_connection *connection)
  968 {
  969     if (current->plug == &connection->receiver_plug) {
  970         blk_finish_plug(&connection->receiver_plug);
  971         blk_start_plug(&connection->receiver_plug);
  972     } /* else: maybe just schedule() ?? */
  973 }
  974 
  975 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
  976 {
  977     void *buffer;
  978     int err;
  979 
  980     err = drbd_recv_all_warn(connection, &buffer, drbd_header_size(connection));
  981     if (err)
  982         return err;
  983 
  984     err = decode_header(connection, buffer, pi);
  985     connection->last_received = jiffies;
  986 
  987     return err;
  988 }
  989 
  990 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
  991 {
  992     struct drbd_transport_ops *tr_ops = connection->transport.ops;
  993     unsigned int size = drbd_header_size(connection);
  994     void *buffer;
  995     int err;
  996 
  997     err = tr_ops->recv(&connection->transport, DATA_STREAM, &buffer,
  998                size, MSG_NOSIGNAL | MSG_DONTWAIT);
  999     if (err != size) {
 1000         int rflags = 0;
 1001 
 1002         /* If we have nothing in the receive buffer now, to reduce
 1003          * application latency, try to drain the backend queues as
 1004          * quickly as possible, and let remote TCP know what we have
 1005          * received so far. */
 1006         if (err == -EAGAIN) {
 1007             tr_ops->hint(&connection->transport, DATA_STREAM, QUICKACK);
 1008             drbd_unplug_all_devices(connection);
 1009         } else if (err > 0) {
 1010             size -= err;
 1011             rflags |= GROW_BUFFER;
 1012         }
 1013 
 1014         err = drbd_recv(connection, &buffer, size, rflags);
 1015         if (err != size) {
 1016             if (err >= 0)
 1017                 err = -EIO;
 1018         } else
 1019             err = 0;
 1020 
 1021         if (err)
 1022             return err;
 1023     }
 1024 
 1025     err = decode_header(connection, buffer, pi);
 1026     connection->last_received = jiffies;
 1027 
 1028     return err;
 1029 }
 1030 
 1031 /* This is blkdev_issue_flush, but asynchronous.
 1032  * We want to submit to all component volumes in parallel,
 1033  * then wait for all completions.
 1034  */
 1035 struct issue_flush_context {
 1036     atomic_t pending;
 1037     int error;
 1038     struct completion done;
 1039 };
 1040 struct one_flush_context {
 1041     struct drbd_device *device;
 1042     struct issue_flush_context *ctx;
 1043 };
 1044 
 1045 static void one_flush_endio(struct bio *bio)
 1046 {
 1047     struct one_flush_context *octx = bio->bi_private;
 1048     struct drbd_device *device = octx->device;
 1049     struct issue_flush_context *ctx = octx->ctx;
 1050 
 1051     blk_status_t status = bio->bi_status;
 1052 
 1053     if (status) {
 1054         ctx->error = blk_status_to_errno(status);
 1055         drbd_info(device, "local disk FLUSH FAILED with status %d\n", status);
 1056     }
 1057     kfree(octx);
 1058     bio_put(bio);
 1059 
 1060     clear_bit(FLUSH_PENDING, &device->flags);
 1061     put_ldev(device);
 1062     kref_debug_put(&device->kref_debug, 7);
 1063     kref_put(&device->kref, drbd_destroy_device);
 1064 
 1065     if (atomic_dec_and_test(&ctx->pending))
 1066         complete(&ctx->done);
 1067 }
 1068 
 1069 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
 1070 {
 1071     struct bio *bio = bio_alloc(GFP_NOIO, 0);
 1072     struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
 1073     if (!bio || !octx) {
 1074         drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
 1075         /* FIXME: what else can I do now?  disconnecting or detaching
 1076          * really does not help to improve the state of the world, either.
 1077          */
 1078         kfree(octx);
 1079         if (bio)
 1080             bio_put(bio);
 1081 
 1082         ctx->error = -ENOMEM;
 1083         put_ldev(device);
 1084         kref_debug_put(&device->kref_debug, 7);
 1085         kref_put(&device->kref, drbd_destroy_device);
 1086         return;
 1087     }
 1088 
 1089     octx->device = device;
 1090     octx->ctx = ctx;
 1091     bio_set_dev(bio, device->ldev->backing_bdev);
 1092     bio->bi_private = octx;
 1093     bio->bi_end_io = one_flush_endio;
 1094 
 1095     device->flush_jif = jiffies;
 1096     set_bit(FLUSH_PENDING, &device->flags);
 1097     atomic_inc(&ctx->pending);
 1098     bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
 1099     submit_bio(bio);
 1100 }
 1101 
 1102 static enum finish_epoch drbd_flush_after_epoch(struct drbd_connection *connection, struct drbd_epoch *epoch)
 1103 {
 1104     struct drbd_resource *resource = connection->resource;
 1105 
 1106     if (resource->write_ordering >= WO_BDEV_FLUSH) {
 1107         struct drbd_device *device;
 1108         struct issue_flush_context ctx;
 1109         int vnr;
 1110 
 1111         atomic_set(&ctx.pending, 1);
 1112         ctx.error = 0;
 1113         init_completion(&ctx.done);
 1114 
 1115         rcu_read_lock();
 1116         idr_for_each_entry(&resource->devices, device, vnr) {
 1117             if (!get_ldev(device))
 1118                 continue;
 1119             kref_get(&device->kref);
 1120             kref_debug_get(&device->kref_debug, 7);
 1121             rcu_read_unlock();
 1122 
 1123             submit_one_flush(device, &ctx);
 1124 
 1125             rcu_read_lock();
 1126         }
 1127         rcu_read_unlock();
 1128 
 1129         /* Do we want to add a timeout,
 1130          * if disk-timeout is set? */
 1131         if (!atomic_dec_and_test(&ctx.pending))
 1132             wait_for_completion(&ctx.done);
 1133 
 1134         if (ctx.error) {
 1135             /* would rather check on EOPNOTSUPP, but that is not reliable.
 1136              * don't try again for ANY return value != 0
 1137              * if (rv == -EOPNOTSUPP) */
 1138             /* Any error is already reported by bio_endio callback. */
 1139             drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
 1140         }
 1141     }
 1142 
 1143     /* If called before sending P_CONFIRM_STABLE, we don't have the epoch
 1144      * (and must not finish it yet, anyways) */
 1145     if (epoch == NULL)
 1146         return FE_STILL_LIVE;
 1147     return drbd_may_finish_epoch(connection, epoch, EV_BARRIER_DONE);
 1148 }
 1149 
 1150 static int w_flush(struct drbd_work *w, int cancel)
 1151 {
 1152     struct flush_work *fw = container_of(w, struct flush_work, w);
 1153     struct drbd_epoch *epoch = fw->epoch;
 1154     struct drbd_connection *connection = epoch->connection;
 1155 
 1156     kfree(fw);
 1157 
 1158     if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
 1159         drbd_flush_after_epoch(connection, epoch);
 1160 
 1161     drbd_may_finish_epoch(connection, epoch, EV_PUT |
 1162                   (connection->cstate[NOW] < C_CONNECTED ? EV_CLEANUP : 0));
 1163 
 1164     return 0;
 1165 }
 1166 
 1167 static void drbd_send_b_ack(struct drbd_connection *connection, u32 barrier_nr, u32 set_size)
 1168 {
 1169     struct p_barrier_ack *p;
 1170 
 1171     if (connection->cstate[NOW] < C_CONNECTED)
 1172         return;
 1173 
 1174     p = conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM);
 1175     if (!p)
 1176         return;
 1177     p->barrier = barrier_nr;
 1178     p->set_size = cpu_to_be32(set_size);
 1179     send_command(connection, -1, P_BARRIER_ACK, CONTROL_STREAM);
 1180 }
 1181 
 1182 static void drbd_send_confirm_stable(struct drbd_peer_request *peer_req)
 1183 {
 1184     struct drbd_connection *connection = peer_req->peer_device->connection;
 1185     struct drbd_resource *resource = connection->resource;
 1186     struct drbd_epoch *epoch = peer_req->epoch;
 1187     struct drbd_peer_request *oldest, *youngest;
 1188     struct p_confirm_stable *p;
 1189     int count;
 1190 
 1191     if (connection->cstate[NOW] < C_CONNECTED)
 1192         return;
 1193 
 1194     /* peer_req is not on stable storage yet, but the only one in this epoch.
 1195      * Nothing to confirm, just wait for the normal barrier_ack and peer_ack
 1196      * to do their work. */
 1197     oldest = epoch->oldest_unconfirmed_peer_req;
 1198     if (oldest == peer_req)
 1199         return;
 1200 
 1201     p = conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM);
 1202     if (!p)
 1203         return;
 1204 
 1205     /* receive_Data() does a list_add_tail() for every requests, which
 1206      * means the oldest is .next, the currently blocked one that triggered
 1207      * this code path is .prev, and the youngest that now should be on
 1208      * stable storage is .prev->prev */
 1209     spin_lock_irq(&resource->req_lock);
 1210     youngest = list_entry(peer_req->recv_order.prev, struct drbd_peer_request, recv_order);
 1211     spin_unlock_irq(&resource->req_lock);
 1212 
 1213     count = atomic_read(&epoch->epoch_size) - atomic_read(&epoch->confirmed) - 1;
 1214     atomic_add(count, &epoch->confirmed);
 1215     epoch->oldest_unconfirmed_peer_req = peer_req;
 1216 
 1217     D_ASSERT(connection, oldest->epoch == youngest->epoch);
 1218     D_ASSERT(connection, count > 0);
 1219 
 1220     p->oldest_block_id = oldest->block_id;
 1221     p->youngest_block_id = youngest->block_id;
 1222     p->set_size = cpu_to_be32(count);
 1223     p->pad = 0;
 1224 
 1225     send_command(connection, -1, P_CONFIRM_STABLE, CONTROL_STREAM);
 1226 }
 1227 
 1228 /**
 1229  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
 1230  * @connection: DRBD connection.
 1231  * @epoch:  Epoch object.
 1232  * @ev:     Epoch event.
 1233  */
 1234 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
 1235                            struct drbd_epoch *epoch,
 1236                            enum epoch_event ev)
 1237 {
 1238     int finish, epoch_size;
 1239     struct drbd_epoch *next_epoch;
 1240     int schedule_flush = 0;
 1241     enum finish_epoch rv = FE_STILL_LIVE;
 1242     struct drbd_resource *resource = connection->resource;
 1243 
 1244     spin_lock(&connection->epoch_lock);
 1245     do {
 1246         next_epoch = NULL;
 1247         finish = 0;
 1248 
 1249         epoch_size = atomic_read(&epoch->epoch_size);
 1250 
 1251         switch (ev & ~EV_CLEANUP) {
 1252         case EV_PUT:
 1253             atomic_dec(&epoch->active);
 1254             break;
 1255         case EV_GOT_BARRIER_NR:
 1256             set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
 1257 
 1258             /* Special case: If we just switched from WO_BIO_BARRIER to
 1259                WO_BDEV_FLUSH we should not finish the current epoch */
 1260             if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
 1261                 resource->write_ordering != WO_BIO_BARRIER &&
 1262                 epoch == connection->current_epoch)
 1263                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
 1264             break;
 1265         case EV_BARRIER_DONE:
 1266             set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
 1267             break;
 1268         case EV_BECAME_LAST:
 1269             /* nothing to do*/
 1270             break;
 1271         }
 1272 
 1273         if (epoch_size != 0 &&
 1274             atomic_read(&epoch->active) == 0 &&
 1275             (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP) &&
 1276             epoch->list.prev == &connection->current_epoch->list &&
 1277             !test_bit(DE_IS_FINISHING, &epoch->flags)) {
 1278             /* Nearly all conditions are met to finish that epoch... */
 1279             if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
 1280                 resource->write_ordering == WO_NONE ||
 1281                 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
 1282                 ev & EV_CLEANUP) {
 1283                 finish = 1;
 1284                 set_bit(DE_IS_FINISHING, &epoch->flags);
 1285             } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
 1286                  resource->write_ordering == WO_BIO_BARRIER) {
 1287                 atomic_inc(&epoch->active);
 1288                 schedule_flush = 1;
 1289             }
 1290         }
 1291         if (finish) {
 1292             if (!(ev & EV_CLEANUP)) {
 1293                 /* adjust for nr requests already confirmed via P_CONFIRM_STABLE, if any. */
 1294                 epoch_size -= atomic_read(&epoch->confirmed);
 1295                 spin_unlock(&connection->epoch_lock);
 1296                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
 1297                 spin_lock(&connection->epoch_lock);
 1298             }
 1299 
 1300             if (connection->current_epoch != epoch) {
 1301                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
 1302                 list_del(&epoch->list);
 1303                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
 1304                 connection->epochs--;
 1305                 kfree(epoch);
 1306 
 1307                 if (rv == FE_STILL_LIVE)
 1308                     rv = FE_DESTROYED;
 1309             } else {
 1310                 epoch->oldest_unconfirmed_peer_req = NULL;
 1311                 epoch->flags = 0;
 1312                 atomic_set(&epoch->epoch_size, 0);
 1313                 atomic_set(&epoch->confirmed, 0);
 1314                 /* atomic_set(&epoch->active, 0); is alrady zero */
 1315                 if (rv == FE_STILL_LIVE)
 1316                     rv = FE_RECYCLED;
 1317             }
 1318         }
 1319 
 1320         if (!next_epoch)
 1321             break;
 1322 
 1323         epoch = next_epoch;
 1324     } while (1);
 1325 
 1326     spin_unlock(&connection->epoch_lock);
 1327 
 1328     if (schedule_flush) {
 1329         struct flush_work *fw;
 1330         fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
 1331         if (fw) {
 1332             fw->w.cb = w_flush;
 1333             fw->epoch = epoch;
 1334             drbd_queue_work(&resource->work, &fw->w);
 1335         } else {
 1336             drbd_warn(resource, "Could not kmalloc a flush_work obj\n");
 1337             set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
 1338             /* That is not a recursion, only one level */
 1339             drbd_may_finish_epoch(connection, epoch, EV_BARRIER_DONE);
 1340             drbd_may_finish_epoch(connection, epoch, EV_PUT);
 1341         }
 1342     }
 1343 
 1344     return rv;
 1345 }
 1346 
 1347 static enum write_ordering_e
 1348 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
 1349 {
 1350     struct disk_conf *dc;
 1351 
 1352     dc = rcu_dereference(bdev->disk_conf);
 1353 
 1354     if (wo == WO_BIO_BARRIER && !dc->disk_barrier)
 1355         wo = WO_BDEV_FLUSH;
 1356     if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
 1357         wo = WO_DRAIN_IO;
 1358     if (wo == WO_DRAIN_IO && !dc->disk_drain)
 1359         wo = WO_NONE;
 1360 
 1361     return wo;
 1362 }
 1363 
 1364 /**
 1365  * drbd_bump_write_ordering() - Fall back to an other write ordering method
 1366  * @resource:   DRBD resource.
 1367  * @wo:     Write ordering method to try.
 1368  */
 1369 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
 1370                   enum write_ordering_e wo) __must_hold(local)
 1371 {
 1372     struct drbd_device *device;
 1373     enum write_ordering_e pwo;
 1374     int vnr, i = 0;
 1375     static char *write_ordering_str[] = {
 1376         [WO_NONE] = "none",
 1377         [WO_DRAIN_IO] = "drain",
 1378         [WO_BDEV_FLUSH] = "flush",
 1379         [WO_BIO_BARRIER] = "barrier",
 1380     };
 1381 
 1382     pwo = resource->write_ordering;
 1383     if (wo != WO_BIO_BARRIER)
 1384         wo = min(pwo, wo);
 1385     rcu_read_lock();
 1386     idr_for_each_entry(&resource->devices, device, vnr) {
 1387         if (i++ == 1 && wo == WO_BIO_BARRIER)
 1388             wo = WO_BDEV_FLUSH; /* WO = barrier does not handle multiple volumes */
 1389 
 1390         if (get_ldev(device)) {
 1391             wo = max_allowed_wo(device->ldev, wo);
 1392             if (device->ldev == bdev)
 1393                 bdev = NULL;
 1394             put_ldev(device);
 1395         }
 1396     }
 1397 
 1398     if (bdev)
 1399         wo = max_allowed_wo(bdev, wo);
 1400 
 1401     rcu_read_unlock();
 1402 
 1403     resource->write_ordering = wo;
 1404     if (pwo != resource->write_ordering || wo == WO_BIO_BARRIER)
 1405         drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
 1406 }
 1407 
 1408 /*
 1409  * We *may* ignore the discard-zeroes-data setting, if so configured.
 1410  *
 1411  * Assumption is that this "discard_zeroes_data=0" is only because the backend
 1412  * may ignore partial unaligned discards.
 1413  *
 1414  * LVM/DM thin as of at least
 1415  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
 1416  *   Library version: 1.02.93-RHEL7 (2015-01-28)
 1417  *   Driver version:  4.29.0
 1418  * still behaves this way.
 1419  *
 1420  * For unaligned (wrt. alignment and granularity) or too small discards,
 1421  * we zero-out the initial (and/or) trailing unaligned partial chunks,
 1422  * but discard all the aligned full chunks.
 1423  *
 1424  * At least for LVM/DM thin, with skip_block_zeroing=false,
 1425  * the result is effectively "discard_zeroes_data=1".
 1426  */
 1427 /* flags: EE_TRIM|EE_ZEROOUT */
 1428 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
 1429 {
 1430     struct block_device *bdev = device->ldev->backing_bdev;
 1431     struct request_queue *q = bdev_get_queue(bdev);
 1432     sector_t tmp, nr;
 1433     unsigned int max_discard_sectors, granularity;
 1434     int alignment;
 1435     int err = 0;
 1436 
 1437     if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
 1438         goto zero_out;
 1439 
 1440     /* Zero-sector (unknown) and one-sector granularities are the same.  */
 1441     granularity = max(q->limits.discard_granularity >> 9, 1U);
 1442     alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
 1443 
 1444     max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
 1445     max_discard_sectors -= max_discard_sectors % granularity;
 1446     if (unlikely(!max_discard_sectors))
 1447         goto zero_out;
 1448 
 1449     if (nr_sectors < granularity)
 1450         goto zero_out;
 1451 
 1452     tmp = start;
 1453     if (sector_div(tmp, granularity) != alignment) {
 1454         if (nr_sectors < 2*granularity)
 1455             goto zero_out;
 1456         /* start + gran - (start + gran - align) % gran */
 1457         tmp = start + granularity - alignment;
 1458         tmp = start + granularity - sector_div(tmp, granularity);
 1459 
 1460         nr = tmp - start;
 1461         /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
 1462          * layers are below us, some may have smaller granularity */
 1463         err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
 1464         nr_sectors -= nr;
 1465         start = tmp;
 1466     }
 1467     while (nr_sectors >= max_discard_sectors) {
 1468         err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
 1469         nr_sectors -= max_discard_sectors;
 1470         start += max_discard_sectors;
 1471     }
 1472     if (nr_sectors) {
 1473         /* max_discard_sectors is unsigned int (and a multiple of
 1474          * granularity, we made sure of that above already);
 1475          * nr is < max_discard_sectors;
 1476          * I don't need sector_div here, even though nr is sector_t */
 1477         nr = nr_sectors;
 1478         nr -= (unsigned int)nr % granularity;
 1479         if (nr) {
 1480             err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
 1481             nr_sectors -= nr;
 1482             start += nr;
 1483         }
 1484     }
 1485  zero_out:
 1486     if (nr_sectors) {
 1487         err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
 1488                 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
 1489     }
 1490     return err != 0;
 1491 }
 1492 
 1493 static bool can_do_reliable_discards(struct drbd_device *device)
 1494 {
 1495     struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
 1496     struct disk_conf *dc;
 1497     bool can_do;
 1498 
 1499     if (!blk_queue_discard(q))
 1500         return false;
 1501 
 1502     if (queue_discard_zeroes_data(q))
 1503         return true;
 1504 
 1505     rcu_read_lock();
 1506     dc = rcu_dereference(device->ldev->disk_conf);
 1507     can_do = dc->discard_zeroes_if_aligned;
 1508     rcu_read_unlock();
 1509     return can_do;
 1510 }
 1511 
 1512 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
 1513 {
 1514     /* If the backend cannot discard, or does not guarantee
 1515      * read-back zeroes in discarded ranges, we fall back to
 1516      * zero-out.  Unless configuration specifically requested
 1517      * otherwise. */
 1518     if (!can_do_reliable_discards(device))
 1519         peer_req->flags |= EE_ZEROOUT;
 1520 
 1521     if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
 1522         peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
 1523         peer_req->flags |= EE_WAS_ERROR;
 1524     drbd_endio_write_sec_final(peer_req);
 1525 }
 1526 
 1527 static void drbd_issue_peer_wsame(struct drbd_device *device,
 1528                   struct drbd_peer_request *peer_req)
 1529 {
 1530     struct block_device *bdev = device->ldev->backing_bdev;
 1531     sector_t s = peer_req->i.sector;
 1532     sector_t nr = peer_req->i.size >> 9;
 1533     if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->page_chain.head))
 1534         peer_req->flags |= EE_WAS_ERROR;
 1535     drbd_endio_write_sec_final(peer_req);
 1536 }
 1537 
 1538 static bool conn_wait_ee_cond(struct drbd_connection *connection, struct list_head *head)
 1539 {
 1540     struct drbd_resource *resource = connection->resource;
 1541     bool done;
 1542 
 1543     spin_lock_irq(&resource->req_lock);
 1544     done = list_empty(head);
 1545     spin_unlock_irq(&resource->req_lock);
 1546 
 1547     if (!done)
 1548         drbd_unplug_all_devices(connection);
 1549 
 1550     return done;
 1551 }
 1552 
 1553 static void conn_wait_ee_empty(struct drbd_connection *connection, struct list_head *head)
 1554 {
 1555     wait_event(connection->ee_wait, conn_wait_ee_cond(connection, head));
 1556 }
 1557 
 1558 static int peer_request_fault_type(struct drbd_peer_request *peer_req)
 1559 {
 1560     if (peer_req_op(peer_req) == REQ_OP_READ) {
 1561         return peer_req->flags & EE_APPLICATION ?
 1562             DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD;
 1563     } else {
 1564         return peer_req->flags & EE_APPLICATION ?
 1565             DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR;
 1566     }
 1567 }
 1568 
 1569 /**
 1570  * drbd_submit_peer_request()
 1571  * @peer_req:   peer request
 1572  *
 1573  * May spread the pages to multiple bios,
 1574  * depending on bio_add_page restrictions.
 1575  *
 1576  * Returns 0 if all bios have been submitted,
 1577  * -ENOMEM if we could not allocate enough bios,
 1578  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
 1579  *  single page to an empty bio (which should never happen and likely indicates
 1580  *  that the lower level IO stack is in some way broken). This has been observed
 1581  *  on certain Xen deployments.
 1582  *
 1583  *  When this function returns 0, it "consumes" an ldev reference; the
 1584  *  reference is released when the request completes.
 1585  */
 1586 /* TODO allocate from our own bio_set. */
 1587 int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
 1588 {
 1589     struct drbd_device *device = peer_req->peer_device->device;
 1590     struct bio *bios = NULL;
 1591     struct bio *bio;
 1592     struct page *page = peer_req->page_chain.head;
 1593     sector_t sector = peer_req->i.sector;
 1594     unsigned data_size = peer_req->i.size;
 1595     unsigned n_bios = 0;
 1596     unsigned nr_pages = peer_req->page_chain.nr_pages;
 1597     int err = -ENOMEM;
 1598 
 1599     if (peer_req->flags & EE_SET_OUT_OF_SYNC)
 1600         drbd_set_out_of_sync(peer_req->peer_device,
 1601                 peer_req->i.sector, peer_req->i.size);
 1602 
 1603     /* TRIM/DISCARD: for now, always use the helper function
 1604      * blkdev_issue_zeroout(..., discard=true).
 1605      * It's synchronous, but it does the right thing wrt. bio splitting.
 1606      * Correctness first, performance later.  Next step is to code an
 1607      * asynchronous variant of the same.
 1608      */
 1609     if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
 1610         peer_req->submit_jif = jiffies;
 1611         peer_req->flags |= EE_SUBMITTED;
 1612 
 1613         if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
 1614             drbd_issue_peer_discard_or_zero_out(device, peer_req);
 1615         else /* EE_WRITE_SAME */
 1616             drbd_issue_peer_wsame(device, peer_req);
 1617         return 0;
 1618     }
 1619 
 1620     /* In most cases, we will only need one bio.  But in case the lower
 1621      * level restrictions happen to be different at this offset on this
 1622      * side than those of the sending peer, we may need to submit the
 1623      * request in more than one bio.
 1624      *
 1625      * Plain bio_alloc is good enough here, this is no DRBD internally
 1626      * generated bio, but a bio allocated on behalf of the peer.
 1627      */
 1628 next_bio:
 1629     /* REQ_OP_WRITE_SAME, _DISCARD, _WRITE_ZEROES handled above.
 1630      * REQ_OP_FLUSH (empty flush) not expected,
 1631      * should have been mapped to a "drbd protocol barrier".
 1632      * REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
 1633      */
 1634     if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
 1635                 peer_req_op(peer_req) == REQ_OP_READ)) {
 1636         drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf);
 1637         err = -EINVAL;
 1638         goto fail;
 1639     }
 1640 
 1641     bio = bio_alloc(GFP_NOIO, nr_pages);
 1642     if (!bio) {
 1643         drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
 1644         goto fail;
 1645     }
 1646     /* > peer_req->i.sector, unless this is the first bio */
 1647     bio->bi_iter.bi_sector = sector;
 1648     bio_set_dev(bio, device->ldev->backing_bdev);
 1649     /* we special case some flags in the multi-bio case, see below
 1650      * (REQ_PREFLUSH, or BIO_RW_BARRIER in older kernels) */
 1651     bio->bi_opf = peer_req->opf;
 1652     bio->bi_private = peer_req;
 1653     bio->bi_end_io = drbd_peer_request_endio;
 1654 
 1655     bio->bi_next = bios;
 1656     bios = bio;
 1657     ++n_bios;
 1658 
 1659     page_chain_for_each(page) {
 1660         unsigned off, len;
 1661         int res;
 1662 
 1663         if (peer_req_op(peer_req) == REQ_OP_READ) {
 1664             set_page_chain_offset(page, 0);
 1665             set_page_chain_size(page, min_t(unsigned, data_size, PAGE_SIZE));
 1666         }
 1667         off = page_chain_offset(page);
 1668         len = page_chain_size(page);
 1669 
 1670         if (off > PAGE_SIZE || len > PAGE_SIZE - off || len > data_size || len == 0) {
 1671             drbd_err(device, "invalid page chain: offset %u size %u remaining data_size %u\n",
 1672                     off, len, data_size);
 1673             err = -EINVAL;
 1674             goto fail;
 1675         }
 1676 
 1677         res = bio_add_page(bio, page, len, off);
 1678         if (res <= 0) {
 1679             /* A single page must always be possible!
 1680              * But in case it fails anyways,
 1681              * we deal with it, and complain (below). */
 1682             if (bio->bi_vcnt == 0) {
 1683                 drbd_err(device,
 1684                     "bio_add_page(%p, %p, %u, %u): %d (bi_vcnt %u bi_max_vecs %u bi_sector %llu, bi_flags 0x%lx)\n",
 1685                     bio, page, len, off, res, bio->bi_vcnt, bio->bi_max_vecs, (uint64_t)bio->bi_iter.bi_sector,
 1686                      (unsigned long)bio->bi_flags);
 1687                 err = -ENOSPC;
 1688                 goto fail;
 1689             }
 1690             goto next_bio;
 1691         }
 1692         data_size -= len;
 1693         sector += len >> 9;
 1694         --nr_pages;
 1695     }
 1696     D_ASSERT(device, data_size == 0);
 1697     D_ASSERT(device, page == NULL);
 1698 
 1699     atomic_set(&peer_req->pending_bios, n_bios);
 1700     /* for debugfs: update timestamp, mark as submitted */
 1701     peer_req->submit_jif = jiffies;
 1702     peer_req->flags |= EE_SUBMITTED;
 1703     do {
 1704         bio = bios;
 1705         bios = bios->bi_next;
 1706         bio->bi_next = NULL;
 1707 
 1708         drbd_generic_make_request(device, peer_request_fault_type(peer_req), bio);
 1709 
 1710         /* strip off REQ_PREFLUSH,
 1711          * unless it is the first or last bio */
 1712         if (bios && bios->bi_next)
 1713             bios->bi_opf &= ~REQ_PREFLUSH;
 1714     } while (bios);
 1715     return 0;
 1716 
 1717 fail:
 1718     while (bios) {
 1719         bio = bios;
 1720         bios = bios->bi_next;
 1721         bio_put(bio);
 1722     }
 1723     return err;
 1724 }
 1725 
 1726 static void drbd_remove_peer_req_interval(struct drbd_device *device,
 1727                       struct drbd_peer_request *peer_req)
 1728 {
 1729     struct drbd_interval *i = &peer_req->i;
 1730 
 1731     drbd_remove_interval(&device->write_requests, i);
 1732     drbd_clear_interval(i);
 1733     peer_req->flags &= ~EE_IN_INTERVAL_TREE;
 1734 
 1735     /* Wake up any processes waiting for this peer request to complete.  */
 1736     if (i->waiting)
 1737         wake_up(&device->misc_wait);
 1738 }
 1739 
 1740 /**
 1741  * w_e_reissue() - Worker callback; Resubmit a bio
 1742  * @device: DRBD device.
 1743  * @dw:     work object.
 1744  * @cancel: The connection will be closed anyways (unused in this callback)
 1745  */
 1746 int w_e_reissue(struct drbd_work *w, int cancel) __releases(local)
 1747 {
 1748     struct drbd_peer_request *peer_req =
 1749         container_of(w, struct drbd_peer_request, w);
 1750     struct drbd_peer_device *peer_device = peer_req->peer_device;
 1751     struct drbd_device *device = peer_device->device;
 1752     int err;
 1753     /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
 1754        (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
 1755        so that we can finish that epoch in drbd_may_finish_epoch().
 1756        That is necessary if we already have a long chain of Epochs, before
 1757        we realize that BARRIER is actually not supported */
 1758 
 1759     /* As long as the -ENOTSUPP on the barrier is reported immediately
 1760        that will never trigger. If it is reported late, we will just
 1761        print that warning and continue correctly for all future requests
 1762        with WO_BDEV_FLUSH */
 1763     if (previous_epoch(peer_device->connection, peer_req->epoch))
 1764         drbd_warn(device, "Write ordering was not enforced (one time event)\n");
 1765 
 1766     /* we still have a local reference,
 1767      * get_ldev was done in receive_Data. */
 1768 
 1769     peer_req->w.cb = e_end_block;
 1770     err = drbd_submit_peer_request(peer_req);
 1771     switch (err) {
 1772     case -ENOMEM:
 1773         peer_req->w.cb = w_e_reissue;
 1774         drbd_queue_work(&peer_device->connection->sender_work,
 1775                 &peer_req->w);
 1776         /* retry later */
 1777         /* Fall through */
 1778     case 0:
 1779         /* keep worker happy and connection up */
 1780         return 0;
 1781 
 1782     case -ENOSPC:
 1783         /* no other error expected, but anyways: */
 1784     default:
 1785         /* forget the object,
 1786          * and cause a "Network failure" */
 1787         spin_lock_irq(&device->resource->req_lock);
 1788         list_del(&peer_req->w.list);
 1789         drbd_remove_peer_req_interval(device, peer_req);
 1790         spin_unlock_irq(&device->resource->req_lock);
 1791         drbd_al_complete_io(device, &peer_req->i);
 1792         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
 1793         drbd_free_peer_req(peer_req);
 1794         drbd_err(device, "submit failed, triggering re-connect\n");
 1795         return err;
 1796     }
 1797 }
 1798 
 1799 static void conn_wait_done_ee_empty_or_disconnect(struct drbd_connection *connection)
 1800 {
 1801     wait_event(connection->ee_wait,
 1802         atomic_read(&connection->done_ee_cnt) == 0
 1803         || connection->cstate[NOW] < C_CONNECTED);
 1804 }
 1805 
 1806 static void conn_wait_active_ee_empty_or_disconnect(struct drbd_connection *connection)
 1807 {
 1808     if (atomic_read(&connection->active_ee_cnt) == 0)
 1809         return;
 1810 
 1811     drbd_unplug_all_devices(connection);
 1812 
 1813     wait_event(connection->ee_wait,
 1814         atomic_read(&connection->active_ee_cnt) == 0
 1815         || connection->cstate[NOW] < C_CONNECTED);
 1816 }
 1817 
 1818 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
 1819 {
 1820     struct drbd_transport_ops *tr_ops = connection->transport.ops;
 1821     int rv, issue_flush;
 1822     struct p_barrier *p = pi->data;
 1823     struct drbd_epoch *epoch;
 1824 
 1825     tr_ops->hint(&connection->transport, DATA_STREAM, QUICKACK);
 1826     drbd_unplug_all_devices(connection);
 1827 
 1828     /* FIXME these are unacked on connection,
 1829      * not a specific (peer)device.
 1830      */
 1831     connection->current_epoch->barrier_nr = p->barrier;
 1832     connection->current_epoch->connection = connection;
 1833     rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
 1834 
 1835     /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
 1836      * the activity log, which means it would not be resynced in case the
 1837      * R_PRIMARY crashes now.
 1838      * Therefore we must send the barrier_ack after the barrier request was
 1839      * completed. */
 1840     switch (connection->resource->write_ordering) {
 1841     case WO_BIO_BARRIER:
 1842     case WO_NONE:
 1843         if (rv == FE_RECYCLED)
 1844             return 0;
 1845         break;
 1846 
 1847     case WO_BDEV_FLUSH:
 1848     case WO_DRAIN_IO:
 1849         if (rv == FE_STILL_LIVE) {
 1850             set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &connection->current_epoch->flags);
 1851             conn_wait_active_ee_empty_or_disconnect(connection);
 1852             rv = drbd_flush_after_epoch(connection, connection->current_epoch);
 1853         }
 1854         if (rv == FE_RECYCLED)
 1855             return 0;
 1856 
 1857         /* The ack_sender will send all the ACKs and barrier ACKs out, since
 1858            all EEs moved from the active_ee to the done_ee. We need to
 1859            provide a new epoch object for the EEs that come in soon */
 1860         break;
 1861     }
 1862 
 1863     /* receiver context, in the writeout path of the other node.
 1864      * avoid potential distributed deadlock */
 1865     epoch = kzalloc(sizeof(struct drbd_epoch), GFP_NOIO);
 1866     if (!epoch) {
 1867         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
 1868         issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &connection->current_epoch->flags);
 1869         conn_wait_active_ee_empty_or_disconnect(connection);
 1870         if (issue_flush) {
 1871             rv = drbd_flush_after_epoch(connection, connection->current_epoch);
 1872             if (rv == FE_RECYCLED)
 1873                 return 0;
 1874         }
 1875 
 1876         conn_wait_done_ee_empty_or_disconnect(connection);
 1877 
 1878         return 0;
 1879     }
 1880 
 1881     spin_lock(&connection->epoch_lock);
 1882     if (atomic_read(&connection->current_epoch->epoch_size)) {
 1883         list_add(&epoch->list, &connection->current_epoch->list);
 1884         connection->current_epoch = epoch;
 1885         connection->epochs++;
 1886     } else {
 1887         /* The current_epoch got recycled while we allocated this one... */
 1888         kfree(epoch);
 1889     }
 1890     spin_unlock(&connection->epoch_lock);
 1891 
 1892     return 0;
 1893 }
 1894 
 1895 /* pi->data points into some recv buffer, which may be
 1896  * re-used/recycled/overwritten by the next receive operation.
 1897  * (read_in_block via recv_resync_read) */
 1898 static void p_req_detail_from_pi(struct drbd_connection *connection,
 1899         struct drbd_peer_request_details *d, struct packet_info *pi)
 1900 {
 1901     struct p_trim *p = pi->data;
 1902     bool is_trim_or_wsame = pi->cmd == P_TRIM || pi->cmd == P_WSAME || pi->cmd == P_ZEROES;
 1903     unsigned int digest_size =
 1904         pi->cmd != P_TRIM && connection->peer_integrity_tfm ?
 1905         crypto_shash_digestsize(connection->peer_integrity_tfm) : 0;
 1906 
 1907     d->sector = be64_to_cpu(p->p_data.sector);
 1908     d->block_id = p->p_data.block_id;
 1909     d->peer_seq = be64_to_cpu(p->p_data.seq_num);
 1910     d->dp_flags = be32_to_cpu(p->p_data.dp_flags);
 1911     d->length = pi->size;
 1912     d->bi_size = is_trim_or_wsame ? be32_to_cpu(p->size) : pi->size - digest_size;
 1913     d->digest_size = digest_size;
 1914 }
 1915 
 1916 /* used from receive_RSDataReply (recv_resync_read)
 1917  * and from receive_Data.
 1918  * data_size: actual payload ("data in")
 1919  *  for normal writes that is bi_size.
 1920  *  for discards, that is zero.
 1921  *  for write same, it is logical_block_size.
 1922  * both trim and write same have the bi_size ("data len to be affected")
 1923  * as extra argument in the packet header.
 1924  */
 1925 static struct drbd_peer_request *
 1926 read_in_block(struct drbd_peer_device *peer_device, struct drbd_peer_request_details *d) __must_hold(local)
 1927 {
 1928     struct drbd_device *device = peer_device->device;
 1929     const uint64_t capacity = drbd_get_capacity(device->this_bdev);
 1930     struct drbd_peer_request *peer_req;
 1931     int err;
 1932     void *dig_in = peer_device->connection->int_dig_in;
 1933     void *dig_vv = peer_device->connection->int_dig_vv;
 1934     struct drbd_transport *transport = &peer_device->connection->transport;
 1935     struct drbd_transport_ops *tr_ops = transport->ops;
 1936 
 1937     if (d->digest_size) {
 1938         err = drbd_recv_into(peer_device->connection, dig_in, d->digest_size);
 1939         if (err)
 1940             return NULL;
 1941     }
 1942 
 1943     if (!expect(peer_device, IS_ALIGNED(d->bi_size, 512)))
 1944         return NULL;
 1945     if (d->dp_flags & (DP_WSAME|DP_DISCARD|DP_ZEROES)) {
 1946         if (!expect(peer_device, d->bi_size <= (DRBD_MAX_BBIO_SECTORS << 9)))
 1947             return NULL;
 1948     } else if (!expect(peer_device, d->bi_size <= DRBD_MAX_BIO_SIZE))
 1949         return NULL;
 1950 
 1951     /* even though we trust our peer,
 1952      * we sometimes have to double check. */
 1953     if (d->sector + (d->bi_size>>9) > capacity) {
 1954         drbd_err(device, "request from peer beyond end of local disk: "
 1955             "capacity: %llus < sector: %llus + size: %u\n",
 1956             capacity, d->sector, d->bi_size);
 1957         return NULL;
 1958     }
 1959 
 1960     peer_req = drbd_alloc_peer_req(peer_device, GFP_TRY);
 1961     if (!peer_req)
 1962         return NULL;
 1963     peer_req->i.size = d->bi_size; /* storage size */
 1964     peer_req->i.sector = d->sector;
 1965     peer_req->block_id = d->block_id;
 1966 
 1967     peer_req->flags |= EE_WRITE;
 1968     if (d->length == 0)
 1969         return peer_req;
 1970 
 1971     err = tr_ops->recv_pages(transport, &peer_req->page_chain, d->length - d->digest_size);
 1972     if (err)
 1973         goto fail;
 1974 
 1975     if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
 1976         struct page *page;
 1977         unsigned long *data;
 1978         drbd_err(device, "Fault injection: Corrupting data on receive, sector %llu\n",
 1979                 d->sector);
 1980         page = peer_req->page_chain.head;
 1981         data = kmap(page) + page_chain_offset(page);
 1982         data[0] = ~data[0];
 1983         kunmap(page);
 1984     }
 1985 
 1986     if (d->digest_size) {
 1987         drbd_csum_pages(peer_device->connection->peer_integrity_tfm, peer_req->page_chain.head, dig_vv);
 1988         if (memcmp(dig_in, dig_vv, d->digest_size)) {
 1989             drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
 1990                 d->sector, d->bi_size);
 1991             goto fail;
 1992         }
 1993     }
 1994     peer_device->recv_cnt += d->bi_size >> 9;
 1995     return peer_req;
 1996 
 1997 fail:
 1998     drbd_free_peer_req(peer_req);
 1999     return NULL;
 2000 }
 2001 
 2002 static int ignore_remaining_packet(struct drbd_connection *connection, int size)
 2003 {
 2004     void *data_to_ignore;
 2005 
 2006     while (size) {
 2007         int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
 2008         int rv = drbd_recv(connection, &data_to_ignore, s, 0);
 2009         if (rv < 0)
 2010             return rv;
 2011 
 2012         size -= rv;
 2013     }
 2014 
 2015     return 0;
 2016 }
 2017 
 2018 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
 2019                sector_t sector, int data_size)
 2020 {
 2021     struct bio_vec bvec;
 2022     struct bvec_iter iter;
 2023     struct bio *bio;
 2024     int digest_size, err, expect;
 2025     void *dig_in = peer_device->connection->int_dig_in;
 2026     void *dig_vv = peer_device->connection->int_dig_vv;
 2027 
 2028     digest_size = 0;
 2029     if (peer_device->connection->peer_integrity_tfm) {
 2030         digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
 2031         err = drbd_recv_into(peer_device->connection, dig_in, digest_size);
 2032         if (err)
 2033             return err;
 2034         data_size -= digest_size;
 2035     }
 2036 
 2037     /* optimistically update recv_cnt.  if receiving fails below,
 2038      * we disconnect anyways, and counters will be reset. */
 2039     peer_device->recv_cnt += data_size >> 9;
 2040 
 2041     bio = req->master_bio;
 2042     D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
 2043 
 2044     bio_for_each_segment(bvec, bio, iter) {
 2045         void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
 2046         expect = min_t(int, data_size, bvec.bv_len);
 2047         err = drbd_recv_into(peer_device->connection, mapped, expect);
 2048         kunmap(bvec.bv_page);
 2049         if (err)
 2050             return err;
 2051         data_size -= expect;
 2052     }
 2053 
 2054     if (digest_size) {
 2055         drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
 2056         if (memcmp(dig_in, dig_vv, digest_size)) {
 2057             drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
 2058             return -EINVAL;
 2059         }
 2060     }
 2061 
 2062     D_ASSERT(peer_device->device, data_size == 0);
 2063     return 0;
 2064 }
 2065 
 2066 /*
 2067  * e_end_resync_block() is called in ack_sender context via
 2068  * drbd_finish_peer_reqs().
 2069  */
 2070 static int e_end_resync_block(struct drbd_work *w, int unused)
 2071 {
 2072     struct drbd_peer_request *peer_req =
 2073         container_of(w, struct drbd_peer_request, w);
 2074     struct drbd_peer_device *peer_device = peer_req->peer_device;
 2075     struct drbd_device *device = peer_device->device;
 2076     sector_t sector = peer_req->i.sector;
 2077     int err;
 2078 
 2079     D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 2080 
 2081     if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
 2082         drbd_set_in_sync(peer_device, sector, peer_req->i.size);
 2083         err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
 2084     } else {
 2085         /* Record failure to sync */
 2086         drbd_rs_failed_io(peer_device, sector, peer_req->i.size);
 2087 
 2088         err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
 2089     }
 2090     dec_unacked(peer_device);
 2091 
 2092     return err;
 2093 }
 2094 
 2095 static int recv_resync_read(struct drbd_peer_device *peer_device,
 2096                 struct drbd_peer_request_details *d) __releases(local)
 2097 {
 2098     struct drbd_device *device = peer_device->device;
 2099     struct drbd_peer_request *peer_req;
 2100     int err;
 2101     u64 im;
 2102 
 2103     peer_req = read_in_block(peer_device, d);
 2104     if (!peer_req)
 2105         return -EIO;
 2106 
 2107     if (test_bit(UNSTABLE_RESYNC, &peer_device->flags))
 2108         clear_bit(STABLE_RESYNC, &device->flags);
 2109 
 2110     dec_rs_pending(peer_device);
 2111 
 2112     inc_unacked(peer_device);
 2113     /* corresponding dec_unacked() in e_end_resync_block()
 2114      * respective _drbd_clear_done_ee */
 2115 
 2116     peer_req->w.cb = e_end_resync_block;
 2117     peer_req->opf = REQ_OP_WRITE;
 2118     peer_req->submit_jif = jiffies;
 2119 
 2120     spin_lock_irq(&device->resource->req_lock);
 2121     list_add_tail(&peer_req->w.list, &peer_device->connection->sync_ee);
 2122     spin_unlock_irq(&device->resource->req_lock);
 2123 
 2124     atomic_add(d->bi_size >> 9, &device->rs_sect_ev);
 2125 
 2126     /* Setting all peer out of sync here. Sync source peer will be set
 2127        in sync when the write completes. Other peers will be set in
 2128        sync by the sync source with a P_PEERS_IN_SYNC packet soon. */
 2129     drbd_set_all_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
 2130 
 2131     err = drbd_submit_peer_request(peer_req);
 2132     if (err)
 2133         goto out;
 2134 
 2135     for_each_peer_device_ref(peer_device, im, device) {
 2136         enum drbd_repl_state repl_state = peer_device->repl_state[NOW];
 2137         if (repl_state == L_SYNC_SOURCE || repl_state == L_PAUSED_SYNC_S)
 2138             drbd_send_out_of_sync(peer_device, &peer_req->i);
 2139     }
 2140     return 0;
 2141 out:
 2142     /* don't care for the reason here */
 2143     drbd_err(device, "submit failed, triggering re-connect\n");
 2144     spin_lock_irq(&device->resource->req_lock);
 2145     list_del(&peer_req->w.list);
 2146     spin_unlock_irq(&device->resource->req_lock);
 2147 
 2148     drbd_free_peer_req(peer_req);
 2149     return err;
 2150 }
 2151 
 2152 static struct drbd_request *
 2153 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
 2154          sector_t sector, bool missing_ok, const char *func)
 2155 {
 2156     struct drbd_request *req;
 2157 
 2158     /* Request object according to our peer */
 2159     req = (struct drbd_request *)(unsigned long)id;
 2160     if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
 2161         return req;
 2162     if (!missing_ok) {
 2163         drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
 2164             (unsigned long)id, (unsigned long long)sector);
 2165     }
 2166     return NULL;
 2167 }
 2168 
 2169 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
 2170 {
 2171     struct drbd_peer_device *peer_device;
 2172     struct drbd_device *device;
 2173     struct drbd_request *req;
 2174     sector_t sector;
 2175     int err;
 2176     struct p_data *p = pi->data;
 2177 
 2178     peer_device = conn_peer_device(connection, pi->vnr);
 2179     if (!peer_device)
 2180         return -EIO;
 2181     device = peer_device->device;
 2182 
 2183     sector = be64_to_cpu(p->sector);
 2184 
 2185     spin_lock_irq(&device->resource->req_lock);
 2186     req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
 2187     spin_unlock_irq(&device->resource->req_lock);
 2188     if (unlikely(!req))
 2189         return -EIO;
 2190 
 2191     err = recv_dless_read(peer_device, req, sector, pi->size);
 2192     if (!err)
 2193         req_mod(req, DATA_RECEIVED, peer_device);
 2194     /* else: nothing. handled from drbd_disconnect...
 2195      * I don't think we may complete this just yet
 2196      * in case we are "on-disconnect: freeze" */
 2197 
 2198     return err;
 2199 }
 2200 
 2201 /**
 2202  * _drbd_send_ack() - Sends an ack packet
 2203  * @device: DRBD device.
 2204  * @cmd:    Packet command code.
 2205  * @sector: sector, needs to be in big endian byte order
 2206  * @blksize:    size in byte, needs to be in big endian byte order
 2207  * @block_id:   Id, big endian byte order
 2208  */
 2209 static int _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
 2210               u64 sector, u32 blksize, u64 block_id)
 2211 {
 2212     struct p_block_ack *p;
 2213 
 2214     if (peer_device->repl_state[NOW] < L_ESTABLISHED)
 2215         return -EIO;
 2216 
 2217     p = drbd_prepare_command(peer_device, sizeof(*p), CONTROL_STREAM);
 2218     if (!p)
 2219         return -EIO;
 2220     p->sector = sector;
 2221     p->block_id = block_id;
 2222     p->blksize = blksize;
 2223     p->seq_num = cpu_to_be32(atomic_inc_return(&peer_device->packet_seq));
 2224     return drbd_send_command(peer_device, cmd, CONTROL_STREAM);
 2225 }
 2226 
 2227 static int drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
 2228           struct drbd_peer_request_details *d)
 2229 {
 2230     return _drbd_send_ack(peer_device, cmd,
 2231                   cpu_to_be64(d->sector),
 2232                   cpu_to_be32(d->bi_size),
 2233                   d->block_id);
 2234 }
 2235 
 2236 static void drbd_send_ack_rp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
 2237               struct p_block_req *rp)
 2238 {
 2239     _drbd_send_ack(peer_device, cmd, rp->sector, rp->blksize, rp->block_id);
 2240 }
 2241 
 2242 /**
 2243  * drbd_send_ack() - Sends an ack packet
 2244  * @device: DRBD device
 2245  * @cmd:    packet command code
 2246  * @peer_req:   peer request
 2247  */
 2248 int drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
 2249           struct drbd_peer_request *peer_req)
 2250 {
 2251     return _drbd_send_ack(peer_device, cmd,
 2252                   cpu_to_be64(peer_req->i.sector),
 2253                   cpu_to_be32(peer_req->i.size),
 2254                   peer_req->block_id);
 2255 }
 2256 
 2257 /* This function misuses the block_id field to signal if the blocks
 2258  * are is sync or not. */
 2259 int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
 2260              sector_t sector, int blksize, u64 block_id)
 2261 {
 2262     return _drbd_send_ack(peer_device, cmd,
 2263                   cpu_to_be64(sector),
 2264                   cpu_to_be32(blksize),
 2265                   cpu_to_be64(block_id));
 2266 }
 2267 
 2268 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
 2269 {
 2270     struct drbd_peer_request_details d;
 2271     struct drbd_peer_device *peer_device;
 2272     struct drbd_device *device;
 2273     int err;
 2274 
 2275     p_req_detail_from_pi(connection, &d, pi);
 2276     pi->data = NULL;
 2277 
 2278     peer_device = conn_peer_device(connection, pi->vnr);
 2279     if (!peer_device)
 2280         return -EIO;
 2281     device = peer_device->device;
 2282 
 2283     D_ASSERT(device, d.block_id == ID_SYNCER);
 2284 
 2285     if (get_ldev(device)) {
 2286         err = recv_resync_read(peer_device, &d);
 2287         if (err)
 2288             put_ldev(device);
 2289     } else {
 2290         if (drbd_ratelimit())
 2291             drbd_err(device, "Can not write resync data to local disk.\n");
 2292 
 2293         err = ignore_remaining_packet(connection, pi->size);
 2294 
 2295         drbd_send_ack_dp(peer_device, P_NEG_ACK, &d);
 2296     }
 2297 
 2298     rs_sectors_came_in(peer_device, d.bi_size);
 2299 
 2300     return err;
 2301 }
 2302 
 2303 static void restart_conflicting_writes(struct drbd_peer_request *peer_req)
 2304 {
 2305     struct drbd_interval *i;
 2306     struct drbd_request *req;
 2307     struct drbd_device *device = peer_req->peer_device->device;
 2308     const sector_t sector = peer_req->i.sector;
 2309     const unsigned int size = peer_req->i.size;
 2310 
 2311     drbd_for_each_overlap(i, &device->write_requests, sector, size) {
 2312         if (!i->local)
 2313             continue;
 2314         req = container_of(i, struct drbd_request, i);
 2315         if ((req->local_rq_state & RQ_LOCAL_PENDING) ||
 2316            !(req->local_rq_state & RQ_POSTPONED))
 2317             continue;
 2318         /* as it is RQ_POSTPONED, this will cause it to
 2319          * be queued on the retry workqueue. */
 2320         __req_mod(req, DISCARD_WRITE, peer_req->peer_device, NULL);
 2321     }
 2322 }
 2323 
 2324 /*
 2325  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
 2326  */
 2327 static int e_end_block(struct drbd_work *w, int cancel)
 2328 {
 2329     struct drbd_peer_request *peer_req =
 2330         container_of(w, struct drbd_peer_request, w);
 2331     struct drbd_peer_device *peer_device = peer_req->peer_device;
 2332     struct drbd_device *device = peer_device->device;
 2333     sector_t sector = peer_req->i.sector;
 2334     struct drbd_epoch *epoch;
 2335     int err = 0, pcmd;
 2336 
 2337     if (peer_req->flags & EE_IS_BARRIER) {
 2338         epoch = previous_epoch(peer_device->connection, peer_req->epoch);
 2339         if (epoch)
 2340             drbd_may_finish_epoch(peer_device->connection, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
 2341     }
 2342 
 2343     if (peer_req->flags & EE_SEND_WRITE_ACK) {
 2344         if (unlikely(peer_req->flags & EE_WAS_ERROR)) {
 2345             pcmd = P_NEG_ACK;
 2346             /* we expect it to be marked out of sync anyways...
 2347              * maybe assert this?  */
 2348         } else if (peer_device->repl_state[NOW] >= L_SYNC_SOURCE &&
 2349                peer_device->repl_state[NOW] <= L_PAUSED_SYNC_T &&
 2350                peer_req->flags & EE_MAY_SET_IN_SYNC) {
 2351             pcmd = P_RS_WRITE_ACK;
 2352             drbd_set_in_sync(peer_device, sector, peer_req->i.size);
 2353         } else
 2354             pcmd = P_WRITE_ACK;
 2355         err = drbd_send_ack(peer_device, pcmd, peer_req);
 2356         dec_unacked(peer_device);
 2357     }
 2358 
 2359     /* we delete from the conflict detection hash _after_ we sent out the
 2360      * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
 2361     if (peer_req->flags & EE_IN_INTERVAL_TREE) {
 2362         spin_lock_irq(&device->resource->req_lock);
 2363         D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
 2364         drbd_remove_peer_req_interval(device, peer_req);
 2365         if (peer_req->flags & EE_RESTART_REQUESTS)
 2366             restart_conflicting_writes(peer_req);
 2367         spin_unlock_irq(&device->resource->req_lock);
 2368     } else
 2369         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 2370 
 2371     drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
 2372 
 2373     return err;
 2374 }
 2375 
 2376 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
 2377 {
 2378     struct drbd_peer_request *peer_req =
 2379         container_of(w, struct drbd_peer_request, w);
 2380     struct drbd_peer_device *peer_device = peer_req->peer_device;
 2381     int err;
 2382 
 2383     err = drbd_send_ack(peer_device, ack, peer_req);
 2384     dec_unacked(peer_device);
 2385 
 2386     return err;
 2387 }
 2388 
 2389 static int e_send_discard_write(struct drbd_work *w, int unused)
 2390 {
 2391     return e_send_ack(w, P_SUPERSEDED);
 2392 }
 2393 
 2394 static int e_send_retry_write(struct drbd_work *w, int unused)
 2395 {
 2396 
 2397     struct drbd_peer_request *peer_request =
 2398         container_of(w, struct drbd_peer_request, w);
 2399     struct drbd_connection *connection = peer_request->peer_device->connection;
 2400 
 2401     return e_send_ack(w, connection->agreed_pro_version >= 100 ?
 2402                  P_RETRY_WRITE : P_SUPERSEDED);
 2403 }
 2404 
 2405 static bool seq_greater(u32 a, u32 b)
 2406 {
 2407     /*
 2408      * We assume 32-bit wrap-around here.
 2409      * For 24-bit wrap-around, we would have to shift:
 2410      *  a <<= 8; b <<= 8;
 2411      */
 2412     return (s32)a - (s32)b > 0;
 2413 }
 2414 
 2415 static u32 seq_max(u32 a, u32 b)
 2416 {
 2417     return seq_greater(a, b) ? a : b;
 2418 }
 2419 
 2420 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
 2421 {
 2422     unsigned int newest_peer_seq;
 2423 
 2424     if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->transport.flags)) {
 2425         spin_lock(&peer_device->peer_seq_lock);
 2426         newest_peer_seq = seq_max(peer_device->peer_seq, peer_seq);
 2427         peer_device->peer_seq = newest_peer_seq;
 2428         spin_unlock(&peer_device->peer_seq_lock);
 2429         /* wake up only if we actually changed peer_device->peer_seq */
 2430         if (peer_seq == newest_peer_seq)
 2431             wake_up(&peer_device->device->seq_wait);
 2432     }
 2433 }
 2434 
 2435 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
 2436 {
 2437     return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
 2438 }
 2439 
 2440 /* maybe change sync_ee into interval trees as well? */
 2441 static bool overlapping_resync_write(struct drbd_connection *connection, struct drbd_peer_request *peer_req)
 2442 {
 2443     struct drbd_peer_request *rs_req;
 2444     bool rv = false;
 2445 
 2446     /* Now only called in the fallback compatibility path, when the peer is
 2447      * DRBD version 8, which also means it is the only peer.
 2448      * If we wanted to use this in a scenario where we could potentially
 2449      * have in-flight resync writes from multiple peers, we'd need to
 2450      * iterate over all connections.
 2451      * Fortunately we don't have to, because we have now mutually excluded
 2452      * resync and application activity on a particular region using
 2453      * device->act_log and peer_device->resync_lru.
 2454      */
 2455     spin_lock_irq(&connection->resource->req_lock);
 2456     list_for_each_entry(rs_req, &connection->sync_ee, w.list) {
 2457         if (rs_req->peer_device != peer_req->peer_device)
 2458             continue;
 2459         if (overlaps(peer_req->i.sector, peer_req->i.size,
 2460                  rs_req->i.sector, rs_req->i.size)) {
 2461             rv = true;
 2462             break;
 2463         }
 2464     }
 2465     spin_unlock_irq(&connection->resource->req_lock);
 2466 
 2467     return rv;
 2468 }
 2469 
 2470 /* Called from receive_Data.
 2471  * Synchronize packets on sock with packets on msock.
 2472  *
 2473  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
 2474  * packet traveling on msock, they are still processed in the order they have
 2475  * been sent.
 2476  *
 2477  * Note: we don't care for Ack packets overtaking P_DATA packets.
 2478  *
 2479  * In case packet_seq is larger than peer_device->peer_seq number, there are
 2480  * outstanding packets on the msock. We wait for them to arrive.
 2481  * In case we are the logically next packet, we update peer_device->peer_seq
 2482  * ourselves. Correctly handles 32bit wrap around.
 2483  *
 2484  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
 2485  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
 2486  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
 2487  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
 2488  *
 2489  * returns 0 if we may process the packet,
 2490  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
 2491 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
 2492 {
 2493     struct drbd_connection *connection = peer_device->connection;
 2494     DEFINE_WAIT(wait);
 2495     long timeout;
 2496     int ret = 0, tp;
 2497 
 2498     if (!test_bit(RESOLVE_CONFLICTS, &connection->transport.flags))
 2499         return 0;
 2500 
 2501     spin_lock(&peer_device->peer_seq_lock);
 2502     for (;;) {
 2503         if (!seq_greater(peer_seq - 1, peer_device->peer_seq)) {
 2504             peer_device->peer_seq = seq_max(peer_device->peer_seq, peer_seq);
 2505             break;
 2506         }
 2507 
 2508         if (signal_pending(current)) {
 2509             ret = -ERESTARTSYS;
 2510             break;
 2511         }
 2512 
 2513         rcu_read_lock();
 2514         tp = rcu_dereference(connection->transport.net_conf)->two_primaries;
 2515         rcu_read_unlock();
 2516 
 2517         if (!tp)
 2518             break;
 2519 
 2520         /* Only need to wait if two_primaries is enabled */
 2521         prepare_to_wait(&peer_device->device->seq_wait, &wait, TASK_INTERRUPTIBLE);
 2522         spin_unlock(&peer_device->peer_seq_lock);
 2523         rcu_read_lock();
 2524         timeout = rcu_dereference(connection->transport.net_conf)->ping_timeo*HZ/10;
 2525         rcu_read_unlock();
 2526         timeout = schedule_timeout(timeout);
 2527         spin_lock(&peer_device->peer_seq_lock);
 2528         if (!timeout) {
 2529             ret = -ETIMEDOUT;
 2530             drbd_err(peer_device, "Timed out waiting for missing ack packets; disconnecting\n");
 2531             break;
 2532         }
 2533     }
 2534     spin_unlock(&peer_device->peer_seq_lock);
 2535     finish_wait(&peer_device->device->seq_wait, &wait);
 2536     return ret;
 2537 }
 2538 
 2539 static unsigned long wire_flags_to_bio_op(u32 dpf)
 2540 {
 2541     if (dpf & DP_ZEROES)
 2542         return REQ_OP_WRITE_ZEROES;
 2543     if (dpf & DP_DISCARD)
 2544         return REQ_OP_DISCARD;
 2545     if (dpf & DP_WSAME)
 2546         return REQ_OP_WRITE_SAME;
 2547     else
 2548         return REQ_OP_WRITE;
 2549 }
 2550 
 2551 /* see also bio_flags_to_wire() */
 2552 static unsigned long wire_flags_to_bio(struct drbd_connection *connection, u32 dpf)
 2553 {
 2554     unsigned long opf = wire_flags_to_bio_op(dpf) |
 2555         (dpf & DP_RW_SYNC ? REQ_SYNC : 0);
 2556 
 2557     /* we used to communicate one bit only in older DRBD */
 2558     if (connection->agreed_pro_version >= 95)
 2559         opf |= (dpf & DP_FUA ? REQ_FUA : 0) |
 2560                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
 2561 
 2562     return opf;
 2563 }
 2564 
 2565 static void fail_postponed_requests(struct drbd_peer_request *peer_req)
 2566 {
 2567     struct drbd_device *device = peer_req->peer_device->device;
 2568     struct drbd_interval *i;
 2569     const sector_t sector = peer_req->i.sector;
 2570     const unsigned int size = peer_req->i.size;
 2571 
 2572     repeat:
 2573     drbd_for_each_overlap(i, &device->write_requests, sector, size) {
 2574         struct drbd_request *req;
 2575         struct bio_and_error m;
 2576 
 2577         if (!i->local)
 2578             continue;
 2579         req = container_of(i, struct drbd_request, i);
 2580         if (!(req->local_rq_state & RQ_POSTPONED))
 2581             continue;
 2582         req->local_rq_state &= ~RQ_POSTPONED;
 2583         __req_mod(req, NEG_ACKED, peer_req->peer_device, &m);
 2584         spin_unlock_irq(&device->resource->req_lock);
 2585         if (m.bio)
 2586             complete_master_bio(device, &m);
 2587         spin_lock_irq(&device->resource->req_lock);
 2588         goto repeat;
 2589     }
 2590 }
 2591 
 2592 static int handle_write_conflicts(struct drbd_peer_request *peer_req)
 2593 {
 2594     struct drbd_peer_device *peer_device = peer_req->peer_device;
 2595     struct drbd_device *device = peer_device->device;
 2596     struct drbd_connection *connection = peer_device->connection;
 2597     bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->transport.flags);
 2598     sector_t sector = peer_req->i.sector;
 2599     const unsigned int size = peer_req->i.size;
 2600     struct drbd_interval *i;
 2601     bool equal;
 2602     int err;
 2603 
 2604     /*
 2605      * Inserting the peer request into the write_requests tree will prevent
 2606      * new conflicting local requests from being added.
 2607      */
 2608     drbd_insert_interval(&device->write_requests, &peer_req->i);
 2609     peer_req->flags |= EE_IN_INTERVAL_TREE;
 2610 
 2611     repeat:
 2612     drbd_for_each_overlap(i, &device->write_requests, sector, size) {
 2613         if (i == &peer_req->i)
 2614             continue;
 2615         if (i->completed)
 2616             continue;
 2617 
 2618         if (!i->local) {
 2619             /*
 2620              * Our peer has sent a conflicting remote request; this
 2621              * should not happen in a two-node setup.  Wait for the
 2622              * earlier peer request to complete.
 2623              */
 2624             err = drbd_wait_misc(device, peer_device, i);
 2625             if (err)
 2626                 goto out;
 2627             goto repeat;
 2628         }
 2629 
 2630         equal = i->sector == sector && i->size == size;
 2631         if (resolve_conflicts) {
 2632             /*
 2633              * If the peer request is fully contained within the
 2634              * overlapping request, it can be discarded; otherwise,
 2635              * it will be retried once all overlapping requests
 2636              * have completed.
 2637              */
 2638             bool discard = i->sector <= sector && i->sector +
 2639                        (i->size >> 9) >= sector + (size >> 9);
 2640 
 2641             if (!equal)
 2642                 drbd_alert(device, "Concurrent writes detected: "
 2643                            "local=%llus +%u, remote=%llus +%u, "
 2644                            "assuming %s came first\n",
 2645                       (unsigned long long)i->sector, i->size,
 2646                       (unsigned long long)sector, size,
 2647                       discard ? "local" : "remote");
 2648 
 2649             peer_req->w.cb = discard ? e_send_discard_write :
 2650                            e_send_retry_write;
 2651             atomic_inc(&connection->done_ee_cnt);
 2652             list_add_tail(&peer_req->w.list, &connection->done_ee);
 2653             queue_work(connection->ack_sender, &connection->send_acks_work);
 2654 
 2655             err = -ENOENT;
 2656             goto out;
 2657         } else {
 2658             struct drbd_request *req =
 2659                 container_of(i, struct drbd_request, i);
 2660 
 2661             if (!equal)
 2662                 drbd_alert(device, "Concurrent writes detected: "
 2663                            "local=%llus +%u, remote=%llus +%u\n",
 2664                       (unsigned long long)i->sector, i->size,
 2665                       (unsigned long long)sector, size);
 2666 
 2667             if (req->local_rq_state & RQ_LOCAL_PENDING ||
 2668                 !(req->local_rq_state & RQ_POSTPONED)) {
 2669                 /*
 2670                  * Wait for the node with the discard flag to
 2671                  * decide if this request will be discarded or
 2672                  * retried.  Requests that are discarded will
 2673                  * disappear from the write_requests tree.
 2674                  *
 2675                  * In addition, wait for the conflicting
 2676                  * request to finish locally before submitting
 2677                  * the conflicting peer request.
 2678                  */
 2679                 err = drbd_wait_misc(device, NULL, &req->i);
 2680                 if (err) {
 2681                     begin_state_change_locked(connection->resource, CS_HARD);
 2682                     __change_cstate(connection, C_TIMEOUT);
 2683                     end_state_change_locked(connection->resource);
 2684                     fail_postponed_requests(peer_req);
 2685                     goto out;
 2686                 }
 2687                 goto repeat;
 2688             }
 2689             /*
 2690              * Remember to restart the conflicting requests after
 2691              * the new peer request has completed.
 2692              */
 2693             peer_req->flags |= EE_RESTART_REQUESTS;
 2694         }
 2695     }
 2696     err = 0;
 2697 
 2698     out:
 2699     if (err)
 2700         drbd_remove_peer_req_interval(device, peer_req);
 2701     return err;
 2702 }
 2703 
 2704 static void drbd_queue_peer_request(struct drbd_device *device, struct drbd_peer_request *peer_req)
 2705 {
 2706     atomic_inc(&device->wait_for_actlog);
 2707     spin_lock_irq(&device->resource->req_lock);
 2708     list_add_tail(&peer_req->wait_for_actlog, &device->submit.peer_writes);
 2709     spin_unlock_irq(&device->resource->req_lock);
 2710     queue_work(device->submit.wq, &device->submit.worker);
 2711     /* do_submit() may sleep internally on al_wait, too */
 2712     wake_up(&device->al_wait);
 2713 }
 2714 
 2715 /* FIXME
 2716  * TODO grab the device->al_lock *once*, and check:
 2717  *     if possible, non-blocking get the reference(s),
 2718  *     if transaction is required, queue them up,
 2719  *        AND account for the queued up worst-case slot consumption
 2720  *     if available slots, corrected by other accounting, suggest
 2721  *        that we might block on this now or later,
 2722  *        *FIRST* drain, then flush, then send P_CONFIRM_STABLE,
 2723  *        then wait for available slots to be sufficient.
 2724  */
 2725 static enum { DRBD_PAL_QUEUE, DRBD_PAL_DISCONNECTED, DRBD_PAL_SUBMIT }
 2726 prepare_activity_log(struct drbd_peer_request *peer_req)
 2727 {
 2728     struct drbd_peer_device *peer_device = peer_req->peer_device;
 2729     struct drbd_connection *connection = peer_device->connection;
 2730     struct drbd_device *device = peer_device->device;
 2731 
 2732     struct lru_cache *al;
 2733     int nr_al_extents = interval_to_al_extents(&peer_req->i);
 2734     int nr, used, ecnt;
 2735     int ret = DRBD_PAL_SUBMIT;
 2736 
 2737     /* In protocol < 110 (which is compat mode 8.4 <-> 9.0),
 2738      * we must not block in the activity log here, that would
 2739      * deadlock during an ongoing resync with the drbd_rs_begin_io
 2740      * we did when receiving the resync request.
 2741      *
 2742      * We still need to update the activity log, if ours is the
 2743      * only remaining disk, in which case there cannot be a resync,
 2744      * and the deadlock paths cannot be taken.
 2745      */
 2746     if (connection->agreed_pro_version < 110 &&
 2747         peer_device->disk_state[NOW] >= D_INCONSISTENT)
 2748         return DRBD_PAL_SUBMIT;
 2749 
 2750     /* Let the activity log know we are about to use it.
 2751      * See also drbd_request_prepare() for the "request" entry point. */
 2752     ecnt = atomic_add_return(nr_al_extents, &device->wait_for_actlog_ecnt);
 2753 
 2754     spin_lock_irq(&device->al_lock);
 2755     al = device->act_log;
 2756     nr = al->nr_elements;
 2757     used = al->used;
 2758     spin_unlock_irq(&device->al_lock);
 2759 
 2760     /* note: due to the slight delay between being accounted in "used" after
 2761      * being committed to the activity log with drbd_al_begin_io_commit(),
 2762      * and being subtracted from "wait_for_actlog_ecnt" in __drbd_submit_peer_request(),
 2763      * this can err, but only on the conservative side (overestimating ecnt). */
 2764     if (ecnt > nr - used) {
 2765         conn_wait_active_ee_empty_or_disconnect(connection);
 2766         drbd_flush_after_epoch(connection, NULL);
 2767         conn_wait_done_ee_empty_or_disconnect(connection);
 2768 
 2769         /* would this peer even understand me? */
 2770         if (connection->agreed_pro_version >= 114)
 2771             drbd_send_confirm_stable(peer_req);
 2772 
 2773         if  (drbd_al_begin_io_for_peer(peer_device, &peer_req->i))
 2774             ret = DRBD_PAL_DISCONNECTED;
 2775     } else if (nr_al_extents != 1 || !drbd_al_begin_io_fastpath(device, &peer_req->i)) {
 2776         ret = DRBD_PAL_QUEUE;
 2777     }
 2778     if (ret == DRBD_PAL_SUBMIT)
 2779         peer_req->flags |= EE_IN_ACTLOG;
 2780     if (ret != DRBD_PAL_QUEUE)
 2781         atomic_sub(nr_al_extents, &device->wait_for_actlog_ecnt);
 2782 
 2783     return ret;
 2784 }
 2785 
 2786 /* mirrored write */
 2787 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
 2788 {
 2789     struct drbd_peer_device *peer_device;
 2790     struct drbd_device *device;
 2791     struct net_conf *nc;
 2792     struct drbd_peer_request *peer_req;
 2793     struct drbd_peer_request_details d;
 2794     int err, tp;
 2795 
 2796     peer_device = conn_peer_device(connection, pi->vnr);
 2797     if (!peer_device)
 2798         return -EIO;
 2799     device = peer_device->device;
 2800 
 2801     if (pi->cmd == P_TRIM)
 2802         D_ASSERT(peer_device, pi->size == 0);
 2803 
 2804     p_req_detail_from_pi(connection, &d, pi);
 2805     pi->data = NULL;
 2806 
 2807     if (!get_ldev(device)) {
 2808         int err2;
 2809 
 2810         err = wait_for_and_update_peer_seq(peer_device, d.peer_seq);
 2811         drbd_send_ack_dp(peer_device, P_NEG_ACK, &d);
 2812         atomic_inc(&connection->current_epoch->epoch_size);
 2813         err2 = ignore_remaining_packet(connection, pi->size);
 2814         if (!err)
 2815             err = err2;
 2816         return err;
 2817     }
 2818 
 2819     /*
 2820      * Corresponding put_ldev done either below (on various errors), or in
 2821      * drbd_peer_request_endio, if we successfully submit the data at the
 2822      * end of this function.
 2823      */
 2824 
 2825     peer_req = read_in_block(peer_device, &d);
 2826     if (!peer_req) {
 2827         put_ldev(device);
 2828         return -EIO;
 2829     }
 2830     if (pi->cmd == P_TRIM)
 2831         peer_req->flags |= EE_TRIM;
 2832     else if (pi->cmd == P_ZEROES)
 2833         peer_req->flags |= EE_ZEROOUT;
 2834     else if (pi->cmd == P_WSAME)
 2835         peer_req->flags |= EE_WRITE_SAME;
 2836 
 2837     peer_req->dagtag_sector = connection->last_dagtag_sector + (peer_req->i.size >> 9);
 2838     connection->last_dagtag_sector = peer_req->dagtag_sector;
 2839 
 2840     peer_req->w.cb = e_end_block;
 2841     peer_req->submit_jif = jiffies;
 2842     peer_req->flags |= EE_APPLICATION;
 2843 
 2844     peer_req->opf = wire_flags_to_bio(connection, d.dp_flags);
 2845     if (pi->cmd == P_TRIM) {
 2846         D_ASSERT(peer_device, peer_req->i.size > 0);
 2847         D_ASSERT(peer_device, d.dp_flags & DP_DISCARD);
 2848         D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD);
 2849         D_ASSERT(peer_device, peer_req->page_chain.head == NULL);
 2850         D_ASSERT(peer_device, peer_req->page_chain.nr_pages == 0);
 2851         /* need to play safe: an older DRBD sender
 2852          * may mean zero-out while sending P_TRIM. */
 2853         if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
 2854             peer_req->flags |= EE_ZEROOUT;
 2855     } else if (pi->cmd == P_ZEROES) {
 2856         D_ASSERT(peer_device, peer_req->i.size > 0);
 2857         D_ASSERT(peer_device, d.dp_flags & DP_ZEROES);
 2858         D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES);
 2859         D_ASSERT(peer_device, peer_req->page_chain.head == NULL);
 2860         D_ASSERT(peer_device, peer_req->page_chain.nr_pages == 0);
 2861         /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
 2862         if (d.dp_flags & DP_DISCARD)
 2863             peer_req->flags |= EE_TRIM;
 2864     } else if (pi->cmd == P_WSAME) {
 2865         D_ASSERT(peer_device, peer_req->i.size > 0);
 2866         D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_SAME);
 2867         D_ASSERT(peer_device, peer_req->page_chain.head != NULL);
 2868     } else if (peer_req->page_chain.head == NULL) {
 2869         /* Actually, this must not happen anymore,
 2870          * "empty" flushes are mapped to P_BARRIER,
 2871          * and should never end up here.
 2872          * Compat with old DRBD? */
 2873         D_ASSERT(device, peer_req->i.size == 0);
 2874         D_ASSERT(device, d.dp_flags & DP_FLUSH);
 2875     } else {
 2876         D_ASSERT(peer_device, peer_req->i.size > 0);
 2877         D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE);
 2878     }
 2879 
 2880     if (d.dp_flags & DP_MAY_SET_IN_SYNC)
 2881         peer_req->flags |= EE_MAY_SET_IN_SYNC;
 2882 
 2883     spin_lock(&connection->epoch_lock);
 2884     peer_req->epoch = connection->current_epoch;
 2885     atomic_inc(&peer_req->epoch->epoch_size);
 2886     atomic_inc(&peer_req->epoch->active);
 2887     if (peer_req->epoch->oldest_unconfirmed_peer_req == NULL)
 2888         peer_req->epoch->oldest_unconfirmed_peer_req = peer_req;
 2889 
 2890     if (connection->resource->write_ordering == WO_BIO_BARRIER &&
 2891         atomic_read(&peer_req->epoch->epoch_size) == 1) {
 2892         struct drbd_epoch *epoch;
 2893         /* Issue a barrier if we start a new epoch, and the previous epoch
 2894            was not a epoch containing a single request which already was
 2895            a Barrier. */
 2896         epoch = list_entry(peer_req->epoch->list.prev, struct drbd_epoch, list);
 2897         if (epoch == peer_req->epoch) {
 2898             set_bit(DE_CONTAINS_A_BARRIER, &peer_req->epoch->flags);
 2899             peer_req->opf |= REQ_PREFLUSH | REQ_FUA;
 2900             peer_req->flags |= EE_IS_BARRIER;
 2901         } else {
 2902             if (atomic_read(&epoch->epoch_size) > 1 ||
 2903                 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
 2904                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
 2905                 set_bit(DE_CONTAINS_A_BARRIER, &peer_req->epoch->flags);
 2906                 peer_req->opf |= REQ_PREFLUSH | REQ_FUA;
 2907                 peer_req->flags |= EE_IS_BARRIER;
 2908             }
 2909         }
 2910     }
 2911     spin_unlock(&connection->epoch_lock);
 2912 
 2913     rcu_read_lock();
 2914     nc = rcu_dereference(connection->transport.net_conf);
 2915     tp = nc->two_primaries;
 2916     if (connection->agreed_pro_version < 100) {
 2917         switch (nc->wire_protocol) {
 2918         case DRBD_PROT_C:
 2919             d.dp_flags |= DP_SEND_WRITE_ACK;
 2920             break;
 2921         case DRBD_PROT_B:
 2922             d.dp_flags |= DP_SEND_RECEIVE_ACK;
 2923             break;
 2924         }
 2925     }
 2926     rcu_read_unlock();
 2927 
 2928     if (d.dp_flags & DP_SEND_WRITE_ACK) {
 2929         peer_req->flags |= EE_SEND_WRITE_ACK;
 2930         inc_unacked(peer_device);
 2931         /* corresponding dec_unacked() in e_end_block()
 2932          * respective _drbd_clear_done_ee */
 2933     }
 2934 
 2935     if (d.dp_flags & DP_SEND_RECEIVE_ACK) {
 2936         /* I really don't like it that the receiver thread
 2937          * sends on the msock, but anyways */
 2938         drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
 2939     }
 2940 
 2941     if (tp) {
 2942         /* two primaries implies protocol C */
 2943         D_ASSERT(device, d.dp_flags & DP_SEND_WRITE_ACK);
 2944         err = wait_for_and_update_peer_seq(peer_device, d.peer_seq);
 2945         if (err)
 2946             goto out_interrupted;
 2947         spin_lock_irq(&device->resource->req_lock);
 2948         err = handle_write_conflicts(peer_req);
 2949         if (err) {
 2950             spin_unlock_irq(&device->resource->req_lock);
 2951             if (err == -ENOENT) {
 2952                 put_ldev(device);
 2953                 return 0;
 2954             }
 2955             goto out_interrupted;
 2956         }
 2957     } else {
 2958         update_peer_seq(peer_device, d.peer_seq);
 2959         spin_lock_irq(&device->resource->req_lock);
 2960     }
 2961     /* Added to list here already, so debugfs can find it.
 2962      * NOTE: active_ee_cnt is only increased *after* we checked we won't
 2963      * need to wait for current activity to drain in prepare_activity_log()
 2964      */
 2965     list_add_tail(&peer_req->w.list, &connection->active_ee);
 2966     if (connection->agreed_pro_version >= 110)
 2967         list_add_tail(&peer_req->recv_order, &connection->peer_requests);
 2968     spin_unlock_irq(&device->resource->req_lock);
 2969 
 2970     if (connection->agreed_pro_version < 110) {
 2971         /* If the peer is DRBD 8, a sync target may need to drain
 2972          * (overlapping) in-flight resync requests first.
 2973          * With DRBD 9, the mutually exclusive references in resync lru
 2974          * and activity log takes care of that already. */
 2975         if (peer_device->repl_state[NOW] == L_SYNC_TARGET)
 2976             wait_event(connection->ee_wait, !overlapping_resync_write(connection, peer_req));
 2977     }
 2978 
 2979     err = prepare_activity_log(peer_req);
 2980     if (err == DRBD_PAL_DISCONNECTED)
 2981         goto disconnect_during_al_begin_io;
 2982 
 2983     /* Note: this now may or may not be "hot" in the activity log.
 2984      * Still, it is the best time to record that we need to set the
 2985      * out-of-sync bit, if we delay that until drbd_submit_peer_request(),
 2986      * we may introduce a race with some re-attach on the peer.
 2987      * Unless we want to guarantee that we drain all in-flight IO
 2988      * whenever we receive a state change. Which I'm not sure about.
 2989      * Use the EE_SET_OUT_OF_SYNC flag, to be acted on just before
 2990      * the actual submit, when we can be sure it is "hot".
 2991      */
 2992     if (peer_device->disk_state[NOW] < D_INCONSISTENT) {
 2993         peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
 2994         peer_req->flags |= EE_SET_OUT_OF_SYNC;
 2995     }
 2996 
 2997     atomic_inc(&connection->active_ee_cnt);
 2998 
 2999     if (err == DRBD_PAL_QUEUE) {
 3000         drbd_queue_peer_request(device, peer_req);
 3001         return 0;
 3002     }
 3003 
 3004     err = drbd_submit_peer_request(peer_req);
 3005     if (!err)
 3006         return 0;
 3007 
 3008     /* don't care for the reason here */
 3009     drbd_err(peer_device, "submit failed, triggering re-connect\n");
 3010     drbd_al_complete_io(device, &peer_req->i);
 3011 
 3012 disconnect_during_al_begin_io:
 3013     spin_lock_irq(&device->resource->req_lock);
 3014     list_del(&peer_req->w.list);
 3015     list_del_init(&peer_req->recv_order);
 3016     drbd_remove_peer_req_interval(device, peer_req);
 3017     spin_unlock_irq(&device->resource->req_lock);
 3018 
 3019 out_interrupted:
 3020     drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
 3021     put_ldev(device);
 3022     drbd_free_peer_req(peer_req);
 3023     return err;
 3024 }
 3025 
 3026 /*
 3027  * To be called when __drbd_submit_peer_request() fails from submitter
 3028  * workqueue context.  Mimic what happens in the receive_Data() error path,
 3029  * when the submit happens directly in the receiver context.
 3030  */
 3031 void drbd_cleanup_after_failed_submit_peer_request(struct drbd_peer_request *peer_req)
 3032 {
 3033     struct drbd_peer_device *peer_device = peer_req->peer_device;
 3034     struct drbd_device *device = peer_device->device;
 3035     struct drbd_connection *connection = peer_device->connection;
 3036 
 3037     if (drbd_ratelimit())
 3038         drbd_err(peer_device, "submit failed, triggering re-connect\n");
 3039 
 3040     drbd_al_complete_io(device, &peer_req->i);
 3041 
 3042     spin_lock_irq(&device->resource->req_lock);
 3043     list_del(&peer_req->w.list);
 3044     list_del_init(&peer_req->recv_order);
 3045     drbd_remove_peer_req_interval(device, peer_req);
 3046     spin_unlock_irq(&device->resource->req_lock);
 3047 
 3048     drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
 3049     put_ldev(device);
 3050     drbd_free_peer_req(peer_req);
 3051     change_cstate(connection, C_PROTOCOL_ERROR, CS_HARD);
 3052 }
 3053 
 3054 /* Possibly "cancel" and forget about all peer_requests that had still been
 3055  * waiting for the activity log (wfa) when the connection to their peer failed,
 3056  * and pretend we never received them.
 3057  */
 3058 void drbd_cleanup_peer_requests_wfa(struct drbd_device *device, struct list_head *cleanup)
 3059 {
 3060     struct drbd_connection *connection;
 3061     struct drbd_peer_request *peer_req, *pr_tmp;
 3062 
 3063     spin_lock_irq(&device->resource->req_lock);
 3064     list_for_each_entry(peer_req, cleanup, wait_for_actlog) {
 3065         list_del(&peer_req->w.list); /* should be on the "->active_ee" list */
 3066         atomic_dec(&peer_req->peer_device->connection->active_ee_cnt);
 3067         list_del_init(&peer_req->recv_order);
 3068         drbd_remove_peer_req_interval(device, peer_req);
 3069     }
 3070     spin_unlock_irq(&device->resource->req_lock);
 3071 
 3072     list_for_each_entry_safe(peer_req, pr_tmp, cleanup, wait_for_actlog) {
 3073         atomic_sub(interval_to_al_extents(&peer_req->i), &device->wait_for_actlog_ecnt);
 3074         atomic_dec(&device->wait_for_actlog);
 3075         dec_unacked(peer_req->peer_device);
 3076         list_del_init(&peer_req->wait_for_actlog);
 3077         drbd_may_finish_epoch(peer_req->peer_device->connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
 3078         drbd_free_peer_req(peer_req);
 3079         put_ldev(device);
 3080     }
 3081     /* We changed (likely: cleared out) active_ee for "at least one" connection.
 3082      * We should wake potential waiters, just in case. */
 3083     for_each_connection(connection, device->resource)
 3084         wake_up(&connection->ee_wait);
 3085 }
 3086 
 3087 /* We may throttle resync, if the lower device seems to be busy,
 3088  * and current sync rate is above c_min_rate.
 3089  *
 3090  * To decide whether or not the lower device is busy, we use a scheme similar
 3091  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
 3092  * (more than 64 sectors) of activity we cannot account for with our own resync
 3093  * activity, it obviously is "busy".
 3094  *
 3095  * The current sync rate used here uses only the most recent two step marks,
 3096  * to have a short time average so we can react faster.
 3097  */
 3098 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector,
 3099                   bool throttle_if_app_is_waiting)
 3100 {
 3101     bool throttle = drbd_rs_c_min_rate_throttle(peer_device);
 3102 
 3103     if (!throttle || throttle_if_app_is_waiting)
 3104         return throttle;
 3105 
 3106     return !drbd_sector_has_priority(peer_device, sector);
 3107 }
 3108 
 3109 bool drbd_rs_c_min_rate_throttle(struct drbd_peer_device *peer_device)
 3110 {
 3111     struct drbd_device *device = peer_device->device;
 3112     unsigned long db, dt, dbdt;
 3113     unsigned int c_min_rate;
 3114     int curr_events;
 3115 
 3116     rcu_read_lock();
 3117     c_min_rate = rcu_dereference(peer_device->conf)->c_min_rate;
 3118     rcu_read_unlock();
 3119 
 3120     /* feature disabled? */
 3121     if (c_min_rate == 0)
 3122         return false;
 3123 
 3124     curr_events = drbd_backing_bdev_events(device)
 3125             - atomic_read(&device->rs_sect_ev);
 3126 
 3127     if (atomic_read(&device->ap_actlog_cnt) || curr_events - peer_device->rs_last_events > 64) {
 3128         unsigned long rs_left;
 3129         int i;
 3130 
 3131         peer_device->rs_last_events = curr_events;
 3132 
 3133         /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
 3134          * approx. */
 3135         i = (peer_device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
 3136 
 3137         if (peer_device->repl_state[NOW] == L_VERIFY_S || peer_device->repl_state[NOW] == L_VERIFY_T)
 3138             rs_left = peer_device->ov_left;
 3139         else
 3140             rs_left = drbd_bm_total_weight(peer_device) - peer_device->rs_failed;
 3141 
 3142         dt = ((long)jiffies - (long)peer_device->rs_mark_time[i]) / HZ;
 3143         if (!dt)
 3144             dt++;
 3145         db = peer_device->rs_mark_left[i] - rs_left;
 3146         dbdt = Bit2KB(db/dt);
 3147 
 3148         if (dbdt > c_min_rate)
 3149             return true;
 3150     }
 3151     return false;
 3152 }
 3153 
 3154 static void verify_skipped_block(struct drbd_peer_device *peer_device,
 3155         const sector_t sector, const unsigned int size)
 3156 {
 3157     ++peer_device->ov_skipped;
 3158     if (peer_device->ov_last_skipped_start + peer_device->ov_last_skipped_size == sector) {
 3159         peer_device->ov_last_skipped_size += size>>9;
 3160     } else {
 3161         ov_skipped_print(peer_device);
 3162         peer_device->ov_last_skipped_start = sector;
 3163         peer_device->ov_last_skipped_size = size>>9;
 3164     }
 3165     verify_progress(peer_device, sector, size);
 3166 }
 3167 
 3168 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
 3169 {
 3170     struct drbd_peer_device *peer_device;
 3171     struct drbd_device *device;
 3172     sector_t sector;
 3173     sector_t capacity;
 3174     struct drbd_peer_request *peer_req;
 3175     struct digest_info *di = NULL;
 3176     int size, verb;
 3177     struct p_block_req *p = pi->data;
 3178     enum drbd_disk_state min_d_state;
 3179     int err;
 3180 
 3181     peer_device = conn_peer_device(connection, pi->vnr);
 3182     if (!peer_device)
 3183         return -EIO;
 3184     device = peer_device->device;
 3185     capacity = drbd_get_capacity(device->this_bdev);
 3186 
 3187     sector = be64_to_cpu(p->sector);
 3188     size   = be32_to_cpu(p->blksize);
 3189 
 3190     if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
 3191         drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
 3192                 (unsigned long long)sector, size);
 3193         return -EINVAL;
 3194     }
 3195     if (sector + (size>>9) > capacity) {
 3196         drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
 3197                 (unsigned long long)sector, size);
 3198         return -EINVAL;
 3199     }
 3200 
 3201     min_d_state = pi->cmd == P_DATA_REQUEST ? D_UP_TO_DATE : D_OUTDATED;
 3202     if (!get_ldev_if_state(device, min_d_state)) {
 3203         verb = 1;
 3204         switch (pi->cmd) {
 3205         case P_DATA_REQUEST:
 3206             drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
 3207             break;
 3208         case P_OV_REQUEST:
 3209             verify_skipped_block(peer_device, sector, size);
 3210         /* Fall through */
 3211         case P_RS_THIN_REQ:
 3212         case P_RS_DATA_REQUEST:
 3213         case P_CSUM_RS_REQUEST:
 3214             drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
 3215             break;
 3216         case P_OV_REPLY:
 3217             verb = 0;
 3218             dec_rs_pending(peer_device);
 3219             drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
 3220             break;
 3221         default:
 3222             BUG();
 3223         }
 3224         if (verb && drbd_ratelimit())
 3225             drbd_err(device, "Can not satisfy peer's read request, "
 3226                 "no local data.\n");
 3227 
 3228         /* drain possibly payload */
 3229         return ignore_remaining_packet(connection, pi->size);
 3230     }
 3231 
 3232     peer_req = drbd_alloc_peer_req(peer_device, GFP_TRY);
 3233     err = -ENOMEM;
 3234     if (!peer_req)
 3235         goto fail;
 3236     if (size) {
 3237         drbd_alloc_page_chain(&peer_device->connection->transport,
 3238             &peer_req->page_chain, DIV_ROUND_UP(size, PAGE_SIZE), GFP_TRY);
 3239         if (!peer_req->page_chain.head)
 3240             goto fail2;
 3241     }
 3242     peer_req->i.size = size;
 3243     peer_req->i.sector = sector;
 3244     peer_req->block_id = p->block_id;
 3245     peer_req->opf = REQ_OP_READ;
 3246     /* no longer valid, about to call drbd_recv again for the digest... */
 3247     p = pi->data = NULL;
 3248 
 3249 
 3250     if (peer_device->repl_state[NOW] == L_AHEAD) {
 3251         if (pi->cmd == P_DATA_REQUEST) {
 3252             /* P_DATA_REQUEST originates from a Primary,
 3253              * so if I am "Ahead", the Primary would be "Behind":
 3254              * Can not happen. */
 3255             if (drbd_ratelimit())
 3256                 drbd_err(peer_device, "received P_DATA_REQUEST while L_AHEAD\n");
 3257             err = -EINVAL;
 3258             goto fail2;
 3259         }
 3260         if (connection->agreed_pro_version >= 115) {
 3261             switch (pi->cmd) {
 3262             /* case P_DATA_REQUEST: see above, not based on protocol version */
 3263             case P_OV_REQUEST:
 3264                 verify_skipped_block(peer_device, sector, size);
 3265                 /* fall through */
 3266             case P_RS_DATA_REQUEST:
 3267             case P_RS_THIN_REQ:
 3268             case P_CSUM_RS_REQUEST:
 3269                 err = drbd_send_ack(peer_device, P_RS_CANCEL_AHEAD, peer_req);
 3270                 goto fail2;
 3271             case P_OV_REPLY:
 3272                 /* FIXME how can we cancel these?
 3273                  * just ignore L_AHEAD for now */
 3274                 break;
 3275             default:
 3276                 BUG();
 3277             }
 3278         }
 3279     }
 3280 
 3281     switch (pi->cmd) {
 3282     case P_DATA_REQUEST:
 3283         peer_req->w.cb = w_e_end_data_req;
 3284         /* application IO, don't drbd_rs_begin_io */
 3285         peer_req->flags |= EE_APPLICATION;
 3286         goto submit;
 3287 
 3288     case P_RS_THIN_REQ:
 3289         /* If at some point in the future we have a smart way to
 3290            find out if this data block is completely deallocated,
 3291            then we would do something smarter here than reading
 3292            the block... */
 3293         peer_req->flags |= EE_RS_THIN_REQ;
 3294     /* Fall through */
 3295     case P_RS_DATA_REQUEST:
 3296         peer_req->w.cb = w_e_end_rsdata_req;
 3297         break;
 3298 
 3299     case P_OV_REPLY:
 3300     case P_CSUM_RS_REQUEST:
 3301         di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
 3302         err = -ENOMEM;
 3303         if (!di)
 3304             goto fail2;
 3305 
 3306         di->digest_size = pi->size;
 3307         di->digest = (((char *)di)+sizeof(struct digest_info));
 3308 
 3309         peer_req->digest = di;
 3310         peer_req->flags |= EE_HAS_DIGEST;
 3311 
 3312         err = drbd_recv_into(connection, di->digest, pi->size);
 3313         if (err)
 3314             goto fail2;
 3315 
 3316         if (pi->cmd == P_CSUM_RS_REQUEST) {
 3317             D_ASSERT(device, connection->agreed_pro_version >= 89);
 3318             peer_req->w.cb = w_e_end_csum_rs_req;
 3319             /* remember to report stats in drbd_resync_finished */
 3320             peer_device->use_csums = true;
 3321         } else if (pi->cmd == P_OV_REPLY) {
 3322             /* track progress, we may need to throttle */
 3323             rs_sectors_came_in(peer_device, size);
 3324             peer_req->w.cb = w_e_end_ov_reply;
 3325             dec_rs_pending(peer_device);
 3326             /* drbd_rs_begin_io done when we sent this request,
 3327              * but accounting still needs to be done. */
 3328             goto submit_for_resync;
 3329         }
 3330         break;
 3331 
 3332     case P_OV_REQUEST:
 3333         peer_device->ov_position = sector;
 3334         if (peer_device->ov_start_sector == ~(sector_t)0 &&
 3335             connection->agreed_pro_version >= 90) {
 3336             unsigned long now = jiffies;
 3337             int i;
 3338             peer_device->ov_start_sector = sector;
 3339             peer_device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
 3340             peer_device->ov_skipped = 0;
 3341             peer_device->rs_total = peer_device->ov_left;
 3342             peer_device->rs_last_writeout = now;
 3343             for (i = 0; i < DRBD_SYNC_MARKS; i++) {
 3344                 peer_device->rs_mark_left[i] = peer_device->ov_left;
 3345                 peer_device->rs_mark_time[i] = now;
 3346             }
 3347             drbd_info(device, "Online Verify start sector: %llu\n",
 3348                     (unsigned long long)sector);
 3349         }
 3350         peer_req->w.cb = w_e_end_ov_req;
 3351         break;
 3352 
 3353     default:
 3354         BUG();
 3355     }
 3356 
 3357     /* Throttle, drbd_rs_begin_io and submit should become asynchronous
 3358      * wrt the receiver, but it is not as straightforward as it may seem.
 3359      * Various places in the resync start and stop logic assume resync
 3360      * requests are processed in order, requeuing this on the worker thread
 3361      * introduces a bunch of new code for synchronization between threads.
 3362      *
 3363      * Unlimited throttling before drbd_rs_begin_io may stall the resync
 3364      * "forever", throttling after drbd_rs_begin_io will lock that extent
 3365      * for application writes for the same time.  For now, just throttle
 3366      * here, where the rest of the code expects the receiver to sleep for
 3367      * a while, anyways.
 3368      */
 3369 
 3370     /* Throttle before drbd_rs_begin_io, as that locks out application IO;
 3371      * this defers syncer requests for some time, before letting at least
 3372      * on request through.  The resync controller on the receiving side
 3373      * will adapt to the incoming rate accordingly.
 3374      *
 3375      * We cannot throttle here if remote is Primary/SyncTarget:
 3376      * we would also throttle its application reads.
 3377      * In that case, throttling is done on the SyncTarget only.
 3378      */
 3379 
 3380     /* Even though this may be a resync request, we do add to "read_ee";
 3381      * "sync_ee" is only used for resync WRITEs.
 3382      * Add to list early, so debugfs can find this request
 3383      * even if we have to sleep below. */
 3384     spin_lock_irq(&device->resource->req_lock);
 3385     list_add_tail(&peer_req->w.list, &connection->read_ee);
 3386     spin_unlock_irq(&device->resource->req_lock);
 3387 
 3388     update_receiver_timing_details(connection, drbd_rs_should_slow_down);
 3389     if (connection->peer_role[NOW] != R_PRIMARY &&
 3390         drbd_rs_should_slow_down(peer_device, sector, false))
 3391         schedule_timeout_uninterruptible(HZ/10);
 3392 
 3393     if (connection->agreed_pro_version >= 110) {
 3394         /* In DRBD9 we may not sleep here in order to avoid deadlocks.
 3395            Instruct the SyncSource to retry */
 3396         err = drbd_try_rs_begin_io(peer_device, sector, false);
 3397         if (err) {
 3398             if (pi->cmd == P_OV_REQUEST)
 3399                 verify_skipped_block(peer_device, sector, size);
 3400             err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
 3401             /* If err is set, we will drop the connection... */
 3402             goto fail3;
 3403         }
 3404     } else {
 3405         update_receiver_timing_details(connection, drbd_rs_begin_io);
 3406         if (drbd_rs_begin_io(peer_device, sector)) {
 3407             err = -EIO;
 3408             goto fail3;
 3409         }
 3410     }
 3411 
 3412 submit_for_resync:
 3413     atomic_add(size >> 9, &device->rs_sect_ev);
 3414 
 3415 submit:
 3416     update_receiver_timing_details(connection, drbd_submit_peer_request);
 3417     inc_unacked(peer_device);
 3418     if (drbd_submit_peer_request(peer_req) == 0)
 3419         return 0;
 3420 
 3421     /* don't care for the reason here */
 3422     drbd_err(device, "submit failed, triggering re-connect\n");
 3423     err = -EIO;
 3424 
 3425 fail3:
 3426     spin_lock_irq(&device->resource->req_lock);
 3427     list_del(&peer_req->w.list);
 3428     spin_unlock_irq(&device->resource->req_lock);
 3429     /* no drbd_rs_complete_io(), we are dropping the connection anyways */
 3430 fail2:
 3431     drbd_free_peer_req(peer_req);
 3432 fail:
 3433     put_ldev(device);
 3434     return err;
 3435 }
 3436 
 3437 /**
 3438  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
 3439  */
 3440 static enum sync_strategy drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
 3441 {
 3442     const int node_id = peer_device->device->resource->res_opts.node_id;
 3443     int self, peer;
 3444     enum sync_strategy rv = SPLIT_BRAIN_DISCONNECT;
 3445     unsigned long ch_self, ch_peer;
 3446     enum drbd_after_sb_p after_sb_0p;
 3447 
 3448     self = drbd_bitmap_uuid(peer_device) & UUID_PRIMARY;
 3449     peer = peer_device->bitmap_uuids[node_id] & UUID_PRIMARY;
 3450 
 3451     ch_peer = peer_device->dirty_bits;
 3452     ch_self = peer_device->comm_bm_set;
 3453 
 3454     rcu_read_lock();
 3455     after_sb_0p = rcu_dereference(peer_device->connection->transport.net_conf)->after_sb_0p;
 3456     rcu_read_unlock();
 3457     switch (after_sb_0p) {
 3458     case ASB_CONSENSUS:
 3459     case ASB_DISCARD_SECONDARY:
 3460     case ASB_CALL_HELPER:
 3461     case ASB_VIOLENTLY:
 3462     case ASB_RETRY_CONNECT:
 3463         drbd_err(peer_device, "Configuration error.\n");
 3464         break;
 3465     case ASB_DISCONNECT:
 3466         break;
 3467     case ASB_DISCARD_YOUNGER_PRI:
 3468         if (self == 0 && peer == 1) {
 3469             rv = SYNC_TARGET_USE_BITMAP;
 3470             break;
 3471         }
 3472         if (self == 1 && peer == 0) {
 3473             rv = SYNC_SOURCE_USE_BITMAP;
 3474             break;
 3475         }
 3476         /* Else fall through - to one of the other strategies... */
 3477     case ASB_DISCARD_OLDER_PRI:
 3478         if (self == 0 && peer == 1) {
 3479             rv = SYNC_SOURCE_USE_BITMAP;
 3480             break;
 3481         }
 3482         if (self == 1 && peer == 0) {
 3483             rv = SYNC_TARGET_USE_BITMAP;
 3484             break;
 3485         }
 3486         /* Else fall through - to one of the other strategies... */
 3487         drbd_warn(peer_device, "Discard younger/older primary did not find a decision\n"
 3488               "Using discard-least-changes instead\n");
 3489     /* Fall through */
 3490     case ASB_DISCARD_ZERO_CHG:
 3491         if (ch_peer == 0 && ch_self == 0) {
 3492             rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->transport.flags)
 3493                 ? SYNC_TARGET_USE_BITMAP : SYNC_SOURCE_USE_BITMAP;
 3494             break;
 3495         } else {
 3496             if (ch_peer == 0) { rv = SYNC_SOURCE_USE_BITMAP; break; }
 3497             if (ch_self == 0) { rv = SYNC_TARGET_USE_BITMAP; break; }
 3498         }
 3499         if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
 3500             break;
 3501         /* else, fall through */
 3502     case ASB_DISCARD_LEAST_CHG:
 3503         if  (ch_self < ch_peer)
 3504             rv = SYNC_TARGET_USE_BITMAP;
 3505         else if (ch_self > ch_peer)
 3506             rv = SYNC_SOURCE_USE_BITMAP;
 3507         else /* ( ch_self == ch_peer ) */
 3508              /* Well, then use something else. */
 3509             rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->transport.flags)
 3510                 ? SYNC_TARGET_USE_BITMAP : SYNC_SOURCE_USE_BITMAP;
 3511         break;
 3512     case ASB_DISCARD_LOCAL:
 3513         rv = SYNC_TARGET_USE_BITMAP;
 3514         break;
 3515     case ASB_DISCARD_REMOTE:
 3516         rv = SYNC_SOURCE_USE_BITMAP;
 3517     }
 3518 
 3519     return rv;
 3520 }
 3521 
 3522 /**
 3523  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
 3524  */
 3525 static enum sync_strategy drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
 3526 {
 3527     struct drbd_device *device = peer_device->device;
 3528     struct drbd_connection *connection = peer_device->connection;
 3529     struct drbd_resource *resource = device->resource;
 3530     enum sync_strategy strategy, rv = SPLIT_BRAIN_DISCONNECT;
 3531     enum drbd_after_sb_p after_sb_1p;
 3532 
 3533     rcu_read_lock();
 3534     after_sb_1p = rcu_dereference(connection->transport.net_conf)->after_sb_1p;
 3535     rcu_read_unlock();
 3536     switch (after_sb_1p) {
 3537     case ASB_DISCARD_YOUNGER_PRI:
 3538     case ASB_DISCARD_OLDER_PRI:
 3539     case ASB_DISCARD_LEAST_CHG:
 3540     case ASB_DISCARD_LOCAL:
 3541     case ASB_DISCARD_REMOTE:
 3542     case ASB_DISCARD_ZERO_CHG:
 3543     case ASB_RETRY_CONNECT:
 3544         drbd_err(device, "Configuration error.\n");
 3545         break;
 3546     case ASB_DISCONNECT:
 3547         break;
 3548     case ASB_CONSENSUS:
 3549         strategy = drbd_asb_recover_0p(peer_device);
 3550         if (strategy == SYNC_TARGET_USE_BITMAP && resource->role[NOW] == R_SECONDARY)
 3551             rv = strategy;
 3552         if (strategy == SYNC_SOURCE_USE_BITMAP && resource->role[NOW] == R_PRIMARY)
 3553             rv = strategy;
 3554         break;
 3555     case ASB_VIOLENTLY:
 3556         rv = drbd_asb_recover_0p(peer_device);
 3557         break;
 3558     case ASB_DISCARD_SECONDARY:
 3559         return resource->role[NOW] == R_PRIMARY ? SYNC_SOURCE_USE_BITMAP : SYNC_TARGET_USE_BITMAP;
 3560     case ASB_CALL_HELPER:
 3561         strategy = drbd_asb_recover_0p(peer_device);
 3562         if (strategy == SYNC_TARGET_USE_BITMAP && resource->role[NOW] == R_PRIMARY) {
 3563             enum drbd_state_rv rv2;
 3564 
 3565              /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
 3566               * we might be here in L_OFF which is transient.
 3567               * we do not need to wait for the after state change work either. */
 3568             rv2 = change_role(resource, R_SECONDARY, CS_VERBOSE, false, NULL);
 3569             if (rv2 != SS_SUCCESS) {
 3570                 drbd_maybe_khelper(device, connection, "pri-lost-after-sb");
 3571             } else {
 3572                 drbd_warn(device, "Successfully gave up primary role.\n");
 3573                 rv = strategy;
 3574             }
 3575         } else
 3576             rv = strategy;
 3577     }
 3578 
 3579     return rv;
 3580 }
 3581 
 3582 /**
 3583  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
 3584  */
 3585 static enum sync_strategy drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
 3586 {
 3587     struct drbd_device *device = peer_device->device;
 3588     struct drbd_connection *connection = peer_device->connection;
 3589     enum sync_strategy strategy, rv = SPLIT_BRAIN_DISCONNECT;
 3590     enum drbd_after_sb_p after_sb_2p;
 3591 
 3592     rcu_read_lock();
 3593     after_sb_2p = rcu_dereference(connection->transport.net_conf)->after_sb_2p;
 3594     rcu_read_unlock();
 3595     switch (after_sb_2p) {
 3596     case ASB_DISCARD_YOUNGER_PRI:
 3597     case ASB_DISCARD_OLDER_PRI:
 3598     case ASB_DISCARD_LEAST_CHG:
 3599     case ASB_DISCARD_LOCAL:
 3600     case ASB_DISCARD_REMOTE:
 3601     case ASB_CONSENSUS:
 3602     case ASB_DISCARD_SECONDARY:
 3603     case ASB_DISCARD_ZERO_CHG:
 3604     case ASB_RETRY_CONNECT:
 3605         drbd_err(device, "Configuration error.\n");
 3606         break;
 3607     case ASB_VIOLENTLY:
 3608         rv = drbd_asb_recover_0p(peer_device);
 3609         break;
 3610     case ASB_DISCONNECT:
 3611         break;
 3612     case ASB_CALL_HELPER:
 3613         strategy = drbd_asb_recover_0p(peer_device);
 3614         if (strategy == SYNC_TARGET_USE_BITMAP) {
 3615             enum drbd_state_rv rv2;
 3616 
 3617              /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
 3618               * we might be here in L_OFF which is transient.
 3619               * we do not need to wait for the after state change work either. */
 3620             rv2 = change_role(device->resource, R_SECONDARY, CS_VERBOSE, false, NULL);
 3621             if (rv2 != SS_SUCCESS) {
 3622                 drbd_maybe_khelper(device, connection, "pri-lost-after-sb");
 3623             } else {
 3624                 drbd_warn(device, "Successfully gave up primary role.\n");
 3625                 rv = strategy;
 3626             }
 3627         } else
 3628             rv = strategy;
 3629     }
 3630 
 3631     return rv;
 3632 }
 3633 
 3634 static void drbd_uuid_dump_self(struct drbd_peer_device *peer_device, u64 bits, u64 flags)
 3635 {
 3636     struct drbd_device *device = peer_device->device;
 3637 
 3638     drbd_info(peer_device, "self %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
 3639           (unsigned long long)drbd_current_uuid(peer_device->device),
 3640           (unsigned long long)drbd_bitmap_uuid(peer_device),
 3641           (unsigned long long)drbd_history_uuid(device, 0),
 3642           (unsigned long long)drbd_history_uuid(device, 1),
 3643           (unsigned long long)bits,
 3644           (unsigned long long)flags);
 3645 }
 3646 
 3647 
 3648 static void drbd_uuid_dump_peer(struct drbd_peer_device *peer_device, u64 bits, u64 flags)
 3649 {
 3650     const int node_id = peer_device->device->resource->res_opts.node_id;
 3651 
 3652     drbd_info(peer_device, "peer %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
 3653          (unsigned long long)peer_device->current_uuid,
 3654          (unsigned long long)peer_device->bitmap_uuids[node_id],
 3655          (unsigned long long)peer_device->history_uuids[0],
 3656          (unsigned long long)peer_device->history_uuids[1],
 3657          (unsigned long long)bits,
 3658          (unsigned long long)flags);
 3659 }
 3660 
 3661 static enum sync_strategy uuid_fixup_resync_end(struct drbd_peer_device *peer_device, int *rule_nr) __must_hold(local)
 3662 {
 3663     struct drbd_device *device = peer_device->device;
 3664     const int node_id = device->resource->res_opts.node_id;
 3665 
 3666     if (peer_device->bitmap_uuids[node_id] == (u64)0 && drbd_bitmap_uuid(peer_device) != (u64)0) {
 3667 
 3668         if (peer_device->connection->agreed_pro_version < 91)
 3669             return REQUIRES_PROTO_91;
 3670 
 3671         if ((drbd_bitmap_uuid(peer_device) & ~UUID_PRIMARY) ==
 3672             (peer_device->history_uuids[0] & ~UUID_PRIMARY) &&
 3673             (drbd_history_uuid(device, 0) & ~UUID_PRIMARY) ==
 3674             (peer_device->history_uuids[0] & ~UUID_PRIMARY)) {
 3675             struct drbd_peer_md *peer_md = &device->ldev->md.peers[peer_device->node_id];
 3676 
 3677             drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
 3678             _drbd_uuid_push_history(device, peer_md->bitmap_uuid);
 3679             peer_md->bitmap_uuid = 0;
 3680 
 3681             drbd_uuid_dump_self(peer_device,
 3682                         device->disk_state[NOW] >= D_NEGOTIATING ? drbd_bm_total_weight(peer_device) : 0, 0);
 3683             *rule_nr = 34;
 3684         } else {
 3685             drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
 3686             *rule_nr = 36;
 3687         }
 3688 
 3689         return SYNC_SOURCE_USE_BITMAP;
 3690     }
 3691 
 3692     if (drbd_bitmap_uuid(peer_device) == (u64)0 && peer_device->bitmap_uuids[node_id] != (u64)0) {
 3693 
 3694         if (peer_device->connection->agreed_pro_version < 91)
 3695             return REQUIRES_PROTO_91;
 3696 
 3697         if ((drbd_history_uuid(device, 0) & ~UUID_PRIMARY) ==
 3698             (peer_device->bitmap_uuids[node_id] & ~UUID_PRIMARY) &&
 3699             (drbd_history_uuid(device, 1) & ~UUID_PRIMARY) ==
 3700             (peer_device->history_uuids[0] & ~UUID_PRIMARY)) {
 3701             int i;
 3702 
 3703             drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
 3704 
 3705             for (i = ARRAY_SIZE(peer_device->history_uuids) - 1; i > 0; i--)
 3706                 peer_device->history_uuids[i] = peer_device->history_uuids[i - 1];
 3707             peer_device->history_uuids[i] = peer_device->bitmap_uuids[node_id];
 3708             peer_device->bitmap_uuids[node_id] = 0;
 3709 
 3710             drbd_uuid_dump_peer(peer_device, peer_device->dirty_bits, peer_device->uuid_flags);
 3711             *rule_nr = 35;
 3712         } else {
 3713             drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
 3714             *rule_nr = 37;
 3715         }
 3716 
 3717         return SYNC_TARGET_USE_BITMAP;
 3718     }
 3719 
 3720     return UNDETERMINED;
 3721 }
 3722 
 3723 static enum sync_strategy uuid_fixup_resync_start1(struct drbd_peer_device *peer_device, int *rule_nr) __must_hold(local)
 3724 {
 3725     struct drbd_device *device = peer_device->device;
 3726     const int node_id = peer_device->device->resource->res_opts.node_id;
 3727     u64 self, peer;
 3728 
 3729     self = drbd_current_uuid(device) & ~UUID_PRIMARY;
 3730     peer = peer_device->history_uuids[0] & ~UUID_PRIMARY;
 3731 
 3732     if (self == peer) {
 3733         if (peer_device->connection->agreed_pro_version < 96 ?
 3734             (drbd_history_uuid(device, 0) & ~UUID_PRIMARY) ==
 3735             (peer_device->history_uuids[1] & ~UUID_PRIMARY) :
 3736             peer + UUID_NEW_BM_OFFSET == (peer_device->bitmap_uuids[node_id] & ~UUID_PRIMARY)) {
 3737             int i;
 3738 
 3739             /* The last P_SYNC_UUID did not get though. Undo the last start of
 3740                resync as sync source modifications of the peer's UUIDs. */
 3741             *rule_nr = 51;
 3742 
 3743             if (peer_device->connection->agreed_pro_version < 91)
 3744                 return REQUIRES_PROTO_91;
 3745 
 3746             peer_device->bitmap_uuids[node_id] = peer_device->history_uuids[0];
 3747             for (i = 0; i < ARRAY_SIZE(peer_device->history_uuids) - 1; i++)
 3748                 peer_device->history_uuids[i] = peer_device->history_uuids[i + 1];
 3749             peer_device->history_uuids[i] = 0;
 3750 
 3751             drbd_info(device, "Lost last syncUUID packet, corrected:\n");
 3752             drbd_uuid_dump_peer(peer_device, peer_device->dirty_bits, peer_device->uuid_flags);
 3753 
 3754             return SYNC_TARGET_USE_BITMAP;
 3755         }
 3756     }
 3757 
 3758     return UNDETERMINED;
 3759 }
 3760 
 3761 static enum sync_strategy uuid_fixup_resync_start2(struct drbd_peer_device *peer_device, int *rule_nr) __must_hold(local)
 3762 {
 3763     struct drbd_device *device = peer_device->device;
 3764     u64 self, peer;
 3765 
 3766     self = drbd_history_uuid(device, 0) & ~UUID_PRIMARY;
 3767     peer = peer_device->current_uuid & ~UUID_PRIMARY;
 3768 
 3769     if (self == peer) {
 3770         if (peer_device->connection->agreed_pro_version < 96 ?
 3771             (drbd_history_uuid(device, 1) & ~UUID_PRIMARY) ==
 3772             (peer_device->history_uuids[0] & ~UUID_PRIMARY) :
 3773             self + UUID_NEW_BM_OFFSET == (drbd_bitmap_uuid(peer_device) & ~UUID_PRIMARY)) {
 3774             u64 bitmap_uuid;
 3775 
 3776             /* The last P_SYNC_UUID did not get though. Undo the last start of
 3777                resync as sync source modifications of our UUIDs. */
 3778             *rule_nr = 71;
 3779 
 3780             if (peer_device->connection->agreed_pro_version < 91)
 3781                 return REQUIRES_PROTO_91;
 3782 
 3783             bitmap_uuid = _drbd_uuid_pull_history(peer_device);
 3784             __drbd_uuid_set_bitmap(peer_device, bitmap_uuid);
 3785 
 3786             drbd_info(device, "Last syncUUID did not get through, corrected:\n");
 3787             drbd_uuid_dump_self(peer_device,
 3788                         device->disk_state[NOW] >= D_NEGOTIATING ? drbd_bm_total_weight(peer_device) : 0, 0);
 3789 
 3790             return SYNC_SOURCE_USE_BITMAP;
 3791         }
 3792     }
 3793 
 3794     return UNDETERMINED;
 3795 }
 3796 
 3797 static enum sync_strategy drbd_uuid_compare(struct drbd_peer_device *peer_device,
 3798                  int *rule_nr, int *peer_node_id) __must_hold(local)
 3799 {
 3800     struct drbd_connection *connection = peer_device->connection;
 3801     struct drbd_device *device = peer_device->device;
 3802     const int node_id = device->resource->res_opts.node_id;
 3803     u64 self, peer;
 3804     u64 local_uuid_flags;
 3805     int i, j;
 3806     bool initial_handshake;
 3807     bool uuid_matches_initial;
 3808 
 3809     self = drbd_current_uuid(device) & ~UUID_PRIMARY;
 3810     peer = peer_device->current_uuid & ~UUID_PRIMARY;
 3811     local_uuid_flags = drbd_collect_local_uuid_flags(peer_device, NULL);
 3812 
 3813     initial_handshake =
 3814         test_bit(INITIAL_STATE_SENT, &peer_device->flags) &&
 3815         !test_bit(INITIAL_STATE_RECEIVED, &peer_device->flags);
 3816     uuid_matches_initial =
 3817         self == (peer_device->comm_current_uuid & ~UUID_PRIMARY) &&
 3818         local_uuid_flags == peer_device->comm_uuid_flags;
 3819     if (initial_handshake && !uuid_matches_initial) {
 3820         *rule_nr = 9;
 3821         drbd_warn(peer_device, "My current UUID/flags changed during "
 3822               "handshake. Retry connecting.\n");
 3823         return RETRY_CONNECT;
 3824     }
 3825 
 3826     /* Before DRBD 8.0.2 (from 2007), the uuid on sync targets was set to
 3827      * zero during resyncs for no good reason. */
 3828     if (self == 0)
 3829         self = UUID_JUST_CREATED;
 3830     if (peer == 0)
 3831         peer = UUID_JUST_CREATED;
 3832 
 3833     *rule_nr = 10;
 3834     if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
 3835         return NO_SYNC;
 3836 
 3837     *rule_nr = 20;
 3838     if (self == UUID_JUST_CREATED)
 3839         return SYNC_TARGET_SET_BITMAP;
 3840 
 3841     *rule_nr = 30;
 3842     if (peer == UUID_JUST_CREATED)
 3843         return SYNC_SOURCE_SET_BITMAP;
 3844 
 3845     if (self == peer) {
 3846         if (connection->agreed_pro_version < 110) {
 3847             enum sync_strategy rv = uuid_fixup_resync_end(peer_device, rule_nr);
 3848             if (rv != UNDETERMINED)
 3849                 return rv;
 3850         }
 3851 
 3852         *rule_nr = 39;
 3853         if (peer_device->uuid_flags & UUID_FLAG_PRIMARY_LOST_QUORUM &&
 3854             !test_bit(PRIMARY_LOST_QUORUM, &device->flags))
 3855             return SYNC_TARGET_IF_BOTH_FAILED;
 3856 
 3857         if (!(peer_device->uuid_flags & UUID_FLAG_PRIMARY_LOST_QUORUM) &&
 3858             test_bit(PRIMARY_LOST_QUORUM, &device->flags))
 3859             return SYNC_SOURCE_IF_BOTH_FAILED;
 3860 
 3861         if (peer_device->uuid_flags & UUID_FLAG_PRIMARY_LOST_QUORUM &&
 3862             test_bit(PRIMARY_LOST_QUORUM, &device->flags))
 3863             return test_bit(RESOLVE_CONFLICTS, &connection->transport.flags) ?
 3864                 SYNC_SOURCE_IF_BOTH_FAILED :
 3865                 SYNC_TARGET_IF_BOTH_FAILED;
 3866 
 3867         *rule_nr = 38;
 3868         /* This is a safety net for the following two clauses */
 3869         if (peer_device->uuid_flags & UUID_FLAG_RECONNECT &&
 3870             local_uuid_flags & UUID_FLAG_RECONNECT)
 3871             return NO_SYNC;
 3872 
 3873         /* Peer crashed as primary, I survived, resync from me */
 3874         if (peer_device->uuid_flags & UUID_FLAG_CRASHED_PRIMARY &&
 3875             local_uuid_flags & UUID_FLAG_RECONNECT)
 3876             return SYNC_SOURCE_IF_BOTH_FAILED;
 3877 
 3878         /* I am a crashed primary, peer survived, resync to me */
 3879         if (local_uuid_flags & UUID_FLAG_CRASHED_PRIMARY &&
 3880             peer_device->uuid_flags & UUID_FLAG_RECONNECT)
 3881             return SYNC_TARGET_IF_BOTH_FAILED;
 3882 
 3883         /* One of us had a connection to the other node before.
 3884            i.e. this is not a common power failure. */
 3885         if (peer_device->uuid_flags & UUID_FLAG_RECONNECT ||
 3886             local_uuid_flags & UUID_FLAG_RECONNECT)
 3887             return NO_SYNC;
 3888 
 3889         /* Common power [off|failure]? */
 3890         *rule_nr = 40;
 3891         if (local_uuid_flags & UUID_FLAG_CRASHED_PRIMARY) {
 3892             if ((peer_device->uuid_flags & UUID_FLAG_CRASHED_PRIMARY) &&
 3893                 test_bit(RESOLVE_CONFLICTS, &connection->transport.flags))
 3894                 return SYNC_TARGET_IF_BOTH_FAILED;
 3895             return SYNC_SOURCE_IF_BOTH_FAILED;
 3896         } else if (peer_device->uuid_flags & UUID_FLAG_CRASHED_PRIMARY)
 3897                 return SYNC_TARGET_IF_BOTH_FAILED;
 3898         else
 3899             return NO_SYNC;
 3900     }
 3901 
 3902     *rule_nr = 50;
 3903     peer = peer_device->bitmap_uuids[node_id] & ~UUID_PRIMARY;
 3904     if (self == peer)
 3905         return SYNC_TARGET_USE_BITMAP;
 3906 
 3907     *rule_nr = 52;
 3908     for (i = 0; i < DRBD_PEERS_MAX; i++) {
 3909         peer = peer_device->bitmap_uuids[i] & ~UUID_PRIMARY;
 3910         if (self == peer) {
 3911             *peer_node_id = i;
 3912             return SYNC_TARGET_CLEAR_BITMAP;
 3913         }
 3914     }
 3915 
 3916     if (connection->agreed_pro_version < 110) {
 3917         enum sync_strategy rv = uuid_fixup_resync_start1(peer_device, rule_nr);
 3918         if (rv != UNDETERMINED)
 3919             return rv;
 3920     }
 3921 
 3922     *rule_nr = 60;
 3923     self = drbd_current_uuid(device) & ~UUID_PRIMARY;
 3924     for (i = 0; i < ARRAY_SIZE(peer_device->history_uuids); i++) {
 3925         peer = peer_device->history_uuids[i] & ~UUID_PRIMARY;
 3926         if (self == peer)
 3927             return SYNC_TARGET_SET_BITMAP;
 3928     }
 3929 
 3930     *rule_nr = 70;
 3931     self = drbd_bitmap_uuid(peer_device) & ~UUID_PRIMARY;
 3932     peer = peer_device->current_uuid & ~UUID_PRIMARY;
 3933     if (self == peer)
 3934         return SYNC_SOURCE_USE_BITMAP;
 3935 
 3936     *rule_nr = 72;
 3937     for (i = 0; i < DRBD_NODE_ID_MAX; i++) {
 3938         if (i == peer_device->node_id)
 3939             continue;
 3940         if (i == device->ldev->md.node_id)
 3941             continue;
 3942         if (connection->agreed_pro_version < 116 &&
 3943             device->ldev->md.peers[i].bitmap_index == -1)
 3944             continue;
 3945         self = device->ldev->md.peers[i].bitmap_uuid & ~UUID_PRIMARY;
 3946         if (self == peer) {
 3947             *peer_node_id = i;
 3948             return SYNC_SOURCE_COPY_BITMAP;
 3949         }
 3950     }
 3951 
 3952     if (connection->agreed_pro_version < 110) {
 3953         enum sync_strategy rv = uuid_fixup_resync_start2(peer_device, rule_nr);
 3954         if (rv != UNDETERMINED)
 3955             return rv;
 3956     }
 3957 
 3958     *rule_nr = 80;
 3959     peer = peer_device->current_uuid & ~UUID_PRIMARY;
 3960     for (i = 0; i < HISTORY_UUIDS; i++) {
 3961         self = drbd_history_uuid(device, i) & ~UUID_PRIMARY;
 3962         if (self == peer)
 3963             return SYNC_SOURCE_SET_BITMAP;
 3964     }
 3965 
 3966     *rule_nr = 90;
 3967     self = drbd_bitmap_uuid(peer_device) & ~UUID_PRIMARY;
 3968     peer = peer_device->bitmap_uuids[node_id] & ~UUID_PRIMARY;
 3969     if (self == peer && self != ((u64)0))
 3970         return SPLIT_BRAIN_AUTO_RECOVER;
 3971 
 3972     *rule_nr = 100;
 3973     for (i = 0; i < HISTORY_UUIDS; i++) {
 3974         self = drbd_history_uuid(device, i) & ~UUID_PRIMARY;
 3975         for (j = 0; j < ARRAY_SIZE(peer_device->history_uuids); j++) {
 3976             peer = peer_device->history_uuids[j] & ~UUID_PRIMARY;
 3977             if (self == peer)
 3978                 return SPLIT_BRAIN_DISCONNECT;
 3979         }
 3980     }
 3981 
 3982     return UNRELATED_DATA;
 3983 }
 3984 
 3985 static void log_handshake(struct drbd_peer_device *peer_device)
 3986 {
 3987     u64 uuid_flags = drbd_collect_local_uuid_flags(peer_device, NULL);
 3988 
 3989     drbd_info(peer_device, "drbd_sync_handshake:\n");
 3990     drbd_uuid_dump_self(peer_device, peer_device->comm_bm_set, uuid_flags);
 3991     drbd_uuid_dump_peer(peer_device, peer_device->dirty_bits, peer_device->uuid_flags);
 3992 }
 3993 
 3994 static enum sync_strategy drbd_handshake(struct drbd_peer_device *peer_device,
 3995               int *rule_nr,
 3996               int *peer_node_id,
 3997               bool always_verbose) __must_hold(local)
 3998 {
 3999     struct drbd_device *device = peer_device->device;
 4000     enum sync_strategy strategy;
 4001 
 4002     spin_lock_irq(&device->ldev->md.uuid_lock);
 4003     if (always_verbose)
 4004         log_handshake(peer_device);
 4005 
 4006     strategy = drbd_uuid_compare(peer_device, rule_nr, peer_node_id);
 4007     if (strategy != NO_SYNC && !always_verbose)
 4008         log_handshake(peer_device);
 4009     spin_unlock_irq(&device->ldev->md.uuid_lock);
 4010 
 4011     if (strategy != NO_SYNC || always_verbose)
 4012         drbd_info(peer_device, "uuid_compare()=%s by rule %d\n", strategy_descriptor(strategy).name, *rule_nr);
 4013 
 4014     return strategy;
 4015 }
 4016 
 4017 static bool is_resync_running(struct drbd_device *device)
 4018 {
 4019     struct drbd_peer_device *peer_device;
 4020     bool rv = false;
 4021 
 4022     rcu_read_lock();
 4023     for_each_peer_device_rcu(peer_device, device) {
 4024         enum drbd_repl_state repl_state = peer_device->repl_state[NOW];
 4025         if (repl_state == L_SYNC_TARGET || repl_state == L_PAUSED_SYNC_T) {
 4026             rv = true;
 4027             break;
 4028         }
 4029     }
 4030     rcu_read_unlock();
 4031 
 4032     return rv;
 4033 }
 4034 
 4035 static int bitmap_mod_after_handshake(struct drbd_peer_device *peer_device, enum sync_strategy strategy, int peer_node_id)
 4036 {
 4037     struct drbd_device *device = peer_device->device;
 4038 
 4039     if (strategy == SYNC_SOURCE_COPY_BITMAP) {
 4040         int from = device->ldev->md.peers[peer_node_id].bitmap_index;
 4041 
 4042         if (from == -1)
 4043             from = drbd_unallocated_index(device->ldev, device->bitmap->bm_max_peers);
 4044 
 4045         if (peer_device->bitmap_index == -1)
 4046             return 0;
 4047 
 4048         if (from == -1)
 4049             drbd_info(peer_device,
 4050                   "Setting all bitmap bits, day0 bm not available node_id=%d\n",
 4051                   peer_node_id);
 4052         else
 4053             drbd_info(peer_device,
 4054                   "Copying bitmap of peer node_id=%d (bitmap_index=%d)\n",
 4055                   peer_node_id, from);
 4056 
 4057         drbd_suspend_io(device, WRITE_ONLY);
 4058         drbd_bm_slot_lock(peer_device, "copy_slot/set_many sync_handshake", BM_LOCK_BULK);
 4059         if (from == -1)
 4060             drbd_bm_set_many_bits(peer_device, 0, -1UL);
 4061         else
 4062             drbd_bm_copy_slot(device, from, peer_device->bitmap_index);
 4063         drbd_bm_write(device, NULL);
 4064         drbd_bm_slot_unlock(peer_device);
 4065         drbd_resume_io(device);
 4066     } else if (strategy == SYNC_TARGET_CLEAR_BITMAP) {
 4067         drbd_info(peer_device, "Resync source provides bitmap (node_id=%d)\n", peer_node_id);
 4068         drbd_suspend_io(device, WRITE_ONLY);
 4069         drbd_bm_slot_lock(peer_device, "bm_clear_many_bits sync_handshake", BM_LOCK_BULK);
 4070         drbd_bm_clear_many_bits(peer_device, 0, -1UL);
 4071         drbd_bm_write(device, NULL);
 4072         drbd_bm_slot_unlock(peer_device);
 4073         drbd_resume_io(device);
 4074     } else if (strategy == SYNC_SOURCE_SET_BITMAP || strategy == SYNC_TARGET_SET_BITMAP) {
 4075         if (strategy == SYNC_TARGET_SET_BITMAP &&
 4076             drbd_current_uuid(device) == UUID_JUST_CREATED &&
 4077             is_resync_running(device))
 4078             return 0;
 4079 
 4080         drbd_info(peer_device,
 4081               "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
 4082         if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
 4083                     BM_LOCK_CLEAR | BM_LOCK_BULK, peer_device))
 4084             return -1;
 4085     }
 4086     return 0;
 4087 }
 4088 
 4089 static enum drbd_repl_state goodness_to_repl_state(struct drbd_peer_device *peer_device,
 4090                            enum drbd_role peer_role,
 4091                            enum sync_strategy strategy)
 4092 {
 4093     struct drbd_device *device = peer_device->device;
 4094     enum drbd_role role = peer_device->device->resource->role[NOW];
 4095     enum drbd_repl_state rv;
 4096 
 4097     if (strategy == SYNC_SOURCE_IF_BOTH_FAILED || strategy == SYNC_TARGET_IF_BOTH_FAILED) {
 4098         if (role == R_PRIMARY || peer_role == R_PRIMARY) {
 4099             /* We have at least one primary, follow that with the resync decision */
 4100             rv = peer_role == R_SECONDARY ? L_WF_BITMAP_S :
 4101                 role == R_SECONDARY ? L_WF_BITMAP_T :
 4102                 L_ESTABLISHED;
 4103             return rv;
 4104         }
 4105         /* No current primary. Handle it as a common power failure, consider the
 4106            roles at crash time */
 4107     }
 4108 
 4109     if (strategy_descriptor(strategy).is_sync_source) {
 4110         rv = L_WF_BITMAP_S;
 4111     } else if (strategy_descriptor(strategy).is_sync_target) {
 4112         rv = L_WF_BITMAP_T;
 4113     } else {
 4114         u64 peer_current_uuid = peer_device->current_uuid & ~UUID_PRIMARY;
 4115         u64 my_current_uuid = drbd_current_uuid(device) & ~UUID_PRIMARY;
 4116 
 4117         rv = L_ESTABLISHED;
 4118         if (peer_current_uuid == my_current_uuid) {
 4119             if (drbd_bitmap_uuid(peer_device)) {
 4120                 drbd_info(peer_device, "clearing bitmap UUID and bitmap content (%lu bits)\n",
 4121                       drbd_bm_total_weight(peer_device));
 4122                 drbd_uuid_set_bitmap(peer_device, 0);
 4123 
 4124             } else if (drbd_bm_total_weight(peer_device)) {
 4125                 drbd_info(peer_device, "bitmap content (%lu bits)\n",
 4126                       drbd_bm_total_weight(peer_device));
 4127             }
 4128             drbd_bm_clear_many_bits(peer_device, 0, -1UL);
 4129         }
 4130     }
 4131 
 4132     return rv;
 4133 }
 4134 
 4135 static void disk_states_to_goodness(struct drbd_device *device,
 4136                     enum drbd_disk_state peer_disk_state,
 4137                     enum sync_strategy *strategy, int rule_nr)
 4138 {
 4139     enum drbd_disk_state disk_state = device->disk_state[NOW];
 4140     bool p = false;
 4141 
 4142     if (*strategy != NO_SYNC && rule_nr != 40)
 4143         return;
 4144 
 4145     /* rule_nr 40 means that the current UUIDs are equal. The decision
 4146        was found by looking at the crashed_primary bits.
 4147        The current disk states might give a better basis for decision-making! */
 4148 
 4149     if (disk_state == D_NEGOTIATING)
 4150         disk_state = disk_state_from_md(device);
 4151 
 4152     if ((disk_state == D_INCONSISTENT && peer_disk_state > D_INCONSISTENT) ||
 4153         (peer_disk_state == D_INCONSISTENT && disk_state > D_INCONSISTENT)) {
 4154         *strategy = disk_state > D_INCONSISTENT ? SYNC_SOURCE_USE_BITMAP : SYNC_TARGET_USE_BITMAP;
 4155         p = true;
 4156     }
 4157 
 4158     if (p)
 4159         drbd_info(device, "Becoming sync %s due to disk states.\n",
 4160               strategy_descriptor(*strategy).is_sync_source ? "source" : "target");
 4161 }
 4162 
 4163 static enum drbd_repl_state drbd_attach_handshake(struct drbd_peer_device *peer_device,
 4164                           enum drbd_disk_state peer_disk_state) __must_hold(local)
 4165 {
 4166     enum sync_strategy strategy;
 4167     int rule_nr, peer_node_id;
 4168 
 4169     strategy = drbd_handshake(peer_device, &rule_nr, &peer_node_id, true);
 4170 
 4171     if (!is_strategy_determined(strategy))
 4172         return -1;
 4173 
 4174     bitmap_mod_after_handshake(peer_device, strategy, peer_node_id);
 4175     disk_states_to_goodness(peer_device->device, peer_disk_state, &strategy, rule_nr);
 4176 
 4177     return goodness_to_repl_state(peer_device, peer_device->connection->peer_role[NOW], strategy);
 4178 }
 4179 
 4180 /* drbd_sync_handshake() returns the new replication state on success, and -1
 4181  * on failure.
 4182  */
 4183 static enum drbd_repl_state drbd_sync_handshake(struct drbd_peer_device *peer_device,
 4184                         union drbd_state peer_state) __must_hold(local)
 4185 {
 4186     struct drbd_device *device = peer_device->device;
 4187     struct drbd_connection *connection = peer_device->connection;
 4188     enum drbd_disk_state disk_state;
 4189     struct net_conf *nc;
 4190     enum sync_strategy strategy;
 4191     int rule_nr, rr_conflict, always_asbp, peer_node_id = 0, r;
 4192     enum drbd_role peer_role = peer_state.role;
 4193     enum drbd_disk_state peer_disk_state = peer_state.disk;
 4194     int required_protocol;
 4195 
 4196     strategy = drbd_handshake(peer_device, &rule_nr, &peer_node_id, true);
 4197 
 4198     disk_state = device->disk_state[NOW];
 4199     if (disk_state == D_NEGOTIATING)
 4200         disk_state = disk_state_from_md(device);
 4201 
 4202     if (strategy == RETRY_CONNECT)
 4203         return -1; /* retry connect */
 4204 
 4205     if (strategy == UNRELATED_DATA) {
 4206         drbd_alert(device, "Unrelated data, aborting!\n");
 4207         return -2;
 4208     }
 4209     required_protocol = strategy_descriptor(strategy).required_protocol;
 4210     if (required_protocol) {
 4211         drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", required_protocol);
 4212         return -2;
 4213     }
 4214 
 4215     disk_states_to_goodness(device, peer_disk_state, &strategy, rule_nr);
 4216 
 4217     if (strategy == SPLIT_BRAIN_AUTO_RECOVER && (!drbd_device_stable(device, NULL) || !(peer_device->uuid_flags & UUID_FLAG_STABLE))) {
 4218         drbd_warn(device, "Ignore Split-Brain, for now, at least one side unstable\n");
 4219         strategy = NO_SYNC;
 4220     }
 4221 
 4222     if (strategy_descriptor(strategy).is_split_brain)
 4223         drbd_maybe_khelper(device, connection, "initial-split-brain");
 4224 
 4225     rcu_read_lock();
 4226     nc = rcu_dereference(connection->transport.net_conf);
 4227     always_asbp = nc->always_asbp;
 4228     rr_conflict = nc->rr_conflict;
 4229     rcu_read_unlock();
 4230 
 4231     if (strategy == SPLIT_BRAIN_AUTO_RECOVER || (strategy == SPLIT_BRAIN_DISCONNECT && always_asbp)) {
 4232         int pcount = (device->resource->role[NOW] == R_PRIMARY)
 4233                + (peer_role == R_PRIMARY);
 4234         int forced = (strategy == SPLIT_BRAIN_DISCONNECT);
 4235 
 4236         if (device->resource->res_opts.quorum != QOU_OFF &&
 4237             connection->agreed_pro_version >= 113) {
 4238             if (device->have_quorum[NOW] && !peer_state.quorum)
 4239                 strategy = SYNC_SOURCE_USE_BITMAP;
 4240             else if (!device->have_quorum[NOW] && peer_state.quorum)
 4241                 strategy = SYNC_TARGET_USE_BITMAP;
 4242         }
 4243         if (strategy_descriptor(strategy).is_split_brain) {
 4244             switch (pcount) {
 4245             case 0:
 4246                 strategy = drbd_asb_recover_0p(peer_device);
 4247                 break;
 4248             case 1:
 4249                 strategy = drbd_asb_recover_1p(peer_device);
 4250                 break;
 4251             case 2:
 4252                 strategy = drbd_asb_recover_2p(peer_device);
 4253                 break;
 4254             }
 4255         }
 4256         if (!strategy_descriptor(strategy).is_split_brain) {
 4257             drbd_warn(device, "Split-Brain detected, %d primaries, "
 4258                  "automatically solved. Sync from %s node\n",
 4259                  pcount, strategy_descriptor(strategy).is_sync_target ? "peer" : "this");
 4260             if (forced) {
 4261                 if (!strategy_descriptor(strategy).full_sync_equivalent) {
 4262                     drbd_alert(device, "Want full sync but cannot decide direction, dropping connection!\n");
 4263                     return -2;
 4264                 }
 4265                 drbd_warn(device, "Doing a full sync, since"
 4266                      " UUIDs where ambiguous.\n");
 4267                 strategy = strategy_descriptor(strategy).full_sync_equivalent;
 4268             }
 4269         }
 4270     }
 4271 
 4272     if (strategy == SPLIT_BRAIN_DISCONNECT) {
 4273         if (test_bit(DISCARD_MY_DATA, &peer_device->flags) &&
 4274             !(peer_device->uuid_flags & UUID_FLAG_DISCARD_MY_DATA))
 4275             strategy = SYNC_TARGET_USE_BITMAP;
 4276         if (!test_bit(DISCARD_MY_DATA, &peer_device->flags) &&
 4277             (peer_device->uuid_flags & UUID_FLAG_DISCARD_MY_DATA))
 4278             strategy = SYNC_SOURCE_USE_BITMAP;
 4279 
 4280         if (!strategy_descriptor(strategy).is_split_brain)
 4281             drbd_warn(device, "Split-Brain detected, manually solved. "
 4282                  "Sync from %s node\n",
 4283                  strategy_descriptor(strategy).is_sync_target ? "peer" : "this");
 4284     }
 4285 
 4286     if (strategy_descriptor(strategy).is_split_brain) {
 4287         drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
 4288         drbd_maybe_khelper(device, connection, "split-brain");
 4289         return -2;
 4290     }
 4291 
 4292     if (!is_strategy_determined(strategy)) {
 4293         drbd_alert(device, "Failed to fully determine sync strategy, dropping connection!\n");
 4294         return -2;
 4295     }
 4296 
 4297     if (strategy_descriptor(strategy).is_sync_target &&
 4298         strategy != SYNC_TARGET_IF_BOTH_FAILED &&
 4299         device->resource->role[NOW] == R_PRIMARY && device->disk_state[NOW] >= D_CONSISTENT) {
 4300         switch (rr_conflict) {
 4301         case ASB_CALL_HELPER:
 4302             drbd_maybe_khelper(device, connection, "pri-lost");
 4303             /* fall through */
 4304         case ASB_DISCONNECT:
 4305         case ASB_RETRY_CONNECT:
 4306             drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
 4307             return rr_conflict == ASB_RETRY_CONNECT ? -1 : -2;
 4308         case ASB_VIOLENTLY:
 4309             drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
 4310                  "assumption\n");
 4311         }
 4312     }
 4313 
 4314     if (test_bit(CONN_DRY_RUN, &connection->flags)) {
 4315         if (strategy == NO_SYNC)
 4316             drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
 4317         else
 4318             drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
 4319                  drbd_repl_str(strategy_descriptor(strategy).is_sync_target ? L_SYNC_TARGET : L_SYNC_SOURCE),
 4320                  strategy_descriptor(strategy).name);
 4321         return -2;
 4322     }
 4323 
 4324     r = bitmap_mod_after_handshake(peer_device, strategy, peer_node_id);
 4325     if (r)
 4326         return r;
 4327 
 4328     return goodness_to_repl_state(peer_device, peer_role, strategy);
 4329 }
 4330 
 4331 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
 4332 {
 4333     /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
 4334     if (peer == ASB_DISCARD_REMOTE)
 4335         return ASB_DISCARD_LOCAL;
 4336 
 4337     /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
 4338     if (peer == ASB_DISCARD_LOCAL)
 4339         return ASB_DISCARD_REMOTE;
 4340 
 4341     /* everything else is valid if they are equal on both sides. */
 4342     return peer;
 4343 }
 4344 
 4345 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
 4346 {
 4347     struct p_protocol *p = pi->data;
 4348     enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
 4349     int p_proto, p_discard_my_data, p_two_primaries, cf;
 4350     struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
 4351     char integrity_alg[SHARED_SECRET_MAX] = "";
 4352     struct crypto_shash *peer_integrity_tfm = NULL;
 4353     void *int_dig_in = NULL, *int_dig_vv = NULL;
 4354 
 4355     p_proto     = be32_to_cpu(p->protocol);
 4356     p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
 4357     p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
 4358     p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
 4359     p_two_primaries = be32_to_cpu(p->two_primaries);
 4360     cf      = be32_to_cpu(p->conn_flags);
 4361     p_discard_my_data = cf & CF_DISCARD_MY_DATA;
 4362 
 4363     if (connection->agreed_pro_version >= 87) {
 4364         int err;
 4365 
 4366         if (pi->size > sizeof(integrity_alg))
 4367             return -EIO;
 4368         err = drbd_recv_into(connection, integrity_alg, pi->size);
 4369         if (err)
 4370             return err;
 4371         integrity_alg[SHARED_SECRET_MAX - 1] = 0;
 4372     }
 4373 
 4374     if (pi->cmd != P_PROTOCOL_UPDATE) {
 4375         if (cf & CF_DRY_RUN)
 4376             set_bit(CONN_DRY_RUN, &connection->flags);
 4377 
 4378         rcu_read_lock();
 4379         nc = rcu_dereference(connection->transport.net_conf);
 4380 
 4381         if (p_proto != nc->wire_protocol) {
 4382             drbd_err(connection, "incompatible %s settings\n", "protocol");
 4383             goto disconnect_rcu_unlock;
 4384         }
 4385 
 4386         if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
 4387             drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
 4388             goto disconnect_rcu_unlock;
 4389         }
 4390 
 4391         if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
 4392             drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
 4393             goto disconnect_rcu_unlock;
 4394         }
 4395 
 4396         if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
 4397             drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
 4398             goto disconnect_rcu_unlock;
 4399         }
 4400 
 4401         if (p_discard_my_data && test_bit(CONN_DISCARD_MY_DATA, &connection->flags)) {
 4402             drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
 4403             goto disconnect_rcu_unlock;
 4404         }
 4405 
 4406         if (p_two_primaries != nc->two_primaries) {
 4407             drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
 4408             goto disconnect_rcu_unlock;
 4409         }
 4410 
 4411         if (strcmp(integrity_alg, nc->integrity_alg)) {
 4412             drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
 4413             goto disconnect_rcu_unlock;
 4414         }
 4415 
 4416         rcu_read_unlock();
 4417     }
 4418 
 4419     if (integrity_alg[0]) {
 4420         int hash_size;
 4421 
 4422         /*
 4423          * We can only change the peer data integrity algorithm
 4424          * here.  Changing our own data integrity algorithm
 4425          * requires that we send a P_PROTOCOL_UPDATE packet at
 4426          * the same time; otherwise, the peer has no way to
 4427          * tell between which packets the algorithm should
 4428          * change.
 4429          */
 4430 
 4431         peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
 4432         if (IS_ERR(peer_integrity_tfm)) {
 4433             peer_integrity_tfm = NULL;
 4434             drbd_err(connection, "peer data-integrity-alg %s not supported\n",
 4435                  integrity_alg);
 4436             goto disconnect;
 4437         }
 4438 
 4439         hash_size = crypto_shash_digestsize(peer_integrity_tfm);
 4440         int_dig_in = kmalloc(hash_size, GFP_KERNEL);
 4441         int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
 4442         if (!(int_dig_in && int_dig_vv)) {
 4443             drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
 4444             goto disconnect;
 4445         }
 4446     }
 4447 
 4448     new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
 4449     if (!new_net_conf) {
 4450         drbd_err(connection, "Allocation of new net_conf failed\n");
 4451         goto disconnect;
 4452     }
 4453 
 4454     if (mutex_lock_interruptible(&connection->resource->conf_update)) {
 4455         drbd_err(connection, "Interrupted while waiting for conf_update\n");
 4456         goto disconnect;
 4457     }
 4458 
 4459     mutex_lock(&connection->mutex[DATA_STREAM]);
 4460     old_net_conf = connection->transport.net_conf;
 4461     *new_net_conf = *old_net_conf;
 4462 
 4463     new_net_conf->wire_protocol = p_proto;
 4464     new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
 4465     new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
 4466     new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
 4467     new_net_conf->two_primaries = p_two_primaries;
 4468 
 4469     rcu_assign_pointer(connection->transport.net_conf, new_net_conf);
 4470     mutex_unlock(&connection->mutex[DATA_STREAM]);
 4471     mutex_unlock(&connection->resource->conf_update);
 4472 
 4473     crypto_free_shash(connection->peer_integrity_tfm);
 4474     kfree(connection->int_dig_in);
 4475     kfree(connection->int_dig_vv);
 4476     connection->peer_integrity_tfm = peer_integrity_tfm;
 4477     connection->int_dig_in = int_dig_in;
 4478     connection->int_dig_vv = int_dig_vv;
 4479 
 4480     if (strcmp(old_net_conf->integrity_alg, integrity_alg))
 4481         drbd_info(connection, "peer data-integrity-alg: %s\n",
 4482               integrity_alg[0] ? integrity_alg : "(none)");
 4483 
 4484     synchronize_rcu();
 4485     kfree(old_net_conf);
 4486     return 0;
 4487 
 4488 disconnect_rcu_unlock:
 4489     rcu_read_unlock();
 4490 disconnect:
 4491     crypto_free_shash(peer_integrity_tfm);
 4492     kfree(int_dig_in);
 4493     kfree(int_dig_vv);
 4494     change_cstate(connection, C_DISCONNECTING, CS_HARD);
 4495     return -EIO;
 4496 }
 4497 
 4498 /* helper function
 4499  * input: alg name, feature name
 4500  * return: NULL (alg name was "")
 4501  *         ERR_PTR(error) if something goes wrong
 4502  *         or the crypto hash ptr, if it worked out ok. */
 4503 static struct crypto_shash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
 4504         const char *alg, const char *name)
 4505 {
 4506     struct crypto_shash *tfm;
 4507 
 4508     if (!alg[0])
 4509         return NULL;
 4510 
 4511     tfm = crypto_alloc_shash(alg, 0, 0);
 4512     if (IS_ERR(tfm)) {
 4513         drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
 4514             alg, name, PTR_ERR(tfm));
 4515         return tfm;
 4516     }
 4517     return tfm;
 4518 }
 4519 
 4520 /*
 4521  * config_unknown_volume  -  device configuration command for unknown volume
 4522  *
 4523  * When a device is added to an existing connection, the node on which the
 4524  * device is added first will send configuration commands to its peer but the
 4525  * peer will not know about the device yet.  It will warn and ignore these
 4526  * commands.  Once the device is added on the second node, the second node will
 4527  * send the same device configuration commands, but in the other direction.
 4528  *
 4529  * (We can also end up here if drbd is misconfigured.)
 4530  */
 4531 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
 4532 {
 4533     drbd_warn(connection, "%s packet received for volume %d, which is not configured locally\n",
 4534           drbd_packet_name(pi->cmd), pi->vnr);
 4535     return ignore_remaining_packet(connection, pi->size);
 4536 }
 4537 
 4538 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
 4539 {
 4540     struct drbd_peer_device *peer_device;
 4541     struct drbd_device *device;
 4542     struct p_rs_param_95 *p;
 4543     unsigned int header_size, data_size, exp_max_sz;
 4544     struct crypto_shash *verify_tfm = NULL;
 4545     struct crypto_shash *csums_tfm = NULL;
 4546     struct net_conf *old_net_conf, *new_net_conf = NULL;
 4547     struct peer_device_conf *old_peer_device_conf = NULL, *new_peer_device_conf = NULL;
 4548     const int apv = connection->agreed_pro_version;
 4549     struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
 4550     struct drbd_resource *resource = connection->resource;
 4551     int fifo_size = 0;
 4552     int err;
 4553 
 4554     peer_device = conn_peer_device(connection, pi->vnr);
 4555     if (!peer_device)
 4556         return config_unknown_volume(connection, pi);
 4557     device = peer_device->device;
 4558 
 4559     exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
 4560             : apv == 88 ? sizeof(struct p_rs_param)
 4561                     + SHARED_SECRET_MAX
 4562             : apv <= 94 ? sizeof(struct p_rs_param_89)
 4563             : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 4564 
 4565     if (pi->size > exp_max_sz) {
 4566         drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
 4567             pi->size, exp_max_sz);
 4568         return -EIO;
 4569     }
 4570 
 4571     if (apv <= 88) {
 4572         header_size = sizeof(struct p_rs_param);
 4573         data_size = pi->size - header_size;
 4574     } else if (apv <= 94) {
 4575         header_size = sizeof(struct p_rs_param_89);
 4576         data_size = pi->size - header_size;
 4577         D_ASSERT(device, data_size == 0);
 4578     } else {
 4579         header_size = sizeof(struct p_rs_param_95);
 4580         data_size = pi->size - header_size;
 4581         D_ASSERT(device, data_size == 0);
 4582     }
 4583 
 4584     err = drbd_recv_all(connection, (void **)&p, header_size + data_size);
 4585     if (err)
 4586         return err;
 4587 
 4588     err = mutex_lock_interruptible(&resource->conf_update);
 4589     if (err) {
 4590         drbd_err(connection, "Interrupted while waiting for conf_update\n");
 4591         return err;
 4592     }
 4593     old_net_conf = connection->transport.net_conf;
 4594     if (get_ldev(device)) {
 4595         new_peer_device_conf = kzalloc(sizeof(struct peer_device_conf), GFP_KERNEL);
 4596         if (!new_peer_device_conf) {
 4597             put_ldev(device);
 4598             mutex_unlock(&resource->conf_update);
 4599             drbd_err(device, "Allocation of new peer_device_conf failed\n");
 4600             return -ENOMEM;
 4601         }
 4602         /* With a non-zero new_peer_device_conf, we will call put_ldev() below.  */
 4603 
 4604         old_peer_device_conf = peer_device->conf;
 4605         *new_peer_device_conf = *old_peer_device_conf;
 4606 
 4607         new_peer_device_conf->resync_rate = be32_to_cpu(p->resync_rate);
 4608     }
 4609 
 4610     if (apv >= 88) {
 4611         if (apv == 88) {
 4612             if (data_size > SHARED_SECRET_MAX || data_size == 0) {
 4613                 drbd_err(device, "verify-alg too long, "
 4614                      "peer wants %u, accepting only %u byte\n",
 4615                      data_size, SHARED_SECRET_MAX);
 4616                 goto reconnect;
 4617             }
 4618             p->verify_alg[data_size] = 0;
 4619 
 4620         } else /* apv >= 89 */ {
 4621             /* we still expect NUL terminated strings */
 4622             /* but just in case someone tries to be evil */
 4623             D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
 4624             D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
 4625             p->verify_alg[SHARED_SECRET_MAX-1] = 0;
 4626             p->csums_alg[SHARED_SECRET_MAX-1] = 0;
 4627         }
 4628 
 4629         if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
 4630             if (peer_device->repl_state[NOW] == L_OFF) {
 4631                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
 4632                     old_net_conf->verify_alg, p->verify_alg);
 4633                 goto disconnect;
 4634             }
 4635             verify_tfm = drbd_crypto_alloc_digest_safe(device,
 4636                     p->verify_alg, "verify-alg");
 4637             if (IS_ERR(verify_tfm)) {
 4638                 verify_tfm = NULL;
 4639                 goto disconnect;
 4640             }
 4641         }
 4642 
 4643         if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
 4644             if (peer_device->repl_state[NOW] == L_OFF) {
 4645                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
 4646                     old_net_conf->csums_alg, p->csums_alg);
 4647                 goto disconnect;
 4648             }
 4649             csums_tfm = drbd_crypto_alloc_digest_safe(device,
 4650                     p->csums_alg, "csums-alg");
 4651             if (IS_ERR(csums_tfm)) {
 4652                 csums_tfm = NULL;
 4653                 goto disconnect;
 4654             }
 4655         }
 4656 
 4657         if (apv > 94 && new_peer_device_conf) {
 4658             new_peer_device_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
 4659             new_peer_device_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
 4660             new_peer_device_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
 4661             new_peer_device_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
 4662 
 4663             fifo_size = (new_peer_device_conf->c_plan_ahead * 10 * RS_MAKE_REQS_INTV) / HZ;
 4664             old_plan = rcu_dereference_protected(peer_device->rs_plan_s,
 4665                 lockdep_is_held(&resource->conf_update));
 4666             if (!old_plan || fifo_size != old_plan->size) {
 4667                 new_plan = fifo_alloc(fifo_size);
 4668                 if (!new_plan) {
 4669                     drbd_err(device, "kmalloc of fifo_buffer failed");
 4670                     goto disconnect;
 4671                 }
 4672             }
 4673         }
 4674 
 4675         if (verify_tfm || csums_tfm) {
 4676             new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
 4677             if (!new_net_conf) {
 4678                 drbd_err(device, "Allocation of new net_conf failed\n");
 4679                 goto disconnect;
 4680             }
 4681 
 4682             *new_net_conf = *old_net_conf;
 4683 
 4684             if (verify_tfm) {
 4685                 strcpy(new_net_conf->verify_alg, p->verify_alg);
 4686                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
 4687                 crypto_free_shash(connection->verify_tfm);
 4688                 connection->verify_tfm = verify_tfm;
 4689                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
 4690             }
 4691             if (csums_tfm) {
 4692                 strcpy(new_net_conf->csums_alg, p->csums_alg);
 4693                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
 4694                 crypto_free_shash(connection->csums_tfm);
 4695                 connection->csums_tfm = csums_tfm;
 4696                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
 4697             }
 4698             rcu_assign_pointer(connection->transport.net_conf, new_net_conf);
 4699         }
 4700     }
 4701 
 4702     if (new_peer_device_conf) {
 4703         rcu_assign_pointer(peer_device->conf, new_peer_device_conf);
 4704         put_ldev(device);
 4705     }
 4706 
 4707     if (new_plan)
 4708         rcu_assign_pointer(peer_device->rs_plan_s, new_plan);
 4709 
 4710     mutex_unlock(&resource->conf_update);
 4711     synchronize_rcu();
 4712     if (new_net_conf)
 4713         kfree(old_net_conf);
 4714     kfree(old_peer_device_conf);
 4715     if (new_plan)
 4716         kfree(old_plan);
 4717 
 4718     return 0;
 4719 
 4720 reconnect:
 4721     if (new_peer_device_conf) {
 4722         put_ldev(device);
 4723         kfree(new_peer_device_conf);
 4724     }
 4725     mutex_unlock(&resource->conf_update);
 4726     return -EIO;
 4727 
 4728 disconnect:
 4729     kfree(new_plan);
 4730     if (new_peer_device_conf) {
 4731         put_ldev(device);
 4732         kfree(new_peer_device_conf);
 4733     }
 4734     mutex_unlock(&resource->conf_update);
 4735     /* just for completeness: actually not needed,
 4736      * as this is not reached if csums_tfm was ok. */
 4737     crypto_free_shash(csums_tfm);
 4738     /* but free the verify_tfm again, if csums_tfm did not work out */
 4739     crypto_free_shash(verify_tfm);
 4740     change_cstate(connection, C_DISCONNECTING, CS_HARD);
 4741     return -EIO;
 4742 }
 4743 
 4744 static void drbd_setup_order_type(struct drbd_device *device, int peer)
 4745 {
 4746     /* sorry, we currently have no working implementation
 4747      * of distributed TCQ */
 4748 }
 4749 
 4750 /* warn if the arguments differ by more than 12.5% */
 4751 static void warn_if_differ_considerably(struct drbd_peer_device *peer_device,
 4752     const char *s, sector_t a, sector_t b)
 4753 {
 4754     sector_t d;
 4755     if (a == 0 || b == 0)
 4756         return;
 4757     d = (a > b) ? (a - b) : (b - a);
 4758     if (d > (a>>3) || d > (b>>3))
 4759         drbd_warn(peer_device, "Considerable difference in %s: %llus vs. %llus\n", s,
 4760              (unsigned long long)a, (unsigned long long)b);
 4761 }
 4762 
 4763 /* Maximum bio size that a protocol version supports. */
 4764 static unsigned int conn_max_bio_size(struct drbd_connection *connection)
 4765 {
 4766     if (connection->agreed_pro_version >= 100)
 4767         return DRBD_MAX_BIO_SIZE;
 4768     else if (connection->agreed_pro_version >= 95)
 4769         return DRBD_MAX_BIO_SIZE_P95;
 4770     else
 4771         return DRBD_MAX_SIZE_H80_PACKET;
 4772 }
 4773 
 4774 static struct drbd_peer_device *get_neighbor_device(struct drbd_device *device,
 4775         enum drbd_neighbor neighbor)
 4776 {
 4777     s32 self_id, peer_id, pivot;
 4778     struct drbd_peer_device *peer_device, *peer_device_ret = NULL;
 4779 
 4780     if (!get_ldev(device))
 4781         return NULL;
 4782     self_id = device->ldev->md.node_id;
 4783     put_ldev(device);
 4784 
 4785     pivot = neighbor == NEXT_LOWER ? 0 : neighbor == NEXT_HIGHER ? S32_MAX : -1;
 4786     if (pivot == -1)
 4787         return NULL;
 4788 
 4789     rcu_read_lock();
 4790     for_each_peer_device_rcu(peer_device, device) {
 4791         bool found_new = false;
 4792         peer_id = peer_device->node_id;
 4793 
 4794         if (neighbor == NEXT_LOWER && peer_id < self_id && peer_id >= pivot)
 4795             found_new = true;
 4796         else if (neighbor == NEXT_HIGHER && peer_id > self_id && peer_id <= pivot)
 4797             found_new = true;
 4798 
 4799         if (found_new && peer_device->disk_state[NOW] >= D_INCONSISTENT) {
 4800             pivot = peer_id;
 4801             peer_device_ret = peer_device;
 4802         }
 4803     }
 4804     rcu_read_unlock();
 4805 
 4806     return peer_device_ret;
 4807 }
 4808 
 4809 static void maybe_trigger_resync(struct drbd_device *device, struct drbd_peer_device *peer_device, bool grew, bool skip)
 4810 {
 4811     if (!peer_device)
 4812         return;
 4813     if (peer_device->repl_state[NOW] <= L_OFF)
 4814         return;
 4815     if (test_and_clear_bit(RESIZE_PENDING, &peer_device->flags) ||
 4816         (grew && peer_device->repl_state[NOW] == L_ESTABLISHED)) {
 4817         if (peer_device->disk_state[NOW] >= D_INCONSISTENT &&
 4818             device->disk_state[NOW] >= D_INCONSISTENT) {
 4819             if (skip)
 4820                 drbd_info(peer_device, "Resync of new storage suppressed with --assume-clean\n");
 4821             else
 4822                 resync_after_online_grow(peer_device);
 4823         } else
 4824             set_bit(RESYNC_AFTER_NEG, &peer_device->flags);
 4825     }
 4826 }
 4827 
 4828 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
 4829 {
 4830     struct drbd_peer_device *peer_device, *peer_device_it = NULL;
 4831     struct drbd_device *device;
 4832     struct p_sizes *p = pi->data;
 4833     struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
 4834     uint64_t p_size, p_usize, p_csize;
 4835     uint64_t my_usize, my_max_size, cur_size;
 4836     enum determine_dev_size dd = DS_UNCHANGED;
 4837     bool should_send_sizes = false;
 4838     enum dds_flags ddsf;
 4839     unsigned int protocol_max_bio_size;
 4840     bool have_ldev = false;
 4841     bool have_mutex = false;
 4842     bool is_handshake;
 4843     int err;
 4844     u64 im;
 4845 
 4846     peer_device = conn_peer_device(connection, pi->vnr);
 4847     if (!peer_device)
 4848         return config_unknown_volume(connection, pi);
 4849     device = peer_device->device;
 4850 
 4851     err = mutex_lock_interruptible(&connection->resource->conf_update);
 4852     if (err) {
 4853         drbd_err(connection, "Interrupted while waiting for conf_update\n");
 4854         goto out;
 4855     }
 4856     have_mutex = true;
 4857 
 4858     /* just store the peer's disk size for now.
 4859      * we still need to figure out whether we accept that. */
 4860     p_size = be64_to_cpu(p->d_size);
 4861     p_usize = be64_to_cpu(p->u_size);
 4862     p_csize = be64_to_cpu(p->c_size);
 4863 
 4864     peer_device->d_size = p_size;
 4865     peer_device->u_size = p_usize;
 4866     peer_device->c_size = p_csize;
 4867 
 4868     /* Ignore "current" size for calculating "max" size. */
 4869     /* If it used to have a disk, but now is detached, don't revert back to zero. */
 4870     if (p_size)
 4871         peer_device->max_size = p_size;
 4872 
 4873     cur_size = drbd_get_capacity(device->this_bdev);
 4874     dynamic_drbd_dbg(device, "current_size: %llu\n", (unsigned long long)cur_size);
 4875     dynamic_drbd_dbg(peer_device, "c_size: %llu u_size: %llu d_size: %llu max_size: %llu\n",
 4876             (unsigned long long)p_csize,
 4877             (unsigned long long)p_usize,
 4878             (unsigned long long)p_size,
 4879             (unsigned long long)peer_device->max_size);
 4880 
 4881     if ((p_size && p_csize > p_size) || (p_usize && p_csize > p_usize)) {
 4882         drbd_warn(peer_device, "Peer sent bogus sizes, disconnecting\n");
 4883         goto disconnect;
 4884     }
 4885 
 4886     /* The protocol version limits how big requests can be.  In addition,
 4887      * peers before protocol version 94 cannot split large requests into
 4888      * multiple bios; their reported max_bio_size is a hard limit.
 4889      */
 4890     protocol_max_bio_size = conn_max_bio_size(connection);
 4891     peer_device->max_bio_size = min(be32_to_cpu(p->max_bio_size), protocol_max_bio_size);
 4892     ddsf = be16_to_cpu(p->dds_flags);
 4893 
 4894     is_handshake = (peer_device->repl_state[NOW] == L_OFF);
 4895     /* Maybe the peer knows something about peers I cannot currently see. */
 4896     ddsf |= DDSF_IGNORE_PEER_CONSTRAINTS;
 4897 
 4898     if (get_ldev(device)) {
 4899         sector_t new_size;
 4900 
 4901         have_ldev = true;
 4902 
 4903         rcu_read_lock();
 4904         my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
 4905         rcu_read_unlock();
 4906 
 4907         my_max_size = drbd_get_max_capacity(device, device->ldev, false);
 4908         dynamic_drbd_dbg(peer_device, "la_size: %llu my_usize: %llu my_max_size: %llu\n",
 4909             (unsigned long long)device->ldev->md.effective_size,
 4910             (unsigned long long)my_usize,
 4911             (unsigned long long)my_max_size);
 4912 
 4913         if (peer_device->disk_state[NOW] > D_DISKLESS)
 4914             warn_if_differ_considerably(peer_device, "lower level device sizes",
 4915                    p_size, my_max_size);
 4916         warn_if_differ_considerably(peer_device, "user requested size",
 4917                         p_usize, my_usize);
 4918 
 4919         if (is_handshake)
 4920             p_usize = min_not_zero(my_usize, p_usize);
 4921 
 4922         if (p_usize == 0) {
 4923             /* Peer may reset usize to zero only if it has a backend.
 4924              * Because a diskless node has no disk config,
 4925              * and always sends zero. */
 4926             if (p_size == 0)
 4927                 p_usize = my_usize;
 4928         }
 4929 
 4930         new_size = drbd_new_dev_size(device, p_csize, p_usize, ddsf);
 4931 
 4932         /* Never shrink a device with usable data during connect,
 4933          * or "attach" on the peer.
 4934          * But allow online shrinking if we are connected. */
 4935         if (new_size < cur_size &&
 4936             device->disk_state[NOW] >= D_OUTDATED &&
 4937             (peer_device->repl_state[NOW] < L_ESTABLISHED || peer_device->disk_state[NOW] == D_DISKLESS)) {
 4938             drbd_err(peer_device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
 4939                     (unsigned long long)new_size, (unsigned long long)cur_size);
 4940             goto disconnect;
 4941         }
 4942 
 4943         /* Disconnect, if we cannot grow to the peer's current size */
 4944         if (my_max_size < p_csize && !is_handshake) {
 4945             drbd_err(peer_device, "Peer's size larger than my maximum capacity (%llu < %llu sectors)\n",
 4946                     (unsigned long long)my_max_size, (unsigned long long)p_csize);
 4947             goto disconnect;
 4948         }
 4949 
 4950         if (my_usize != p_usize) {
 4951             struct disk_conf *old_disk_conf, *new_disk_conf;
 4952 
 4953             new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
 4954             if (!new_disk_conf) {
 4955                 drbd_err(device, "Allocation of new disk_conf failed\n");
 4956                 err = -ENOMEM;
 4957                 goto out;
 4958             }
 4959 
 4960             old_disk_conf = device->ldev->disk_conf;
 4961             *new_disk_conf = *old_disk_conf;
 4962             new_disk_conf->disk_size = p_usize;
 4963 
 4964             rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
 4965             synchronize_rcu();
 4966             kfree(old_disk_conf);
 4967 
 4968             drbd_info(peer_device, "Peer sets u_size to %llu sectors (old: %llu)\n",
 4969                  (unsigned long long)p_usize, (unsigned long long)my_usize);
 4970             /* Do not set should_send_sizes here. That might cause packet storms */
 4971         }
 4972     }
 4973 
 4974     /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
 4975        In case we cleared the QUEUE_FLAG_DISCARD from our queue in
 4976        drbd_reconsider_queue_parameters(), we can be sure that after
 4977        drbd_determine_dev_size() no REQ_OP_DISCARDs are in the queue. */
 4978     if (have_ldev) {
 4979         enum dds_flags local_ddsf = ddsf;
 4980         drbd_reconsider_queue_parameters(device, device->ldev, o);
 4981 
 4982         /* To support thinly provisioned nodes (partial resync) joining later,
 4983            clear all bitmap slots, including the unused ones. */
 4984         if (device->ldev->md.effective_size == 0)
 4985             local_ddsf |= DDSF_NO_RESYNC;
 4986 
 4987         dd = drbd_determine_dev_size(device, p_csize, local_ddsf, NULL);
 4988 
 4989         if (dd == DS_GREW || dd == DS_SHRUNK)
 4990             should_send_sizes = true;
 4991 
 4992         if (dd == DS_ERROR) {
 4993             err = -EIO;
 4994             goto out;
 4995         }
 4996         drbd_md_sync_if_dirty(device);
 4997     } else {
 4998         uint64_t new_size = 0;
 4999 
 5000         drbd_reconsider_queue_parameters(device, NULL, o);
 5001         /* In case I am diskless, need to accept the peer's *current* size.
 5002          *
 5003          * At this point, the peer knows more about my disk, or at
 5004          * least about what we last agreed upon, than myself.
 5005          * So if his c_size is less than his d_size, the most likely
 5006          * reason is that *my* d_size was smaller last time we checked,
 5007          * or some other peer does not (yet) have enough room.
 5008          *
 5009          * Unless of course he does not have a disk himself.
 5010          * In which case we ignore this completely.
 5011          */
 5012         new_size = p_csize;
 5013         new_size = min_not_zero(new_size, p_usize);
 5014         new_size = min_not_zero(new_size, p_size);
 5015 
 5016         if (new_size == 0) {
 5017             /* Ignore, peer does not know nothing. */
 5018         } else if (new_size == cur_size) {
 5019             /* nothing to do */
 5020         } else if (cur_size != 0 && p_size == 0) {
 5021             dynamic_drbd_dbg(peer_device,
 5022                     "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
 5023                     (unsigned long long)new_size, (unsigned long long)cur_size);
 5024         } else if (new_size < cur_size && device->resource->role[NOW] == R_PRIMARY) {
 5025             drbd_err(peer_device,
 5026                 "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
 5027                 (unsigned long long)new_size, (unsigned long long)cur_size);
 5028             goto disconnect;
 5029 
 5030 /* FIXME for each peer device: can I currently see any peer with attached disk
 5031  * with a current size smaller than what that guy advertises? Then I better not
 5032  * believe him.
 5033  */
 5034         } else {
 5035             /* I believe the peer, if
 5036              *  - I don't have a current size myself
 5037              *  - we agree on the size anyways
 5038              *  - I do have a current size, am Secondary,
 5039              *    and he has the only disk
 5040              *  - I do have a current size, am Primary,
 5041              *    and he has the only disk,
 5042              *    which is larger than my current size
 5043              */
 5044             should_send_sizes = true;
 5045             drbd_set_my_capacity(device, new_size);
 5046         }
 5047     }
 5048 
 5049     if (device->device_conf.max_bio_size > protocol_max_bio_size ||
 5050         (connection->agreed_pro_version < 94 &&
 5051          device->device_conf.max_bio_size > peer_device->max_bio_size)) {
 5052         drbd_err(device, "Peer cannot deal with requests bigger than %u. "
 5053              "Please reduce max_bio_size in the configuration.\n",
 5054              peer_device->max_bio_size);
 5055         goto disconnect;
 5056     }
 5057 
 5058     if (have_ldev) {
 5059         if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
 5060             device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
 5061             should_send_sizes = true;
 5062         }
 5063 
 5064         drbd_setup_order_type(device, be16_to_cpu(p->queue_order_type));
 5065     }
 5066 
 5067     cur_size = drbd_get_capacity(device->this_bdev);
 5068 
 5069     for_each_peer_device_ref(peer_device_it, im, device) {
 5070         struct drbd_connection *con_it = peer_device_it->connection;
 5071 
 5072         /* drop cached max_size, if we already grew beyond it */
 5073         if (peer_device_it->max_size < cur_size)
 5074             peer_device_it->max_size = 0;
 5075 
 5076         if (con_it->cstate[NOW] < C_CONNECTED)
 5077             continue;
 5078 
 5079         /* Send size updates only if something relevant has changed.
 5080          * TODO: only tell the sender thread to do so,
 5081          * or we may end up in a distributed deadlock on congestion. */
 5082 
 5083         if (should_send_sizes)
 5084             drbd_send_sizes(peer_device_it, p_usize, ddsf);
 5085     }
 5086 
 5087     maybe_trigger_resync(device, get_neighbor_device(device, NEXT_HIGHER),
 5088                     dd == DS_GREW, ddsf & DDSF_NO_RESYNC);
 5089     maybe_trigger_resync(device, get_neighbor_device(device, NEXT_LOWER),
 5090                     dd == DS_GREW, ddsf & DDSF_NO_RESYNC);
 5091     err = 0;
 5092 
 5093 out:
 5094     if (have_ldev)
 5095         put_ldev(device);
 5096     if (have_mutex)
 5097         mutex_unlock(&connection->resource->conf_update);
 5098     return err;
 5099 
 5100 disconnect:
 5101     /* don't let a rejected peer confuse future handshakes with different peers. */
 5102     peer_device->max_size = 0;
 5103     change_cstate(connection, C_DISCONNECTING, CS_HARD);
 5104     err = -EIO;
 5105     goto out;
 5106 }
 5107 
 5108 static enum sync_strategy resolve_splitbrain_from_disk_states(struct drbd_peer_device *peer_device)
 5109 {
 5110     struct drbd_device *device = peer_device->device;
 5111     enum drbd_disk_state peer_disk_state = peer_device->disk_state[NOW];
 5112     enum drbd_disk_state disk_state = device->disk_state[NOW];
 5113 
 5114     return  disk_state <= D_UP_TO_DATE && peer_disk_state == D_UP_TO_DATE ? SYNC_TARGET_USE_BITMAP :
 5115         disk_state == D_UP_TO_DATE && peer_disk_state <= D_UP_TO_DATE ? SYNC_SOURCE_USE_BITMAP :
 5116         SPLIT_BRAIN_AUTO_RECOVER;
 5117 }
 5118 
 5119 static void drbd_resync(struct drbd_peer_device *peer_device,
 5120             enum resync_reason reason) __must_hold(local)
 5121 {
 5122     enum drbd_role peer_role = peer_device->connection->peer_role[NOW];
 5123     enum drbd_repl_state new_repl_state;
 5124     enum drbd_disk_state peer_disk_state;
 5125     enum sync_strategy strategy;
 5126     int rule_nr, peer_node_id;
 5127     enum drbd_state_rv rv;
 5128 
 5129     strategy = drbd_handshake(peer_device, &rule_nr, &peer_node_id, reason == DISKLESS_PRIMARY);
 5130     if (strategy == SPLIT_BRAIN_AUTO_RECOVER && reason == AFTER_UNSTABLE)
 5131         strategy = resolve_splitbrain_from_disk_states(peer_device);
 5132 
 5133     if (!is_strategy_determined(strategy)) {
 5134         drbd_info(peer_device, "Unexpected result of handshake() %s!\n", strategy_descriptor(strategy).name);
 5135         return;
 5136     }
 5137 
 5138     new_repl_state = goodness_to_repl_state(peer_device, peer_role, strategy);
 5139     if (new_repl_state != L_ESTABLISHED) {
 5140         bitmap_mod_after_handshake(peer_device, strategy, peer_node_id);
 5141         drbd_info(peer_device, "Becoming %s %s\n", drbd_repl_str(new_repl_state),
 5142               reason == AFTER_UNSTABLE ? "after unstable" : "because primary is diskless");
 5143     }
 5144 
 5145     peer_disk_state = peer_device->disk_state[NOW];
 5146     if (new_repl_state == L_ESTABLISHED && peer_disk_state >= D_CONSISTENT &&
 5147         peer_device->device->disk_state[NOW] == D_OUTDATED) {
 5148         /* No resync with up-to-date peer -> I should be consistent or up-to-date as well.
 5149            Note: Former unstable (but up-to-date) nodes become consistent for a short
 5150            time after loosing their primary peer. Therefore consider consistent here
 5151            as well. */
 5152         drbd_info(peer_device, "Upgrading local disk to %s after unstable/weak (and no resync).\n",
 5153               drbd_disk_str(peer_disk_state));
 5154         change_disk_state(peer_device->device, peer_disk_state, CS_VERBOSE, NULL);
 5155         return;
 5156     }
 5157 
 5158     rv = change_repl_state(peer_device, new_repl_state, CS_VERBOSE);
 5159     if ((rv == SS_NOTHING_TO_DO || rv == SS_RESYNC_RUNNING) &&
 5160         (new_repl_state == L_WF_BITMAP_S || new_repl_state == L_WF_BITMAP_T)) {
 5161         /* Those events might happen very quickly. In case we are still processing
 5162            the previous resync we need to re-enter that state. Schedule sending of
 5163            the bitmap here explicitly */
 5164         peer_device->resync_again++;
 5165         drbd_info(peer_device, "...postponing this until current resync finished\n");
 5166     }
 5167 }
 5168 
 5169 static void update_bitmap_slot_of_peer(struct drbd_peer_device *peer_device, int node_id, u64 bitmap_uuid)
 5170 {
 5171     if (peer_device->bitmap_uuids[node_id] && bitmap_uuid == 0) {
 5172         /* If we learn from a neighbor that it no longer has a bitmap
 5173            against a third node, we need to deduce from that knowledge
 5174            that in the other direction the bitmap was cleared as well.
 5175          */
 5176         struct drbd_peer_device *peer_device2;
 5177 
 5178         rcu_read_lock();
 5179         peer_device2 = peer_device_by_node_id(peer_device->device, node_id);
 5180         if (peer_device2) {
 5181             int node_id2 = peer_device->connection->peer_node_id;
 5182             peer_device2->bitmap_uuids[node_id2] = 0;
 5183         }
 5184         rcu_read_unlock();
 5185     }
 5186     peer_device->bitmap_uuids[node_id] = bitmap_uuid;
 5187 }
 5188 
 5189 static int __receive_uuids(