"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.0.8/src/cluster.c" (10 Sep 2020, 235596 Bytes) of package /linux/misc/redis-6.0.8.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "cluster.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.7_vs_6.0.8.

    1 /* Redis Cluster implementation.
    2  *
    3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
    4  * All rights reserved.
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions are met:
    8  *
    9  *   * Redistributions of source code must retain the above copyright notice,
   10  *     this list of conditions and the following disclaimer.
   11  *   * Redistributions in binary form must reproduce the above copyright
   12  *     notice, this list of conditions and the following disclaimer in the
   13  *     documentation and/or other materials provided with the distribution.
   14  *   * Neither the name of Redis nor the names of its contributors may be used
   15  *     to endorse or promote products derived from this software without
   16  *     specific prior written permission.
   17  *
   18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   28  * POSSIBILITY OF SUCH DAMAGE.
   29  */
   30 
   31 #include "server.h"
   32 #include "cluster.h"
   33 #include "endianconv.h"
   34 
   35 #include <sys/types.h>
   36 #include <sys/socket.h>
   37 #include <arpa/inet.h>
   38 #include <fcntl.h>
   39 #include <unistd.h>
   40 #include <sys/stat.h>
   41 #include <sys/file.h>
   42 #include <math.h>
   43 
   44 /* A global reference to myself is handy to make code more clear.
   45  * Myself always points to server.cluster->myself, that is, the clusterNode
   46  * that represents this node. */
   47 clusterNode *myself = NULL;
   48 
   49 clusterNode *createClusterNode(char *nodename, int flags);
   50 int clusterAddNode(clusterNode *node);
   51 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
   52 void clusterReadHandler(connection *conn);
   53 void clusterSendPing(clusterLink *link, int type);
   54 void clusterSendFail(char *nodename);
   55 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
   56 void clusterUpdateState(void);
   57 int clusterNodeGetSlotBit(clusterNode *n, int slot);
   58 sds clusterGenNodesDescription(int filter);
   59 clusterNode *clusterLookupNode(const char *name);
   60 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
   61 int clusterAddSlot(clusterNode *n, int slot);
   62 int clusterDelSlot(int slot);
   63 int clusterDelNodeSlots(clusterNode *node);
   64 int clusterNodeSetSlotBit(clusterNode *n, int slot);
   65 void clusterSetMaster(clusterNode *n);
   66 void clusterHandleSlaveFailover(void);
   67 void clusterHandleSlaveMigration(int max_slaves);
   68 int bitmapTestBit(unsigned char *bitmap, int pos);
   69 void clusterDoBeforeSleep(int flags);
   70 void clusterSendUpdate(clusterLink *link, clusterNode *node);
   71 void resetManualFailover(void);
   72 void clusterCloseAllSlots(void);
   73 void clusterSetNodeAsMaster(clusterNode *n);
   74 void clusterDelNode(clusterNode *delnode);
   75 sds representClusterNodeFlags(sds ci, uint16_t flags);
   76 uint64_t clusterGetMaxEpoch(void);
   77 int clusterBumpConfigEpochWithoutConsensus(void);
   78 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
   79 
   80 /* -----------------------------------------------------------------------------
   81  * Initialization
   82  * -------------------------------------------------------------------------- */
   83 
   84 /* Load the cluster config from 'filename'.
   85  *
   86  * If the file does not exist or is zero-length (this may happen because
   87  * when we lock the nodes.conf file, we create a zero-length one for the
   88  * sake of locking if it does not already exist), C_ERR is returned.
   89  * If the configuration was loaded from the file, C_OK is returned. */
   90 int clusterLoadConfig(char *filename) {
   91     FILE *fp = fopen(filename,"r");
   92     struct stat sb;
   93     char *line;
   94     int maxline, j;
   95 
   96     if (fp == NULL) {
   97         if (errno == ENOENT) {
   98             return C_ERR;
   99         } else {
  100             serverLog(LL_WARNING,
  101                 "Loading the cluster node config from %s: %s",
  102                 filename, strerror(errno));
  103             exit(1);
  104         }
  105     }
  106 
  107     /* Check if the file is zero-length: if so return C_ERR to signal
  108      * we have to write the config. */
  109     if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
  110         fclose(fp);
  111         return C_ERR;
  112     }
  113 
  114     /* Parse the file. Note that single lines of the cluster config file can
  115      * be really long as they include all the hash slots of the node.
  116      * This means in the worst possible case, half of the Redis slots will be
  117      * present in a single line, possibly in importing or migrating state, so
  118      * together with the node ID of the sender/receiver.
  119      *
  120      * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
  121     maxline = 1024+CLUSTER_SLOTS*128;
  122     line = zmalloc(maxline);
  123     while(fgets(line,maxline,fp) != NULL) {
  124         int argc;
  125         sds *argv;
  126         clusterNode *n, *master;
  127         char *p, *s;
  128 
  129         /* Skip blank lines, they can be created either by users manually
  130          * editing nodes.conf or by the config writing process if stopped
  131          * before the truncate() call. */
  132         if (line[0] == '\n' || line[0] == '\0') continue;
  133 
  134         /* Split the line into arguments for processing. */
  135         argv = sdssplitargs(line,&argc);
  136         if (argv == NULL) goto fmterr;
  137 
  138         /* Handle the special "vars" line. Don't pretend it is the last
  139          * line even if it actually is when generated by Redis. */
  140         if (strcasecmp(argv[0],"vars") == 0) {
  141             if (!(argc % 2)) goto fmterr;
  142             for (j = 1; j < argc; j += 2) {
  143                 if (strcasecmp(argv[j],"currentEpoch") == 0) {
  144                     server.cluster->currentEpoch =
  145                             strtoull(argv[j+1],NULL,10);
  146                 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
  147                     server.cluster->lastVoteEpoch =
  148                             strtoull(argv[j+1],NULL,10);
  149                 } else {
  150                     serverLog(LL_WARNING,
  151                         "Skipping unknown cluster config variable '%s'",
  152                         argv[j]);
  153                 }
  154             }
  155             sdsfreesplitres(argv,argc);
  156             continue;
  157         }
  158 
  159         /* Regular config lines have at least eight fields */
  160         if (argc < 8) {
  161             sdsfreesplitres(argv,argc);
  162             goto fmterr;
  163         }
  164 
  165         /* Create this node if it does not exist */
  166         n = clusterLookupNode(argv[0]);
  167         if (!n) {
  168             n = createClusterNode(argv[0],0);
  169             clusterAddNode(n);
  170         }
  171         /* Address and port */
  172         if ((p = strrchr(argv[1],':')) == NULL) {
  173             sdsfreesplitres(argv,argc);
  174             goto fmterr;
  175         }
  176         *p = '\0';
  177         memcpy(n->ip,argv[1],strlen(argv[1])+1);
  178         char *port = p+1;
  179         char *busp = strchr(port,'@');
  180         if (busp) {
  181             *busp = '\0';
  182             busp++;
  183         }
  184         n->port = atoi(port);
  185         /* In older versions of nodes.conf the "@busport" part is missing.
  186          * In this case we set it to the default offset of 10000 from the
  187          * base port. */
  188         n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
  189 
  190         /* Parse flags */
  191         p = s = argv[2];
  192         while(p) {
  193             p = strchr(s,',');
  194             if (p) *p = '\0';
  195             if (!strcasecmp(s,"myself")) {
  196                 serverAssert(server.cluster->myself == NULL);
  197                 myself = server.cluster->myself = n;
  198                 n->flags |= CLUSTER_NODE_MYSELF;
  199             } else if (!strcasecmp(s,"master")) {
  200                 n->flags |= CLUSTER_NODE_MASTER;
  201             } else if (!strcasecmp(s,"slave")) {
  202                 n->flags |= CLUSTER_NODE_SLAVE;
  203             } else if (!strcasecmp(s,"fail?")) {
  204                 n->flags |= CLUSTER_NODE_PFAIL;
  205             } else if (!strcasecmp(s,"fail")) {
  206                 n->flags |= CLUSTER_NODE_FAIL;
  207                 n->fail_time = mstime();
  208             } else if (!strcasecmp(s,"handshake")) {
  209                 n->flags |= CLUSTER_NODE_HANDSHAKE;
  210             } else if (!strcasecmp(s,"noaddr")) {
  211                 n->flags |= CLUSTER_NODE_NOADDR;
  212             } else if (!strcasecmp(s,"nofailover")) {
  213                 n->flags |= CLUSTER_NODE_NOFAILOVER;
  214             } else if (!strcasecmp(s,"noflags")) {
  215                 /* nothing to do */
  216             } else {
  217                 serverPanic("Unknown flag in redis cluster config file");
  218             }
  219             if (p) s = p+1;
  220         }
  221 
  222         /* Get master if any. Set the master and populate master's
  223          * slave list. */
  224         if (argv[3][0] != '-') {
  225             master = clusterLookupNode(argv[3]);
  226             if (!master) {
  227                 master = createClusterNode(argv[3],0);
  228                 clusterAddNode(master);
  229             }
  230             n->slaveof = master;
  231             clusterNodeAddSlave(master,n);
  232         }
  233 
  234         /* Set ping sent / pong received timestamps */
  235         if (atoi(argv[4])) n->ping_sent = mstime();
  236         if (atoi(argv[5])) n->pong_received = mstime();
  237 
  238         /* Set configEpoch for this node. */
  239         n->configEpoch = strtoull(argv[6],NULL,10);
  240 
  241         /* Populate hash slots served by this instance. */
  242         for (j = 8; j < argc; j++) {
  243             int start, stop;
  244 
  245             if (argv[j][0] == '[') {
  246                 /* Here we handle migrating / importing slots */
  247                 int slot;
  248                 char direction;
  249                 clusterNode *cn;
  250 
  251                 p = strchr(argv[j],'-');
  252                 serverAssert(p != NULL);
  253                 *p = '\0';
  254                 direction = p[1]; /* Either '>' or '<' */
  255                 slot = atoi(argv[j]+1);
  256                 if (slot < 0 || slot >= CLUSTER_SLOTS) {
  257                     sdsfreesplitres(argv,argc);
  258                     goto fmterr;
  259                 }
  260                 p += 3;
  261                 cn = clusterLookupNode(p);
  262                 if (!cn) {
  263                     cn = createClusterNode(p,0);
  264                     clusterAddNode(cn);
  265                 }
  266                 if (direction == '>') {
  267                     server.cluster->migrating_slots_to[slot] = cn;
  268                 } else {
  269                     server.cluster->importing_slots_from[slot] = cn;
  270                 }
  271                 continue;
  272             } else if ((p = strchr(argv[j],'-')) != NULL) {
  273                 *p = '\0';
  274                 start = atoi(argv[j]);
  275                 stop = atoi(p+1);
  276             } else {
  277                 start = stop = atoi(argv[j]);
  278             }
  279             if (start < 0 || start >= CLUSTER_SLOTS ||
  280                 stop < 0 || stop >= CLUSTER_SLOTS)
  281             {
  282                 sdsfreesplitres(argv,argc);
  283                 goto fmterr;
  284             }
  285             while(start <= stop) clusterAddSlot(n, start++);
  286         }
  287 
  288         sdsfreesplitres(argv,argc);
  289     }
  290     /* Config sanity check */
  291     if (server.cluster->myself == NULL) goto fmterr;
  292 
  293     zfree(line);
  294     fclose(fp);
  295 
  296     serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
  297 
  298     /* Something that should never happen: currentEpoch smaller than
  299      * the max epoch found in the nodes configuration. However we handle this
  300      * as some form of protection against manual editing of critical files. */
  301     if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
  302         server.cluster->currentEpoch = clusterGetMaxEpoch();
  303     }
  304     return C_OK;
  305 
  306 fmterr:
  307     serverLog(LL_WARNING,
  308         "Unrecoverable error: corrupted cluster config file.");
  309     zfree(line);
  310     if (fp) fclose(fp);
  311     exit(1);
  312 }
  313 
  314 /* Cluster node configuration is exactly the same as CLUSTER NODES output.
  315  *
  316  * This function writes the node config and returns 0, on error -1
  317  * is returned.
  318  *
  319  * Note: we need to write the file in an atomic way from the point of view
  320  * of the POSIX filesystem semantics, so that if the server is stopped
  321  * or crashes during the write, we'll end with either the old file or the
  322  * new one. Since we have the full payload to write available we can use
  323  * a single write to write the whole file. If the pre-existing file was
  324  * bigger we pad our payload with newlines that are anyway ignored and truncate
  325  * the file afterward. */
  326 int clusterSaveConfig(int do_fsync) {
  327     sds ci;
  328     size_t content_size;
  329     struct stat sb;
  330     int fd;
  331 
  332     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
  333 
  334     /* Get the nodes description and concatenate our "vars" directive to
  335      * save currentEpoch and lastVoteEpoch. */
  336     ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
  337     ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
  338         (unsigned long long) server.cluster->currentEpoch,
  339         (unsigned long long) server.cluster->lastVoteEpoch);
  340     content_size = sdslen(ci);
  341 
  342     if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
  343         == -1) goto err;
  344 
  345     /* Pad the new payload if the existing file length is greater. */
  346     if (fstat(fd,&sb) != -1) {
  347         if (sb.st_size > (off_t)content_size) {
  348             ci = sdsgrowzero(ci,sb.st_size);
  349             memset(ci+content_size,'\n',sb.st_size-content_size);
  350         }
  351     }
  352     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
  353     if (do_fsync) {
  354         server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
  355         fsync(fd);
  356     }
  357 
  358     /* Truncate the file if needed to remove the final \n padding that
  359      * is just garbage. */
  360     if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
  361         /* ftruncate() failing is not a critical error. */
  362     }
  363     close(fd);
  364     sdsfree(ci);
  365     return 0;
  366 
  367 err:
  368     if (fd != -1) close(fd);
  369     sdsfree(ci);
  370     return -1;
  371 }
  372 
  373 void clusterSaveConfigOrDie(int do_fsync) {
  374     if (clusterSaveConfig(do_fsync) == -1) {
  375         serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
  376         exit(1);
  377     }
  378 }
  379 
  380 /* Lock the cluster config using flock(), and leaks the file descritor used to
  381  * acquire the lock so that the file will be locked forever.
  382  *
  383  * This works because we always update nodes.conf with a new version
  384  * in-place, reopening the file, and writing to it in place (later adjusting
  385  * the length with ftruncate()).
  386  *
  387  * On success C_OK is returned, otherwise an error is logged and
  388  * the function returns C_ERR to signal a lock was not acquired. */
  389 int clusterLockConfig(char *filename) {
  390 /* flock() does not exist on Solaris
  391  * and a fcntl-based solution won't help, as we constantly re-open that file,
  392  * which will release _all_ locks anyway
  393  */
  394 #if !defined(__sun)
  395     /* To lock it, we need to open the file in a way it is created if
  396      * it does not exist, otherwise there is a race condition with other
  397      * processes. */
  398     int fd = open(filename,O_WRONLY|O_CREAT,0644);
  399     if (fd == -1) {
  400         serverLog(LL_WARNING,
  401             "Can't open %s in order to acquire a lock: %s",
  402             filename, strerror(errno));
  403         return C_ERR;
  404     }
  405 
  406     if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
  407         if (errno == EWOULDBLOCK) {
  408             serverLog(LL_WARNING,
  409                  "Sorry, the cluster configuration file %s is already used "
  410                  "by a different Redis Cluster node. Please make sure that "
  411                  "different nodes use different cluster configuration "
  412                  "files.", filename);
  413         } else {
  414             serverLog(LL_WARNING,
  415                 "Impossible to lock %s: %s", filename, strerror(errno));
  416         }
  417         close(fd);
  418         return C_ERR;
  419     }
  420     /* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
  421      * lock to the file as long as the process exists.
  422      *
  423      * After fork, the child process will get the fd opened by the parent process,
  424      * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
  425      * it will be closed in the child process.
  426      * If it is not closed, when the main process is killed -9, but the child process
  427      * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
  428      * child process, and the main process will fail to get lock, means fail to start. */
  429     server.cluster_config_file_lock_fd = fd;
  430 #endif /* __sun */
  431 
  432     return C_OK;
  433 }
  434 
  435 /* Some flags (currently just the NOFAILOVER flag) may need to be updated
  436  * in the "myself" node based on the current configuration of the node,
  437  * that may change at runtime via CONFIG SET. This function changes the
  438  * set of flags in myself->flags accordingly. */
  439 void clusterUpdateMyselfFlags(void) {
  440     int oldflags = myself->flags;
  441     int nofailover = server.cluster_slave_no_failover ?
  442                      CLUSTER_NODE_NOFAILOVER : 0;
  443     myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
  444     myself->flags |= nofailover;
  445     if (myself->flags != oldflags) {
  446         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
  447                              CLUSTER_TODO_UPDATE_STATE);
  448     }
  449 }
  450 
  451 void clusterInit(void) {
  452     int saveconf = 0;
  453 
  454     server.cluster = zmalloc(sizeof(clusterState));
  455     server.cluster->myself = NULL;
  456     server.cluster->currentEpoch = 0;
  457     server.cluster->state = CLUSTER_FAIL;
  458     server.cluster->size = 1;
  459     server.cluster->todo_before_sleep = 0;
  460     server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
  461     server.cluster->nodes_black_list =
  462         dictCreate(&clusterNodesBlackListDictType,NULL);
  463     server.cluster->failover_auth_time = 0;
  464     server.cluster->failover_auth_count = 0;
  465     server.cluster->failover_auth_rank = 0;
  466     server.cluster->failover_auth_epoch = 0;
  467     server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
  468     server.cluster->lastVoteEpoch = 0;
  469     for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
  470         server.cluster->stats_bus_messages_sent[i] = 0;
  471         server.cluster->stats_bus_messages_received[i] = 0;
  472     }
  473     server.cluster->stats_pfail_nodes = 0;
  474     memset(server.cluster->slots,0, sizeof(server.cluster->slots));
  475     clusterCloseAllSlots();
  476 
  477     /* Lock the cluster config file to make sure every node uses
  478      * its own nodes.conf. */
  479     server.cluster_config_file_lock_fd = -1;
  480     if (clusterLockConfig(server.cluster_configfile) == C_ERR)
  481         exit(1);
  482 
  483     /* Load or create a new nodes configuration. */
  484     if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
  485         /* No configuration found. We will just use the random name provided
  486          * by the createClusterNode() function. */
  487         myself = server.cluster->myself =
  488             createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
  489         serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
  490             myself->name);
  491         clusterAddNode(myself);
  492         saveconf = 1;
  493     }
  494     if (saveconf) clusterSaveConfigOrDie(1);
  495 
  496     /* We need a listening TCP port for our cluster messaging needs. */
  497     server.cfd_count = 0;
  498 
  499     /* Port sanity check II
  500      * The other handshake port check is triggered too late to stop
  501      * us from trying to use a too-high cluster port number. */
  502     int port = server.tls_cluster ? server.tls_port : server.port;
  503     if (port > (65535-CLUSTER_PORT_INCR)) {
  504         serverLog(LL_WARNING, "Redis port number too high. "
  505                    "Cluster communication port is 10,000 port "
  506                    "numbers higher than your Redis port. "
  507                    "Your Redis port number must be "
  508                    "lower than 55535.");
  509         exit(1);
  510     }
  511     if (listenToPort(port+CLUSTER_PORT_INCR,
  512         server.cfd,&server.cfd_count) == C_ERR)
  513     {
  514         exit(1);
  515     } else {
  516         int j;
  517 
  518         for (j = 0; j < server.cfd_count; j++) {
  519             if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
  520                 clusterAcceptHandler, NULL) == AE_ERR)
  521                     serverPanic("Unrecoverable error creating Redis Cluster "
  522                                 "file event.");
  523         }
  524     }
  525 
  526     /* The slots -> keys map is a radix tree. Initialize it here. */
  527     server.cluster->slots_to_keys = raxNew();
  528     memset(server.cluster->slots_keys_count,0,
  529            sizeof(server.cluster->slots_keys_count));
  530 
  531     /* Set myself->port / cport to my listening ports, we'll just need to
  532      * discover the IP address via MEET messages. */
  533     myself->port = port;
  534     myself->cport = port+CLUSTER_PORT_INCR;
  535     if (server.cluster_announce_port)
  536         myself->port = server.cluster_announce_port;
  537     if (server.cluster_announce_bus_port)
  538         myself->cport = server.cluster_announce_bus_port;
  539 
  540     server.cluster->mf_end = 0;
  541     resetManualFailover();
  542     clusterUpdateMyselfFlags();
  543 }
  544 
  545 /* Reset a node performing a soft or hard reset:
  546  *
  547  * 1) All other nodes are forget.
  548  * 2) All the assigned / open slots are released.
  549  * 3) If the node is a slave, it turns into a master.
  550  * 5) Only for hard reset: a new Node ID is generated.
  551  * 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
  552  * 7) The new configuration is saved and the cluster state updated.
  553  * 8) If the node was a slave, the whole data set is flushed away. */
  554 void clusterReset(int hard) {
  555     dictIterator *di;
  556     dictEntry *de;
  557     int j;
  558 
  559     /* Turn into master. */
  560     if (nodeIsSlave(myself)) {
  561         clusterSetNodeAsMaster(myself);
  562         replicationUnsetMaster();
  563         emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
  564     }
  565 
  566     /* Close slots, reset manual failover state. */
  567     clusterCloseAllSlots();
  568     resetManualFailover();
  569 
  570     /* Unassign all the slots. */
  571     for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
  572 
  573     /* Forget all the nodes, but myself. */
  574     di = dictGetSafeIterator(server.cluster->nodes);
  575     while((de = dictNext(di)) != NULL) {
  576         clusterNode *node = dictGetVal(de);
  577 
  578         if (node == myself) continue;
  579         clusterDelNode(node);
  580     }
  581     dictReleaseIterator(di);
  582 
  583     /* Hard reset only: set epochs to 0, change node ID. */
  584     if (hard) {
  585         sds oldname;
  586 
  587         server.cluster->currentEpoch = 0;
  588         server.cluster->lastVoteEpoch = 0;
  589         myself->configEpoch = 0;
  590         serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
  591 
  592         /* To change the Node ID we need to remove the old name from the
  593          * nodes table, change the ID, and re-add back with new name. */
  594         oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
  595         dictDelete(server.cluster->nodes,oldname);
  596         sdsfree(oldname);
  597         getRandomHexChars(myself->name, CLUSTER_NAMELEN);
  598         clusterAddNode(myself);
  599         serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
  600     }
  601 
  602     /* Make sure to persist the new config and update the state. */
  603     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
  604                          CLUSTER_TODO_UPDATE_STATE|
  605                          CLUSTER_TODO_FSYNC_CONFIG);
  606 }
  607 
  608 /* -----------------------------------------------------------------------------
  609  * CLUSTER communication link
  610  * -------------------------------------------------------------------------- */
  611 
  612 clusterLink *createClusterLink(clusterNode *node) {
  613     clusterLink *link = zmalloc(sizeof(*link));
  614     link->ctime = mstime();
  615     link->sndbuf = sdsempty();
  616     link->rcvbuf = sdsempty();
  617     link->node = node;
  618     link->conn = NULL;
  619     return link;
  620 }
  621 
  622 /* Free a cluster link, but does not free the associated node of course.
  623  * This function will just make sure that the original node associated
  624  * with this link will have the 'link' field set to NULL. */
  625 void freeClusterLink(clusterLink *link) {
  626     if (link->conn) {
  627         connClose(link->conn);
  628         link->conn = NULL;
  629     }
  630     sdsfree(link->sndbuf);
  631     sdsfree(link->rcvbuf);
  632     if (link->node)
  633         link->node->link = NULL;
  634     zfree(link);
  635 }
  636 
  637 static void clusterConnAcceptHandler(connection *conn) {
  638     clusterLink *link;
  639 
  640     if (connGetState(conn) != CONN_STATE_CONNECTED) {
  641         serverLog(LL_VERBOSE,
  642                 "Error accepting cluster node connection: %s", connGetLastError(conn));
  643         connClose(conn);
  644         return;
  645     }
  646 
  647     /* Create a link object we use to handle the connection.
  648      * It gets passed to the readable handler when data is available.
  649      * Initiallly the link->node pointer is set to NULL as we don't know
  650      * which node is, but the right node is references once we know the
  651      * node identity. */
  652     link = createClusterLink(NULL);
  653     link->conn = conn;
  654     connSetPrivateData(conn, link);
  655 
  656     /* Register read handler */
  657     connSetReadHandler(conn, clusterReadHandler);
  658 }
  659 
  660 #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
  661 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
  662     int cport, cfd;
  663     int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
  664     char cip[NET_IP_STR_LEN];
  665     UNUSED(el);
  666     UNUSED(mask);
  667     UNUSED(privdata);
  668 
  669     /* If the server is starting up, don't accept cluster connections:
  670      * UPDATE messages may interact with the database content. */
  671     if (server.masterhost == NULL && server.loading) return;
  672 
  673     while(max--) {
  674         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
  675         if (cfd == ANET_ERR) {
  676             if (errno != EWOULDBLOCK)
  677                 serverLog(LL_VERBOSE,
  678                     "Error accepting cluster node: %s", server.neterr);
  679             return;
  680         }
  681 
  682         connection *conn = server.tls_cluster ?
  683             connCreateAcceptedTLS(cfd, TLS_CLIENT_AUTH_YES) : connCreateAcceptedSocket(cfd);
  684 
  685         /* Make sure connection is not in an error state */
  686         if (connGetState(conn) != CONN_STATE_ACCEPTING) {
  687             serverLog(LL_VERBOSE,
  688                 "Error creating an accepting connection for cluster node: %s",
  689                     connGetLastError(conn));
  690             connClose(conn);
  691             return;
  692         }
  693         connNonBlock(conn);
  694         connEnableTcpNoDelay(conn);
  695 
  696         /* Use non-blocking I/O for cluster messages. */
  697         serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
  698 
  699         /* Accept the connection now.  connAccept() may call our handler directly
  700          * or schedule it for later depending on connection implementation.
  701          */
  702         if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
  703             if (connGetState(conn) == CONN_STATE_ERROR)
  704                 serverLog(LL_VERBOSE,
  705                         "Error accepting cluster node connection: %s",
  706                         connGetLastError(conn));
  707             connClose(conn);
  708             return;
  709         }
  710     }
  711 }
  712 
  713 /* Return the approximated number of sockets we are using in order to
  714  * take the cluster bus connections. */
  715 unsigned long getClusterConnectionsCount(void) {
  716     /* We decrement the number of nodes by one, since there is the
  717      * "myself" node too in the list. Each node uses two file descriptors,
  718      * one incoming and one outgoing, thus the multiplication by 2. */
  719     return server.cluster_enabled ?
  720            ((dictSize(server.cluster->nodes)-1)*2) : 0;
  721 }
  722 
  723 /* -----------------------------------------------------------------------------
  724  * Key space handling
  725  * -------------------------------------------------------------------------- */
  726 
  727 /* We have 16384 hash slots. The hash slot of a given key is obtained
  728  * as the least significant 14 bits of the crc16 of the key.
  729  *
  730  * However if the key contains the {...} pattern, only the part between
  731  * { and } is hashed. This may be useful in the future to force certain
  732  * keys to be in the same node (assuming no resharding is in progress). */
  733 unsigned int keyHashSlot(char *key, int keylen) {
  734     int s, e; /* start-end indexes of { and } */
  735 
  736     for (s = 0; s < keylen; s++)
  737         if (key[s] == '{') break;
  738 
  739     /* No '{' ? Hash the whole key. This is the base case. */
  740     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
  741 
  742     /* '{' found? Check if we have the corresponding '}'. */
  743     for (e = s+1; e < keylen; e++)
  744         if (key[e] == '}') break;
  745 
  746     /* No '}' or nothing between {} ? Hash the whole key. */
  747     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
  748 
  749     /* If we are here there is both a { and a } on its right. Hash
  750      * what is in the middle between { and }. */
  751     return crc16(key+s+1,e-s-1) & 0x3FFF;
  752 }
  753 
  754 /* -----------------------------------------------------------------------------
  755  * CLUSTER node API
  756  * -------------------------------------------------------------------------- */
  757 
  758 /* Create a new cluster node, with the specified flags.
  759  * If "nodename" is NULL this is considered a first handshake and a random
  760  * node name is assigned to this node (it will be fixed later when we'll
  761  * receive the first pong).
  762  *
  763  * The node is created and returned to the user, but it is not automatically
  764  * added to the nodes hash table. */
  765 clusterNode *createClusterNode(char *nodename, int flags) {
  766     clusterNode *node = zmalloc(sizeof(*node));
  767 
  768     if (nodename)
  769         memcpy(node->name, nodename, CLUSTER_NAMELEN);
  770     else
  771         getRandomHexChars(node->name, CLUSTER_NAMELEN);
  772     node->ctime = mstime();
  773     node->configEpoch = 0;
  774     node->flags = flags;
  775     memset(node->slots,0,sizeof(node->slots));
  776     node->numslots = 0;
  777     node->numslaves = 0;
  778     node->slaves = NULL;
  779     node->slaveof = NULL;
  780     node->ping_sent = node->pong_received = 0;
  781     node->data_received = 0;
  782     node->fail_time = 0;
  783     node->link = NULL;
  784     memset(node->ip,0,sizeof(node->ip));
  785     node->port = 0;
  786     node->cport = 0;
  787     node->fail_reports = listCreate();
  788     node->voted_time = 0;
  789     node->orphaned_time = 0;
  790     node->repl_offset_time = 0;
  791     node->repl_offset = 0;
  792     listSetFreeMethod(node->fail_reports,zfree);
  793     return node;
  794 }
  795 
  796 /* This function is called every time we get a failure report from a node.
  797  * The side effect is to populate the fail_reports list (or to update
  798  * the timestamp of an existing report).
  799  *
  800  * 'failing' is the node that is in failure state according to the
  801  * 'sender' node.
  802  *
  803  * The function returns 0 if it just updates a timestamp of an existing
  804  * failure report from the same sender. 1 is returned if a new failure
  805  * report is created. */
  806 int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
  807     list *l = failing->fail_reports;
  808     listNode *ln;
  809     listIter li;
  810     clusterNodeFailReport *fr;
  811 
  812     /* If a failure report from the same sender already exists, just update
  813      * the timestamp. */
  814     listRewind(l,&li);
  815     while ((ln = listNext(&li)) != NULL) {
  816         fr = ln->value;
  817         if (fr->node == sender) {
  818             fr->time = mstime();
  819             return 0;
  820         }
  821     }
  822 
  823     /* Otherwise create a new report. */
  824     fr = zmalloc(sizeof(*fr));
  825     fr->node = sender;
  826     fr->time = mstime();
  827     listAddNodeTail(l,fr);
  828     return 1;
  829 }
  830 
  831 /* Remove failure reports that are too old, where too old means reasonably
  832  * older than the global node timeout. Note that anyway for a node to be
  833  * flagged as FAIL we need to have a local PFAIL state that is at least
  834  * older than the global node timeout, so we don't just trust the number
  835  * of failure reports from other nodes. */
  836 void clusterNodeCleanupFailureReports(clusterNode *node) {
  837     list *l = node->fail_reports;
  838     listNode *ln;
  839     listIter li;
  840     clusterNodeFailReport *fr;
  841     mstime_t maxtime = server.cluster_node_timeout *
  842                      CLUSTER_FAIL_REPORT_VALIDITY_MULT;
  843     mstime_t now = mstime();
  844 
  845     listRewind(l,&li);
  846     while ((ln = listNext(&li)) != NULL) {
  847         fr = ln->value;
  848         if (now - fr->time > maxtime) listDelNode(l,ln);
  849     }
  850 }
  851 
  852 /* Remove the failing report for 'node' if it was previously considered
  853  * failing by 'sender'. This function is called when a node informs us via
  854  * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
  855  *
  856  * Note that this function is called relatively often as it gets called even
  857  * when there are no nodes failing, and is O(N), however when the cluster is
  858  * fine the failure reports list is empty so the function runs in constant
  859  * time.
  860  *
  861  * The function returns 1 if the failure report was found and removed.
  862  * Otherwise 0 is returned. */
  863 int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
  864     list *l = node->fail_reports;
  865     listNode *ln;
  866     listIter li;
  867     clusterNodeFailReport *fr;
  868 
  869     /* Search for a failure report from this sender. */
  870     listRewind(l,&li);
  871     while ((ln = listNext(&li)) != NULL) {
  872         fr = ln->value;
  873         if (fr->node == sender) break;
  874     }
  875     if (!ln) return 0; /* No failure report from this sender. */
  876 
  877     /* Remove the failure report. */
  878     listDelNode(l,ln);
  879     clusterNodeCleanupFailureReports(node);
  880     return 1;
  881 }
  882 
  883 /* Return the number of external nodes that believe 'node' is failing,
  884  * not including this node, that may have a PFAIL or FAIL state for this
  885  * node as well. */
  886 int clusterNodeFailureReportsCount(clusterNode *node) {
  887     clusterNodeCleanupFailureReports(node);
  888     return listLength(node->fail_reports);
  889 }
  890 
  891 int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
  892     int j;
  893 
  894     for (j = 0; j < master->numslaves; j++) {
  895         if (master->slaves[j] == slave) {
  896             if ((j+1) < master->numslaves) {
  897                 int remaining_slaves = (master->numslaves - j) - 1;
  898                 memmove(master->slaves+j,master->slaves+(j+1),
  899                         (sizeof(*master->slaves) * remaining_slaves));
  900             }
  901             master->numslaves--;
  902             if (master->numslaves == 0)
  903                 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
  904             return C_OK;
  905         }
  906     }
  907     return C_ERR;
  908 }
  909 
  910 int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
  911     int j;
  912 
  913     /* If it's already a slave, don't add it again. */
  914     for (j = 0; j < master->numslaves; j++)
  915         if (master->slaves[j] == slave) return C_ERR;
  916     master->slaves = zrealloc(master->slaves,
  917         sizeof(clusterNode*)*(master->numslaves+1));
  918     master->slaves[master->numslaves] = slave;
  919     master->numslaves++;
  920     master->flags |= CLUSTER_NODE_MIGRATE_TO;
  921     return C_OK;
  922 }
  923 
  924 int clusterCountNonFailingSlaves(clusterNode *n) {
  925     int j, okslaves = 0;
  926 
  927     for (j = 0; j < n->numslaves; j++)
  928         if (!nodeFailed(n->slaves[j])) okslaves++;
  929     return okslaves;
  930 }
  931 
  932 /* Low level cleanup of the node structure. Only called by clusterDelNode(). */
  933 void freeClusterNode(clusterNode *n) {
  934     sds nodename;
  935     int j;
  936 
  937     /* If the node has associated slaves, we have to set
  938      * all the slaves->slaveof fields to NULL (unknown). */
  939     for (j = 0; j < n->numslaves; j++)
  940         n->slaves[j]->slaveof = NULL;
  941 
  942     /* Remove this node from the list of slaves of its master. */
  943     if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
  944 
  945     /* Unlink from the set of nodes. */
  946     nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
  947     serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
  948     sdsfree(nodename);
  949 
  950     /* Release link and associated data structures. */
  951     if (n->link) freeClusterLink(n->link);
  952     listRelease(n->fail_reports);
  953     zfree(n->slaves);
  954     zfree(n);
  955 }
  956 
  957 /* Add a node to the nodes hash table */
  958 int clusterAddNode(clusterNode *node) {
  959     int retval;
  960 
  961     retval = dictAdd(server.cluster->nodes,
  962             sdsnewlen(node->name,CLUSTER_NAMELEN), node);
  963     return (retval == DICT_OK) ? C_OK : C_ERR;
  964 }
  965 
  966 /* Remove a node from the cluster. The function performs the high level
  967  * cleanup, calling freeClusterNode() for the low level cleanup.
  968  * Here we do the following:
  969  *
  970  * 1) Mark all the slots handled by it as unassigned.
  971  * 2) Remove all the failure reports sent by this node and referenced by
  972  *    other nodes.
  973  * 3) Free the node with freeClusterNode() that will in turn remove it
  974  *    from the hash table and from the list of slaves of its master, if
  975  *    it is a slave node.
  976  */
  977 void clusterDelNode(clusterNode *delnode) {
  978     int j;
  979     dictIterator *di;
  980     dictEntry *de;
  981 
  982     /* 1) Mark slots as unassigned. */
  983     for (j = 0; j < CLUSTER_SLOTS; j++) {
  984         if (server.cluster->importing_slots_from[j] == delnode)
  985             server.cluster->importing_slots_from[j] = NULL;
  986         if (server.cluster->migrating_slots_to[j] == delnode)
  987             server.cluster->migrating_slots_to[j] = NULL;
  988         if (server.cluster->slots[j] == delnode)
  989             clusterDelSlot(j);
  990     }
  991 
  992     /* 2) Remove failure reports. */
  993     di = dictGetSafeIterator(server.cluster->nodes);
  994     while((de = dictNext(di)) != NULL) {
  995         clusterNode *node = dictGetVal(de);
  996 
  997         if (node == delnode) continue;
  998         clusterNodeDelFailureReport(node,delnode);
  999     }
 1000     dictReleaseIterator(di);
 1001 
 1002     /* 3) Free the node, unlinking it from the cluster. */
 1003     freeClusterNode(delnode);
 1004 }
 1005 
 1006 /* Node lookup by name */
 1007 clusterNode *clusterLookupNode(const char *name) {
 1008     sds s = sdsnewlen(name, CLUSTER_NAMELEN);
 1009     dictEntry *de;
 1010 
 1011     de = dictFind(server.cluster->nodes,s);
 1012     sdsfree(s);
 1013     if (de == NULL) return NULL;
 1014     return dictGetVal(de);
 1015 }
 1016 
 1017 /* This is only used after the handshake. When we connect a given IP/PORT
 1018  * as a result of CLUSTER MEET we don't have the node name yet, so we
 1019  * pick a random one, and will fix it when we receive the PONG request using
 1020  * this function. */
 1021 void clusterRenameNode(clusterNode *node, char *newname) {
 1022     int retval;
 1023     sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
 1024 
 1025     serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
 1026         node->name, newname);
 1027     retval = dictDelete(server.cluster->nodes, s);
 1028     sdsfree(s);
 1029     serverAssert(retval == DICT_OK);
 1030     memcpy(node->name, newname, CLUSTER_NAMELEN);
 1031     clusterAddNode(node);
 1032 }
 1033 
 1034 /* -----------------------------------------------------------------------------
 1035  * CLUSTER config epoch handling
 1036  * -------------------------------------------------------------------------- */
 1037 
 1038 /* Return the greatest configEpoch found in the cluster, or the current
 1039  * epoch if greater than any node configEpoch. */
 1040 uint64_t clusterGetMaxEpoch(void) {
 1041     uint64_t max = 0;
 1042     dictIterator *di;
 1043     dictEntry *de;
 1044 
 1045     di = dictGetSafeIterator(server.cluster->nodes);
 1046     while((de = dictNext(di)) != NULL) {
 1047         clusterNode *node = dictGetVal(de);
 1048         if (node->configEpoch > max) max = node->configEpoch;
 1049     }
 1050     dictReleaseIterator(di);
 1051     if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
 1052     return max;
 1053 }
 1054 
 1055 /* If this node epoch is zero or is not already the greatest across the
 1056  * cluster (from the POV of the local configuration), this function will:
 1057  *
 1058  * 1) Generate a new config epoch, incrementing the current epoch.
 1059  * 2) Assign the new epoch to this node, WITHOUT any consensus.
 1060  * 3) Persist the configuration on disk before sending packets with the
 1061  *    new configuration.
 1062  *
 1063  * If the new config epoch is generated and assigend, C_OK is returned,
 1064  * otherwise C_ERR is returned (since the node has already the greatest
 1065  * configuration around) and no operation is performed.
 1066  *
 1067  * Important note: this function violates the principle that config epochs
 1068  * should be generated with consensus and should be unique across the cluster.
 1069  * However Redis Cluster uses this auto-generated new config epochs in two
 1070  * cases:
 1071  *
 1072  * 1) When slots are closed after importing. Otherwise resharding would be
 1073  *    too expensive.
 1074  * 2) When CLUSTER FAILOVER is called with options that force a slave to
 1075  *    failover its master even if there is not master majority able to
 1076  *    create a new configuration epoch.
 1077  *
 1078  * Redis Cluster will not explode using this function, even in the case of
 1079  * a collision between this node and another node, generating the same
 1080  * configuration epoch unilaterally, because the config epoch conflict
 1081  * resolution algorithm will eventually move colliding nodes to different
 1082  * config epochs. However using this function may violate the "last failover
 1083  * wins" rule, so should only be used with care. */
 1084 int clusterBumpConfigEpochWithoutConsensus(void) {
 1085     uint64_t maxEpoch = clusterGetMaxEpoch();
 1086 
 1087     if (myself->configEpoch == 0 ||
 1088         myself->configEpoch != maxEpoch)
 1089     {
 1090         server.cluster->currentEpoch++;
 1091         myself->configEpoch = server.cluster->currentEpoch;
 1092         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1093                              CLUSTER_TODO_FSYNC_CONFIG);
 1094         serverLog(LL_WARNING,
 1095             "New configEpoch set to %llu",
 1096             (unsigned long long) myself->configEpoch);
 1097         return C_OK;
 1098     } else {
 1099         return C_ERR;
 1100     }
 1101 }
 1102 
 1103 /* This function is called when this node is a master, and we receive from
 1104  * another master a configuration epoch that is equal to our configuration
 1105  * epoch.
 1106  *
 1107  * BACKGROUND
 1108  *
 1109  * It is not possible that different slaves get the same config
 1110  * epoch during a failover election, because the slaves need to get voted
 1111  * by a majority. However when we perform a manual resharding of the cluster
 1112  * the node will assign a configuration epoch to itself without to ask
 1113  * for agreement. Usually resharding happens when the cluster is working well
 1114  * and is supervised by the sysadmin, however it is possible for a failover
 1115  * to happen exactly while the node we are resharding a slot to assigns itself
 1116  * a new configuration epoch, but before it is able to propagate it.
 1117  *
 1118  * So technically it is possible in this condition that two nodes end with
 1119  * the same configuration epoch.
 1120  *
 1121  * Another possibility is that there are bugs in the implementation causing
 1122  * this to happen.
 1123  *
 1124  * Moreover when a new cluster is created, all the nodes start with the same
 1125  * configEpoch. This collision resolution code allows nodes to automatically
 1126  * end with a different configEpoch at startup automatically.
 1127  *
 1128  * In all the cases, we want a mechanism that resolves this issue automatically
 1129  * as a safeguard. The same configuration epoch for masters serving different
 1130  * set of slots is not harmful, but it is if the nodes end serving the same
 1131  * slots for some reason (manual errors or software bugs) without a proper
 1132  * failover procedure.
 1133  *
 1134  * In general we want a system that eventually always ends with different
 1135  * masters having different configuration epochs whatever happened, since
 1136  * nothign is worse than a split-brain condition in a distributed system.
 1137  *
 1138  * BEHAVIOR
 1139  *
 1140  * When this function gets called, what happens is that if this node
 1141  * has the lexicographically smaller Node ID compared to the other node
 1142  * with the conflicting epoch (the 'sender' node), it will assign itself
 1143  * the greatest configuration epoch currently detected among nodes plus 1.
 1144  *
 1145  * This means that even if there are multiple nodes colliding, the node
 1146  * with the greatest Node ID never moves forward, so eventually all the nodes
 1147  * end with a different configuration epoch.
 1148  */
 1149 void clusterHandleConfigEpochCollision(clusterNode *sender) {
 1150     /* Prerequisites: nodes have the same configEpoch and are both masters. */
 1151     if (sender->configEpoch != myself->configEpoch ||
 1152         !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
 1153     /* Don't act if the colliding node has a smaller Node ID. */
 1154     if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
 1155     /* Get the next ID available at the best of this node knowledge. */
 1156     server.cluster->currentEpoch++;
 1157     myself->configEpoch = server.cluster->currentEpoch;
 1158     clusterSaveConfigOrDie(1);
 1159     serverLog(LL_VERBOSE,
 1160         "WARNING: configEpoch collision with node %.40s."
 1161         " configEpoch set to %llu",
 1162         sender->name,
 1163         (unsigned long long) myself->configEpoch);
 1164 }
 1165 
 1166 /* -----------------------------------------------------------------------------
 1167  * CLUSTER nodes blacklist
 1168  *
 1169  * The nodes blacklist is just a way to ensure that a given node with a given
 1170  * Node ID is not readded before some time elapsed (this time is specified
 1171  * in seconds in CLUSTER_BLACKLIST_TTL).
 1172  *
 1173  * This is useful when we want to remove a node from the cluster completely:
 1174  * when CLUSTER FORGET is called, it also puts the node into the blacklist so
 1175  * that even if we receive gossip messages from other nodes that still remember
 1176  * about the node we want to remove, we don't re-add it before some time.
 1177  *
 1178  * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
 1179  * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
 1180  * in the cluster without dealing with the problem of other nodes re-adding
 1181  * back the node to nodes we already sent the FORGET command to.
 1182  *
 1183  * The data structure used is a hash table with an sds string representing
 1184  * the node ID as key, and the time when it is ok to re-add the node as
 1185  * value.
 1186  * -------------------------------------------------------------------------- */
 1187 
 1188 #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
 1189 
 1190 
 1191 /* Before of the addNode() or Exists() operations we always remove expired
 1192  * entries from the black list. This is an O(N) operation but it is not a
 1193  * problem since add / exists operations are called very infrequently and
 1194  * the hash table is supposed to contain very little elements at max.
 1195  * However without the cleanup during long uptimes and with some automated
 1196  * node add/removal procedures, entries could accumulate. */
 1197 void clusterBlacklistCleanup(void) {
 1198     dictIterator *di;
 1199     dictEntry *de;
 1200 
 1201     di = dictGetSafeIterator(server.cluster->nodes_black_list);
 1202     while((de = dictNext(di)) != NULL) {
 1203         int64_t expire = dictGetUnsignedIntegerVal(de);
 1204 
 1205         if (expire < server.unixtime)
 1206             dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
 1207     }
 1208     dictReleaseIterator(di);
 1209 }
 1210 
 1211 /* Cleanup the blacklist and add a new node ID to the black list. */
 1212 void clusterBlacklistAddNode(clusterNode *node) {
 1213     dictEntry *de;
 1214     sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
 1215 
 1216     clusterBlacklistCleanup();
 1217     if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
 1218         /* If the key was added, duplicate the sds string representation of
 1219          * the key for the next lookup. We'll free it at the end. */
 1220         id = sdsdup(id);
 1221     }
 1222     de = dictFind(server.cluster->nodes_black_list,id);
 1223     dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
 1224     sdsfree(id);
 1225 }
 1226 
 1227 /* Return non-zero if the specified node ID exists in the blacklist.
 1228  * You don't need to pass an sds string here, any pointer to 40 bytes
 1229  * will work. */
 1230 int clusterBlacklistExists(char *nodeid) {
 1231     sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
 1232     int retval;
 1233 
 1234     clusterBlacklistCleanup();
 1235     retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
 1236     sdsfree(id);
 1237     return retval;
 1238 }
 1239 
 1240 /* -----------------------------------------------------------------------------
 1241  * CLUSTER messages exchange - PING/PONG and gossip
 1242  * -------------------------------------------------------------------------- */
 1243 
 1244 /* This function checks if a given node should be marked as FAIL.
 1245  * It happens if the following conditions are met:
 1246  *
 1247  * 1) We received enough failure reports from other master nodes via gossip.
 1248  *    Enough means that the majority of the masters signaled the node is
 1249  *    down recently.
 1250  * 2) We believe this node is in PFAIL state.
 1251  *
 1252  * If a failure is detected we also inform the whole cluster about this
 1253  * event trying to force every other node to set the FAIL flag for the node.
 1254  *
 1255  * Note that the form of agreement used here is weak, as we collect the majority
 1256  * of masters state during some time, and even if we force agreement by
 1257  * propagating the FAIL message, because of partitions we may not reach every
 1258  * node. However:
 1259  *
 1260  * 1) Either we reach the majority and eventually the FAIL state will propagate
 1261  *    to all the cluster.
 1262  * 2) Or there is no majority so no slave promotion will be authorized and the
 1263  *    FAIL flag will be cleared after some time.
 1264  */
 1265 void markNodeAsFailingIfNeeded(clusterNode *node) {
 1266     int failures;
 1267     int needed_quorum = (server.cluster->size / 2) + 1;
 1268 
 1269     if (!nodeTimedOut(node)) return; /* We can reach it. */
 1270     if (nodeFailed(node)) return; /* Already FAILing. */
 1271 
 1272     failures = clusterNodeFailureReportsCount(node);
 1273     /* Also count myself as a voter if I'm a master. */
 1274     if (nodeIsMaster(myself)) failures++;
 1275     if (failures < needed_quorum) return; /* No weak agreement from masters. */
 1276 
 1277     serverLog(LL_NOTICE,
 1278         "Marking node %.40s as failing (quorum reached).", node->name);
 1279 
 1280     /* Mark the node as failing. */
 1281     node->flags &= ~CLUSTER_NODE_PFAIL;
 1282     node->flags |= CLUSTER_NODE_FAIL;
 1283     node->fail_time = mstime();
 1284 
 1285     /* Broadcast the failing node name to everybody, forcing all the other
 1286      * reachable nodes to flag the node as FAIL.
 1287      * We do that even if this node is a replica and not a master: anyway
 1288      * the failing state is triggered collecting failure reports from masters,
 1289      * so here the replica is only helping propagating this status. */
 1290     clusterSendFail(node->name);
 1291     clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
 1292 }
 1293 
 1294 /* This function is called only if a node is marked as FAIL, but we are able
 1295  * to reach it again. It checks if there are the conditions to undo the FAIL
 1296  * state. */
 1297 void clearNodeFailureIfNeeded(clusterNode *node) {
 1298     mstime_t now = mstime();
 1299 
 1300     serverAssert(nodeFailed(node));
 1301 
 1302     /* For slaves we always clear the FAIL flag if we can contact the
 1303      * node again. */
 1304     if (nodeIsSlave(node) || node->numslots == 0) {
 1305         serverLog(LL_NOTICE,
 1306             "Clear FAIL state for node %.40s: %s is reachable again.",
 1307                 node->name,
 1308                 nodeIsSlave(node) ? "replica" : "master without slots");
 1309         node->flags &= ~CLUSTER_NODE_FAIL;
 1310         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
 1311     }
 1312 
 1313     /* If it is a master and...
 1314      * 1) The FAIL state is old enough.
 1315      * 2) It is yet serving slots from our point of view (not failed over).
 1316      * Apparently no one is going to fix these slots, clear the FAIL flag. */
 1317     if (nodeIsMaster(node) && node->numslots > 0 &&
 1318         (now - node->fail_time) >
 1319         (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
 1320     {
 1321         serverLog(LL_NOTICE,
 1322             "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
 1323                 node->name);
 1324         node->flags &= ~CLUSTER_NODE_FAIL;
 1325         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
 1326     }
 1327 }
 1328 
 1329 /* Return true if we already have a node in HANDSHAKE state matching the
 1330  * specified ip address and port number. This function is used in order to
 1331  * avoid adding a new handshake node for the same address multiple times. */
 1332 int clusterHandshakeInProgress(char *ip, int port, int cport) {
 1333     dictIterator *di;
 1334     dictEntry *de;
 1335 
 1336     di = dictGetSafeIterator(server.cluster->nodes);
 1337     while((de = dictNext(di)) != NULL) {
 1338         clusterNode *node = dictGetVal(de);
 1339 
 1340         if (!nodeInHandshake(node)) continue;
 1341         if (!strcasecmp(node->ip,ip) &&
 1342             node->port == port &&
 1343             node->cport == cport) break;
 1344     }
 1345     dictReleaseIterator(di);
 1346     return de != NULL;
 1347 }
 1348 
 1349 /* Start an handshake with the specified address if there is not one
 1350  * already in progress. Returns non-zero if the handshake was actually
 1351  * started. On error zero is returned and errno is set to one of the
 1352  * following values:
 1353  *
 1354  * EAGAIN - There is already an handshake in progress for this address.
 1355  * EINVAL - IP or port are not valid. */
 1356 int clusterStartHandshake(char *ip, int port, int cport) {
 1357     clusterNode *n;
 1358     char norm_ip[NET_IP_STR_LEN];
 1359     struct sockaddr_storage sa;
 1360 
 1361     /* IP sanity check */
 1362     if (inet_pton(AF_INET,ip,
 1363             &(((struct sockaddr_in *)&sa)->sin_addr)))
 1364     {
 1365         sa.ss_family = AF_INET;
 1366     } else if (inet_pton(AF_INET6,ip,
 1367             &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
 1368     {
 1369         sa.ss_family = AF_INET6;
 1370     } else {
 1371         errno = EINVAL;
 1372         return 0;
 1373     }
 1374 
 1375     /* Port sanity check */
 1376     if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
 1377         errno = EINVAL;
 1378         return 0;
 1379     }
 1380 
 1381     /* Set norm_ip as the normalized string representation of the node
 1382      * IP address. */
 1383     memset(norm_ip,0,NET_IP_STR_LEN);
 1384     if (sa.ss_family == AF_INET)
 1385         inet_ntop(AF_INET,
 1386             (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
 1387             norm_ip,NET_IP_STR_LEN);
 1388     else
 1389         inet_ntop(AF_INET6,
 1390             (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
 1391             norm_ip,NET_IP_STR_LEN);
 1392 
 1393     if (clusterHandshakeInProgress(norm_ip,port,cport)) {
 1394         errno = EAGAIN;
 1395         return 0;
 1396     }
 1397 
 1398     /* Add the node with a random address (NULL as first argument to
 1399      * createClusterNode()). Everything will be fixed during the
 1400      * handshake. */
 1401     n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
 1402     memcpy(n->ip,norm_ip,sizeof(n->ip));
 1403     n->port = port;
 1404     n->cport = cport;
 1405     clusterAddNode(n);
 1406     return 1;
 1407 }
 1408 
 1409 /* Process the gossip section of PING or PONG packets.
 1410  * Note that this function assumes that the packet is already sanity-checked
 1411  * by the caller, not in the content of the gossip section, but in the
 1412  * length. */
 1413 void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
 1414     uint16_t count = ntohs(hdr->count);
 1415     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
 1416     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
 1417 
 1418     while(count--) {
 1419         uint16_t flags = ntohs(g->flags);
 1420         clusterNode *node;
 1421         sds ci;
 1422 
 1423         if (server.verbosity == LL_DEBUG) {
 1424             ci = representClusterNodeFlags(sdsempty(), flags);
 1425             serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
 1426                 g->nodename,
 1427                 g->ip,
 1428                 ntohs(g->port),
 1429                 ntohs(g->cport),
 1430                 ci);
 1431             sdsfree(ci);
 1432         }
 1433 
 1434         /* Update our state accordingly to the gossip sections */
 1435         node = clusterLookupNode(g->nodename);
 1436         if (node) {
 1437             /* We already know this node.
 1438                Handle failure reports, only when the sender is a master. */
 1439             if (sender && nodeIsMaster(sender) && node != myself) {
 1440                 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
 1441                     if (clusterNodeAddFailureReport(node,sender)) {
 1442                         serverLog(LL_VERBOSE,
 1443                             "Node %.40s reported node %.40s as not reachable.",
 1444                             sender->name, node->name);
 1445                     }
 1446                     markNodeAsFailingIfNeeded(node);
 1447                 } else {
 1448                     if (clusterNodeDelFailureReport(node,sender)) {
 1449                         serverLog(LL_VERBOSE,
 1450                             "Node %.40s reported node %.40s is back online.",
 1451                             sender->name, node->name);
 1452                     }
 1453                 }
 1454             }
 1455 
 1456             /* If from our POV the node is up (no failure flags are set),
 1457              * we have no pending ping for the node, nor we have failure
 1458              * reports for this node, update the last pong time with the
 1459              * one we see from the other nodes. */
 1460             if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
 1461                 node->ping_sent == 0 &&
 1462                 clusterNodeFailureReportsCount(node) == 0)
 1463             {
 1464                 mstime_t pongtime = ntohl(g->pong_received);
 1465                 pongtime *= 1000; /* Convert back to milliseconds. */
 1466 
 1467                 /* Replace the pong time with the received one only if
 1468                  * it's greater than our view but is not in the future
 1469                  * (with 500 milliseconds tolerance) from the POV of our
 1470                  * clock. */
 1471                 if (pongtime <= (server.mstime+500) &&
 1472                     pongtime > node->pong_received)
 1473                 {
 1474                     node->pong_received = pongtime;
 1475                 }
 1476             }
 1477 
 1478             /* If we already know this node, but it is not reachable, and
 1479              * we see a different address in the gossip section of a node that
 1480              * can talk with this other node, update the address, disconnect
 1481              * the old link if any, so that we'll attempt to connect with the
 1482              * new address. */
 1483             if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
 1484                 !(flags & CLUSTER_NODE_NOADDR) &&
 1485                 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
 1486                 (strcasecmp(node->ip,g->ip) ||
 1487                  node->port != ntohs(g->port) ||
 1488                  node->cport != ntohs(g->cport)))
 1489             {
 1490                 if (node->link) freeClusterLink(node->link);
 1491                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
 1492                 node->port = ntohs(g->port);
 1493                 node->cport = ntohs(g->cport);
 1494                 node->flags &= ~CLUSTER_NODE_NOADDR;
 1495             }
 1496         } else {
 1497             /* If it's not in NOADDR state and we don't have it, we
 1498              * add it to our trusted dict with exact nodeid and flag.
 1499              * Note that we cannot simply start a handshake against
 1500              * this IP/PORT pairs, since IP/PORT can be reused already,
 1501              * otherwise we risk joining another cluster.
 1502              *
 1503              * Note that we require that the sender of this gossip message
 1504              * is a well known node in our cluster, otherwise we risk
 1505              * joining another cluster. */
 1506             if (sender &&
 1507                 !(flags & CLUSTER_NODE_NOADDR) &&
 1508                 !clusterBlacklistExists(g->nodename))
 1509             {
 1510                 clusterNode *node;
 1511                 node = createClusterNode(g->nodename, flags);
 1512                 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
 1513                 node->port = ntohs(g->port);
 1514                 node->cport = ntohs(g->cport);
 1515                 clusterAddNode(node);
 1516             }
 1517         }
 1518 
 1519         /* Next node */
 1520         g++;
 1521     }
 1522 }
 1523 
 1524 /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
 1525  * If 'announced_ip' length is non-zero, it is used instead of extracting
 1526  * the IP from the socket peer address. */
 1527 void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
 1528     if (announced_ip[0] != '\0') {
 1529         memcpy(buf,announced_ip,NET_IP_STR_LEN);
 1530         buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
 1531     } else {
 1532         connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
 1533     }
 1534 }
 1535 
 1536 /* Update the node address to the IP address that can be extracted
 1537  * from link->fd, or if hdr->myip is non empty, to the address the node
 1538  * is announcing us. The port is taken from the packet header as well.
 1539  *
 1540  * If the address or port changed, disconnect the node link so that we'll
 1541  * connect again to the new address.
 1542  *
 1543  * If the ip/port pair are already correct no operation is performed at
 1544  * all.
 1545  *
 1546  * The function returns 0 if the node address is still the same,
 1547  * otherwise 1 is returned. */
 1548 int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
 1549                               clusterMsg *hdr)
 1550 {
 1551     char ip[NET_IP_STR_LEN] = {0};
 1552     int port = ntohs(hdr->port);
 1553     int cport = ntohs(hdr->cport);
 1554 
 1555     /* We don't proceed if the link is the same as the sender link, as this
 1556      * function is designed to see if the node link is consistent with the
 1557      * symmetric link that is used to receive PINGs from the node.
 1558      *
 1559      * As a side effect this function never frees the passed 'link', so
 1560      * it is safe to call during packet processing. */
 1561     if (link == node->link) return 0;
 1562 
 1563     nodeIp2String(ip,link,hdr->myip);
 1564     if (node->port == port && node->cport == cport &&
 1565         strcmp(ip,node->ip) == 0) return 0;
 1566 
 1567     /* IP / port is different, update it. */
 1568     memcpy(node->ip,ip,sizeof(ip));
 1569     node->port = port;
 1570     node->cport = cport;
 1571     if (node->link) freeClusterLink(node->link);
 1572     node->flags &= ~CLUSTER_NODE_NOADDR;
 1573     serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
 1574         node->name, node->ip, node->port);
 1575 
 1576     /* Check if this is our master and we have to change the
 1577      * replication target as well. */
 1578     if (nodeIsSlave(myself) && myself->slaveof == node)
 1579         replicationSetMaster(node->ip, node->port);
 1580     return 1;
 1581 }
 1582 
 1583 /* Reconfigure the specified node 'n' as a master. This function is called when
 1584  * a node that we believed to be a slave is now acting as master in order to
 1585  * update the state of the node. */
 1586 void clusterSetNodeAsMaster(clusterNode *n) {
 1587     if (nodeIsMaster(n)) return;
 1588 
 1589     if (n->slaveof) {
 1590         clusterNodeRemoveSlave(n->slaveof,n);
 1591         if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
 1592     }
 1593     n->flags &= ~CLUSTER_NODE_SLAVE;
 1594     n->flags |= CLUSTER_NODE_MASTER;
 1595     n->slaveof = NULL;
 1596 
 1597     /* Update config and state. */
 1598     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1599                          CLUSTER_TODO_UPDATE_STATE);
 1600 }
 1601 
 1602 /* This function is called when we receive a master configuration via a
 1603  * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
 1604  * node, and the set of slots claimed under this configEpoch.
 1605  *
 1606  * What we do is to rebind the slots with newer configuration compared to our
 1607  * local configuration, and if needed, we turn ourself into a replica of the
 1608  * node (see the function comments for more info).
 1609  *
 1610  * The 'sender' is the node for which we received a configuration update.
 1611  * Sometimes it is not actually the "Sender" of the information, like in the
 1612  * case we receive the info via an UPDATE packet. */
 1613 void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
 1614     int j;
 1615     clusterNode *curmaster, *newmaster = NULL;
 1616     /* The dirty slots list is a list of slots for which we lose the ownership
 1617      * while having still keys inside. This usually happens after a failover
 1618      * or after a manual cluster reconfiguration operated by the admin.
 1619      *
 1620      * If the update message is not able to demote a master to slave (in this
 1621      * case we'll resync with the master updating the whole key space), we
 1622      * need to delete all the keys in the slots we lost ownership. */
 1623     uint16_t dirty_slots[CLUSTER_SLOTS];
 1624     int dirty_slots_count = 0;
 1625 
 1626     /* Here we set curmaster to this node or the node this node
 1627      * replicates to if it's a slave. In the for loop we are
 1628      * interested to check if slots are taken away from curmaster. */
 1629     curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
 1630 
 1631     if (sender == myself) {
 1632         serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
 1633         return;
 1634     }
 1635 
 1636     for (j = 0; j < CLUSTER_SLOTS; j++) {
 1637         if (bitmapTestBit(slots,j)) {
 1638             /* The slot is already bound to the sender of this message. */
 1639             if (server.cluster->slots[j] == sender) continue;
 1640 
 1641             /* The slot is in importing state, it should be modified only
 1642              * manually via redis-trib (example: a resharding is in progress
 1643              * and the migrating side slot was already closed and is advertising
 1644              * a new config. We still want the slot to be closed manually). */
 1645             if (server.cluster->importing_slots_from[j]) continue;
 1646 
 1647             /* We rebind the slot to the new node claiming it if:
 1648              * 1) The slot was unassigned or the new node claims it with a
 1649              *    greater configEpoch.
 1650              * 2) We are not currently importing the slot. */
 1651             if (server.cluster->slots[j] == NULL ||
 1652                 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
 1653             {
 1654                 /* Was this slot mine, and still contains keys? Mark it as
 1655                  * a dirty slot. */
 1656                 if (server.cluster->slots[j] == myself &&
 1657                     countKeysInSlot(j) &&
 1658                     sender != myself)
 1659                 {
 1660                     dirty_slots[dirty_slots_count] = j;
 1661                     dirty_slots_count++;
 1662                 }
 1663 
 1664                 if (server.cluster->slots[j] == curmaster)
 1665                     newmaster = sender;
 1666                 clusterDelSlot(j);
 1667                 clusterAddSlot(sender,j);
 1668                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1669                                      CLUSTER_TODO_UPDATE_STATE|
 1670                                      CLUSTER_TODO_FSYNC_CONFIG);
 1671             }
 1672         }
 1673     }
 1674 
 1675     /* After updating the slots configuration, don't do any actual change
 1676      * in the state of the server if a module disabled Redis Cluster
 1677      * keys redirections. */
 1678     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
 1679         return;
 1680 
 1681     /* If at least one slot was reassigned from a node to another node
 1682      * with a greater configEpoch, it is possible that:
 1683      * 1) We are a master left without slots. This means that we were
 1684      *    failed over and we should turn into a replica of the new
 1685      *    master.
 1686      * 2) We are a slave and our master is left without slots. We need
 1687      *    to replicate to the new slots owner. */
 1688     if (newmaster && curmaster->numslots == 0) {
 1689         serverLog(LL_WARNING,
 1690             "Configuration change detected. Reconfiguring myself "
 1691             "as a replica of %.40s", sender->name);
 1692         clusterSetMaster(sender);
 1693         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1694                              CLUSTER_TODO_UPDATE_STATE|
 1695                              CLUSTER_TODO_FSYNC_CONFIG);
 1696     } else if (dirty_slots_count) {
 1697         /* If we are here, we received an update message which removed
 1698          * ownership for certain slots we still have keys about, but still
 1699          * we are serving some slots, so this master node was not demoted to
 1700          * a slave.
 1701          *
 1702          * In order to maintain a consistent state between keys and slots
 1703          * we need to remove all the keys from the slots we lost. */
 1704         for (j = 0; j < dirty_slots_count; j++)
 1705             delKeysInSlot(dirty_slots[j]);
 1706     }
 1707 }
 1708 
 1709 /* When this function is called, there is a packet to process starting
 1710  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
 1711  * function should just handle the higher level stuff of processing the
 1712  * packet, modifying the cluster state if needed.
 1713  *
 1714  * The function returns 1 if the link is still valid after the packet
 1715  * was processed, otherwise 0 if the link was freed since the packet
 1716  * processing lead to some inconsistency error (for instance a PONG
 1717  * received from the wrong sender ID). */
 1718 int clusterProcessPacket(clusterLink *link) {
 1719     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
 1720     uint32_t totlen = ntohl(hdr->totlen);
 1721     uint16_t type = ntohs(hdr->type);
 1722     mstime_t now = mstime();
 1723 
 1724     if (type < CLUSTERMSG_TYPE_COUNT)
 1725         server.cluster->stats_bus_messages_received[type]++;
 1726     serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
 1727         type, (unsigned long) totlen);
 1728 
 1729     /* Perform sanity checks */
 1730     if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
 1731     if (totlen > sdslen(link->rcvbuf)) return 1;
 1732 
 1733     if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
 1734         /* Can't handle messages of different versions. */
 1735         return 1;
 1736     }
 1737 
 1738     uint16_t flags = ntohs(hdr->flags);
 1739     uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
 1740     clusterNode *sender;
 1741 
 1742     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
 1743         type == CLUSTERMSG_TYPE_MEET)
 1744     {
 1745         uint16_t count = ntohs(hdr->count);
 1746         uint32_t explen; /* expected length of this packet */
 1747 
 1748         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 1749         explen += (sizeof(clusterMsgDataGossip)*count);
 1750         if (totlen != explen) return 1;
 1751     } else if (type == CLUSTERMSG_TYPE_FAIL) {
 1752         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 1753 
 1754         explen += sizeof(clusterMsgDataFail);
 1755         if (totlen != explen) return 1;
 1756     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
 1757         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 1758 
 1759         explen += sizeof(clusterMsgDataPublish) -
 1760                 8 +
 1761                 ntohl(hdr->data.publish.msg.channel_len) +
 1762                 ntohl(hdr->data.publish.msg.message_len);
 1763         if (totlen != explen) return 1;
 1764     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
 1765                type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
 1766                type == CLUSTERMSG_TYPE_MFSTART)
 1767     {
 1768         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 1769 
 1770         if (totlen != explen) return 1;
 1771     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
 1772         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 1773 
 1774         explen += sizeof(clusterMsgDataUpdate);
 1775         if (totlen != explen) return 1;
 1776     } else if (type == CLUSTERMSG_TYPE_MODULE) {
 1777         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 1778 
 1779         explen += sizeof(clusterMsgModule) -
 1780                 3 + ntohl(hdr->data.module.msg.len);
 1781         if (totlen != explen) return 1;
 1782     }
 1783 
 1784     /* Check if the sender is a known node. Note that for incoming connections
 1785      * we don't store link->node information, but resolve the node by the
 1786      * ID in the header each time in the current implementation. */
 1787     sender = clusterLookupNode(hdr->sender);
 1788 
 1789     /* Update the last time we saw any data from this node. We
 1790      * use this in order to avoid detecting a timeout from a node that
 1791      * is just sending a lot of data in the cluster bus, for instance
 1792      * because of Pub/Sub. */
 1793     if (sender) sender->data_received = now;
 1794 
 1795     if (sender && !nodeInHandshake(sender)) {
 1796         /* Update our curretEpoch if we see a newer epoch in the cluster. */
 1797         senderCurrentEpoch = ntohu64(hdr->currentEpoch);
 1798         senderConfigEpoch = ntohu64(hdr->configEpoch);
 1799         if (senderCurrentEpoch > server.cluster->currentEpoch)
 1800             server.cluster->currentEpoch = senderCurrentEpoch;
 1801         /* Update the sender configEpoch if it is publishing a newer one. */
 1802         if (senderConfigEpoch > sender->configEpoch) {
 1803             sender->configEpoch = senderConfigEpoch;
 1804             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1805                                  CLUSTER_TODO_FSYNC_CONFIG);
 1806         }
 1807         /* Update the replication offset info for this node. */
 1808         sender->repl_offset = ntohu64(hdr->offset);
 1809         sender->repl_offset_time = now;
 1810         /* If we are a slave performing a manual failover and our master
 1811          * sent its offset while already paused, populate the MF state. */
 1812         if (server.cluster->mf_end &&
 1813             nodeIsSlave(myself) &&
 1814             myself->slaveof == sender &&
 1815             hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
 1816             server.cluster->mf_master_offset == 0)
 1817         {
 1818             server.cluster->mf_master_offset = sender->repl_offset;
 1819             serverLog(LL_WARNING,
 1820                 "Received replication offset for paused "
 1821                 "master manual failover: %lld",
 1822                 server.cluster->mf_master_offset);
 1823         }
 1824     }
 1825 
 1826     /* Initial processing of PING and MEET requests replying with a PONG. */
 1827     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
 1828         serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
 1829 
 1830         /* We use incoming MEET messages in order to set the address
 1831          * for 'myself', since only other cluster nodes will send us
 1832          * MEET messages on handshakes, when the cluster joins, or
 1833          * later if we changed address, and those nodes will use our
 1834          * official address to connect to us. So by obtaining this address
 1835          * from the socket is a simple way to discover / update our own
 1836          * address in the cluster without it being hardcoded in the config.
 1837          *
 1838          * However if we don't have an address at all, we update the address
 1839          * even with a normal PING packet. If it's wrong it will be fixed
 1840          * by MEET later. */
 1841         if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
 1842             server.cluster_announce_ip == NULL)
 1843         {
 1844             char ip[NET_IP_STR_LEN];
 1845 
 1846             if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
 1847                 strcmp(ip,myself->ip))
 1848             {
 1849                 memcpy(myself->ip,ip,NET_IP_STR_LEN);
 1850                 serverLog(LL_WARNING,"IP address for this node updated to %s",
 1851                     myself->ip);
 1852                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
 1853             }
 1854         }
 1855 
 1856         /* Add this node if it is new for us and the msg type is MEET.
 1857          * In this stage we don't try to add the node with the right
 1858          * flags, slaveof pointer, and so forth, as this details will be
 1859          * resolved when we'll receive PONGs from the node. */
 1860         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
 1861             clusterNode *node;
 1862 
 1863             node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
 1864             nodeIp2String(node->ip,link,hdr->myip);
 1865             node->port = ntohs(hdr->port);
 1866             node->cport = ntohs(hdr->cport);
 1867             clusterAddNode(node);
 1868             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
 1869         }
 1870 
 1871         /* If this is a MEET packet from an unknown node, we still process
 1872          * the gossip section here since we have to trust the sender because
 1873          * of the message type. */
 1874         if (!sender && type == CLUSTERMSG_TYPE_MEET)
 1875             clusterProcessGossipSection(hdr,link);
 1876 
 1877         /* Anyway reply with a PONG */
 1878         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
 1879     }
 1880 
 1881     /* PING, PONG, MEET: process config information. */
 1882     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
 1883         type == CLUSTERMSG_TYPE_MEET)
 1884     {
 1885         serverLog(LL_DEBUG,"%s packet received: %p",
 1886             type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
 1887             (void*)link->node);
 1888         if (link->node) {
 1889             if (nodeInHandshake(link->node)) {
 1890                 /* If we already have this node, try to change the
 1891                  * IP/port of the node with the new one. */
 1892                 if (sender) {
 1893                     serverLog(LL_VERBOSE,
 1894                         "Handshake: we already know node %.40s, "
 1895                         "updating the address if needed.", sender->name);
 1896                     if (nodeUpdateAddressIfNeeded(sender,link,hdr))
 1897                     {
 1898                         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1899                                              CLUSTER_TODO_UPDATE_STATE);
 1900                     }
 1901                     /* Free this node as we already have it. This will
 1902                      * cause the link to be freed as well. */
 1903                     clusterDelNode(link->node);
 1904                     return 0;
 1905                 }
 1906 
 1907                 /* First thing to do is replacing the random name with the
 1908                  * right node name if this was a handshake stage. */
 1909                 clusterRenameNode(link->node, hdr->sender);
 1910                 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
 1911                     link->node->name);
 1912                 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
 1913                 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
 1914                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
 1915             } else if (memcmp(link->node->name,hdr->sender,
 1916                         CLUSTER_NAMELEN) != 0)
 1917             {
 1918                 /* If the reply has a non matching node ID we
 1919                  * disconnect this node and set it as not having an associated
 1920                  * address. */
 1921                 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
 1922                     link->node->name,
 1923                     (int)(now-(link->node->ctime)),
 1924                     link->node->flags);
 1925                 link->node->flags |= CLUSTER_NODE_NOADDR;
 1926                 link->node->ip[0] = '\0';
 1927                 link->node->port = 0;
 1928                 link->node->cport = 0;
 1929                 freeClusterLink(link);
 1930                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
 1931                 return 0;
 1932             }
 1933         }
 1934 
 1935         /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
 1936          * announced. This is a dynamic flag that we receive from the
 1937          * sender, and the latest status must be trusted. We need it to
 1938          * be propagated because the slave ranking used to understand the
 1939          * delay of each slave in the voting process, needs to know
 1940          * what are the instances really competing. */
 1941         if (sender) {
 1942             int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
 1943             sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
 1944             sender->flags |= nofailover;
 1945         }
 1946 
 1947         /* Update the node address if it changed. */
 1948         if (sender && type == CLUSTERMSG_TYPE_PING &&
 1949             !nodeInHandshake(sender) &&
 1950             nodeUpdateAddressIfNeeded(sender,link,hdr))
 1951         {
 1952             clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1953                                  CLUSTER_TODO_UPDATE_STATE);
 1954         }
 1955 
 1956         /* Update our info about the node */
 1957         if (link->node && type == CLUSTERMSG_TYPE_PONG) {
 1958             link->node->pong_received = now;
 1959             link->node->ping_sent = 0;
 1960 
 1961             /* The PFAIL condition can be reversed without external
 1962              * help if it is momentary (that is, if it does not
 1963              * turn into a FAIL state).
 1964              *
 1965              * The FAIL condition is also reversible under specific
 1966              * conditions detected by clearNodeFailureIfNeeded(). */
 1967             if (nodeTimedOut(link->node)) {
 1968                 link->node->flags &= ~CLUSTER_NODE_PFAIL;
 1969                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1970                                      CLUSTER_TODO_UPDATE_STATE);
 1971             } else if (nodeFailed(link->node)) {
 1972                 clearNodeFailureIfNeeded(link->node);
 1973             }
 1974         }
 1975 
 1976         /* Check for role switch: slave -> master or master -> slave. */
 1977         if (sender) {
 1978             if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
 1979                 sizeof(hdr->slaveof)))
 1980             {
 1981                 /* Node is a master. */
 1982                 clusterSetNodeAsMaster(sender);
 1983             } else {
 1984                 /* Node is a slave. */
 1985                 clusterNode *master = clusterLookupNode(hdr->slaveof);
 1986 
 1987                 if (nodeIsMaster(sender)) {
 1988                     /* Master turned into a slave! Reconfigure the node. */
 1989                     clusterDelNodeSlots(sender);
 1990                     sender->flags &= ~(CLUSTER_NODE_MASTER|
 1991                                        CLUSTER_NODE_MIGRATE_TO);
 1992                     sender->flags |= CLUSTER_NODE_SLAVE;
 1993 
 1994                     /* Update config and state. */
 1995                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 1996                                          CLUSTER_TODO_UPDATE_STATE);
 1997                 }
 1998 
 1999                 /* Master node changed for this slave? */
 2000                 if (master && sender->slaveof != master) {
 2001                     if (sender->slaveof)
 2002                         clusterNodeRemoveSlave(sender->slaveof,sender);
 2003                     clusterNodeAddSlave(master,sender);
 2004                     sender->slaveof = master;
 2005 
 2006                     /* Update config. */
 2007                     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
 2008                 }
 2009             }
 2010         }
 2011 
 2012         /* Update our info about served slots.
 2013          *
 2014          * Note: this MUST happen after we update the master/slave state
 2015          * so that CLUSTER_NODE_MASTER flag will be set. */
 2016 
 2017         /* Many checks are only needed if the set of served slots this
 2018          * instance claims is different compared to the set of slots we have
 2019          * for it. Check this ASAP to avoid other computational expansive
 2020          * checks later. */
 2021         clusterNode *sender_master = NULL; /* Sender or its master if slave. */
 2022         int dirty_slots = 0; /* Sender claimed slots don't match my view? */
 2023 
 2024         if (sender) {
 2025             sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
 2026             if (sender_master) {
 2027                 dirty_slots = memcmp(sender_master->slots,
 2028                         hdr->myslots,sizeof(hdr->myslots)) != 0;
 2029             }
 2030         }
 2031 
 2032         /* 1) If the sender of the message is a master, and we detected that
 2033          *    the set of slots it claims changed, scan the slots to see if we
 2034          *    need to update our configuration. */
 2035         if (sender && nodeIsMaster(sender) && dirty_slots)
 2036             clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
 2037 
 2038         /* 2) We also check for the reverse condition, that is, the sender
 2039          *    claims to serve slots we know are served by a master with a
 2040          *    greater configEpoch. If this happens we inform the sender.
 2041          *
 2042          * This is useful because sometimes after a partition heals, a
 2043          * reappearing master may be the last one to claim a given set of
 2044          * hash slots, but with a configuration that other instances know to
 2045          * be deprecated. Example:
 2046          *
 2047          * A and B are master and slave for slots 1,2,3.
 2048          * A is partitioned away, B gets promoted.
 2049          * B is partitioned away, and A returns available.
 2050          *
 2051          * Usually B would PING A publishing its set of served slots and its
 2052          * configEpoch, but because of the partition B can't inform A of the
 2053          * new configuration, so other nodes that have an updated table must
 2054          * do it. In this way A will stop to act as a master (or can try to
 2055          * failover if there are the conditions to win the election). */
 2056         if (sender && dirty_slots) {
 2057             int j;
 2058 
 2059             for (j = 0; j < CLUSTER_SLOTS; j++) {
 2060                 if (bitmapTestBit(hdr->myslots,j)) {
 2061                     if (server.cluster->slots[j] == sender ||
 2062                         server.cluster->slots[j] == NULL) continue;
 2063                     if (server.cluster->slots[j]->configEpoch >
 2064                         senderConfigEpoch)
 2065                     {
 2066                         serverLog(LL_VERBOSE,
 2067                             "Node %.40s has old slots configuration, sending "
 2068                             "an UPDATE message about %.40s",
 2069                                 sender->name, server.cluster->slots[j]->name);
 2070                         clusterSendUpdate(sender->link,
 2071                             server.cluster->slots[j]);
 2072 
 2073                         /* TODO: instead of exiting the loop send every other
 2074                          * UPDATE packet for other nodes that are the new owner
 2075                          * of sender's slots. */
 2076                         break;
 2077                     }
 2078                 }
 2079             }
 2080         }
 2081 
 2082         /* If our config epoch collides with the sender's try to fix
 2083          * the problem. */
 2084         if (sender &&
 2085             nodeIsMaster(myself) && nodeIsMaster(sender) &&
 2086             senderConfigEpoch == myself->configEpoch)
 2087         {
 2088             clusterHandleConfigEpochCollision(sender);
 2089         }
 2090 
 2091         /* Get info from the gossip section */
 2092         if (sender) clusterProcessGossipSection(hdr,link);
 2093     } else if (type == CLUSTERMSG_TYPE_FAIL) {
 2094         clusterNode *failing;
 2095 
 2096         if (sender) {
 2097             failing = clusterLookupNode(hdr->data.fail.about.nodename);
 2098             if (failing &&
 2099                 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
 2100             {
 2101                 serverLog(LL_NOTICE,
 2102                     "FAIL message received from %.40s about %.40s",
 2103                     hdr->sender, hdr->data.fail.about.nodename);
 2104                 failing->flags |= CLUSTER_NODE_FAIL;
 2105                 failing->fail_time = now;
 2106                 failing->flags &= ~CLUSTER_NODE_PFAIL;
 2107                 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 2108                                      CLUSTER_TODO_UPDATE_STATE);
 2109             }
 2110         } else {
 2111             serverLog(LL_NOTICE,
 2112                 "Ignoring FAIL message from unknown node %.40s about %.40s",
 2113                 hdr->sender, hdr->data.fail.about.nodename);
 2114         }
 2115     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
 2116         robj *channel, *message;
 2117         uint32_t channel_len, message_len;
 2118 
 2119         /* Don't bother creating useless objects if there are no
 2120          * Pub/Sub subscribers. */
 2121         if (dictSize(server.pubsub_channels) ||
 2122            listLength(server.pubsub_patterns))
 2123         {
 2124             channel_len = ntohl(hdr->data.publish.msg.channel_len);
 2125             message_len = ntohl(hdr->data.publish.msg.message_len);
 2126             channel = createStringObject(
 2127                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
 2128             message = createStringObject(
 2129                         (char*)hdr->data.publish.msg.bulk_data+channel_len,
 2130                         message_len);
 2131             pubsubPublishMessage(channel,message);
 2132             decrRefCount(channel);
 2133             decrRefCount(message);
 2134         }
 2135     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
 2136         if (!sender) return 1;  /* We don't know that node. */
 2137         clusterSendFailoverAuthIfNeeded(sender,hdr);
 2138     } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
 2139         if (!sender) return 1;  /* We don't know that node. */
 2140         /* We consider this vote only if the sender is a master serving
 2141          * a non zero number of slots, and its currentEpoch is greater or
 2142          * equal to epoch where this node started the election. */
 2143         if (nodeIsMaster(sender) && sender->numslots > 0 &&
 2144             senderCurrentEpoch >= server.cluster->failover_auth_epoch)
 2145         {
 2146             server.cluster->failover_auth_count++;
 2147             /* Maybe we reached a quorum here, set a flag to make sure
 2148              * we check ASAP. */
 2149             clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
 2150         }
 2151     } else if (type == CLUSTERMSG_TYPE_MFSTART) {
 2152         /* This message is acceptable only if I'm a master and the sender
 2153          * is one of my slaves. */
 2154         if (!sender || sender->slaveof != myself) return 1;
 2155         /* Manual failover requested from slaves. Initialize the state
 2156          * accordingly. */
 2157         resetManualFailover();
 2158         server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
 2159         server.cluster->mf_slave = sender;
 2160         pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
 2161         serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
 2162             sender->name);
 2163     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
 2164         clusterNode *n; /* The node the update is about. */
 2165         uint64_t reportedConfigEpoch =
 2166                     ntohu64(hdr->data.update.nodecfg.configEpoch);
 2167 
 2168         if (!sender) return 1;  /* We don't know the sender. */
 2169         n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
 2170         if (!n) return 1;   /* We don't know the reported node. */
 2171         if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
 2172 
 2173         /* If in our current config the node is a slave, set it as a master. */
 2174         if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
 2175 
 2176         /* Update the node's configEpoch. */
 2177         n->configEpoch = reportedConfigEpoch;
 2178         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 2179                              CLUSTER_TODO_FSYNC_CONFIG);
 2180 
 2181         /* Check the bitmap of served slots and update our
 2182          * config accordingly. */
 2183         clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
 2184             hdr->data.update.nodecfg.slots);
 2185     } else if (type == CLUSTERMSG_TYPE_MODULE) {
 2186         if (!sender) return 1;  /* Protect the module from unknown nodes. */
 2187         /* We need to route this message back to the right module subscribed
 2188          * for the right message type. */
 2189         uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
 2190         uint32_t len = ntohl(hdr->data.module.msg.len);
 2191         uint8_t type = hdr->data.module.msg.type;
 2192         unsigned char *payload = hdr->data.module.msg.bulk_data;
 2193         moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
 2194     } else {
 2195         serverLog(LL_WARNING,"Received unknown packet type: %d", type);
 2196     }
 2197     return 1;
 2198 }
 2199 
 2200 /* This function is called when we detect the link with this node is lost.
 2201    We set the node as no longer connected. The Cluster Cron will detect
 2202    this connection and will try to get it connected again.
 2203 
 2204    Instead if the node is a temporary node used to accept a query, we
 2205    completely free the node on error. */
 2206 void handleLinkIOError(clusterLink *link) {
 2207     freeClusterLink(link);
 2208 }
 2209 
 2210 /* Send data. This is handled using a trivial send buffer that gets
 2211  * consumed by write(). We don't try to optimize this for speed too much
 2212  * as this is a very low traffic channel. */
 2213 void clusterWriteHandler(connection *conn) {
 2214     clusterLink *link = connGetPrivateData(conn);
 2215     ssize_t nwritten;
 2216 
 2217     nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
 2218     if (nwritten <= 0) {
 2219         serverLog(LL_DEBUG,"I/O error writing to node link: %s",
 2220             (nwritten == -1) ? connGetLastError(conn) : "short write");
 2221         handleLinkIOError(link);
 2222         return;
 2223     }
 2224     sdsrange(link->sndbuf,nwritten,-1);
 2225     if (sdslen(link->sndbuf) == 0)
 2226         connSetWriteHandler(link->conn, NULL);
 2227 }
 2228 
 2229 /* A connect handler that gets called when a connection to another node
 2230  * gets established.
 2231  */
 2232 void clusterLinkConnectHandler(connection *conn) {
 2233     clusterLink *link = connGetPrivateData(conn);
 2234     clusterNode *node = link->node;
 2235 
 2236     /* Check if connection succeeded */
 2237     if (connGetState(conn) != CONN_STATE_CONNECTED) {
 2238         serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
 2239                 node->name, node->ip, node->cport,
 2240                 connGetLastError(conn));
 2241         freeClusterLink(link);
 2242         return;
 2243     }
 2244 
 2245     /* Register a read handler from now on */
 2246     connSetReadHandler(conn, clusterReadHandler);
 2247 
 2248     /* Queue a PING in the new connection ASAP: this is crucial
 2249      * to avoid false positives in failure detection.
 2250      *
 2251      * If the node is flagged as MEET, we send a MEET message instead
 2252      * of a PING one, to force the receiver to add us in its node
 2253      * table. */
 2254     mstime_t old_ping_sent = node->ping_sent;
 2255     clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
 2256             CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
 2257     if (old_ping_sent) {
 2258         /* If there was an active ping before the link was
 2259          * disconnected, we want to restore the ping time, otherwise
 2260          * replaced by the clusterSendPing() call. */
 2261         node->ping_sent = old_ping_sent;
 2262     }
 2263     /* We can clear the flag after the first packet is sent.
 2264      * If we'll never receive a PONG, we'll never send new packets
 2265      * to this node. Instead after the PONG is received and we
 2266      * are no longer in meet/handshake status, we want to send
 2267      * normal PING packets. */
 2268     node->flags &= ~CLUSTER_NODE_MEET;
 2269 
 2270     serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
 2271             node->name, node->ip, node->cport);
 2272 }
 2273 
 2274 /* Read data. Try to read the first field of the header first to check the
 2275  * full length of the packet. When a whole packet is in memory this function
 2276  * will call the function to process the packet. And so forth. */
 2277 void clusterReadHandler(connection *conn) {
 2278     clusterMsg buf[1];
 2279     ssize_t nread;
 2280     clusterMsg *hdr;
 2281     clusterLink *link = connGetPrivateData(conn);
 2282     unsigned int readlen, rcvbuflen;
 2283 
 2284     while(1) { /* Read as long as there is data to read. */
 2285         rcvbuflen = sdslen(link->rcvbuf);
 2286         if (rcvbuflen < 8) {
 2287             /* First, obtain the first 8 bytes to get the full message
 2288              * length. */
 2289             readlen = 8 - rcvbuflen;
 2290         } else {
 2291             /* Finally read the full message. */
 2292             hdr = (clusterMsg*) link->rcvbuf;
 2293             if (rcvbuflen == 8) {
 2294                 /* Perform some sanity check on the message signature
 2295                  * and length. */
 2296                 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
 2297                     ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
 2298                 {
 2299                     serverLog(LL_WARNING,
 2300                         "Bad message length or signature received "
 2301                         "from Cluster bus.");
 2302                     handleLinkIOError(link);
 2303                     return;
 2304                 }
 2305             }
 2306             readlen = ntohl(hdr->totlen) - rcvbuflen;
 2307             if (readlen > sizeof(buf)) readlen = sizeof(buf);
 2308         }
 2309 
 2310         nread = connRead(conn,buf,readlen);
 2311         if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
 2312 
 2313         if (nread <= 0) {
 2314             /* I/O error... */
 2315             serverLog(LL_DEBUG,"I/O error reading from node link: %s",
 2316                 (nread == 0) ? "connection closed" : connGetLastError(conn));
 2317             handleLinkIOError(link);
 2318             return;
 2319         } else {
 2320             /* Read data and recast the pointer to the new buffer. */
 2321             link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
 2322             hdr = (clusterMsg*) link->rcvbuf;
 2323             rcvbuflen += nread;
 2324         }
 2325 
 2326         /* Total length obtained? Process this packet. */
 2327         if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
 2328             if (clusterProcessPacket(link)) {
 2329                 sdsfree(link->rcvbuf);
 2330                 link->rcvbuf = sdsempty();
 2331             } else {
 2332                 return; /* Link no longer valid. */
 2333             }
 2334         }
 2335     }
 2336 }
 2337 
 2338 /* Put stuff into the send buffer.
 2339  *
 2340  * It is guaranteed that this function will never have as a side effect
 2341  * the link to be invalidated, so it is safe to call this function
 2342  * from event handlers that will do stuff with the same link later. */
 2343 void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
 2344     if (sdslen(link->sndbuf) == 0 && msglen != 0)
 2345         connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
 2346 
 2347     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
 2348 
 2349     /* Populate sent messages stats. */
 2350     clusterMsg *hdr = (clusterMsg*) msg;
 2351     uint16_t type = ntohs(hdr->type);
 2352     if (type < CLUSTERMSG_TYPE_COUNT)
 2353         server.cluster->stats_bus_messages_sent[type]++;
 2354 }
 2355 
 2356 /* Send a message to all the nodes that are part of the cluster having
 2357  * a connected link.
 2358  *
 2359  * It is guaranteed that this function will never have as a side effect
 2360  * some node->link to be invalidated, so it is safe to call this function
 2361  * from event handlers that will do stuff with node links later. */
 2362 void clusterBroadcastMessage(void *buf, size_t len) {
 2363     dictIterator *di;
 2364     dictEntry *de;
 2365 
 2366     di = dictGetSafeIterator(server.cluster->nodes);
 2367     while((de = dictNext(di)) != NULL) {
 2368         clusterNode *node = dictGetVal(de);
 2369 
 2370         if (!node->link) continue;
 2371         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
 2372             continue;
 2373         clusterSendMessage(node->link,buf,len);
 2374     }
 2375     dictReleaseIterator(di);
 2376 }
 2377 
 2378 /* Build the message header. hdr must point to a buffer at least
 2379  * sizeof(clusterMsg) in bytes. */
 2380 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
 2381     int totlen = 0;
 2382     uint64_t offset;
 2383     clusterNode *master;
 2384 
 2385     /* If this node is a master, we send its slots bitmap and configEpoch.
 2386      * If this node is a slave we send the master's information instead (the
 2387      * node is flagged as slave so the receiver knows that it is NOT really
 2388      * in charge for this slots. */
 2389     master = (nodeIsSlave(myself) && myself->slaveof) ?
 2390               myself->slaveof : myself;
 2391 
 2392     memset(hdr,0,sizeof(*hdr));
 2393     hdr->ver = htons(CLUSTER_PROTO_VER);
 2394     hdr->sig[0] = 'R';
 2395     hdr->sig[1] = 'C';
 2396     hdr->sig[2] = 'm';
 2397     hdr->sig[3] = 'b';
 2398     hdr->type = htons(type);
 2399     memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
 2400 
 2401     /* If cluster-announce-ip option is enabled, force the receivers of our
 2402      * packets to use the specified address for this node. Otherwise if the
 2403      * first byte is zero, they'll do auto discovery. */
 2404     memset(hdr->myip,0,NET_IP_STR_LEN);
 2405     if (server.cluster_announce_ip) {
 2406         strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
 2407         hdr->myip[NET_IP_STR_LEN-1] = '\0';
 2408     }
 2409 
 2410     /* Handle cluster-announce-port as well. */
 2411     int port = server.tls_cluster ? server.tls_port : server.port;
 2412     int announced_port = server.cluster_announce_port ?
 2413                          server.cluster_announce_port : port;
 2414     int announced_cport = server.cluster_announce_bus_port ?
 2415                           server.cluster_announce_bus_port :
 2416                           (port + CLUSTER_PORT_INCR);
 2417 
 2418     memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
 2419     memset(hdr->slaveof,0,CLUSTER_NAMELEN);
 2420     if (myself->slaveof != NULL)
 2421         memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
 2422     hdr->port = htons(announced_port);
 2423     hdr->cport = htons(announced_cport);
 2424     hdr->flags = htons(myself->flags);
 2425     hdr->state = server.cluster->state;
 2426 
 2427     /* Set the currentEpoch and configEpochs. */
 2428     hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
 2429     hdr->configEpoch = htonu64(master->configEpoch);
 2430 
 2431     /* Set the replication offset. */
 2432     if (nodeIsSlave(myself))
 2433         offset = replicationGetSlaveOffset();
 2434     else
 2435         offset = server.master_repl_offset;
 2436     hdr->offset = htonu64(offset);
 2437 
 2438     /* Set the message flags. */
 2439     if (nodeIsMaster(myself) && server.cluster->mf_end)
 2440         hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
 2441 
 2442     /* Compute the message length for certain messages. For other messages
 2443      * this is up to the caller. */
 2444     if (type == CLUSTERMSG_TYPE_FAIL) {
 2445         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2446         totlen += sizeof(clusterMsgDataFail);
 2447     } else if (type == CLUSTERMSG_TYPE_UPDATE) {
 2448         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2449         totlen += sizeof(clusterMsgDataUpdate);
 2450     }
 2451     hdr->totlen = htonl(totlen);
 2452     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
 2453 }
 2454 
 2455 /* Return non zero if the node is already present in the gossip section of the
 2456  * message pointed by 'hdr' and having 'count' gossip entries. Otherwise
 2457  * zero is returned. Helper for clusterSendPing(). */
 2458 int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
 2459     int j;
 2460     for (j = 0; j < count; j++) {
 2461         if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
 2462                 CLUSTER_NAMELEN) == 0) break;
 2463     }
 2464     return j != count;
 2465 }
 2466 
 2467 /* Set the i-th entry of the gossip section in the message pointed by 'hdr'
 2468  * to the info of the specified node 'n'. */
 2469 void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
 2470     clusterMsgDataGossip *gossip;
 2471     gossip = &(hdr->data.ping.gossip[i]);
 2472     memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
 2473     gossip->ping_sent = htonl(n->ping_sent/1000);
 2474     gossip->pong_received = htonl(n->pong_received/1000);
 2475     memcpy(gossip->ip,n->ip,sizeof(n->ip));
 2476     gossip->port = htons(n->port);
 2477     gossip->cport = htons(n->cport);
 2478     gossip->flags = htons(n->flags);
 2479     gossip->notused1 = 0;
 2480 }
 2481 
 2482 /* Send a PING or PONG packet to the specified node, making sure to add enough
 2483  * gossip informations. */
 2484 void clusterSendPing(clusterLink *link, int type) {
 2485     unsigned char *buf;
 2486     clusterMsg *hdr;
 2487     int gossipcount = 0; /* Number of gossip sections added so far. */
 2488     int wanted; /* Number of gossip sections we want to append if possible. */
 2489     int totlen; /* Total packet length. */
 2490     /* freshnodes is the max number of nodes we can hope to append at all:
 2491      * nodes available minus two (ourself and the node we are sending the
 2492      * message to). However practically there may be less valid nodes since
 2493      * nodes in handshake state, disconnected, are not considered. */
 2494     int freshnodes = dictSize(server.cluster->nodes)-2;
 2495 
 2496     /* How many gossip sections we want to add? 1/10 of the number of nodes
 2497      * and anyway at least 3. Why 1/10?
 2498      *
 2499      * If we have N masters, with N/10 entries, and we consider that in
 2500      * node_timeout we exchange with each other node at least 4 packets
 2501      * (we ping in the worst case in node_timeout/2 time, and we also
 2502      * receive two pings from the host), we have a total of 8 packets
 2503      * in the node_timeout*2 falure reports validity time. So we have
 2504      * that, for a single PFAIL node, we can expect to receive the following
 2505      * number of failure reports (in the specified window of time):
 2506      *
 2507      * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
 2508      *
 2509      * PROB = probability of being featured in a single gossip entry,
 2510      *        which is 1 / NUM_OF_NODES.
 2511      * ENTRIES = 10.
 2512      * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
 2513      *
 2514      * If we assume we have just masters (so num of nodes and num of masters
 2515      * is the same), with 1/10 we always get over the majority, and specifically
 2516      * 80% of the number of nodes, to account for many masters failing at the
 2517      * same time.
 2518      *
 2519      * Since we have non-voting slaves that lower the probability of an entry
 2520      * to feature our node, we set the number of entries per packet as
 2521      * 10% of the total nodes we have. */
 2522     wanted = floor(dictSize(server.cluster->nodes)/10);
 2523     if (wanted < 3) wanted = 3;
 2524     if (wanted > freshnodes) wanted = freshnodes;
 2525 
 2526     /* Include all the nodes in PFAIL state, so that failure reports are
 2527      * faster to propagate to go from PFAIL to FAIL state. */
 2528     int pfail_wanted = server.cluster->stats_pfail_nodes;
 2529 
 2530     /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
 2531      * later according to the number of gossip sections we really were able
 2532      * to put inside the packet. */
 2533     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2534     totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
 2535     /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
 2536      * sizeof(clusterMsg) or more. */
 2537     if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
 2538     buf = zcalloc(totlen);
 2539     hdr = (clusterMsg*) buf;
 2540 
 2541     /* Populate the header. */
 2542     if (link->node && type == CLUSTERMSG_TYPE_PING)
 2543         link->node->ping_sent = mstime();
 2544     clusterBuildMessageHdr(hdr,type);
 2545 
 2546     /* Populate the gossip fields */
 2547     int maxiterations = wanted*3;
 2548     while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
 2549         dictEntry *de = dictGetRandomKey(server.cluster->nodes);
 2550         clusterNode *this = dictGetVal(de);
 2551 
 2552         /* Don't include this node: the whole packet header is about us
 2553          * already, so we just gossip about other nodes. */
 2554         if (this == myself) continue;
 2555 
 2556         /* PFAIL nodes will be added later. */
 2557         if (this->flags & CLUSTER_NODE_PFAIL) continue;
 2558 
 2559         /* In the gossip section don't include:
 2560          * 1) Nodes in HANDSHAKE state.
 2561          * 3) Nodes with the NOADDR flag set.
 2562          * 4) Disconnected nodes if they don't have configured slots.
 2563          */
 2564         if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
 2565             (this->link == NULL && this->numslots == 0))
 2566         {
 2567             freshnodes--; /* Tecnically not correct, but saves CPU. */
 2568             continue;
 2569         }
 2570 
 2571         /* Do not add a node we already have. */
 2572         if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
 2573 
 2574         /* Add it */
 2575         clusterSetGossipEntry(hdr,gossipcount,this);
 2576         freshnodes--;
 2577         gossipcount++;
 2578     }
 2579 
 2580     /* If there are PFAIL nodes, add them at the end. */
 2581     if (pfail_wanted) {
 2582         dictIterator *di;
 2583         dictEntry *de;
 2584 
 2585         di = dictGetSafeIterator(server.cluster->nodes);
 2586         while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
 2587             clusterNode *node = dictGetVal(de);
 2588             if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
 2589             if (node->flags & CLUSTER_NODE_NOADDR) continue;
 2590             if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
 2591             clusterSetGossipEntry(hdr,gossipcount,node);
 2592             freshnodes--;
 2593             gossipcount++;
 2594             /* We take the count of the slots we allocated, since the
 2595              * PFAIL stats may not match perfectly with the current number
 2596              * of PFAIL nodes. */
 2597             pfail_wanted--;
 2598         }
 2599         dictReleaseIterator(di);
 2600     }
 2601 
 2602     /* Ready to send... fix the totlen fiend and queue the message in the
 2603      * output buffer. */
 2604     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2605     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
 2606     hdr->count = htons(gossipcount);
 2607     hdr->totlen = htonl(totlen);
 2608     clusterSendMessage(link,buf,totlen);
 2609     zfree(buf);
 2610 }
 2611 
 2612 /* Send a PONG packet to every connected node that's not in handshake state
 2613  * and for which we have a valid link.
 2614  *
 2615  * In Redis Cluster pongs are not used just for failure detection, but also
 2616  * to carry important configuration information. So broadcasting a pong is
 2617  * useful when something changes in the configuration and we want to make
 2618  * the cluster aware ASAP (for instance after a slave promotion).
 2619  *
 2620  * The 'target' argument specifies the receiving instances using the
 2621  * defines below:
 2622  *
 2623  * CLUSTER_BROADCAST_ALL -> All known instances.
 2624  * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
 2625  */
 2626 #define CLUSTER_BROADCAST_ALL 0
 2627 #define CLUSTER_BROADCAST_LOCAL_SLAVES 1
 2628 void clusterBroadcastPong(int target) {
 2629     dictIterator *di;
 2630     dictEntry *de;
 2631 
 2632     di = dictGetSafeIterator(server.cluster->nodes);
 2633     while((de = dictNext(di)) != NULL) {
 2634         clusterNode *node = dictGetVal(de);
 2635 
 2636         if (!node->link) continue;
 2637         if (node == myself || nodeInHandshake(node)) continue;
 2638         if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
 2639             int local_slave =
 2640                 nodeIsSlave(node) && node->slaveof &&
 2641                 (node->slaveof == myself || node->slaveof == myself->slaveof);
 2642             if (!local_slave) continue;
 2643         }
 2644         clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
 2645     }
 2646     dictReleaseIterator(di);
 2647 }
 2648 
 2649 /* Send a PUBLISH message.
 2650  *
 2651  * If link is NULL, then the message is broadcasted to the whole cluster. */
 2652 void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
 2653     unsigned char *payload;
 2654     clusterMsg buf[1];
 2655     clusterMsg *hdr = (clusterMsg*) buf;
 2656     uint32_t totlen;
 2657     uint32_t channel_len, message_len;
 2658 
 2659     channel = getDecodedObject(channel);
 2660     message = getDecodedObject(message);
 2661     channel_len = sdslen(channel->ptr);
 2662     message_len = sdslen(message->ptr);
 2663 
 2664     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
 2665     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2666     totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
 2667 
 2668     hdr->data.publish.msg.channel_len = htonl(channel_len);
 2669     hdr->data.publish.msg.message_len = htonl(message_len);
 2670     hdr->totlen = htonl(totlen);
 2671 
 2672     /* Try to use the local buffer if possible */
 2673     if (totlen < sizeof(buf)) {
 2674         payload = (unsigned char*)buf;
 2675     } else {
 2676         payload = zmalloc(totlen);
 2677         memcpy(payload,hdr,sizeof(*hdr));
 2678         hdr = (clusterMsg*) payload;
 2679     }
 2680     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
 2681     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
 2682         message->ptr,sdslen(message->ptr));
 2683 
 2684     if (link)
 2685         clusterSendMessage(link,payload,totlen);
 2686     else
 2687         clusterBroadcastMessage(payload,totlen);
 2688 
 2689     decrRefCount(channel);
 2690     decrRefCount(message);
 2691     if (payload != (unsigned char*)buf) zfree(payload);
 2692 }
 2693 
 2694 /* Send a FAIL message to all the nodes we are able to contact.
 2695  * The FAIL message is sent when we detect that a node is failing
 2696  * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
 2697  * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
 2698  * nodes to do the same ASAP. */
 2699 void clusterSendFail(char *nodename) {
 2700     clusterMsg buf[1];
 2701     clusterMsg *hdr = (clusterMsg*) buf;
 2702 
 2703     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
 2704     memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
 2705     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
 2706 }
 2707 
 2708 /* Send an UPDATE message to the specified link carrying the specified 'node'
 2709  * slots configuration. The node name, slots bitmap, and configEpoch info
 2710  * are included. */
 2711 void clusterSendUpdate(clusterLink *link, clusterNode *node) {
 2712     clusterMsg buf[1];
 2713     clusterMsg *hdr = (clusterMsg*) buf;
 2714 
 2715     if (link == NULL) return;
 2716     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
 2717     memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
 2718     hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
 2719     memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
 2720     clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
 2721 }
 2722 
 2723 /* Send a MODULE message.
 2724  *
 2725  * If link is NULL, then the message is broadcasted to the whole cluster. */
 2726 void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
 2727                        unsigned char *payload, uint32_t len) {
 2728     unsigned char *heapbuf;
 2729     clusterMsg buf[1];
 2730     clusterMsg *hdr = (clusterMsg*) buf;
 2731     uint32_t totlen;
 2732 
 2733     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
 2734     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2735     totlen += sizeof(clusterMsgModule) - 3 + len;
 2736 
 2737     hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
 2738     hdr->data.module.msg.type = type;
 2739     hdr->data.module.msg.len = htonl(len);
 2740     hdr->totlen = htonl(totlen);
 2741 
 2742     /* Try to use the local buffer if possible */
 2743     if (totlen < sizeof(buf)) {
 2744         heapbuf = (unsigned char*)buf;
 2745     } else {
 2746         heapbuf = zmalloc(totlen);
 2747         memcpy(heapbuf,hdr,sizeof(*hdr));
 2748         hdr = (clusterMsg*) heapbuf;
 2749     }
 2750     memcpy(hdr->data.module.msg.bulk_data,payload,len);
 2751 
 2752     if (link)
 2753         clusterSendMessage(link,heapbuf,totlen);
 2754     else
 2755         clusterBroadcastMessage(heapbuf,totlen);
 2756 
 2757     if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
 2758 }
 2759 
 2760 /* This function gets a cluster node ID string as target, the same way the nodes
 2761  * addresses are represented in the modules side, resolves the node, and sends
 2762  * the message. If the target is NULL the message is broadcasted.
 2763  *
 2764  * The function returns C_OK if the target is valid, otherwise C_ERR is
 2765  * returned. */
 2766 int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
 2767     clusterNode *node = NULL;
 2768 
 2769     if (target != NULL) {
 2770         node = clusterLookupNode(target);
 2771         if (node == NULL || node->link == NULL) return C_ERR;
 2772     }
 2773 
 2774     clusterSendModule(target ? node->link : NULL,
 2775                       module_id, type, payload, len);
 2776     return C_OK;
 2777 }
 2778 
 2779 /* -----------------------------------------------------------------------------
 2780  * CLUSTER Pub/Sub support
 2781  *
 2782  * For now we do very little, just propagating PUBLISH messages across the whole
 2783  * cluster. In the future we'll try to get smarter and avoiding propagating those
 2784  * messages to hosts without receives for a given channel.
 2785  * -------------------------------------------------------------------------- */
 2786 void clusterPropagatePublish(robj *channel, robj *message) {
 2787     clusterSendPublish(NULL, channel, message);
 2788 }
 2789 
 2790 /* -----------------------------------------------------------------------------
 2791  * SLAVE node specific functions
 2792  * -------------------------------------------------------------------------- */
 2793 
 2794 /* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
 2795  * see if there is the quorum for this slave instance to failover its failing
 2796  * master.
 2797  *
 2798  * Note that we send the failover request to everybody, master and slave nodes,
 2799  * but only the masters are supposed to reply to our query. */
 2800 void clusterRequestFailoverAuth(void) {
 2801     clusterMsg buf[1];
 2802     clusterMsg *hdr = (clusterMsg*) buf;
 2803     uint32_t totlen;
 2804 
 2805     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
 2806     /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
 2807      * in the header to communicate the nodes receiving the message that
 2808      * they should authorized the failover even if the master is working. */
 2809     if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
 2810     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2811     hdr->totlen = htonl(totlen);
 2812     clusterBroadcastMessage(buf,totlen);
 2813 }
 2814 
 2815 /* Send a FAILOVER_AUTH_ACK message to the specified node. */
 2816 void clusterSendFailoverAuth(clusterNode *node) {
 2817     clusterMsg buf[1];
 2818     clusterMsg *hdr = (clusterMsg*) buf;
 2819     uint32_t totlen;
 2820 
 2821     if (!node->link) return;
 2822     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
 2823     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2824     hdr->totlen = htonl(totlen);
 2825     clusterSendMessage(node->link,(unsigned char*)buf,totlen);
 2826 }
 2827 
 2828 /* Send a MFSTART message to the specified node. */
 2829 void clusterSendMFStart(clusterNode *node) {
 2830     clusterMsg buf[1];
 2831     clusterMsg *hdr = (clusterMsg*) buf;
 2832     uint32_t totlen;
 2833 
 2834     if (!node->link) return;
 2835     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
 2836     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
 2837     hdr->totlen = htonl(totlen);
 2838     clusterSendMessage(node->link,(unsigned char*)buf,totlen);
 2839 }
 2840 
 2841 /* Vote for the node asking for our vote if there are the conditions. */
 2842 void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
 2843     clusterNode *master = node->slaveof;
 2844     uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
 2845     uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
 2846     unsigned char *claimed_slots = request->myslots;
 2847     int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
 2848     int j;
 2849 
 2850     /* IF we are not a master serving at least 1 slot, we don't have the
 2851      * right to vote, as the cluster size in Redis Cluster is the number
 2852      * of masters serving at least one slot, and quorum is the cluster
 2853      * size + 1 */
 2854     if (nodeIsSlave(myself) || myself->numslots == 0) return;
 2855 
 2856     /* Request epoch must be >= our currentEpoch.
 2857      * Note that it is impossible for it to actually be greater since
 2858      * our currentEpoch was updated as a side effect of receiving this
 2859      * request, if the request epoch was greater. */
 2860     if (requestCurrentEpoch < server.cluster->currentEpoch) {
 2861         serverLog(LL_WARNING,
 2862             "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
 2863             node->name,
 2864             (unsigned long long) requestCurrentEpoch,
 2865             (unsigned long long) server.cluster->currentEpoch);
 2866         return;
 2867     }
 2868 
 2869     /* I already voted for this epoch? Return ASAP. */
 2870     if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
 2871         serverLog(LL_WARNING,
 2872                 "Failover auth denied to %.40s: already voted for epoch %llu",
 2873                 node->name,
 2874                 (unsigned long long) server.cluster->currentEpoch);
 2875         return;
 2876     }
 2877 
 2878     /* Node must be a slave and its master down.
 2879      * The master can be non failing if the request is flagged
 2880      * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
 2881     if (nodeIsMaster(node) || master == NULL ||
 2882         (!nodeFailed(master) && !force_ack))
 2883     {
 2884         if (nodeIsMaster(node)) {
 2885             serverLog(LL_WARNING,
 2886                     "Failover auth denied to %.40s: it is a master node",
 2887                     node->name);
 2888         } else if (master == NULL) {
 2889             serverLog(LL_WARNING,
 2890                     "Failover auth denied to %.40s: I don't know its master",
 2891                     node->name);
 2892         } else if (!nodeFailed(master)) {
 2893             serverLog(LL_WARNING,
 2894                     "Failover auth denied to %.40s: its master is up",
 2895                     node->name);
 2896         }
 2897         return;
 2898     }
 2899 
 2900     /* We did not voted for a slave about this master for two
 2901      * times the node timeout. This is not strictly needed for correctness
 2902      * of the algorithm but makes the base case more linear. */
 2903     if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
 2904     {
 2905         serverLog(LL_WARNING,
 2906                 "Failover auth denied to %.40s: "
 2907                 "can't vote about this master before %lld milliseconds",
 2908                 node->name,
 2909                 (long long) ((server.cluster_node_timeout*2)-
 2910                              (mstime() - node->slaveof->voted_time)));
 2911         return;
 2912     }
 2913 
 2914     /* The slave requesting the vote must have a configEpoch for the claimed
 2915      * slots that is >= the one of the masters currently serving the same
 2916      * slots in the current configuration. */
 2917     for (j = 0; j < CLUSTER_SLOTS; j++) {
 2918         if (bitmapTestBit(claimed_slots, j) == 0) continue;
 2919         if (server.cluster->slots[j] == NULL ||
 2920             server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
 2921         {
 2922             continue;
 2923         }
 2924         /* If we reached this point we found a slot that in our current slots
 2925          * is served by a master with a greater configEpoch than the one claimed
 2926          * by the slave requesting our vote. Refuse to vote for this slave. */
 2927         serverLog(LL_WARNING,
 2928                 "Failover auth denied to %.40s: "
 2929                 "slot %d epoch (%llu) > reqEpoch (%llu)",
 2930                 node->name, j,
 2931                 (unsigned long long) server.cluster->slots[j]->configEpoch,
 2932                 (unsigned long long) requestConfigEpoch);
 2933         return;
 2934     }
 2935 
 2936     /* We can vote for this slave. */
 2937     server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
 2938     node->slaveof->voted_time = mstime();
 2939     clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
 2940     clusterSendFailoverAuth(node);
 2941     serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
 2942         node->name, (unsigned long long) server.cluster->currentEpoch);
 2943 }
 2944 
 2945 /* This function returns the "rank" of this instance, a slave, in the context
 2946  * of its master-slaves ring. The rank of the slave is given by the number of
 2947  * other slaves for the same master that have a better replication offset
 2948  * compared to the local one (better means, greater, so they claim more data).
 2949  *
 2950  * A slave with rank 0 is the one with the greatest (most up to date)
 2951  * replication offset, and so forth. Note that because how the rank is computed
 2952  * multiple slaves may have the same rank, in case they have the same offset.
 2953  *
 2954  * The slave rank is used to add a delay to start an election in order to
 2955  * get voted and replace a failing master. Slaves with better replication
 2956  * offsets are more likely to win. */
 2957 int clusterGetSlaveRank(void) {
 2958     long long myoffset;
 2959     int j, rank = 0;
 2960     clusterNode *master;
 2961 
 2962     serverAssert(nodeIsSlave(myself));
 2963     master = myself->slaveof;
 2964     if (master == NULL) return 0; /* Never called by slaves without master. */
 2965 
 2966     myoffset = replicationGetSlaveOffset();
 2967     for (j = 0; j < master->numslaves; j++)
 2968         if (master->slaves[j] != myself &&
 2969             !nodeCantFailover(master->slaves[j]) &&
 2970             master->slaves[j]->repl_offset > myoffset) rank++;
 2971     return rank;
 2972 }
 2973 
 2974 /* This function is called by clusterHandleSlaveFailover() in order to
 2975  * let the slave log why it is not able to failover. Sometimes there are
 2976  * not the conditions, but since the failover function is called again and
 2977  * again, we can't log the same things continuously.
 2978  *
 2979  * This function works by logging only if a given set of conditions are
 2980  * true:
 2981  *
 2982  * 1) The reason for which the failover can't be initiated changed.
 2983  *    The reasons also include a NONE reason we reset the state to
 2984  *    when the slave finds that its master is fine (no FAIL flag).
 2985  * 2) Also, the log is emitted again if the master is still down and
 2986  *    the reason for not failing over is still the same, but more than
 2987  *    CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
 2988  * 3) Finally, the function only logs if the slave is down for more than
 2989  *    five seconds + NODE_TIMEOUT. This way nothing is logged when a
 2990  *    failover starts in a reasonable time.
 2991  *
 2992  * The function is called with the reason why the slave can't failover
 2993  * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
 2994  *
 2995  * The function is guaranteed to be called only if 'myself' is a slave. */
 2996 void clusterLogCantFailover(int reason) {
 2997     char *msg;
 2998     static time_t lastlog_time = 0;
 2999     mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
 3000 
 3001     /* Don't log if we have the same reason for some time. */
 3002     if (reason == server.cluster->cant_failover_reason &&
 3003         time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
 3004         return;
 3005 
 3006     server.cluster->cant_failover_reason = reason;
 3007 
 3008     /* We also don't emit any log if the master failed no long ago, the
 3009      * goal of this function is to log slaves in a stalled condition for
 3010      * a long time. */
 3011     if (myself->slaveof &&
 3012         nodeFailed(myself->slaveof) &&
 3013         (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
 3014 
 3015     switch(reason) {
 3016     case CLUSTER_CANT_FAILOVER_DATA_AGE:
 3017         msg = "Disconnected from master for longer than allowed. "
 3018               "Please check the 'cluster-replica-validity-factor' configuration "
 3019               "option.";
 3020         break;
 3021     case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
 3022         msg = "Waiting the delay before I can start a new failover.";
 3023         break;
 3024     case CLUSTER_CANT_FAILOVER_EXPIRED:
 3025         msg = "Failover attempt expired.";
 3026         break;
 3027     case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
 3028         msg = "Waiting for votes, but majority still not reached.";
 3029         break;
 3030     default:
 3031         msg = "Unknown reason code.";
 3032         break;
 3033     }
 3034     lastlog_time = time(NULL);
 3035     serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
 3036 }
 3037 
 3038 /* This function implements the final part of automatic and manual failovers,
 3039  * where the slave grabs its master's hash slots, and propagates the new
 3040  * configuration.
 3041  *
 3042  * Note that it's up to the caller to be sure that the node got a new
 3043  * configuration epoch already. */
 3044 void clusterFailoverReplaceYourMaster(void) {
 3045     int j;
 3046     clusterNode *oldmaster = myself->slaveof;
 3047 
 3048     if (nodeIsMaster(myself) || oldmaster == NULL) return;
 3049 
 3050     /* 1) Turn this node into a master. */
 3051     clusterSetNodeAsMaster(myself);
 3052     replicationUnsetMaster();
 3053 
 3054     /* 2) Claim all the slots assigned to our master. */
 3055     for (j = 0; j < CLUSTER_SLOTS; j++) {
 3056         if (clusterNodeGetSlotBit(oldmaster,j)) {
 3057             clusterDelSlot(j);
 3058             clusterAddSlot(myself,j);
 3059         }
 3060     }
 3061 
 3062     /* 3) Update state and save config. */
 3063     clusterUpdateState();
 3064     clusterSaveConfigOrDie(1);
 3065 
 3066     /* 4) Pong all the other nodes so that they can update the state
 3067      *    accordingly and detect that we switched to master role. */
 3068     clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
 3069 
 3070     /* 5) If there was a manual failover in progress, clear the state. */
 3071     resetManualFailover();
 3072 }
 3073 
 3074 /* This function is called if we are a slave node and our master serving
 3075  * a non-zero amount of hash slots is in FAIL state.
 3076  *
 3077  * The gaol of this function is:
 3078  * 1) To check if we are able to perform a failover, is our data updated?
 3079  * 2) Try to get elected by masters.
 3080  * 3) Perform the failover informing all the other nodes.
 3081  */
 3082 void clusterHandleSlaveFailover(void) {
 3083     mstime_t data_age;
 3084     mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
 3085     int needed_quorum = (server.cluster->size / 2) + 1;
 3086     int manual_failover = server.cluster->mf_end != 0 &&
 3087                           server.cluster->mf_can_start;
 3088     mstime_t auth_timeout, auth_retry_time;
 3089 
 3090     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
 3091 
 3092     /* Compute the failover timeout (the max time we have to send votes
 3093      * and wait for replies), and the failover retry time (the time to wait
 3094      * before trying to get voted again).
 3095      *
 3096      * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
 3097      * Retry is two times the Timeout.
 3098      */
 3099     auth_timeout = server.cluster_node_timeout*2;
 3100     if (auth_timeout < 2000) auth_timeout = 2000;
 3101     auth_retry_time = auth_timeout*2;
 3102 
 3103     /* Pre conditions to run the function, that must be met both in case
 3104      * of an automatic or manual failover:
 3105      * 1) We are a slave.
 3106      * 2) Our master is flagged as FAIL, or this is a manual failover.
 3107      * 3) We don't have the no failover configuration set, and this is
 3108      *    not a manual failover.
 3109      * 4) It is serving slots. */
 3110     if (nodeIsMaster(myself) ||
 3111         myself->slaveof == NULL ||
 3112         (!nodeFailed(myself->slaveof) && !manual_failover) ||
 3113         (server.cluster_slave_no_failover && !manual_failover) ||
 3114         myself->slaveof->numslots == 0)
 3115     {
 3116         /* There are no reasons to failover, so we set the reason why we
 3117          * are returning without failing over to NONE. */
 3118         server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
 3119         return;
 3120     }
 3121 
 3122     /* Set data_age to the number of seconds we are disconnected from
 3123      * the master. */
 3124     if (server.repl_state == REPL_STATE_CONNECTED) {
 3125         data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
 3126                    * 1000;
 3127     } else {
 3128         data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
 3129     }
 3130 
 3131     /* Remove the node timeout from the data age as it is fine that we are
 3132      * disconnected from our master at least for the time it was down to be
 3133      * flagged as FAIL, that's the baseline. */
 3134     if (data_age > server.cluster_node_timeout)
 3135         data_age -= server.cluster_node_timeout;
 3136 
 3137     /* Check if our data is recent enough according to the slave validity
 3138      * factor configured by the user.
 3139      *
 3140      * Check bypassed for manual failovers. */
 3141     if (server.cluster_slave_validity_factor &&
 3142         data_age >
 3143         (((mstime_t)server.repl_ping_slave_period * 1000) +
 3144          (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
 3145     {
 3146         if (!manual_failover) {
 3147             clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
 3148             return;
 3149         }
 3150     }
 3151 
 3152     /* If the previous failover attempt timedout and the retry time has
 3153      * elapsed, we can setup a new one. */
 3154     if (auth_age > auth_retry_time) {
 3155         server.cluster->failover_auth_time = mstime() +
 3156             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
 3157             random() % 500; /* Random delay between 0 and 500 milliseconds. */
 3158         server.cluster->failover_auth_count = 0;
 3159         server.cluster->failover_auth_sent = 0;
 3160         server.cluster->failover_auth_rank = clusterGetSlaveRank();
 3161         /* We add another delay that is proportional to the slave rank.
 3162          * Specifically 1 second * rank. This way slaves that have a probably
 3163          * less updated replication offset, are penalized. */
 3164         server.cluster->failover_auth_time +=
 3165             server.cluster->failover_auth_rank * 1000;
 3166         /* However if this is a manual failover, no delay is needed. */
 3167         if (server.cluster->mf_end) {
 3168             server.cluster->failover_auth_time = mstime();
 3169             server.cluster->failover_auth_rank = 0;
 3170         clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
 3171         }
 3172         serverLog(LL_WARNING,
 3173             "Start of election delayed for %lld milliseconds "
 3174             "(rank #%d, offset %lld).",
 3175             server.cluster->failover_auth_time - mstime(),
 3176             server.cluster->failover_auth_rank,
 3177             replicationGetSlaveOffset());
 3178         /* Now that we have a scheduled election, broadcast our offset
 3179          * to all the other slaves so that they'll updated their offsets
 3180          * if our offset is better. */
 3181         clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
 3182         return;
 3183     }
 3184 
 3185     /* It is possible that we received more updated offsets from other
 3186      * slaves for the same master since we computed our election delay.
 3187      * Update the delay if our rank changed.
 3188      *
 3189      * Not performed if this is a manual failover. */
 3190     if (server.cluster->failover_auth_sent == 0 &&
 3191         server.cluster->mf_end == 0)
 3192     {
 3193         int newrank = clusterGetSlaveRank();
 3194         if (newrank > server.cluster->failover_auth_rank) {
 3195             long long added_delay =
 3196                 (newrank - server.cluster->failover_auth_rank) * 1000;
 3197             server.cluster->failover_auth_time += added_delay;
 3198             server.cluster->failover_auth_rank = newrank;
 3199             serverLog(LL_WARNING,
 3200                 "Replica rank updated to #%d, added %lld milliseconds of delay.",
 3201                 newrank, added_delay);
 3202         }
 3203     }
 3204 
 3205     /* Return ASAP if we can't still start the election. */
 3206     if (mstime() < server.cluster->failover_auth_time) {
 3207         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
 3208         return;
 3209     }
 3210 
 3211     /* Return ASAP if the election is too old to be valid. */
 3212     if (auth_age > auth_timeout) {
 3213         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
 3214         return;
 3215     }
 3216 
 3217     /* Ask for votes if needed. */
 3218     if (server.cluster->failover_auth_sent == 0) {
 3219         server.cluster->currentEpoch++;
 3220         server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
 3221         serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
 3222             (unsigned long long) server.cluster->currentEpoch);
 3223         clusterRequestFailoverAuth();
 3224         server.cluster->failover_auth_sent = 1;
 3225         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
 3226                              CLUSTER_TODO_UPDATE_STATE|
 3227                              CLUSTER_TODO_FSYNC_CONFIG);
 3228         return; /* Wait for replies. */
 3229     }
 3230 
 3231     /* Check if we reached the quorum. */
 3232     if (server.cluster->failover_auth_count >= needed_quorum) {
 3233         /* We have the quorum, we can finally failover the master. */
 3234 
 3235         serverLog(LL_WARNING,
 3236             "Failover election won: I'm the new master.");
 3237 
 3238         /* Update my configEpoch to the epoch of the election. */
 3239         if (myself->configEpoch < server.cluster->failover_auth_epoch) {
 3240             myself->configEpoch = server.cluster->failover_auth_epoch;
 3241             serverLog(LL_WARNING,
 3242                 "configEpoch set to %llu after successful failover",
 3243                 (unsigned long long) myself->configEpoch);
 3244         }
 3245 
 3246         /* Take responsibility for the cluster slots. */
 3247         clusterFailoverReplaceYourMaster();
 3248     } else {
 3249         clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
 3250     }
 3251 }
 3252 
 3253 /* -----------------------------------------------------------------------------
 3254  * CLUSTER slave migration
 3255  *
 3256  * Slave migration is the process that allows a slave of a master that is
 3257  * already covered by at least another slave, to "migrate" to a master that
 3258  * is orpaned, that is, left with no working slaves.
 3259  * ------------------------------------------------------------------------- */
 3260 
 3261 /* This function is responsible to decide if this replica should be migrated
 3262  * to a different (orphaned) master. It is called by the clusterCron() function
 3263  * only if:
 3264  *
 3265  * 1) We are a slave node.
 3266  * 2) It was detected that there is at least one orphaned master in
 3267  *    the cluster.
 3268  * 3) We are a slave of one of the masters with the greatest number of
 3269  *    slaves.
 3270  *
 3271  * This checks are performed by the caller since it requires to iterate
 3272  * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
 3273  * if definitely needed.
 3274  *
 3275  * The fuction is called with a pre-computed max_slaves, that is the max
 3276  * number of working (not in FAIL state) slaves for a single master.
 3277  *
 3278  * Additional conditions for migration are examined inside the function.
 3279  */
 3280 void clusterHandleSlaveMigration(int max_slaves) {
 3281     int j, okslaves = 0;
 3282     clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
 3283     dictIterator *di;
 3284     dictEntry *de;
 3285 
 3286     /* Step 1: Don't migrate if the cluster state is not ok. */
 3287     if (server.cluster->state != CLUSTER_OK) return;
 3288 
 3289     /* Step 2: Don't migrate if my master will not be left with at least
 3290      *         'migration-barrier' slaves after my migration. */
 3291     if (mymaster == NULL) return;
 3292     for (j = 0; j < mymaster->numslaves; j++)
 3293         if (!nodeFailed(mymaster->slaves[j]) &&
 3294             !nodeTimedOut(mymaster->slaves[j])) okslaves++;
 3295     if (okslaves <= server.cluster_migration_barrier) return;
 3296 
 3297     /* Step 3: Identify a candidate for migration, and check if among the
 3298      * masters with the greatest number of ok slaves, I'm the one with the
 3299      * smallest node ID (the "candidate slave").
 3300      *
 3301      * Note: this means that eventually a replica migration will occur
 3302      * since slaves that are reachable again always have their FAIL flag
 3303      * cleared, so eventually there must be a candidate. At the same time
 3304      * this does not mean that there are no race conditions possible (two
 3305      * slaves migrating at the same time), but this is unlikely to
 3306      * happen, and harmless when happens. */
 3307     candidate = myself;
 3308     di = dictGetSafeIterator(server.cluster->nodes);
 3309     while((de = dictNext(di)) != NULL) {
 3310         clusterNode *node = dictGetVal(de);
 3311         int okslaves = 0, is_orphaned = 1;
 3312 
 3313         /* We want to migrate only if this master is working, orphaned, and
 3314          * used to have slaves or if failed over a master that had slaves
 3315          * (MIGRATE_TO flag). This way we only migrate to instances that were
 3316          * supposed to have replicas. */
 3317         if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
 3318         if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
 3319 
 3320         /* Check number of working slaves. */
 3321         if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
 3322         if (okslaves > 0) is_orphaned = 0;
 3323 
 3324         if (is_orphaned) {
 3325             if (!target && node->numslots > 0) target = node;
 3326 
 3327             /* Track the starting time of the orphaned condition for this
 3328              * master. */
 3329             if (!node->orphaned_time) node->orphaned_time = mstime();
 3330         } else {
 3331             node->orphaned_time = 0;
 3332         }
 3333 
 3334         /* Check if I'm the slave candidate for the migration: attached
 3335          * to a master with the maximum number of slaves and with the smallest
 3336          * node ID. */
 3337         if (okslaves == max_slaves) {
 3338             for (j = 0; j < node->numslaves; j++) {
 3339                 if (memcmp(node->slaves[j]->name,
 3340                            candidate->name,
 3341                            CLUSTER_NAMELEN) < 0)
 3342                 {
 3343                     candidate = node->slaves[j];
 3344                 }
 3345             }
 3346         }
 3347     }
 3348     dictReleaseIterator(di);
 3349 
 3350     /* Step 4: perform the migration if there is a target, and if I'm the
 3351      * candidate, but only if the master is continuously orphaned for a
 3352      * couple of seconds, so that during failovers, we give some time to
 3353      * the natural slaves of this instance to advertise their switch from
 3354      * the old master to the new one. */
 3355     if (target && candidate == myself &&
 3356         (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
 3357        !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
 3358     {
 3359         serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
 3360             target->name);
 3361         clusterSetMaster(target);
 3362     }
 3363 }
 3364 
 3365 /* -----------------------------------------------------------------------------
 3366  * CLUSTER manual failover
 3367  *
 3368  * This are the important steps performed by slaves during a manual failover:
 3369  * 1) User send CLUSTER FAILOVER command. The failover state is initialized
 3370  *    setting mf_end to the millisecond unix time at which we'll abort the
 3371  *    attempt.
 3372  * 2) Slave sends a MFSTART message to the master requesting to pause clients
 3373  *    for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
 3374  *    When master is paused for manual failover, it also starts to flag
 3375  *    packets with CLUSTERMSG_FLAG0_PAUSED.
 3376  * 3) Slave waits for master to send its replication offset flagged as PAUSED.
 3377  * 4) If slave received the offset from the master, and its offset matches,
 3378  *    mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
 3379  *    the failover as usually, with the difference that the vote request
 3380  *    will be modified to force masters to vote for a slave that has a
 3381  *    working master.
 3382  *
 3383  * From the point of view of the master things are simpler: when a
 3384  * PAUSE_CLIENTS packet is received the master sets mf_end as well and
 3385  * the sender in mf_slave. During the time limit for the manual failover
 3386  * the master will just send PINGs more often to this slave, flagged with
 3387  * the PAUSED flag, so that the slave will set mf_master_offset when receiving
 3388  * a packet from the master with this flag set.
 3389  *
 3390  * The gaol of the manual failover is to perform a fast failover without
 3391  * data loss due to the asynchronous master-slave replication.
 3392  * -------------------------------------------------------------------------- */
 3393 
 3394 /* Reset the manual failover state. This works for both masters and slavesa
 3395  * as all the state about manual failover is cleared.
 3396  *
 3397  * The function can be used both to initialize the manual failover state at
 3398  * startup or to abort a manual failover in progress. */
 3399 void resetManualFailover(void) {
 3400     if (server.cluster->mf_end && clientsArePaused()) {
 3401         server.clients_pause_end_time = 0;
 3402         clientsArePaused(); /* Just use the side effect of the function. */
 3403     }
 3404     server.cluster->mf_end = 0; /* No manual failover in progress. */
 3405     server.cluster->mf_can_start = 0;
 3406     server.cluster->mf_slave = NULL;
 3407     server.cluster->mf_master_offset = 0;
 3408 }
 3409 
 3410 /* If a manual failover timed out, abort it. */
 3411 void manualFailoverCheckTimeout(void) {
 3412     if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
 3413         serverLog(LL_WARNING,"Manual failover timed out.");
 3414         resetManualFailover();
 3415     }
 3416 }
 3417 
 3418 /* This function is called from the cluster cron function in order to go
 3419  * forward with a manual failover state machine. */
 3420 void clusterHandleManualFailover(void) {
 3421     /* Return ASAP if no manual failover is in progress. */
 3422     if (server.cluster->mf_end == 0) return;
 3423 
 3424     /* If mf_can_start is non-zero, the failover was already triggered so the
 3425      * next steps are performed by clusterHandleSlaveFailover(). */
 3426     if (server.cluster->mf_can_start) return;
 3427 
 3428     if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
 3429 
 3430     if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
 3431         /* Our replication offset matches the master replication offset
 3432          * announced after clients were paused. We can start the failover. */
 3433         server.cluster->mf_can_start = 1;
 3434         serverLog(LL_WARNING,
 3435             "All master replication stream processed, "
 3436             "manual failover can start.");
 3437     }
 3438 }
 3439 
 3440 /* -----------------------------------------------------------------------------
 3441  * CLUSTER cron job
 3442  * -------------------------------------------------------------------------- */
 3443 
 3444 /* This is executed 10 times every second */
 3445 void clusterCron(void) {
 3446     dictIterator *di;
 3447     dictEntry *de;
 3448     int update_state = 0;
 3449     int orphaned_masters; /* How many masters there are without ok slaves. */
 3450     int max_slaves; /* Max number of ok slaves for a single master. */
 3451     int this_slaves; /* Number of ok slaves for our master (if we are slave). */
 3452     mstime_t min_pong = 0, now = mstime();
 3453     clusterNode *min_pong_node = NULL;
 3454     static unsigned long long iteration = 0;
 3455     mstime_t handshake_timeout;
 3456 
 3457     iteration++; /* Number of times this function was called so far. */
 3458 
 3459     /* We want to take myself->ip in sync with the cluster-announce-ip option.
 3460      * The option can be set at runtime via CONFIG SET, so we periodically check
 3461      * if the option changed to reflect this into myself->ip. */
 3462     {
 3463         static char *prev_ip = NULL;
 3464         char *curr_ip = server.cluster_announce_ip;
 3465         int changed = 0;
 3466 
 3467         if (prev_ip == NULL && curr_ip != NULL) changed = 1;
 3468         else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
 3469         else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
 3470 
 3471         if (changed) {
 3472             if (prev_ip) zfree(prev_ip);
 3473             prev_ip = curr_ip;
 3474 
 3475             if (curr_ip) {
 3476                 /* We always take a copy of the previous IP address, by
 3477                  * duplicating the string. This way later we can check if
 3478                  * the address really changed. */
 3479                 prev_ip = zstrdup(prev_ip);
 3480                 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
 3481                 myself->ip[NET_IP_STR_LEN-1] = '\0';
 3482             } else {
 3483                 myself->ip[0] = '\0'; /* Force autodetection. */
 3484             }
 3485         }
 3486     }
 3487 
 3488     /* The handshake timeout is the time after which a handshake node that was
 3489      * not turned into a normal node is removed from the nodes. Usually it is
 3490      * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
 3491      * the value of 1 second. */
 3492     handshake_timeout = server.cluster_node_timeout;
 3493     if (handshake_timeout < 1000) handshake_timeout = 1000;
 3494 
 3495     /* Update myself flags. */
 3496     clusterUpdateMyselfFlags();
 3497 
 3498     /* Check if we have disconnected nodes and re-establish the connection.
 3499      * Also update a few stats while we are here, that can be used to make
 3500      * better decisions in other part of the code. */
 3501     di = dictGetSafeIterator(server.cluster->nodes);
 3502     server.cluster->stats_pfail_nodes = 0;
 3503     while((de = dictNext(di)) != NULL) {
 3504         clusterNode *node = dictGetVal(de);
 3505 
 3506         /* Not interested in reconnecting the link with myself or nodes
 3507          * for which we have no address. */
 3508         if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
 3509 
 3510         if (node->flags & CLUSTER_NODE_PFAIL)
 3511             server.cluster->stats_pfail_nodes++;
 3512 
 3513         /* A Node in HANDSHAKE state has a limited lifespan equal to the
 3514          * configured node timeout. */
 3515         if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
 3516             clusterDelNode(node);
 3517             continue;
 3518         }
 3519 
 3520         if (node->link == NULL) {
 3521             clusterLink *link = createClusterLink(node);
 3522             link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
 3523             connSetPrivateData(link->conn, link);
 3524             if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
 3525                         clusterLinkConnectHandler) == -1) {
 3526                 /* We got a synchronous error from connect before
 3527                  * clusterSendPing() had a chance to be called.
 3528                  * If node->ping_sent is zero, failure detection can't work,
 3529                  * so we claim we actually sent a ping now (that will
 3530                  * be really sent as soon as the link is obtained). */
 3531                 if (node->ping_sent == 0) node->ping_sent = mstime();
 3532                 serverLog(LL_DEBUG, "Unable to connect to "
 3533                     "Cluster Node [%s]:%d -> %s", node->ip,
 3534                     node->cport, server.neterr);
 3535 
 3536                 freeClusterLink(link);
 3537                 continue;
 3538             }
 3539             node->link = link;
 3540         }
 3541     }
 3542     dictReleaseIterator(di);
 3543 
 3544     /* Ping some random node 1 time every 10 iterations, so that we usually ping
 3545      * one random node every second. */
 3546     if (!(iteration % 10)) {
 3547         int j;
 3548 
 3549         /* Check a few random nodes and ping the one with the oldest
 3550          * pong_received time. */
 3551         for (j = 0; j < 5; j++) {
 3552             de = dictGetRandomKey(server.cluster->nodes);
 3553             clusterNode *this = dictGetVal(de);
 3554 
 3555             /* Don't ping nodes disconnected or with a ping currently active. */
 3556             if (this->link == NULL || this->ping_sent != 0) continue;
 3557             if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
 3558                 continue;
 3559             if (min_pong_node == NULL || min_pong > this->pong_received) {
 3560                 min_pong_node = this;
 3561                 min_pong = this->pong_received;
 3562             }
 3563         }
 3564         if (min_pong_node) {
 3565             serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
 3566             clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
 3567         }
 3568     }
 3569 
 3570     /* Iterate nodes to check if we need to flag something as failing.
 3571      * This loop is also responsible to:
 3572      * 1) Check if there are orphaned masters (masters without non failing
 3573      *    slaves).
 3574      * 2) Count the max number of non failing slaves for a single master.
 3575      * 3) Count the number of slaves for our master, if we are a slave. */
 3576     orphaned_masters = 0;
 3577     max_slaves = 0;
 3578     this_slaves = 0;
 3579     di = dictGetSafeIterator(server.cluster->nodes);
 3580     while((de = dictNext(di)) != NULL) {
 3581         clusterNode *node = dictGetVal(de);
 3582         now = mstime(); /* Use an updated time at every iteration. */
 3583 
 3584         if (node->flags &
 3585             (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
 3586                 continue;
 3587 
 3588         /* Orphaned master check, useful only if the current instance
 3589          * is a slave that may migrate to another master. */
 3590         if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
 3591             int okslaves = clusterCountNonFailingSlaves(node);
 3592 
 3593             /* A master is orphaned if it is serving a non-zero number of
 3594              * slots, have no working slaves, but used to have at least one
 3595              * slave, or failed over a master that used to have slaves. */
 3596             if (okslaves == 0 && node->numslots > 0 &&
 3597                 node->flags & CLUSTER_NODE_MIGRATE_TO)
 3598             {
 3599                 orphaned_masters++;
 3600             }
 3601             if (okslaves > max_slaves) max_slaves = okslaves;
 3602             if (nodeIsSlave(myself) && myself->slaveof == node)
 3603                 this_slaves = okslaves;
 3604         }
 3605 
 3606         /* If we are not receiving any data for more than half the cluster
 3607          * timeout, reconnect the link: maybe there is a connection
 3608          * issue even if the node is alive. */
 3609         mstime_t ping_delay = now - node->ping_sent;
 3610         mstime_t data_delay = now - node->data_received;
 3611         if (node->link && /* is connected */
 3612             now - node->link->ctime >
 3613             server.cluster_node_timeout && /* was not already reconnected */
 3614             node->ping_sent && /* we already sent a ping */
 3615             node->pong_received < node->ping_sent && /* still waiting pong */
 3616             /* and we are waiting for the pong more than timeout/2 */
 3617             ping_delay > server.cluster_node_timeout/2 &&
 3618             /* and in such interval we are not seeing any traffic at all. */
 3619             data_delay > server.cluster_node_timeout/2)
 3620         {
 3621             /* Disconnect the link, it will be reconnected automatically. */
 3622             freeClusterLink(node->link);
 3623         }
 3624 
 3625         /* If we have currently no active ping in this instance, and the
 3626          * received PONG is older than half the cluster timeout, send
 3627          * a new ping now, to ensure all the nodes are pinged without
 3628          * a too big delay. */
 3629         if (node->link &&
 3630             node->ping_sent == 0 &&
 3631             (now - node->pong_received) > server.cluster_node_timeout/2)
 3632         {
 3633             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
 3634             continue;
 3635         }
 3636 
 3637         /* If we are a master and one of the slaves requested a manual
 3638          * failover, ping it continuously. */
 3639         if (server.cluster->mf_end &&
 3640             nodeIsMaster(myself) &&
 3641             server.cluster->mf_slave == node &&
 3642             node->link)
 3643         {
 3644             clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
 3645             continue;
 3646         }
 3647 
 3648         /* Check only if we have an active ping for this instance. */
 3649         if (node->ping_sent == 0) continue;
 3650 
 3651         /* Check if this node looks unreachable.
 3652          * Note that if we already received the PONG, then node->ping_sent
 3653          * is zero, so can't reach this code at all, so we don't risk of
 3654          * checking for a PONG delay if we didn't sent the PING.
 3655          *
 3656          * We also consider every incoming data as proof of liveness, since
 3657          * our cluster bus link is also used for data: under heavy data
 3658          * load pong delays are possible. */
 3659         mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
 3660                                                           data_delay;
 3661 
 3662         if (node_delay > server.cluster_node_timeout) {
 3663             /* Timeout reached. Set the node as possibly failing if it is
 3664              * not already in this state. */
 3665             if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
 3666                 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
 3667                     node->name);
 3668                 node->flags |= CLUSTER_NODE_PFAIL;
 3669                 update_state = 1;
 3670             }
 3671         }
 3672     }
 3673     dictReleaseIterator(di);
 3674 
 3675     /* If we are a slave node but the replication is still turned off,
 3676      * enable it if we know the address of our master and it appears to
 3677      * be up. */
 3678     if (nodeIsSlave(myself) &&
 3679         server.masterhost == NULL &&
 3680         myself->slaveof &&
 3681         nodeHasAddr(myself->slaveof))
 3682     {
 3683         replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
 3684     }
 3685 
 3686     /* Abourt a manual failover if the timeout is reached. */
 3687     manualFailoverCheckTimeout();
 3688 
 3689     if (nodeIsSlave(myself)) {
 3690         clusterHandleManualFailover();
 3691         if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
 3692             clusterHandleSlaveFailover();
 3693         /* If there are orphaned slaves, and we are a slave among the masters
 3694          * with the max number of non-failing slaves, consider migrating to
 3695          * the orphaned masters. Note that it does not make sense to try
 3696          * a migration if there is no master with at least *two* working
 3697          * slaves. */
 3698         if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
 3699             clusterHandleSlaveMigration(max_slaves);
 3700     }
 3701 
 3702     if (update_state || server.cluster->state == CLUSTER_FAIL)
 3703         clusterUpdateState();
 3704 }
 3705 
 3706 /* This function is called before the event handler returns to sleep for
 3707  * events. It is useful to perform operations that must be done ASAP in
 3708  * reaction to events fired but that are not safe to perform inside event
 3709  * handlers, or to perform potentially expansive tasks that we need to do
 3710  * a single time before replying to clients. */
 3711 void clusterBeforeSleep(void) {
 3712     /* Handle failover, this is needed when it is likely that there is already
 3713      * the quorum from masters in order to react fast. */
 3714     if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
 3715         clusterHandleSlaveFailover();
 3716 
 3717     /* Update the cluster state. */
 3718     if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
 3719         clusterUpdateState();
 3720 
 3721     /* Save the config, possibly using fsync. */
 3722     if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
 3723         int fsync = server.cluster->todo_before_sleep &
 3724                     CLUSTER_TODO_FSYNC_CONFIG;
 3725         clusterSaveConfigOrDie(fsync);
 3726     }
 3727 
 3728     /* Reset our flags (not strictly needed since every single function
 3729      * called for flags set should be able to clear its flag). */
 3730     server.cluster->todo_before_sleep = 0;
 3731 }
 3732 
 3733 void clusterDoBeforeSleep(int flags) {
 3734     server.cluster->todo_before_sleep |= flags;
 3735 }
 3736 
 3737 /* -----------------------------------------------------------------------------
 3738  * Slots management
 3739  * -------------------------------------------------------------------------- */
 3740 
 3741 /* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
 3742  * otherwise 0. */
 3743 int bitmapTestBit(unsigned char *bitmap, int pos) {
 3744     off_t byte = pos/8;
 3745     int bit = pos&7;
 3746     return (bitmap[byte] & (1<<bit)) != 0;
 3747 }
 3748 
 3749 /* Set the bit at position 'pos' in a bitmap. */
 3750 void bitmapSetBit(unsigned char *bitmap, int pos) {
 3751     off_t byte = pos/8;
 3752     int bit = pos&7;
 3753     bitmap[byte] |= 1<<bit;
 3754 }
 3755 
 3756 /* Clear the bit at position 'pos' in a bitmap. */
 3757 void bitmapClearBit(unsigned char *bitmap, int pos) {
 3758     off_t byte = pos/8;
 3759     int bit = pos&7;
 3760     bitmap[byte] &= ~(1<<bit);
 3761 }
 3762 
 3763 /* Return non-zero if there is at least one master with slaves in the cluster.
 3764  * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
 3765  * MIGRATE_TO flag the when a master gets the first slot. */
 3766 int clusterMastersHaveSlaves(void) {
 3767     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
 3768     dictEntry *de;
 3769     int slaves = 0;
 3770     while((de = dictNext(di)) != NULL) {
 3771         clusterNode *node = dictGetVal(de);
 3772 
 3773         if (nodeIsSlave(node)) continue;
 3774         slaves += node->numslaves;
 3775     }
 3776     dictReleaseIterator(di);
 3777     return slaves != 0;
 3778 }
 3779 
 3780 /* Set the slot bit and return the old value. */
 3781 int clusterNodeSetSlotBit(clusterNode *n, int slot) {
 3782     int old = bitmapTestBit(n->slots,slot);
 3783     bitmapSetBit(n->slots,slot);
 3784     if (!old) {
 3785         n->numslots++;
 3786         /* When a master gets its first slot, even if it has no slaves,
 3787          * it gets flagged with MIGRATE_TO, that is, the master is a valid
 3788          * target for replicas migration, if and only if at least one of
 3789          * the other masters has slaves right now.
 3790          *
 3791          * Normally masters are valid targerts of replica migration if:
 3792          * 1. The used to have slaves (but no longer have).
 3793          * 2. They are slaves failing over a master that used to have slaves.
 3794          *
 3795          * However new masters with slots assigned are considered valid
 3796          * migration tagets if the rest of the cluster is not a slave-less.
 3797          *
 3798          * See https://github.com/antirez/redis/issues/3043 for more info. */
 3799         if (n->numslots == 1 && clusterMastersHaveSlaves())
 3800             n->flags |= CLUSTER_NODE_MIGRATE_TO;
 3801     }
 3802     return old;
 3803 }
 3804 
 3805 /* Clear the slot bit and return the old value. */
 3806 int clusterNodeClearSlotBit(clusterNode *n, int slot) {
 3807     int old = bitmapTestBit(n->slots,slot);
 3808     bitmapClearBit(n->slots,slot);
 3809     if (old) n->numslots--;
 3810     return old;
 3811 }
 3812 
 3813 /* Return the slot bit from the cluster node structure. */
 3814 int clusterNodeGetSlotBit(clusterNode *n, int slot) {
 3815     return bitmapTestBit(n->slots,slot);
 3816 }
 3817 
 3818 /* Add the specified slot to the list of slots that node 'n' will
 3819  * serve. Return C_OK if the operation ended with success.
 3820  * If the slot is already assigned to another instance this is considered
 3821  * an error and C_ERR is returned. */
 3822 int clusterAddSlot(clusterNode *n, int slot) {
 3823     if (server.cluster->slots[slot]) return C_ERR;
 3824     clusterNodeSetSlotBit(n,slot);
 3825     server.cluster->slots[slot] = n;
 3826     return C_OK;
 3827 }
 3828 
 3829 /* Delete the specified slot marking it as unassigned.
 3830  * Returns C_OK if the slot was assigned, otherwise if the slot was
 3831  * already unassigned C_ERR is returned. */
 3832 int clusterDelSlot(int slot) {
 3833     clusterNode *n = server.cluster->slots[slot];
 3834 
 3835     if (!n) return C_ERR;
 3836     serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
 3837     server.cluster->slots[slot] = NULL;
 3838     return C_OK;
 3839 }
 3840 
 3841 /* Delete all the slots associated with the specified node.
 3842  * The number of deleted slots is returned. */
 3843 int clusterDelNodeSlots(clusterNode *node) {
 3844     int deleted = 0, j;
 3845 
 3846     for (j = 0; j < CLUSTER_SLOTS; j++) {
 3847         if (clusterNodeGetSlotBit(node,j)) {
 3848             clusterDelSlot(j);
 3849             deleted++;
 3850         }
 3851     }
 3852     return deleted;
 3853 }
 3854 
 3855 /* Clear the migrating / importing state for all the slots.
 3856  * This is useful at initialization and when turning a master into slave. */
 3857 void clusterCloseAllSlots(void) {
 3858     memset(server.cluster->migrating_slots_to,0,
 3859         sizeof(server.cluster->migrating_slots_to));
 3860     memset(server.cluster->importing_slots_from,0,
 3861         sizeof(server.cluster->importing_slots_from));
 3862 }
 3863 
 3864 /* -----------------------------------------------------------------------------
 3865  * Cluster state evaluation function
 3866  * -------------------------------------------------------------------------- */
 3867 
 3868 /* The following are defines that are only used in the evaluation function
 3869  * and are based on heuristics. Actually the main point about the rejoin and
 3870  * writable delay is that they should be a few orders of magnitude larger
 3871  * than the network latency. */
 3872 #define CLUSTER_MAX_REJOIN_DELAY 5000
 3873 #define CLUSTER_MIN_REJOIN_DELAY 500
 3874 #define CLUSTER_WRITABLE_DELAY 2000
 3875 
 3876 void clusterUpdateState(void) {
 3877     int j, new_state;
 3878     int reachable_masters = 0;
 3879     static mstime_t among_minority_time;
 3880     static mstime_t first_call_time = 0;
 3881 
 3882     server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
 3883 
 3884     /* If this is a master node, wait some time before turning the state
 3885      * into OK, since it is not a good idea to rejoin the cluster as a writable
 3886      * master, after a reboot, without giving the cluster a chance to
 3887      * reconfigure this node. Note that the delay is calculated starting from
 3888      * the first call to this function and not since the server start, in order
 3889      * to don't count the DB loading time. */
 3890     if (first_call_time == 0) first_call_time = mstime();
 3891     if (nodeIsMaster(myself) &&
 3892         server.cluster->state == CLUSTER_FAIL &&
 3893         mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
 3894 
 3895     /* Start assuming the state is OK. We'll turn it into FAIL if there
 3896      * are the right conditions. */
 3897     new_state = CLUSTER_OK;
 3898 
 3899     /* Check if all the slots are covered. */
 3900     if (server.cluster_require_full_coverage) {
 3901         for (j = 0; j < CLUSTER_SLOTS; j++) {
 3902             if (server.cluster->slots[j] == NULL ||
 3903                 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
 3904             {
 3905                 new_state = CLUSTER_FAIL;
 3906                 break;
 3907             }
 3908         }
 3909     }
 3910 
 3911     /* Compute the cluster size, that is the number of master nodes
 3912      * serving at least a single slot.
 3913      *
 3914      * At the same time count the number of reachable masters having
 3915      * at least one slot. */
 3916     {
 3917         dictIterator *di;
 3918         dictEntry *de;
 3919 
 3920         server.cluster->size = 0;
 3921         di = dictGetSafeIterator(server.cluster->nodes);
 3922         while((de = dictNext(di)) != NULL) {
 3923             clusterNode *node = dictGetVal(de);
 3924 
 3925             if (nodeIsMaster(node) && node->numslots) {
 3926                 server.cluster->size++;
 3927                 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
 3928                     reachable_masters++;
 3929             }
 3930         }
 3931         dictReleaseIterator(di);
 3932     }
 3933 
 3934     /* If we are in a minority partition, change the cluster state
 3935      * to FAIL. */
 3936     {
 3937         int needed_quorum = (server.cluster->size / 2) + 1;
 3938 
 3939         if (reachable_masters < needed_quorum) {
 3940             new_state = CLUSTER_FAIL;
 3941             among_minority_time = mstime();
 3942         }
 3943     }
 3944 
 3945     /* Log a state change */
 3946     if (new_state != server.cluster->state) {
 3947         mstime_t rejoin_delay = server.cluster_node_timeout;
 3948 
 3949         /* If the instance is a master and was partitioned away with the
 3950          * minority, don't let it accept queries for some time after the
 3951          * partition heals, to make sure there is enough time to receive
 3952          * a configuration update. */
 3953         if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
 3954             rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
 3955         if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
 3956             rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
 3957 
 3958         if (new_state == CLUSTER_OK &&
 3959             nodeIsMaster(myself) &&
 3960             mstime() - among_minority_time < rejoin_delay)
 3961         {
 3962             return;
 3963         }
 3964 
 3965         /* Change the state and log the event. */
 3966         serverLog(LL_WARNING,"Cluster state changed: %s",
 3967             new_state == CLUSTER_OK ? "ok" : "fail");
 3968         server.cluster->state = new_state;
 3969     }
 3970 }
 3971 
 3972 /* This function is called after the node startup in order to verify that data
 3973  * loaded from disk is in agreement with the cluster configuration:
 3974  *
 3975  * 1) If we find keys about hash slots we have no responsibility for, the
 3976  *    following happens:
 3977  *    A) If no other node is in charge according to the current cluster
 3978  *       configuration, we add these slots to our node.
 3979  *    B) If according to our config other nodes are already in charge for
 3980  *       this lots, we set the slots as IMPORTING from our point of view
 3981  *       in order to justify we have those slots, and in order to make
 3982  *       redis-trib aware of the issue, so that it can try to fix it.
 3983  * 2) If we find data in a DB different than DB0 we return C_ERR to
 3984  *    signal the caller it should quit the server with an error message
 3985  *    or take other actions.
 3986  *
 3987  * The function always returns C_OK even if it will try to correct
 3988  * the error described in "1". However if data is found in DB different
 3989  * from DB0, C_ERR is returned.
 3990  *
 3991  * The function also uses the logging facility in order to warn the user
 3992  * about desynchronizations between the data we have in memory and the
 3993  * cluster configuration. */
 3994 int verifyClusterConfigWithData(void) {
 3995     int j;
 3996     int update_config = 0;
 3997 
 3998     /* Return ASAP if a module disabled cluster redirections. In that case
 3999      * every master can store keys about every possible hash slot. */
 4000     if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
 4001         return C_OK;
 4002 
 4003     /* If this node is a slave, don't perform the check at all as we
 4004      * completely depend on the replication stream. */
 4005     if (nodeIsSlave(myself)) return C_OK;
 4006 
 4007     /* Make sure we only have keys in DB0. */
 4008     for (j = 1; j < server.dbnum; j++) {
 4009         if (dictSize(server.db[j].dict)) return C_ERR;
 4010     }
 4011 
 4012     /* Check that all the slots we see populated memory have a corresponding
 4013      * entry in the cluster table. Otherwise fix the table. */
 4014     for (j = 0; j < CLUSTER_SLOTS; j++) {
 4015         if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
 4016         /* Check if we are assigned to this slot or if we are importing it.
 4017          * In both cases check the next slot as the configuration makes
 4018          * sense. */
 4019         if (server.cluster->slots[j] == myself ||
 4020             server.cluster->importing_slots_from[j] != NULL) continue;
 4021 
 4022         /* If we are here data and cluster config don't agree, and we have
 4023          * slot 'j' populated even if we are not importing it, nor we are
 4024          * assigned to this slot. Fix this condition. */
 4025 
 4026         update_config++;
 4027         /* Case A: slot is unassigned. Take responsibility for it. */
 4028         if (server.cluster->slots[j] == NULL) {
 4029             serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
 4030                                     "Taking responsibility for it.",j);
 4031             clusterAddSlot(myself,j);
 4032         } else {
 4033             serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
 4034                                     "assigned to another node. "
 4035                                     "Setting it to importing state.",j);
 4036             server.cluster->importing_slots_from[j] = server.cluster->slots[j];
 4037         }
 4038     }
 4039     if (update_config) clusterSaveConfigOrDie(1);
 4040     return C_OK;
 4041 }
 4042 
 4043 /* -----------------------------------------------------------------------------
 4044  * SLAVE nodes handling
 4045  * -------------------------------------------------------------------------- */
 4046 
 4047 /* Set the specified node 'n' as master for this node.
 4048  * If this node is currently a master, it is turned into a slave. */
 4049 void clusterSetMaster(clusterNode *n) {
 4050     serverAssert(n != myself);
 4051     serverAssert(myself->numslots == 0);
 4052 
 4053     if (nodeIsMaster(myself)) {
 4054         myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
 4055         myself->flags |= CLUSTER_NODE_SLAVE;
 4056         clusterCloseAllSlots();
 4057     } else {
 4058         if (myself->slaveof)
 4059             clusterNodeRemoveSlave(myself->slaveof,myself);
 4060     }
 4061     myself->slaveof = n;
 4062     clusterNodeAddSlave(n,myself);
 4063     replicationSetMaster(n->ip, n->port);
 4064     resetManualFailover();
 4065 }
 4066 
 4067 /* -----------------------------------------------------------------------------
 4068  * Nodes to string representation functions.
 4069  * -------------------------------------------------------------------------- */
 4070 
 4071 struct redisNodeFlags {
 4072     uint16_t flag;
 4073     char *name;
 4074 };
 4075 
 4076 static struct redisNodeFlags redisNodeFlagsTable[] = {
 4077     {CLUSTER_NODE_MYSELF,       "myself,"},
 4078     {CLUSTER_NODE_MASTER,       "master,"},
 4079     {CLUSTER_NODE_SLAVE,        "slave,"},
 4080     {CLUSTER_NODE_PFAIL,        "fail?,"},
 4081     {CLUSTER_NODE_FAIL,         "fail,"},
 4082     {CLUSTER_NODE_HANDSHAKE,    "handshake,"},
 4083     {CLUSTER_NODE_NOADDR,       "noaddr,"},
 4084     {CLUSTER_NODE_NOFAILOVER,   "nofailover,"}
 4085 };
 4086 
 4087 /* Concatenate the comma separated list of node flags to the given SDS
 4088  * string 'ci'. */
 4089 sds representClusterNodeFlags(sds ci, uint16_t flags) {
 4090     size_t orig_len = sdslen(ci);
 4091     int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
 4092     for (i = 0; i < size; i++) {
 4093         struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
 4094         if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
 4095     }
 4096     /* If no flag was added, add the "noflags" special flag. */
 4097     if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
 4098     sdsIncrLen(ci,-1); /* Remove trailing comma. */
 4099     return ci;
 4100 }
 4101 
 4102 /* Generate a csv-alike representation of the specified cluster node.
 4103  * See clusterGenNodesDescription() top comment for more information.
 4104  *
 4105  * The function returns the string representation as an SDS string. */
 4106 sds clusterGenNodeDescription(clusterNode *node) {
 4107     int j, start;
 4108     sds ci;
 4109 
 4110     /* Node coordinates */
 4111     ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
 4112         node->name,
 4113         node->ip,
 4114         node->port,
 4115         node->cport);
 4116 
 4117     /* Flags */
 4118     ci = representClusterNodeFlags(ci, node->flags);
 4119 
 4120     /* Slave of... or just "-" */
 4121     if (node->slaveof)
 4122         ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
 4123     else
 4124         ci = sdscatlen(ci," - ",3);
 4125 
 4126     unsigned long long nodeEpoch = node->configEpoch;
 4127     if (nodeIsSlave(node) && node->slaveof) {
 4128         nodeEpoch = node->slaveof->configEpoch;
 4129     }
 4130     /* Latency from the POV of this node, config epoch, link status */
 4131     ci = sdscatprintf(ci,"%lld %lld %llu %s",
 4132         (long long) node->ping_sent,
 4133         (long long) node->pong_received,
 4134         nodeEpoch,
 4135         (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
 4136                     "connected" : "disconnected");
 4137 
 4138     /* Slots served by this instance */
 4139     start = -1;
 4140     for (j = 0; j < CLUSTER_SLOTS; j++) {
 4141         int bit;
 4142 
 4143         if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
 4144             if (start == -1) start = j;
 4145         }
 4146         if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
 4147             if (bit && j == CLUSTER_SLOTS-1) j++;
 4148 
 4149             if (start == j-1) {
 4150                 ci = sdscatprintf(ci," %d",start);
 4151             } else {
 4152                 ci = sdscatprintf(ci," %d-%d",start,j-1);
 4153             }
 4154             start = -1;
 4155         }
 4156     }
 4157 
 4158     /* Just for MYSELF node we also dump info about slots that
 4159      * we are migrating to other instances or importing from other
 4160      * instances. */
 4161     if (node->flags & CLUSTER_NODE_MYSELF) {
 4162         for (j = 0; j < CLUSTER_SLOTS; j++) {
 4163             if (server.cluster->migrating_slots_to[j]) {
 4164                 ci = sdscatprintf(ci," [%d->-%.40s]",j,
 4165                     server.cluster->migrating_slots_to[j]->name);
 4166             } else if (server.cluster->importing_slots_from[j]) {
 4167                 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
 4168                     server.cluster->importing_slots_from[j]->name);
 4169             }
 4170         }
 4171     }
 4172     return ci;
 4173 }
 4174 
 4175 /* Generate a csv-alike representation of the nodes we are aware of,
 4176  * including the "myself" node, and return an SDS string containing the
 4177  * representation (it is up to the caller to free it).
 4178  *
 4179  * All the nodes matching at least one of the node flags specified in
 4180  * "filter" are excluded from the output, so using zero as a filter will
 4181  * include all the known nodes in the representation, including nodes in
 4182  * the HANDSHAKE state.
 4183  *
 4184  * The representation obtained using this function is used for the output
 4185  * of the CLUSTER NODES function, and as format for the cluster
 4186  * configuration file (nodes.conf) for a given node. */
 4187 sds clusterGenNodesDescription(int filter) {
 4188     sds ci = sdsempty(), ni;
 4189     dictIterator *di;
 4190     dictEntry *de;
 4191 
 4192     di = dictGetSafeIterator(server.cluster->nodes);
 4193     while((de = dictNext(di)) != NULL) {
 4194         clusterNode *node = dictGetVal(de);
 4195 
 4196         if (node->flags & filter) continue;
 4197         ni = clusterGenNodeDescription(node);
 4198         ci = sdscatsds(ci,ni);
 4199         sdsfree(ni);
 4200         ci = sdscatlen(ci,"\n",1);
 4201     }
 4202     dictReleaseIterator(di);
 4203     return ci;
 4204 }
 4205 
 4206 /* -----------------------------------------------------------------------------
 4207  * CLUSTER command
 4208  * -------------------------------------------------------------------------- */
 4209 
 4210 const char *clusterGetMessageTypeString(int type) {
 4211     switch(type) {
 4212     case CLUSTERMSG_TYPE_PING: return "ping";
 4213     case CLUSTERMSG_TYPE_PONG: return "pong";
 4214     case CLUSTERMSG_TYPE_MEET: return "meet";
 4215     case CLUSTERMSG_TYPE_FAIL: return "fail";
 4216     case CLUSTERMSG_TYPE_PUBLISH: return "publish";
 4217     case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
 4218     case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
 4219     case CLUSTERMSG_TYPE_UPDATE: return "update";
 4220     case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
 4221     case CLUSTERMSG_TYPE_MODULE: return "module";
 4222     }
 4223     return "unknown";
 4224 }
 4225 
 4226 int getSlotOrReply(client *c, robj *o) {
 4227     long long slot;
 4228 
 4229     if (getLongLongFromObject(o,&slot) != C_OK ||
 4230         slot < 0 || slot >= CLUSTER_SLOTS)
 4231     {
 4232         addReplyError(c,"Invalid or out of range slot");
 4233         return -1;
 4234     }
 4235     return (int) slot;
 4236 }
 4237 
 4238 void clusterReplyMultiBulkSlots(client *c) {
 4239     /* Format: 1) 1) start slot
 4240      *            2) end slot
 4241      *            3) 1) master IP
 4242      *               2) master port
 4243      *               3) node ID
 4244      *            4) 1) replica IP
 4245      *               2) replica port
 4246      *               3) node ID
 4247      *           ... continued until done
 4248      */
 4249 
 4250     int num_masters = 0;
 4251     void *slot_replylen = addReplyDeferredLen(c);
 4252 
 4253     dictEntry *de;
 4254     dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
 4255     while((de = dictNext(di)) != NULL) {
 4256         clusterNode *node = dictGetVal(de);
 4257         int j = 0, start = -1;
 4258         int i, nested_elements = 0;
 4259 
 4260         /* Skip slaves (that are iterated when producing the output of their
 4261          * master) and  masters not serving any slot. */
 4262         if (!nodeIsMaster(node) || node->numslots == 0) continue;
 4263 
 4264         for(i = 0; i < node->numslaves; i++) {
 4265             if (nodeFailed(node->slaves[i])) continue;
 4266             nested_elements++;
 4267         }
 4268 
 4269         for (j = 0; j < CLUSTER_SLOTS; j++) {
 4270             int bit, i;
 4271 
 4272             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
 4273                 if (start == -1) start = j;
 4274             }
 4275             if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
 4276                 addReplyArrayLen(c, nested_elements + 3); /* slots (2) + master addr (1). */
 4277 
 4278                 if (bit && j == CLUSTER_SLOTS-1) j++;
 4279 
 4280                 /* If slot exists in output map, add to it's list.
 4281                  * else, create a new output map for this slot */
 4282                 if (start == j-1) {
 4283                     addReplyLongLong(c, start); /* only one slot; low==high */
 4284                     addReplyLongLong(c, start);
 4285                 } else {
 4286                     addReplyLongLong(c, start); /* low */
 4287                     addReplyLongLong(c, j-1);   /* high */
 4288                 }
 4289                 start = -1;
 4290 
 4291                 /* First node reply position is always the master */
 4292                 addReplyArrayLen(c, 3);
 4293                 addReplyBulkCString(c, node->ip);
 4294                 addReplyLongLong(c, node->port);
 4295                 addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
 4296 
 4297                 /* Remaining nodes in reply are replicas for slot range */
 4298                 for (i = 0; i < node->numslaves; i++) {
 4299                     /* This loop is copy/pasted from clusterGenNodeDescription()
 4300                      * with modifications for per-slot node aggregation */
 4301                     if (nodeFailed(node->slaves[i])) continue;
 4302                     addReplyArrayLen(c, 3);
 4303                     addReplyBulkCString(c, node->slaves[i]->ip);
 4304                     addReplyLongLong(c, node->slaves[i]->port);
 4305                     addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
 4306                 }
 4307                 num_masters++;
 4308             }
 4309         }
 4310     }
 4311     dictReleaseIterator(di);
 4312     setDeferredArrayLen(c, slot_replylen, num_masters);
 4313 }
 4314 
 4315 void clusterCommand(client *c) {
 4316     if (server.cluster_enabled == 0) {
 4317         addReplyError(c,"This instance has cluster support disabled");
 4318         return;
 4319     }
 4320 
 4321     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
 4322         const char *help[] = {
 4323 "ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
 4324 "BUMPEPOCH -- Advance the cluster config epoch.",
 4325 "COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
 4326 "COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
 4327 "DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
 4328 "FAILOVER [force|takeover] -- Promote current replica node to being a master.",
 4329 "FORGET <node-id> -- Remove a node from the cluster.",
 4330 "GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
 4331 "FLUSHSLOTS -- Delete current node own slots information.",
 4332 "INFO - Return information about the cluster.",
 4333 "KEYSLOT <key> -- Return the hash slot for <key>.",
 4334 "MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
 4335 "MYID -- Return the node id.",
 4336 "NODES -- Return cluster configuration seen by node. Output format:",
 4337 "    <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
 4338 "REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
 4339 "RESET [hard|soft] -- Reset current node (default: soft).",
 4340 "SET-config-epoch <epoch> - Set config epoch of current node.",
 4341 "SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
 4342 "REPLICAS <node-id> -- Return <node-id> replicas.",
 4343 "SAVECONFIG - Force saving cluster configuration on disk.",
 4344 "SLOTS -- Return information about slots range mappings. Each range is made of:",
 4345 "    start, end, master and replicas IP addresses, ports and ids",
 4346 NULL
 4347         };
 4348         addReplyHelp(c, help);
 4349     } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
 4350         /* CLUSTER MEET <ip> <port> [cport] */
 4351         long long port, cport;
 4352 
 4353         if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
 4354             addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
 4355                                 (char*)c->argv[3]->ptr);
 4356             return;
 4357         }
 4358 
 4359         if (c->argc == 5) {
 4360             if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
 4361                 addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
 4362                                     (char*)c->argv[4]->ptr);
 4363                 return;
 4364             }
 4365         } else {
 4366             cport = port + CLUSTER_PORT_INCR;
 4367         }
 4368 
 4369         if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
 4370             errno == EINVAL)
 4371         {
 4372             addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
 4373                             (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
 4374         } else {
 4375             addReply(c,shared.ok);
 4376         }
 4377     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
 4378         /* CLUSTER NODES */
 4379         sds nodes = clusterGenNodesDescription(0);
 4380         addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
 4381         sdsfree(nodes);
 4382     } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
 4383         /* CLUSTER MYID */
 4384         addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
 4385     } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
 4386         /* CLUSTER SLOTS */
 4387         clusterReplyMultiBulkSlots(c);
 4388     } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
 4389         /* CLUSTER FLUSHSLOTS */
 4390         if (dictSize(server.db[0].dict) != 0) {
 4391             addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
 4392             return;
 4393         }
 4394         clusterDelNodeSlots(myself);
 4395         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
 4396         addReply(c,shared.ok);
 4397     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
 4398                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
 4399     {
 4400         /* CLUSTER ADDSLOTS <slot> [slot] ... */
 4401         /* CLUSTER DELSLOTS <slot> [slot] ... */
 4402         int j, slot;
 4403         unsigned char *slots = zmalloc(CLUSTER_SLOTS);
 4404         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
 4405 
 4406         memset(slots,0,CLUSTER_SLOTS);
 4407         /* Check that all the arguments are parseable and that all the
 4408          * slots are not already busy. */
 4409         for (j = 2; j < c->argc; j++) {
 4410             if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
 4411                 zfree(slots);
 4412                 return;
 4413             }
 4414             if (del && server.cluster->slots[slot] == NULL) {
 4415                 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
 4416                 zfree(slots);
 4417                 return;
 4418             } else if (!del && server.cluster->slots[slot]) {
 4419                 addReplyErrorFormat(c,"Slot %d is already busy", slot);
 4420                 zfree(slots);
 4421                 return;
 4422             }
 4423             if (slots[slot]++ == 1) {
 4424                 addReplyErrorFormat(c,"Slot %d specified multiple times",
 4425                     (int)slot);
 4426                 zfree(slots);
 4427                 return;
 4428             }
 4429         }
 4430         for (j = 0; j < CLUSTER_SLOTS; j++) {
 4431             if (slots[j]) {
 4432                 int retval;
 4433 
 4434                 /* If this slot was set as importing we can clear this
 4435                  * state as now we are the real owner of the slot. */
 4436                 if (server.cluster->importing_slots_from[j])
 4437                     server.cluster->importing_slots_from[j] = NULL;
 4438 
 4439                 retval = del ? clusterDelSlot(j) :
 4440                                clusterAddSlot(myself,j);
 4441                 serverAssertWithInfo(c,NULL,retval == C_OK);
 4442             }
 4443         }
 4444         zfree(slots);
 4445         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
 4446         addReply(c,shared.ok);
 4447     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
 4448         /* SETSLOT 10 MIGRATING <node ID> */
 4449         /* SETSLOT 10 IMPORTING <node ID> */
 4450         /* SETSLOT 10 STABLE */
 4451         /* SETSLOT 10 NODE <node ID> */
 4452         int slot;
 4453         clusterNode *n;
 4454 
 4455         if (nodeIsSlave(myself)) {
 4456             addReplyError(c,"Please use SETSLOT only with masters.");
 4457             return;
 4458         }
 4459 
 4460         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
 4461 
 4462         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
 4463             if (server.cluster->slots[slot] != myself) {
 4464                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
 4465                 return;
 4466             }
 4467             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
 4468                 addReplyErrorFormat(c,"I don't know about node %s",
 4469                     (char*)c->argv[4]->ptr);
 4470                 return;
 4471             }
 4472             server.cluster->migrating_slots_to[slot] = n;
 4473         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
 4474             if (server.cluster->slots[slot] == myself) {
 4475                 addReplyErrorFormat(c,
 4476                     "I'm already the owner of hash slot %u",slot);
 4477                 return;
 4478             }
 4479             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
 4480                 addReplyErrorFormat(c,"I don't know about node %s",
 4481                     (char*)c->argv[4]->ptr);
 4482                 return;
 4483             }
 4484             server.cluster->importing_slots_from[slot] = n;
 4485         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
 4486             /* CLUSTER SETSLOT <SLOT> STABLE */
 4487             server.cluster->importing_slots_from[slot] = NULL;
 4488             server.cluster->migrating_slots_to[slot] = NULL;
 4489         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
 4490             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
 4491             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
 4492 
 4493             if (!n) {
 4494                 addReplyErrorFormat(c,"Unknown node %s",
 4495                     (char*)c->argv[4]->ptr);
 4496                 return;
 4497             }
 4498             /* If this hash slot was served by 'myself' before to switch
 4499              * make sure there are no longer local keys for this hash slot. */
 4500             if (server.cluster->slots[slot] == myself && n != myself) {
 4501                 if (countKeysInSlot(slot) != 0) {
 4502                     addReplyErrorFormat(c,
 4503                         "Can't assign hashslot %d to a different node "
 4504                         "while I still hold keys for this hash slot.", slot);
 4505                     return;
 4506                 }
 4507             }
 4508             /* If this slot is in migrating status but we have no keys
 4509              * for it assigning the slot to another node will clear
 4510              * the migratig status. */
 4511             if (countKeysInSlot(slot) == 0 &&
 4512                 server.cluster->migrating_slots_to[slot])
 4513                 server.cluster->migrating_slots_to[slot] = NULL;
 4514 
 4515             /* If this node was importing this slot, assigning the slot to
 4516              * itself also clears the importing status. */
 4517             if (n == myself &&
 4518                 server.cluster->importing_slots_from[slot])
 4519             {
 4520                 /* This slot was manually migrated, set this node configEpoch
 4521                  * to a new epoch so that the new version can be propagated
 4522                  * by the cluster.
 4523                  *
 4524                  * Note that if this ever results in a collision with another
 4525                  * node getting the same configEpoch, for example because a
 4526                  * failover happens at the same time we close the slot, the
 4527                  * configEpoch collision resolution will fix it assigning
 4528                  * a different epoch to each node. */
 4529                 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
 4530                     serverLog(LL_WARNING,
 4531                         "configEpoch updated after importing slot %d", slot);
 4532                 }
 4533                 server.cluster->importing_slots_from[slot] = NULL;
 4534             }
 4535             clusterDelSlot(slot);
 4536             clusterAddSlot(n,slot);
 4537         } else {
 4538             addReplyError(c,
 4539                 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
 4540             return;
 4541         }
 4542         clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
 4543         addReply(c,shared.ok);
 4544     } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
 4545         /* CLUSTER BUMPEPOCH */
 4546         int retval = clusterBumpConfigEpochWithoutConsensus();
 4547         sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
 4548                 (retval == C_OK) ? "BUMPED" : "STILL",
 4549                 (unsigned long long) myself->configEpoch);
 4550         addReplySds(c,reply);
 4551     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
 4552         /* CLUSTER INFO */
 4553         char *statestr[] = {"ok","fail","needhelp"};
 4554         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
 4555         uint64_t myepoch;
 4556         int j;
 4557 
 4558         for (j = 0; j < CLUSTER_SLOTS; j++) {
 4559             clusterNode *n = server.cluster->slots[j];
 4560 
 4561             if (n == NULL) continue;
 4562             slots_assigned++;
 4563             if (nodeFailed(n)) {
 4564                 slots_fail++;
 4565             } else if (nodeTimedOut(n)) {
 4566                 slots_pfail++;
 4567             } else {
 4568                 slots_ok++;
 4569             }
 4570         }
 4571 
 4572         myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
 4573                   myself->slaveof->configEpoch : myself->configEpoch;
 4574 
 4575         sds info = sdscatprintf(sdsempty(),
 4576             "cluster_state:%s\r\n"
 4577             "cluster_slots_assigned:%d\r\n"
 4578             "cluster_slots_ok:%d\r\n"
 4579             "cluster_slots_pfail:%d\r\n"
 4580             "cluster_slots_fail:%d\r\n"
 4581             "cluster_known_nodes:%lu\r\n"
 4582             "cluster_size:%d\r\n"
 4583             "cluster_current_epoch:%llu\r\n"
 4584             "cluster_my_epoch:%llu\r\n"
 4585             , statestr[server.cluster->state],
 4586             slots_assigned,
 4587             slots_ok,
 4588             slots_pfail,
 4589             slots_fail,
 4590             dictSize(server.cluster->nodes),
 4591             server.cluster->size,
 4592             (unsigned long long) server.cluster->currentEpoch,
 4593             (unsigned long long) myepoch
 4594         );
 4595 
 4596         /* Show stats about messages sent and received. */
 4597         long long tot_msg_sent = 0;
 4598         long long tot_msg_received = 0;
 4599 
 4600         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
 4601             if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
 4602             tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
 4603             info = sdscatprintf(info,
 4604                 "cluster_stats_messages_%s_sent:%lld\r\n",
 4605                 clusterGetMessageTypeString(i),
 4606                 server.cluster->stats_bus_messages_sent[i]);
 4607         }
 4608         info = sdscatprintf(info,
 4609             "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
 4610 
 4611         for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
 4612             if (server.cluster->stats_bus_messages_received[i] == 0) continue;
 4613             tot_msg_received += server.cluster->stats_bus_messages_received[i];
 4614             info = sdscatprintf(info,
 4615                 "cluster_stats_messages_%s_received:%lld\r\n",
 4616                 clusterGetMessageTypeString(i),
 4617                 server.cluster->stats_bus_messages_received[i]);
 4618         }
 4619         info = sdscatprintf(info,
 4620             "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
 4621 
 4622         /* Produce the reply protocol. */
 4623         addReplyVerbatim(c,info,sdslen(info),"txt");
 4624         sdsfree(info);
 4625     } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
 4626         int retval = clusterSaveConfig(1);
 4627 
 4628         if (retval == 0)
 4629             addReply(c,shared.ok);
 4630         else
 4631             addReplyErrorFormat(c,"error saving the cluster node config: %s",
 4632                 strerror(errno));
 4633     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
 4634         /* CLUSTER KEYSLOT <key> */
 4635         sds key = c->argv[2]->ptr;
 4636 
 4637         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
 4638     } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
 4639         /* CLUSTER COUNTKEYSINSLOT <slot> */
 4640         long long slot;
 4641 
 4642         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
 4643             return;
 4644         if (slot < 0 || slot >= CLUSTER_SLOTS) {
 4645             addReplyError(c,"Invalid slot");
 4646             return;
 4647         }
 4648         addReplyLongLong(c,countKeysInSlot(slot));
 4649     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
 4650         /* CLUSTER GETKEYSINSLOT <slot> <count> */
 4651         long long maxkeys, slot;
 4652         unsigned int numkeys, j;
 4653         robj **keys;
 4654 
 4655         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
 4656             return;
 4657         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
 4658             != C_OK)
 4659             return;
 4660         if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
 4661             addReplyError(c,"Invalid slot or number of keys");
 4662             return;
 4663         }
 4664 
 4665         /* Avoid allocating more than needed in case of large COUNT argument
 4666          * and smaller actual number of keys. */
 4667         unsigned int keys_in_slot = countKeysInSlot(slot);
 4668         if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
 4669 
 4670         keys = zmalloc(sizeof(robj*)*maxkeys);
 4671         numkeys = getKeysInSlot(slot, keys, maxkeys);
 4672         addReplyArrayLen(c,numkeys);
 4673         for (j = 0; j < numkeys; j++) {
 4674             addReplyBulk(c,keys[j]);
 4675             decrRefCount(keys[j]);
 4676         }
 4677         zfree(keys);
 4678     } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
 4679         /* CLUSTER FORGET <NODE ID> */
 4680         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
 4681 
 4682         if (!n) {
 4683             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
 4684             return;
 4685         } else if (n == myself) {
 4686             addReplyError(c,"I tried hard but I can't forget myself...");
 4687             return;
 4688         } else if (nodeIsSlave(myself) && myself->slaveof == n) {
 4689             addReplyError(c,"Can't forget my master!");
 4690             return;
 4691         }
 4692         clusterBlacklistAddNode(n);
 4693         clusterDelNode(n);
 4694         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
 4695                              CLUSTER_TODO_SAVE_CONFIG);
 4696         addReply(c,shared.ok);
 4697     } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
 4698         /* CLUSTER REPLICATE <NODE ID> */
 4699         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
 4700 
 4701         /* Lookup the specified node in our table. */
 4702         if (!n) {
 4703             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
 4704             return;
 4705         }
 4706 
 4707         /* I can't replicate myself. */
 4708         if (n == myself) {
 4709             addReplyError(c,"Can't replicate myself");
 4710             return;
 4711         }
 4712 
 4713         /* Can't replicate a slave. */
 4714         if (nodeIsSlave(n)) {
 4715             addReplyError(c,"I can only replicate a master, not a replica.");
 4716             return;
 4717         }
 4718 
 4719         /* If the instance is currently a master, it should have no assigned
 4720          * slots nor keys to accept to replicate some other node.
 4721          * Slaves can switch to another master without issues. */
 4722         if (nodeIsMaster(myself) &&
 4723             (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
 4724             addReplyError(c,
 4725                 "To set a master the node must be empty and "
 4726                 "without assigned slots.");
 4727             return;
 4728         }
 4729 
 4730         /* Set the master. */
 4731         clusterSetMaster(n);
 4732         clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
 4733         addReply(c,shared.ok);
 4734     } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
 4735                 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
 4736         /* CLUSTER SLAVES <NODE ID> */
 4737         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
 4738         int j;
 4739 
 4740         /* Lookup the specified node in our table. */
 4741         if (!n) {
 4742             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
 4743             return;
 4744         }
 4745 
 4746         if (nodeIsSlave(n)) {
 4747             addReplyError(c,"The specified node is not a master");
 4748             return;
 4749         }
 4750 
 4751         addReplyArrayLen(c,n->numslaves);
 4752         for (j = 0; j < n->numslaves; j++) {
 4753             sds ni = clusterGenNodeDescription(n->slaves[j]);
 4754             addReplyBulkCString(c,ni);
 4755             sdsfree(ni);
 4756         }
 4757     } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
 4758                c->argc == 3)
 4759     {
 4760         /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
 4761         clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
 4762 
 4763         if (!n) {
 4764             addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
 4765             return;
 4766         } else {
 4767             addReplyLongLong(c,clusterNodeFailureReportsCount(n));
 4768         }
 4769     } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
 4770                (c->argc == 2 || c->argc == 3))
 4771     {
 4772         /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
 4773         int force = 0, takeover = 0;
 4774 
 4775         if (c->argc == 3) {
 4776             if (!strcasecmp(c->argv[2]->ptr,"force")) {
 4777                 force = 1;
 4778             } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
 4779                 takeover = 1;
 4780                 force = 1; /* Takeover also implies force. */
 4781             } else {
 4782                 addReply(c,shared.syntaxerr);
 4783                 return;
 4784             }
 4785         }
 4786 
 4787         /* Check preconditions. */
 4788         if (nodeIsMaster(myself)) {
 4789             addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
 4790             return;
 4791         } else if (myself->slaveof == NULL) {
 4792             addReplyError(c,"I'm a replica but my master is unknown to me");
 4793             return;
 4794         } else if (!force &&
 4795                    (nodeFailed(myself->slaveof) ||
 4796                     myself->slaveof->link == NULL))
 4797         {
 4798             addReplyError(c,"Master is down or failed, "
 4799                             "please use CLUSTER FAILOVER FORCE");
 4800             return;
 4801         }
 4802         resetManualFailover();
 4803         server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
 4804 
 4805         if (takeover) {
 4806             /* A takeover does not perform any initial check. It just
 4807              * generates a new configuration epoch for this node without
 4808              * consensus, claims the master's slots, and broadcast the new
 4809              * configuration. */
 4810             serverLog(LL_WARNING,"Taking over the master (user request).");
 4811             clusterBumpConfigEpochWithoutConsensus();
 4812             clusterFailoverReplaceYourMaster();
 4813         } else if (force) {
 4814             /* If this is a forced failover, we don't need to talk with our
 4815              * master to agree about the offset. We just failover taking over
 4816              * it without coordination. */
 4817             serverLog(LL_WARNING,"Forced failover user request accepted.");
 4818             server.cluster->mf_can_start = 1;
 4819         } else {
 4820             serverLog(LL_WARNING,"Manual failover user request accepted.");
 4821             clusterSendMFStart(myself->slaveof);
 4822         }
 4823         addReply(c,shared.ok);
 4824     } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
 4825     {
 4826         /* CLUSTER SET-CONFIG-EPOCH <epoch>
 4827          *
 4828          * The user is allowed to set the config epoch only when a node is
 4829          * totally fresh: no config epoch, no other known node, and so forth.
 4830          * This happens at cluster creation time to start with a cluster where
 4831          * every node has a different node ID, without to rely on the conflicts
 4832          * resolution system which is too slow when a big cluster is created. */
 4833         long long epoch;
 4834 
 4835         if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
 4836             return;
 4837 
 4838         if (epoch < 0) {
 4839             addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
 4840         } else if (dictSize(server.cluster->nodes) > 1) {
 4841             addReplyError(c,"The user can assign a config epoch only when the "
 4842                             "node does not know any other node.");
 4843         } else if (myself->configEpoch != 0) {
 4844             addReplyError(c,"Node config epoch is already non-zero");
 4845         } else {
 4846             myself->configEpoch = epoch;
 4847             serverLog(LL_WARNING,
 4848                 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
 4849                 (unsigned long long) myself->configEpoch);
 4850 
 4851             if (server.cluster->currentEpoch < (uint64_t)epoch)
 4852                 server.cluster->currentEpoch = epoch;
 4853             /* No need to fsync the config here since in the unlucky event
 4854              * of a failure to persist the config, the conflict resolution code
 4855              * will assign an unique config to this node. */
 4856             clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
 4857                                  CLUSTER_TODO_SAVE_CONFIG);
 4858             addReply(c,shared.ok);
 4859         }
 4860     } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
 4861                (c->argc == 2 || c->argc == 3))
 4862     {
 4863         /* CLUSTER RESET [SOFT|HARD] */
 4864         int hard = 0;
 4865 
 4866         /* Parse soft/hard argument. Default is soft. */
 4867         if (c->argc == 3) {
 4868             if (!strcasecmp(c->argv[2]->ptr,"hard")) {
 4869                 hard = 1;
 4870             } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
 4871                 hard = 0;
 4872             } else {
 4873                 addReply(c,shared.syntaxerr);
 4874                 return;
 4875             }
 4876         }
 4877 
 4878         /* Slaves can be reset while containing data, but not master nodes
 4879          * that must be empty. */
 4880         if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
 4881             addReplyError(c,"CLUSTER RESET can't be called with "
 4882                             "master nodes containing keys");
 4883             return;
 4884         }
 4885         clusterReset(hard);
 4886         addReply(c,shared.ok);
 4887     } else {
 4888         addReplySubcommandSyntaxError(c);
 4889         return;
 4890     }
 4891 }
 4892 
 4893 /* -----------------------------------------------------------------------------
 4894  * DUMP, RESTORE and MIGRATE commands
 4895  * -------------------------------------------------------------------------- */
 4896 
 4897 /* Generates a DUMP-format representation of the object 'o', adding it to the
 4898  * io stream pointed by 'rio'. This function can't fail. */
 4899 void createDumpPayload(rio *payload, robj *o, robj *key) {
 4900     unsigned char buf[2];
 4901     uint64_t crc;
 4902 
 4903     /* Serialize the object in a RDB-like format. It consist of an object type
 4904      * byte followed by the serialized object. This is understood by RESTORE. */
 4905     rioInitWithBuffer(payload,sdsempty());
 4906     serverAssert(rdbSaveObjectType(payload,o));
 4907     serverAssert(rdbSaveObject(payload,o,key));
 4908 
 4909     /* Write the footer, this is how it looks like:
 4910      * ----------------+---------------------+---------------+
 4911      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
 4912      * ----------------+---------------------+---------------+
 4913      * RDB version and CRC are both in little endian.
 4914      */
 4915 
 4916     /* RDB version */
 4917     buf[0] = RDB_VERSION & 0xff;
 4918     buf[1] = (RDB_VERSION >> 8) & 0xff;
 4919     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 4920 
 4921     /* CRC64 */
 4922     crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
 4923                 sdslen(payload->io.buffer.ptr));
 4924     memrev64ifbe(&crc);
 4925     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
 4926 }
 4927 
 4928 /* Verify that the RDB version of the dump payload matches the one of this Redis
 4929  * instance and that the checksum is ok.
 4930  * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
 4931  * is returned. */
 4932 int verifyDumpPayload(unsigned char *p, size_t len) {
 4933     unsigned char *footer;
 4934     uint16_t rdbver;
 4935     uint64_t crc;
 4936 
 4937     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
 4938     if (len < 10) return C_ERR;
 4939     footer = p+(len-10);
 4940 
 4941     /* Verify RDB version */
 4942     rdbver = (footer[1] << 8) | footer[0];
 4943     if (rdbver > RDB_VERSION) return C_ERR;
 4944 
 4945     /* Verify CRC64 */
 4946     crc = crc64(0,p,len-8);
 4947     memrev64ifbe(&crc);
 4948     return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
 4949 }
 4950 
 4951 /* DUMP keyname
 4952  * DUMP is actually not used by Redis Cluster but it is the obvious
 4953  * complement of RESTORE and can be useful for different applications. */
 4954 void dumpCommand(client *c) {
 4955     robj *o;
 4956     rio payload;
 4957 
 4958     /* Check if the key is here. */
 4959     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
 4960         addReplyNull(c);
 4961         return;
 4962     }
 4963 
 4964     /* Create the DUMP encoded representation. */
 4965     createDumpPayload(&payload,o,c->argv[1]);
 4966 
 4967     /* Transfer to the client */
 4968     addReplyBulkSds(c,payload.io.buffer.ptr);
 4969     return;
 4970 }
 4971 
 4972 /* RESTORE key ttl serialized-value [REPLACE] */
 4973 void restoreCommand(client *c) {
 4974     long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
 4975     rio payload;
 4976     int j, type, replace = 0, absttl = 0;
 4977     robj *obj;
 4978 
 4979     /* Parse additional options */
 4980     for (j = 4; j < c->argc; j++) {
 4981         int additional = c->argc-j-1;
 4982         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
 4983             replace = 1;
 4984         } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
 4985             absttl = 1;
 4986         } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
 4987                    lfu_freq == -1)
 4988         {
 4989             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
 4990                     != C_OK) return;
 4991             if (lru_idle < 0) {
 4992                 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
 4993                 return;
 4994             }
 4995             lru_clock = LRU_CLOCK();
 4996             j++; /* Consume additional arg. */
 4997         } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
 4998                    lru_idle == -1)
 4999         {
 5000             if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
 5001                     != C_OK) return;
 5002             if (lfu_freq < 0 || lfu_freq > 255) {
 5003                 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
 5004                 return;
 5005             }
 5006             j++; /* Consume additional arg. */
 5007         } else {
 5008             addReply(c,shared.syntaxerr);
 5009             return;
 5010         }
 5011     }
 5012 
 5013     /* Make sure this key does not already exist here... */
 5014     robj *key = c->argv[1];
 5015     if (!replace && lookupKeyWrite(c->db,key) != NULL) {
 5016         addReply(c,shared.busykeyerr);
 5017         return;
 5018     }
 5019 
 5020     /* Check if the TTL value makes sense */
 5021     if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
 5022         return;
 5023     } else if (ttl < 0) {
 5024         addReplyError(c,"Invalid TTL value, must be >= 0");
 5025         return;
 5026     }
 5027 
 5028     /* Verify RDB version and data checksum. */
 5029     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
 5030     {
 5031         addReplyError(c,"DUMP payload version or checksum are wrong");
 5032         return;
 5033     }
 5034 
 5035     rioInitWithBuffer(&payload,c->argv[3]->ptr);
 5036     if (((type = rdbLoadObjectType(&payload)) == -1) ||
 5037         ((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
 5038     {
 5039         addReplyError(c,"Bad data format");
 5040         return;
 5041     }
 5042 
 5043     /* Remove the old key if needed. */
 5044     int deleted = 0;
 5045     if (replace)
 5046         deleted = dbDelete(c->db,key);
 5047 
 5048     if (ttl && !absttl) ttl+=mstime();
 5049     if (ttl && checkAlreadyExpired(ttl)) {
 5050         if (deleted) {
 5051             rewriteClientCommandVector(c,2,shared.del,key);
 5052             signalModifiedKey(c,c->db,key);
 5053             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
 5054             server.dirty++;
 5055         }
 5056         decrRefCount(obj);
 5057         addReply(c, shared.ok);
 5058         return;
 5059     }
 5060 
 5061     /* Create the key and set the TTL if any */
 5062     dbAdd(c->db,key,obj);
 5063     if (ttl) {
 5064         setExpire(c,c->db,key,ttl);
 5065     }
 5066     objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
 5067     signalModifiedKey(c,c->db,key);
 5068     notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
 5069     addReply(c,shared.ok);
 5070     server.dirty++;