"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.0.8/src/redis-cli.c" (10 Sep 2020, 309007 Bytes) of package /linux/misc/redis-6.0.8.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "redis-cli.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.7_vs_6.0.8.

    1 /* Redis CLI (command line interface)
    2  *
    3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
    4  * All rights reserved.
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions are met:
    8  *
    9  *   * Redistributions of source code must retain the above copyright notice,
   10  *     this list of conditions and the following disclaimer.
   11  *   * Redistributions in binary form must reproduce the above copyright
   12  *     notice, this list of conditions and the following disclaimer in the
   13  *     documentation and/or other materials provided with the distribution.
   14  *   * Neither the name of Redis nor the names of its contributors may be used
   15  *     to endorse or promote products derived from this software without
   16  *     specific prior written permission.
   17  *
   18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   28  * POSSIBILITY OF SUCH DAMAGE.
   29  */
   30 
   31 #include "fmacros.h"
   32 #include "version.h"
   33 
   34 #include <stdio.h>
   35 #include <string.h>
   36 #include <stdlib.h>
   37 #include <signal.h>
   38 #include <unistd.h>
   39 #include <time.h>
   40 #include <ctype.h>
   41 #include <errno.h>
   42 #include <sys/stat.h>
   43 #include <sys/time.h>
   44 #include <assert.h>
   45 #include <fcntl.h>
   46 #include <limits.h>
   47 #include <math.h>
   48 
   49 #include <hiredis.h>
   50 #ifdef USE_OPENSSL
   51 #include <openssl/ssl.h>
   52 #include <openssl/err.h>
   53 #include <hiredis_ssl.h>
   54 #endif
   55 #include <sds.h> /* use sds.h from hiredis, so that only one set of sds functions will be present in the binary */
   56 #include "dict.h"
   57 #include "adlist.h"
   58 #include "zmalloc.h"
   59 #include "linenoise.h"
   60 #include "help.h"
   61 #include "anet.h"
   62 #include "ae.h"
   63 
   64 #define UNUSED(V) ((void) V)
   65 
   66 #define OUTPUT_STANDARD 0
   67 #define OUTPUT_RAW 1
   68 #define OUTPUT_CSV 2
   69 #define REDIS_CLI_KEEPALIVE_INTERVAL 15 /* seconds */
   70 #define REDIS_CLI_DEFAULT_PIPE_TIMEOUT 30 /* seconds */
   71 #define REDIS_CLI_HISTFILE_ENV "REDISCLI_HISTFILE"
   72 #define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history"
   73 #define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE"
   74 #define REDIS_CLI_RCFILE_DEFAULT ".redisclirc"
   75 #define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH"
   76 #define REDIS_CLI_CLUSTER_YES_ENV "REDISCLI_CLUSTER_YES"
   77 
   78 #define CLUSTER_MANAGER_SLOTS               16384
   79 #define CLUSTER_MANAGER_MIGRATE_TIMEOUT     60000
   80 #define CLUSTER_MANAGER_MIGRATE_PIPELINE    10
   81 #define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
   82 
   83 #define CLUSTER_MANAGER_INVALID_HOST_ARG \
   84     "[ERR] Invalid arguments: you need to pass either a valid " \
   85     "address (ie. 120.0.0.1:7000) or space separated IP " \
   86     "and port (ie. 120.0.0.1 7000)\n"
   87 #define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL)
   88 #define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1))
   89 #define CLUSTER_MANAGER_COMMAND(n,...) \
   90         (redisCommand(n->context, __VA_ARGS__))
   91 
   92 #define CLUSTER_MANAGER_NODE_ARRAY_FREE(array) zfree(array->alloc)
   93 
   94 #define CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err) \
   95     clusterManagerLogErr("Node %s:%d replied with error:\n%s\n", \
   96                          n->ip, n->port, err);
   97 
   98 #define clusterManagerLogInfo(...) \
   99     clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_INFO,__VA_ARGS__)
  100 
  101 #define clusterManagerLogErr(...) \
  102     clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_ERR,__VA_ARGS__)
  103 
  104 #define clusterManagerLogWarn(...) \
  105     clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_WARN,__VA_ARGS__)
  106 
  107 #define clusterManagerLogOk(...) \
  108     clusterManagerLog(CLUSTER_MANAGER_LOG_LVL_SUCCESS,__VA_ARGS__)
  109 
  110 #define CLUSTER_MANAGER_FLAG_MYSELF     1 << 0
  111 #define CLUSTER_MANAGER_FLAG_SLAVE      1 << 1
  112 #define CLUSTER_MANAGER_FLAG_FRIEND     1 << 2
  113 #define CLUSTER_MANAGER_FLAG_NOADDR     1 << 3
  114 #define CLUSTER_MANAGER_FLAG_DISCONNECT 1 << 4
  115 #define CLUSTER_MANAGER_FLAG_FAIL       1 << 5
  116 
  117 #define CLUSTER_MANAGER_CMD_FLAG_FIX            1 << 0
  118 #define CLUSTER_MANAGER_CMD_FLAG_SLAVE          1 << 1
  119 #define CLUSTER_MANAGER_CMD_FLAG_YES            1 << 2
  120 #define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS    1 << 3
  121 #define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER    1 << 4
  122 #define CLUSTER_MANAGER_CMD_FLAG_SIMULATE       1 << 5
  123 #define CLUSTER_MANAGER_CMD_FLAG_REPLACE        1 << 6
  124 #define CLUSTER_MANAGER_CMD_FLAG_COPY           1 << 7
  125 #define CLUSTER_MANAGER_CMD_FLAG_COLOR          1 << 8
  126 #define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS   1 << 9
  127 #define CLUSTER_MANAGER_CMD_FLAG_FIX_WITH_UNREACHABLE_MASTERS 1 << 10
  128 #define CLUSTER_MANAGER_CMD_FLAG_MASTERS_ONLY   1 << 11
  129 #define CLUSTER_MANAGER_CMD_FLAG_SLAVES_ONLY    1 << 12
  130 
  131 #define CLUSTER_MANAGER_OPT_GETFRIENDS  1 << 0
  132 #define CLUSTER_MANAGER_OPT_COLD        1 << 1
  133 #define CLUSTER_MANAGER_OPT_UPDATE      1 << 2
  134 #define CLUSTER_MANAGER_OPT_QUIET       1 << 6
  135 #define CLUSTER_MANAGER_OPT_VERBOSE     1 << 7
  136 
  137 #define CLUSTER_MANAGER_LOG_LVL_INFO    1
  138 #define CLUSTER_MANAGER_LOG_LVL_WARN    2
  139 #define CLUSTER_MANAGER_LOG_LVL_ERR     3
  140 #define CLUSTER_MANAGER_LOG_LVL_SUCCESS 4
  141 
  142 #define CLUSTER_JOIN_CHECK_AFTER        20
  143 
  144 #define LOG_COLOR_BOLD      "29;1m"
  145 #define LOG_COLOR_RED       "31;1m"
  146 #define LOG_COLOR_GREEN     "32;1m"
  147 #define LOG_COLOR_YELLOW    "33;1m"
  148 #define LOG_COLOR_RESET     "0m"
  149 
  150 /* cliConnect() flags. */
  151 #define CC_FORCE (1<<0)         /* Re-connect if already connected. */
  152 #define CC_QUIET (1<<1)         /* Don't log connecting errors. */
  153 
  154 /* --latency-dist palettes. */
  155 int spectrum_palette_color_size = 19;
  156 int spectrum_palette_color[] = {0,233,234,235,237,239,241,243,245,247,144,143,142,184,226,214,208,202,196};
  157 
  158 int spectrum_palette_mono_size = 13;
  159 int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253};
  160 
  161 /* The actual palette in use. */
  162 int *spectrum_palette;
  163 int spectrum_palette_size;
  164 
  165 /* Dict Helpers */
  166 
  167 static uint64_t dictSdsHash(const void *key);
  168 static int dictSdsKeyCompare(void *privdata, const void *key1,
  169     const void *key2);
  170 static void dictSdsDestructor(void *privdata, void *val);
  171 static void dictListDestructor(void *privdata, void *val);
  172 
  173 /* Cluster Manager Command Info */
  174 typedef struct clusterManagerCommand {
  175     char *name;
  176     int argc;
  177     char **argv;
  178     int flags;
  179     int replicas;
  180     char *from;
  181     char *to;
  182     char **weight;
  183     int weight_argc;
  184     char *master_id;
  185     int slots;
  186     int timeout;
  187     int pipeline;
  188     float threshold;
  189     char *backup_dir;
  190 } clusterManagerCommand;
  191 
  192 static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
  193 
  194 
  195 static redisContext *context;
  196 static struct config {
  197     char *hostip;
  198     int hostport;
  199     char *hostsocket;
  200     int tls;
  201     char *sni;
  202     char *cacert;
  203     char *cacertdir;
  204     char *cert;
  205     char *key;
  206     long repeat;
  207     long interval;
  208     int dbnum;
  209     int interactive;
  210     int shutdown;
  211     int monitor_mode;
  212     int pubsub_mode;
  213     int latency_mode;
  214     int latency_dist_mode;
  215     int latency_history;
  216     int lru_test_mode;
  217     long long lru_test_sample_size;
  218     int cluster_mode;
  219     int cluster_reissue_command;
  220     int slave_mode;
  221     int pipe_mode;
  222     int pipe_timeout;
  223     int getrdb_mode;
  224     int stat_mode;
  225     int scan_mode;
  226     int intrinsic_latency_mode;
  227     int intrinsic_latency_duration;
  228     char *pattern;
  229     char *rdb_filename;
  230     int bigkeys;
  231     int memkeys;
  232     unsigned memkeys_samples;
  233     int hotkeys;
  234     int stdinarg; /* get last arg from stdin. (-x option) */
  235     char *auth;
  236     int askpass;
  237     char *user;
  238     int output; /* output mode, see OUTPUT_* defines */
  239     sds mb_delim;
  240     char prompt[128];
  241     char *eval;
  242     int eval_ldb;
  243     int eval_ldb_sync;  /* Ask for synchronous mode of the Lua debugger. */
  244     int eval_ldb_end;   /* Lua debugging session ended. */
  245     int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */
  246     int last_cmd_type;
  247     int verbose;
  248     clusterManagerCommand cluster_manager_command;
  249     int no_auth_warning;
  250     int resp3;
  251 } config;
  252 
  253 /* User preferences. */
  254 static struct pref {
  255     int hints;
  256 } pref;
  257 
  258 static volatile sig_atomic_t force_cancel_loop = 0;
  259 static void usage(void);
  260 static void slaveMode(void);
  261 char *redisGitSHA1(void);
  262 char *redisGitDirty(void);
  263 static int cliConnect(int force);
  264 
  265 static char *getInfoField(char *info, char *field);
  266 static long getLongInfoField(char *info, char *field);
  267 
  268 /*------------------------------------------------------------------------------
  269  * Utility functions
  270  *--------------------------------------------------------------------------- */
  271 
  272 uint16_t crc16(const char *buf, int len);
  273 
  274 static long long ustime(void) {
  275     struct timeval tv;
  276     long long ust;
  277 
  278     gettimeofday(&tv, NULL);
  279     ust = ((long long)tv.tv_sec)*1000000;
  280     ust += tv.tv_usec;
  281     return ust;
  282 }
  283 
  284 static long long mstime(void) {
  285     return ustime()/1000;
  286 }
  287 
  288 static void cliRefreshPrompt(void) {
  289     if (config.eval_ldb) return;
  290 
  291     sds prompt = sdsempty();
  292     if (config.hostsocket != NULL) {
  293         prompt = sdscatfmt(prompt,"redis %s",config.hostsocket);
  294     } else {
  295         char addr[256];
  296         anetFormatAddr(addr, sizeof(addr), config.hostip, config.hostport);
  297         prompt = sdscatlen(prompt,addr,strlen(addr));
  298     }
  299 
  300     /* Add [dbnum] if needed */
  301     if (config.dbnum != 0)
  302         prompt = sdscatfmt(prompt,"[%i]",config.dbnum);
  303 
  304     /* Copy the prompt in the static buffer. */
  305     prompt = sdscatlen(prompt,"> ",2);
  306     snprintf(config.prompt,sizeof(config.prompt),"%s",prompt);
  307     sdsfree(prompt);
  308 }
  309 
  310 /* Return the name of the dotfile for the specified 'dotfilename'.
  311  * Normally it just concatenates user $HOME to the file specified
  312  * in 'dotfilename'. However if the environment varialbe 'envoverride'
  313  * is set, its value is taken as the path.
  314  *
  315  * The function returns NULL (if the file is /dev/null or cannot be
  316  * obtained for some error), or an SDS string that must be freed by
  317  * the user. */
  318 static sds getDotfilePath(char *envoverride, char *dotfilename) {
  319     char *path = NULL;
  320     sds dotPath = NULL;
  321 
  322     /* Check the env for a dotfile override. */
  323     path = getenv(envoverride);
  324     if (path != NULL && *path != '\0') {
  325         if (!strcmp("/dev/null", path)) {
  326             return NULL;
  327         }
  328 
  329         /* If the env is set, return it. */
  330         dotPath = sdsnew(path);
  331     } else {
  332         char *home = getenv("HOME");
  333         if (home != NULL && *home != '\0') {
  334             /* If no override is set use $HOME/<dotfilename>. */
  335             dotPath = sdscatprintf(sdsempty(), "%s/%s", home, dotfilename);
  336         }
  337     }
  338     return dotPath;
  339 }
  340 
  341 /* URL-style percent decoding. */
  342 #define isHexChar(c) (isdigit(c) || (c >= 'a' && c <= 'f'))
  343 #define decodeHexChar(c) (isdigit(c) ? c - '0' : c - 'a' + 10)
  344 #define decodeHex(h, l) ((decodeHexChar(h) << 4) + decodeHexChar(l))
  345 
  346 static sds percentDecode(const char *pe, size_t len) {
  347     const char *end = pe + len;
  348     sds ret = sdsempty();
  349     const char *curr = pe;
  350 
  351     while (curr < end) {
  352         if (*curr == '%') {
  353             if ((end - curr) < 2) {
  354                 fprintf(stderr, "Incomplete URI encoding\n");
  355                 exit(1);
  356             }
  357 
  358             char h = tolower(*(++curr));
  359             char l = tolower(*(++curr));
  360             if (!isHexChar(h) || !isHexChar(l)) {
  361                 fprintf(stderr, "Illegal character in URI encoding\n");
  362                 exit(1);
  363             }
  364             char c = decodeHex(h, l);
  365             ret = sdscatlen(ret, &c, 1);
  366             curr++;
  367         } else {
  368             ret = sdscatlen(ret, curr++, 1);
  369         }
  370     }
  371 
  372     return ret;
  373 }
  374 
  375 /* Parse a URI and extract the server connection information.
  376  * URI scheme is based on the the provisional specification[1] excluding support
  377  * for query parameters. Valid URIs are:
  378  *   scheme:    "redis://"
  379  *   authority: [[<username> ":"] <password> "@"] [<hostname> [":" <port>]]
  380  *   path:      ["/" [<db>]]
  381  *
  382  *  [1]: https://www.iana.org/assignments/uri-schemes/prov/redis */
  383 static void parseRedisUri(const char *uri) {
  384 
  385     const char *scheme = "redis://";
  386     const char *curr = uri;
  387     const char *end = uri + strlen(uri);
  388     const char *userinfo, *username, *port, *host, *path;
  389 
  390     /* URI must start with a valid scheme. */
  391     if (strncasecmp(scheme, curr, strlen(scheme))) {
  392         fprintf(stderr,"Invalid URI scheme\n");
  393         exit(1);
  394     }
  395     curr += strlen(scheme);
  396     if (curr == end) return;
  397 
  398     /* Extract user info. */
  399     if ((userinfo = strchr(curr,'@'))) {
  400         if ((username = strchr(curr, ':')) && username < userinfo) {
  401             /* If provided, username is ignored. */
  402             curr = username + 1;
  403         }
  404 
  405         config.auth = percentDecode(curr, userinfo - curr);
  406         curr = userinfo + 1;
  407     }
  408     if (curr == end) return;
  409 
  410     /* Extract host and port. */
  411     path = strchr(curr, '/');
  412     if (*curr != '/') {
  413         host = path ? path - 1 : end;
  414         if ((port = strchr(curr, ':'))) {
  415             config.hostport = atoi(port + 1);
  416             host = port - 1;
  417         }
  418         config.hostip = sdsnewlen(curr, host - curr + 1);
  419     }
  420     curr = path ? path + 1 : end;
  421     if (curr == end) return;
  422 
  423     /* Extract database number. */
  424     config.dbnum = atoi(curr);
  425 }
  426 
  427 static uint64_t dictSdsHash(const void *key) {
  428     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
  429 }
  430 
  431 static int dictSdsKeyCompare(void *privdata, const void *key1,
  432         const void *key2)
  433 {
  434     int l1,l2;
  435     DICT_NOTUSED(privdata);
  436 
  437     l1 = sdslen((sds)key1);
  438     l2 = sdslen((sds)key2);
  439     if (l1 != l2) return 0;
  440     return memcmp(key1, key2, l1) == 0;
  441 }
  442 
  443 static void dictSdsDestructor(void *privdata, void *val)
  444 {
  445     DICT_NOTUSED(privdata);
  446     sdsfree(val);
  447 }
  448 
  449 void dictListDestructor(void *privdata, void *val)
  450 {
  451     DICT_NOTUSED(privdata);
  452     listRelease((list*)val);
  453 }
  454 
  455 /* _serverAssert is needed by dict */
  456 void _serverAssert(const char *estr, const char *file, int line) {
  457     fprintf(stderr, "=== ASSERTION FAILED ===");
  458     fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr);
  459     *((char*)-1) = 'x';
  460 }
  461 
  462 /*------------------------------------------------------------------------------
  463  * Help functions
  464  *--------------------------------------------------------------------------- */
  465 
  466 #define CLI_HELP_COMMAND 1
  467 #define CLI_HELP_GROUP 2
  468 
  469 typedef struct {
  470     int type;
  471     int argc;
  472     sds *argv;
  473     sds full;
  474 
  475     /* Only used for help on commands */
  476     struct commandHelp *org;
  477 } helpEntry;
  478 
  479 static helpEntry *helpEntries;
  480 static int helpEntriesLen;
  481 
  482 static sds cliVersion(void) {
  483     sds version;
  484     version = sdscatprintf(sdsempty(), "%s", REDIS_VERSION);
  485 
  486     /* Add git commit and working tree status when available */
  487     if (strtoll(redisGitSHA1(),NULL,16)) {
  488         version = sdscatprintf(version, " (git:%s", redisGitSHA1());
  489         if (strtoll(redisGitDirty(),NULL,10))
  490             version = sdscatprintf(version, "-dirty");
  491         version = sdscat(version, ")");
  492     }
  493     return version;
  494 }
  495 
  496 static void cliInitHelp(void) {
  497     int commandslen = sizeof(commandHelp)/sizeof(struct commandHelp);
  498     int groupslen = sizeof(commandGroups)/sizeof(char*);
  499     int i, len, pos = 0;
  500     helpEntry tmp;
  501 
  502     helpEntriesLen = len = commandslen+groupslen;
  503     helpEntries = zmalloc(sizeof(helpEntry)*len);
  504 
  505     for (i = 0; i < groupslen; i++) {
  506         tmp.argc = 1;
  507         tmp.argv = zmalloc(sizeof(sds));
  508         tmp.argv[0] = sdscatprintf(sdsempty(),"@%s",commandGroups[i]);
  509         tmp.full = tmp.argv[0];
  510         tmp.type = CLI_HELP_GROUP;
  511         tmp.org = NULL;
  512         helpEntries[pos++] = tmp;
  513     }
  514 
  515     for (i = 0; i < commandslen; i++) {
  516         tmp.argv = sdssplitargs(commandHelp[i].name,&tmp.argc);
  517         tmp.full = sdsnew(commandHelp[i].name);
  518         tmp.type = CLI_HELP_COMMAND;
  519         tmp.org = &commandHelp[i];
  520         helpEntries[pos++] = tmp;
  521     }
  522 }
  523 
  524 /* cliInitHelp() setups the helpEntries array with the command and group
  525  * names from the help.h file. However the Redis instance we are connecting
  526  * to may support more commands, so this function integrates the previous
  527  * entries with additional entries obtained using the COMMAND command
  528  * available in recent versions of Redis. */
  529 static void cliIntegrateHelp(void) {
  530     if (cliConnect(CC_QUIET) == REDIS_ERR) return;
  531 
  532     redisReply *reply = redisCommand(context, "COMMAND");
  533     if(reply == NULL || reply->type != REDIS_REPLY_ARRAY) return;
  534 
  535     /* Scan the array reported by COMMAND and fill only the entries that
  536      * don't already match what we have. */
  537     for (size_t j = 0; j < reply->elements; j++) {
  538         redisReply *entry = reply->element[j];
  539         if (entry->type != REDIS_REPLY_ARRAY || entry->elements < 4 ||
  540             entry->element[0]->type != REDIS_REPLY_STRING ||
  541             entry->element[1]->type != REDIS_REPLY_INTEGER ||
  542             entry->element[3]->type != REDIS_REPLY_INTEGER) return;
  543         char *cmdname = entry->element[0]->str;
  544         int i;
  545 
  546         for (i = 0; i < helpEntriesLen; i++) {
  547             helpEntry *he = helpEntries+i;
  548             if (!strcasecmp(he->argv[0],cmdname))
  549                 break;
  550         }
  551         if (i != helpEntriesLen) continue;
  552 
  553         helpEntriesLen++;
  554         helpEntries = zrealloc(helpEntries,sizeof(helpEntry)*helpEntriesLen);
  555         helpEntry *new = helpEntries+(helpEntriesLen-1);
  556 
  557         new->argc = 1;
  558         new->argv = zmalloc(sizeof(sds));
  559         new->argv[0] = sdsnew(cmdname);
  560         new->full = new->argv[0];
  561         new->type = CLI_HELP_COMMAND;
  562         sdstoupper(new->argv[0]);
  563 
  564         struct commandHelp *ch = zmalloc(sizeof(*ch));
  565         ch->name = new->argv[0];
  566         ch->params = sdsempty();
  567         int args = llabs(entry->element[1]->integer);
  568         args--; /* Remove the command name itself. */
  569         if (entry->element[3]->integer == 1) {
  570             ch->params = sdscat(ch->params,"key ");
  571             args--;
  572         }
  573         while(args-- > 0) ch->params = sdscat(ch->params,"arg ");
  574         if (entry->element[1]->integer < 0)
  575             ch->params = sdscat(ch->params,"...options...");
  576         ch->summary = "Help not available";
  577         ch->group = 0;
  578         ch->since = "not known";
  579         new->org = ch;
  580     }
  581     freeReplyObject(reply);
  582 }
  583 
  584 /* Output command help to stdout. */
  585 static void cliOutputCommandHelp(struct commandHelp *help, int group) {
  586     printf("\r\n  \x1b[1m%s\x1b[0m \x1b[90m%s\x1b[0m\r\n", help->name, help->params);
  587     printf("  \x1b[33msummary:\x1b[0m %s\r\n", help->summary);
  588     printf("  \x1b[33msince:\x1b[0m %s\r\n", help->since);
  589     if (group) {
  590         printf("  \x1b[33mgroup:\x1b[0m %s\r\n", commandGroups[help->group]);
  591     }
  592 }
  593 
  594 /* Print generic help. */
  595 static void cliOutputGenericHelp(void) {
  596     sds version = cliVersion();
  597     printf(
  598         "redis-cli %s\n"
  599         "To get help about Redis commands type:\n"
  600         "      \"help @<group>\" to get a list of commands in <group>\n"
  601         "      \"help <command>\" for help on <command>\n"
  602         "      \"help <tab>\" to get a list of possible help topics\n"
  603         "      \"quit\" to exit\n"
  604         "\n"
  605         "To set redis-cli preferences:\n"
  606         "      \":set hints\" enable online hints\n"
  607         "      \":set nohints\" disable online hints\n"
  608         "Set your preferences in ~/.redisclirc\n",
  609         version
  610     );
  611     sdsfree(version);
  612 }
  613 
  614 /* Output all command help, filtering by group or command name. */
  615 static void cliOutputHelp(int argc, char **argv) {
  616     int i, j, len;
  617     int group = -1;
  618     helpEntry *entry;
  619     struct commandHelp *help;
  620 
  621     if (argc == 0) {
  622         cliOutputGenericHelp();
  623         return;
  624     } else if (argc > 0 && argv[0][0] == '@') {
  625         len = sizeof(commandGroups)/sizeof(char*);
  626         for (i = 0; i < len; i++) {
  627             if (strcasecmp(argv[0]+1,commandGroups[i]) == 0) {
  628                 group = i;
  629                 break;
  630             }
  631         }
  632     }
  633 
  634     assert(argc > 0);
  635     for (i = 0; i < helpEntriesLen; i++) {
  636         entry = &helpEntries[i];
  637         if (entry->type != CLI_HELP_COMMAND) continue;
  638 
  639         help = entry->org;
  640         if (group == -1) {
  641             /* Compare all arguments */
  642             if (argc == entry->argc) {
  643                 for (j = 0; j < argc; j++) {
  644                     if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
  645                 }
  646                 if (j == argc) {
  647                     cliOutputCommandHelp(help,1);
  648                 }
  649             }
  650         } else {
  651             if (group == help->group) {
  652                 cliOutputCommandHelp(help,0);
  653             }
  654         }
  655     }
  656     printf("\r\n");
  657 }
  658 
  659 /* Linenoise completion callback. */
  660 static void completionCallback(const char *buf, linenoiseCompletions *lc) {
  661     size_t startpos = 0;
  662     int mask;
  663     int i;
  664     size_t matchlen;
  665     sds tmp;
  666 
  667     if (strncasecmp(buf,"help ",5) == 0) {
  668         startpos = 5;
  669         while (isspace(buf[startpos])) startpos++;
  670         mask = CLI_HELP_COMMAND | CLI_HELP_GROUP;
  671     } else {
  672         mask = CLI_HELP_COMMAND;
  673     }
  674 
  675     for (i = 0; i < helpEntriesLen; i++) {
  676         if (!(helpEntries[i].type & mask)) continue;
  677 
  678         matchlen = strlen(buf+startpos);
  679         if (strncasecmp(buf+startpos,helpEntries[i].full,matchlen) == 0) {
  680             tmp = sdsnewlen(buf,startpos);
  681             tmp = sdscat(tmp,helpEntries[i].full);
  682             linenoiseAddCompletion(lc,tmp);
  683             sdsfree(tmp);
  684         }
  685     }
  686 }
  687 
  688 /* Linenoise hints callback. */
  689 static char *hintsCallback(const char *buf, int *color, int *bold) {
  690     if (!pref.hints) return NULL;
  691 
  692     int i, argc, buflen = strlen(buf);
  693     sds *argv = sdssplitargs(buf,&argc);
  694     int endspace = buflen && isspace(buf[buflen-1]);
  695 
  696     /* Check if the argument list is empty and return ASAP. */
  697     if (argc == 0) {
  698         sdsfreesplitres(argv,argc);
  699         return NULL;
  700     }
  701 
  702     for (i = 0; i < helpEntriesLen; i++) {
  703         if (!(helpEntries[i].type & CLI_HELP_COMMAND)) continue;
  704 
  705         if (strcasecmp(argv[0],helpEntries[i].full) == 0 ||
  706             strcasecmp(buf,helpEntries[i].full) == 0)
  707         {
  708             *color = 90;
  709             *bold = 0;
  710             sds hint = sdsnew(helpEntries[i].org->params);
  711 
  712             /* Remove arguments from the returned hint to show only the
  713              * ones the user did not yet typed. */
  714             int toremove = argc-1;
  715             while(toremove > 0 && sdslen(hint)) {
  716                 if (hint[0] == '[') break;
  717                 if (hint[0] == ' ') toremove--;
  718                 sdsrange(hint,1,-1);
  719             }
  720 
  721             /* Add an initial space if needed. */
  722             if (!endspace) {
  723                 sds newhint = sdsnewlen(" ",1);
  724                 newhint = sdscatsds(newhint,hint);
  725                 sdsfree(hint);
  726                 hint = newhint;
  727             }
  728 
  729             sdsfreesplitres(argv,argc);
  730             return hint;
  731         }
  732     }
  733     sdsfreesplitres(argv,argc);
  734     return NULL;
  735 }
  736 
  737 static void freeHintsCallback(void *ptr) {
  738     sdsfree(ptr);
  739 }
  740 
  741 /*------------------------------------------------------------------------------
  742  * Networking / parsing
  743  *--------------------------------------------------------------------------- */
  744 
  745 /* Send AUTH command to the server */
  746 static int cliAuth(void) {
  747     redisReply *reply;
  748     if (config.auth == NULL) return REDIS_OK;
  749 
  750     if (config.user == NULL)
  751         reply = redisCommand(context,"AUTH %s",config.auth);
  752     else
  753         reply = redisCommand(context,"AUTH %s %s",config.user,config.auth);
  754     if (reply != NULL) {
  755         if (reply->type == REDIS_REPLY_ERROR)
  756             fprintf(stderr,"Warning: AUTH failed\n");
  757         freeReplyObject(reply);
  758         return REDIS_OK;
  759     }
  760     return REDIS_ERR;
  761 }
  762 
  763 /* Send SELECT dbnum to the server */
  764 static int cliSelect(void) {
  765     redisReply *reply;
  766     if (config.dbnum == 0) return REDIS_OK;
  767 
  768     reply = redisCommand(context,"SELECT %d",config.dbnum);
  769     if (reply != NULL) {
  770         int result = REDIS_OK;
  771         if (reply->type == REDIS_REPLY_ERROR) result = REDIS_ERR;
  772         freeReplyObject(reply);
  773         return result;
  774     }
  775     return REDIS_ERR;
  776 }
  777 
  778 /* Wrapper around redisSecureConnection to avoid hiredis_ssl dependencies if
  779  * not building with TLS support.
  780  */
  781 static int cliSecureConnection(redisContext *c, const char **err) {
  782 #ifdef USE_OPENSSL
  783     static SSL_CTX *ssl_ctx = NULL;
  784 
  785     if (!ssl_ctx) {
  786         ssl_ctx = SSL_CTX_new(SSLv23_client_method());
  787         if (!ssl_ctx) {
  788             *err = "Failed to create SSL_CTX";
  789             goto error;
  790         }
  791 
  792         SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
  793         SSL_CTX_set_verify(ssl_ctx, SSL_VERIFY_PEER, NULL);
  794 
  795         if (config.cacert || config.cacertdir) {
  796             if (!SSL_CTX_load_verify_locations(ssl_ctx, config.cacert, config.cacertdir)) {
  797                 *err = "Invalid CA Certificate File/Directory";
  798                 goto error;
  799             }
  800         } else {
  801             if (!SSL_CTX_set_default_verify_paths(ssl_ctx)) {
  802                 *err = "Failed to use default CA paths";
  803                 goto error;
  804             }
  805         }
  806 
  807         if (config.cert && !SSL_CTX_use_certificate_chain_file(ssl_ctx, config.cert)) {
  808             *err = "Invalid client certificate";
  809             goto error;
  810         }
  811 
  812         if (config.key && !SSL_CTX_use_PrivateKey_file(ssl_ctx, config.key, SSL_FILETYPE_PEM)) {
  813             *err = "Invalid private key";
  814             goto error;
  815         }
  816     }
  817 
  818     SSL *ssl = SSL_new(ssl_ctx);
  819     if (!ssl) {
  820         *err = "Failed to create SSL object";
  821         return REDIS_ERR;
  822     }
  823 
  824     if (config.sni && !SSL_set_tlsext_host_name(ssl, config.sni)) {
  825         *err = "Failed to configure SNI";
  826         SSL_free(ssl);
  827         return REDIS_ERR;
  828     }
  829 
  830     return redisInitiateSSL(c, ssl);
  831 
  832 error:
  833     SSL_CTX_free(ssl_ctx);
  834     ssl_ctx = NULL;
  835     return REDIS_ERR;
  836 #else
  837     (void) c;
  838     (void) err;
  839     return REDIS_OK;
  840 #endif
  841 }
  842 
  843 /* Select RESP3 mode if redis-cli was started with the -3 option.  */
  844 static int cliSwitchProto(void) {
  845     redisReply *reply;
  846     if (config.resp3 == 0) return REDIS_OK;
  847 
  848     reply = redisCommand(context,"HELLO 3");
  849     if (reply != NULL) {
  850         int result = REDIS_OK;
  851         if (reply->type == REDIS_REPLY_ERROR) result = REDIS_ERR;
  852         freeReplyObject(reply);
  853         return result;
  854     }
  855     return REDIS_ERR;
  856 }
  857 
  858 /* Connect to the server. It is possible to pass certain flags to the function:
  859  *      CC_FORCE: The connection is performed even if there is already
  860  *                a connected socket.
  861  *      CC_QUIET: Don't print errors if connection fails. */
  862 static int cliConnect(int flags) {
  863     if (context == NULL || flags & CC_FORCE) {
  864         if (context != NULL) {
  865             redisFree(context);
  866         }
  867 
  868         if (config.hostsocket == NULL) {
  869             context = redisConnect(config.hostip,config.hostport);
  870         } else {
  871             context = redisConnectUnix(config.hostsocket);
  872         }
  873 
  874         if (!context->err && config.tls) {
  875             const char *err = NULL;
  876             if (cliSecureConnection(context, &err) == REDIS_ERR && err) {
  877                 fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
  878                 context = NULL;
  879                 redisFree(context);
  880                 return REDIS_ERR;
  881             }
  882         }
  883 
  884         if (context->err) {
  885             if (!(flags & CC_QUIET)) {
  886                 fprintf(stderr,"Could not connect to Redis at ");
  887                 if (config.hostsocket == NULL)
  888                     fprintf(stderr,"%s:%d: %s\n",
  889                         config.hostip,config.hostport,context->errstr);
  890                 else
  891                     fprintf(stderr,"%s: %s\n",
  892                         config.hostsocket,context->errstr);
  893             }
  894             redisFree(context);
  895             context = NULL;
  896             return REDIS_ERR;
  897         }
  898 
  899 
  900         /* Set aggressive KEEP_ALIVE socket option in the Redis context socket
  901          * in order to prevent timeouts caused by the execution of long
  902          * commands. At the same time this improves the detection of real
  903          * errors. */
  904         anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
  905 
  906         /* Do AUTH, select the right DB, switch to RESP3 if needed. */
  907         if (cliAuth() != REDIS_OK)
  908             return REDIS_ERR;
  909         if (cliSelect() != REDIS_OK)
  910             return REDIS_ERR;
  911         if (cliSwitchProto() != REDIS_OK)
  912             return REDIS_ERR;
  913     }
  914     return REDIS_OK;
  915 }
  916 
  917 static void cliPrintContextError(void) {
  918     if (context == NULL) return;
  919     fprintf(stderr,"Error: %s\n",context->errstr);
  920 }
  921 
  922 static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
  923     sds out = sdsempty();
  924     switch (r->type) {
  925     case REDIS_REPLY_ERROR:
  926         out = sdscatprintf(out,"(error) %s\n", r->str);
  927     break;
  928     case REDIS_REPLY_STATUS:
  929         out = sdscat(out,r->str);
  930         out = sdscat(out,"\n");
  931     break;
  932     case REDIS_REPLY_INTEGER:
  933         out = sdscatprintf(out,"(integer) %lld\n",r->integer);
  934     break;
  935     case REDIS_REPLY_DOUBLE:
  936         out = sdscatprintf(out,"(double) %s\n",r->str);
  937     break;
  938     case REDIS_REPLY_STRING:
  939     case REDIS_REPLY_VERB:
  940         /* If you are producing output for the standard output we want
  941         * a more interesting output with quoted characters and so forth,
  942         * unless it's a verbatim string type. */
  943         if (r->type == REDIS_REPLY_STRING) {
  944             out = sdscatrepr(out,r->str,r->len);
  945             out = sdscat(out,"\n");
  946         } else {
  947             out = sdscatlen(out,r->str,r->len);
  948             out = sdscat(out,"\n");
  949         }
  950     break;
  951     case REDIS_REPLY_NIL:
  952         out = sdscat(out,"(nil)\n");
  953     break;
  954     case REDIS_REPLY_BOOL:
  955         out = sdscat(out,r->integer ? "(true)\n" : "(false)\n");
  956     break;
  957     case REDIS_REPLY_ARRAY:
  958     case REDIS_REPLY_MAP:
  959     case REDIS_REPLY_SET:
  960         if (r->elements == 0) {
  961             if (r->type == REDIS_REPLY_ARRAY)
  962                 out = sdscat(out,"(empty array)\n");
  963             else if (r->type == REDIS_REPLY_MAP)
  964                 out = sdscat(out,"(empty hash)\n");
  965             else if (r->type == REDIS_REPLY_SET)
  966                 out = sdscat(out,"(empty set)\n");
  967             else
  968                 out = sdscat(out,"(empty aggregate type)\n");
  969         } else {
  970             unsigned int i, idxlen = 0;
  971             char _prefixlen[16];
  972             char _prefixfmt[16];
  973             sds _prefix;
  974             sds tmp;
  975 
  976             /* Calculate chars needed to represent the largest index */
  977             i = r->elements;
  978             if (r->type == REDIS_REPLY_MAP) i /= 2;
  979             do {
  980                 idxlen++;
  981                 i /= 10;
  982             } while(i);
  983 
  984             /* Prefix for nested multi bulks should grow with idxlen+2 spaces */
  985             memset(_prefixlen,' ',idxlen+2);
  986             _prefixlen[idxlen+2] = '\0';
  987             _prefix = sdscat(sdsnew(prefix),_prefixlen);
  988 
  989             /* Setup prefix format for every entry */
  990             char numsep;
  991             if (r->type == REDIS_REPLY_SET) numsep = '~';
  992             else if (r->type == REDIS_REPLY_MAP) numsep = '#';
  993             else numsep = ')';
  994             snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep);
  995 
  996             for (i = 0; i < r->elements; i++) {
  997                 unsigned int human_idx = (r->type == REDIS_REPLY_MAP) ?
  998                                          i/2 : i;
  999                 human_idx++; /* Make it 1-based. */
 1000 
 1001                 /* Don't use the prefix for the first element, as the parent
 1002                  * caller already prepended the index number. */
 1003                 out = sdscatprintf(out,_prefixfmt,i == 0 ? "" : prefix,human_idx);
 1004 
 1005                 /* Format the multi bulk entry */
 1006                 tmp = cliFormatReplyTTY(r->element[i],_prefix);
 1007                 out = sdscatlen(out,tmp,sdslen(tmp));
 1008                 sdsfree(tmp);
 1009 
 1010                 /* For maps, format the value as well. */
 1011                 if (r->type == REDIS_REPLY_MAP) {
 1012                     i++;
 1013                     sdsrange(out,0,-2);
 1014                     out = sdscat(out," => ");
 1015                     tmp = cliFormatReplyTTY(r->element[i],_prefix);
 1016                     out = sdscatlen(out,tmp,sdslen(tmp));
 1017                     sdsfree(tmp);
 1018                 }
 1019             }
 1020             sdsfree(_prefix);
 1021         }
 1022     break;
 1023     default:
 1024         fprintf(stderr,"Unknown reply type: %d\n", r->type);
 1025         exit(1);
 1026     }
 1027     return out;
 1028 }
 1029 
 1030 int isColorTerm(void) {
 1031     char *t = getenv("TERM");
 1032     return t != NULL && strstr(t,"xterm") != NULL;
 1033 }
 1034 
 1035 /* Helper  function for sdsCatColorizedLdbReply() appending colorize strings
 1036  * to an SDS string. */
 1037 sds sdscatcolor(sds o, char *s, size_t len, char *color) {
 1038     if (!isColorTerm()) return sdscatlen(o,s,len);
 1039 
 1040     int bold = strstr(color,"bold") != NULL;
 1041     int ccode = 37; /* Defaults to white. */
 1042     if (strstr(color,"red")) ccode = 31;
 1043     else if (strstr(color,"green")) ccode = 32;
 1044     else if (strstr(color,"yellow")) ccode = 33;
 1045     else if (strstr(color,"blue")) ccode = 34;
 1046     else if (strstr(color,"magenta")) ccode = 35;
 1047     else if (strstr(color,"cyan")) ccode = 36;
 1048     else if (strstr(color,"white")) ccode = 37;
 1049 
 1050     o = sdscatfmt(o,"\033[%i;%i;49m",bold,ccode);
 1051     o = sdscatlen(o,s,len);
 1052     o = sdscat(o,"\033[0m");
 1053     return o;
 1054 }
 1055 
 1056 /* Colorize Lua debugger status replies according to the prefix they
 1057  * have. */
 1058 sds sdsCatColorizedLdbReply(sds o, char *s, size_t len) {
 1059     char *color = "white";
 1060 
 1061     if (strstr(s,"<debug>")) color = "bold";
 1062     if (strstr(s,"<redis>")) color = "green";
 1063     if (strstr(s,"<reply>")) color = "cyan";
 1064     if (strstr(s,"<error>")) color = "red";
 1065     if (strstr(s,"<hint>")) color = "bold";
 1066     if (strstr(s,"<value>") || strstr(s,"<retval>")) color = "magenta";
 1067     if (len > 4 && isdigit(s[3])) {
 1068         if (s[1] == '>') color = "yellow"; /* Current line. */
 1069         else if (s[2] == '#') color = "bold"; /* Break point. */
 1070     }
 1071     return sdscatcolor(o,s,len,color);
 1072 }
 1073 
 1074 static sds cliFormatReplyRaw(redisReply *r) {
 1075     sds out = sdsempty(), tmp;
 1076     size_t i;
 1077 
 1078     switch (r->type) {
 1079     case REDIS_REPLY_NIL:
 1080         /* Nothing... */
 1081         break;
 1082     case REDIS_REPLY_ERROR:
 1083         out = sdscatlen(out,r->str,r->len);
 1084         out = sdscatlen(out,"\n",1);
 1085         break;
 1086     case REDIS_REPLY_STATUS:
 1087     case REDIS_REPLY_STRING:
 1088     case REDIS_REPLY_VERB:
 1089         if (r->type == REDIS_REPLY_STATUS && config.eval_ldb) {
 1090             /* The Lua debugger replies with arrays of simple (status)
 1091              * strings. We colorize the output for more fun if this
 1092              * is a debugging session. */
 1093 
 1094             /* Detect the end of a debugging session. */
 1095             if (strstr(r->str,"<endsession>") == r->str) {
 1096                 config.enable_ldb_on_eval = 0;
 1097                 config.eval_ldb = 0;
 1098                 config.eval_ldb_end = 1; /* Signal the caller session ended. */
 1099                 config.output = OUTPUT_STANDARD;
 1100                 cliRefreshPrompt();
 1101             } else {
 1102                 out = sdsCatColorizedLdbReply(out,r->str,r->len);
 1103             }
 1104         } else {
 1105             out = sdscatlen(out,r->str,r->len);
 1106         }
 1107         break;
 1108     case REDIS_REPLY_BOOL:
 1109         out = sdscat(out,r->integer ? "(true)" : "(false)");
 1110     break;
 1111     case REDIS_REPLY_INTEGER:
 1112         out = sdscatprintf(out,"%lld",r->integer);
 1113         break;
 1114     case REDIS_REPLY_DOUBLE:
 1115         out = sdscatprintf(out,"%s",r->str);
 1116         break;
 1117     case REDIS_REPLY_ARRAY:
 1118         for (i = 0; i < r->elements; i++) {
 1119             if (i > 0) out = sdscat(out,config.mb_delim);
 1120             tmp = cliFormatReplyRaw(r->element[i]);
 1121             out = sdscatlen(out,tmp,sdslen(tmp));
 1122             sdsfree(tmp);
 1123         }
 1124         break;
 1125     case REDIS_REPLY_MAP:
 1126         for (i = 0; i < r->elements; i += 2) {
 1127             if (i > 0) out = sdscat(out,config.mb_delim);
 1128             tmp = cliFormatReplyRaw(r->element[i]);
 1129             out = sdscatlen(out,tmp,sdslen(tmp));
 1130             sdsfree(tmp);
 1131 
 1132             out = sdscatlen(out," ",1);
 1133             tmp = cliFormatReplyRaw(r->element[i+1]);
 1134             out = sdscatlen(out,tmp,sdslen(tmp));
 1135             sdsfree(tmp);
 1136         }
 1137         break;
 1138     default:
 1139         fprintf(stderr,"Unknown reply type: %d\n", r->type);
 1140         exit(1);
 1141     }
 1142     return out;
 1143 }
 1144 
 1145 static sds cliFormatReplyCSV(redisReply *r) {
 1146     unsigned int i;
 1147 
 1148     sds out = sdsempty();
 1149     switch (r->type) {
 1150     case REDIS_REPLY_ERROR:
 1151         out = sdscat(out,"ERROR,");
 1152         out = sdscatrepr(out,r->str,strlen(r->str));
 1153     break;
 1154     case REDIS_REPLY_STATUS:
 1155         out = sdscatrepr(out,r->str,r->len);
 1156     break;
 1157     case REDIS_REPLY_INTEGER:
 1158         out = sdscatprintf(out,"%lld",r->integer);
 1159     break;
 1160     case REDIS_REPLY_DOUBLE:
 1161         out = sdscatprintf(out,"%s",r->str);
 1162         break;
 1163     case REDIS_REPLY_STRING:
 1164     case REDIS_REPLY_VERB:
 1165         out = sdscatrepr(out,r->str,r->len);
 1166     break;
 1167     case REDIS_REPLY_NIL:
 1168         out = sdscat(out,"NULL");
 1169     break;
 1170     case REDIS_REPLY_BOOL:
 1171         out = sdscat(out,r->integer ? "true" : "false");
 1172     break;
 1173     case REDIS_REPLY_ARRAY:
 1174     case REDIS_REPLY_MAP: /* CSV has no map type, just output flat list. */
 1175         for (i = 0; i < r->elements; i++) {
 1176             sds tmp = cliFormatReplyCSV(r->element[i]);
 1177             out = sdscatlen(out,tmp,sdslen(tmp));
 1178             if (i != r->elements-1) out = sdscat(out,",");
 1179             sdsfree(tmp);
 1180         }
 1181     break;
 1182     default:
 1183         fprintf(stderr,"Unknown reply type: %d\n", r->type);
 1184         exit(1);
 1185     }
 1186     return out;
 1187 }
 1188 
 1189 static int cliReadReply(int output_raw_strings) {
 1190     void *_reply;
 1191     redisReply *reply;
 1192     sds out = NULL;
 1193     int output = 1;
 1194 
 1195     if (redisGetReply(context,&_reply) != REDIS_OK) {
 1196         if (config.shutdown) {
 1197             redisFree(context);
 1198             context = NULL;
 1199             return REDIS_OK;
 1200         }
 1201         if (config.interactive) {
 1202             /* Filter cases where we should reconnect */
 1203             if (context->err == REDIS_ERR_IO &&
 1204                 (errno == ECONNRESET || errno == EPIPE))
 1205                 return REDIS_ERR;
 1206             if (context->err == REDIS_ERR_EOF)
 1207                 return REDIS_ERR;
 1208         }
 1209         cliPrintContextError();
 1210         exit(1);
 1211         return REDIS_ERR; /* avoid compiler warning */
 1212     }
 1213 
 1214     reply = (redisReply*)_reply;
 1215 
 1216     config.last_cmd_type = reply->type;
 1217 
 1218     /* Check if we need to connect to a different node and reissue the
 1219      * request. */
 1220     if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR &&
 1221         (!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK")))
 1222     {
 1223         char *p = reply->str, *s;
 1224         int slot;
 1225 
 1226         output = 0;
 1227         /* Comments show the position of the pointer as:
 1228          *
 1229          * [S] for pointer 's'
 1230          * [P] for pointer 'p'
 1231          */
 1232         s = strchr(p,' ');      /* MOVED[S]3999 127.0.0.1:6381 */
 1233         p = strchr(s+1,' ');    /* MOVED[S]3999[P]127.0.0.1:6381 */
 1234         *p = '\0';
 1235         slot = atoi(s+1);
 1236         s = strrchr(p+1,':');    /* MOVED 3999[P]127.0.0.1[S]6381 */
 1237         *s = '\0';
 1238         sdsfree(config.hostip);
 1239         config.hostip = sdsnew(p+1);
 1240         config.hostport = atoi(s+1);
 1241         if (config.interactive)
 1242             printf("-> Redirected to slot [%d] located at %s:%d\n",
 1243                 slot, config.hostip, config.hostport);
 1244         config.cluster_reissue_command = 1;
 1245         cliRefreshPrompt();
 1246     }
 1247 
 1248     if (output) {
 1249         if (output_raw_strings) {
 1250             out = cliFormatReplyRaw(reply);
 1251         } else {
 1252             if (config.output == OUTPUT_RAW) {
 1253                 out = cliFormatReplyRaw(reply);
 1254                 out = sdscat(out,"\n");
 1255             } else if (config.output == OUTPUT_STANDARD) {
 1256                 out = cliFormatReplyTTY(reply,"");
 1257             } else if (config.output == OUTPUT_CSV) {
 1258                 out = cliFormatReplyCSV(reply);
 1259                 out = sdscat(out,"\n");
 1260             }
 1261         }
 1262         fwrite(out,sdslen(out),1,stdout);
 1263         sdsfree(out);
 1264     }
 1265     freeReplyObject(reply);
 1266     return REDIS_OK;
 1267 }
 1268 
 1269 static int cliSendCommand(int argc, char **argv, long repeat) {
 1270     char *command = argv[0];
 1271     size_t *argvlen;
 1272     int j, output_raw;
 1273 
 1274     if (!config.eval_ldb && /* In debugging mode, let's pass "help" to Redis. */
 1275         (!strcasecmp(command,"help") || !strcasecmp(command,"?"))) {
 1276         cliOutputHelp(--argc, ++argv);
 1277         return REDIS_OK;
 1278     }
 1279 
 1280     if (context == NULL) return REDIS_ERR;
 1281 
 1282     output_raw = 0;
 1283     if (!strcasecmp(command,"info") ||
 1284         !strcasecmp(command,"lolwut") ||
 1285         (argc >= 2 && !strcasecmp(command,"debug") &&
 1286                        !strcasecmp(argv[1],"htstats")) ||
 1287         (argc >= 2 && !strcasecmp(command,"debug") &&
 1288                        !strcasecmp(argv[1],"htstats-key")) ||
 1289         (argc >= 2 && !strcasecmp(command,"memory") &&
 1290                       (!strcasecmp(argv[1],"malloc-stats") ||
 1291                        !strcasecmp(argv[1],"doctor"))) ||
 1292         (argc == 2 && !strcasecmp(command,"cluster") &&
 1293                       (!strcasecmp(argv[1],"nodes") ||
 1294                        !strcasecmp(argv[1],"info"))) ||
 1295         (argc >= 2 && !strcasecmp(command,"client") &&
 1296                        !strcasecmp(argv[1],"list")) ||
 1297         (argc == 3 && !strcasecmp(command,"latency") &&
 1298                        !strcasecmp(argv[1],"graph")) ||
 1299         (argc == 2 && !strcasecmp(command,"latency") &&
 1300                        !strcasecmp(argv[1],"doctor")) ||
 1301         /* Format PROXY INFO command for Redis Cluster Proxy:
 1302          * https://github.com/artix75/redis-cluster-proxy */
 1303         (argc >= 2 && !strcasecmp(command,"proxy") &&
 1304                        !strcasecmp(argv[1],"info")))
 1305     {
 1306         output_raw = 1;
 1307     }
 1308 
 1309     if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
 1310     if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
 1311     if (!strcasecmp(command,"subscribe") ||
 1312         !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
 1313     if (!strcasecmp(command,"sync") ||
 1314         !strcasecmp(command,"psync")) config.slave_mode = 1;
 1315 
 1316     /* When the user manually calls SCRIPT DEBUG, setup the activation of
 1317      * debugging mode on the next eval if needed. */
 1318     if (argc == 3 && !strcasecmp(argv[0],"script") &&
 1319                      !strcasecmp(argv[1],"debug"))
 1320     {
 1321         if (!strcasecmp(argv[2],"yes") || !strcasecmp(argv[2],"sync")) {
 1322             config.enable_ldb_on_eval = 1;
 1323         } else {
 1324             config.enable_ldb_on_eval = 0;
 1325         }
 1326     }
 1327 
 1328     /* Actually activate LDB on EVAL if needed. */
 1329     if (!strcasecmp(command,"eval") && config.enable_ldb_on_eval) {
 1330         config.eval_ldb = 1;
 1331         config.output = OUTPUT_RAW;
 1332     }
 1333 
 1334     /* Setup argument length */
 1335     argvlen = zmalloc(argc*sizeof(size_t));
 1336     for (j = 0; j < argc; j++)
 1337         argvlen[j] = sdslen(argv[j]);
 1338 
 1339     /* Negative repeat is allowed and causes infinite loop,
 1340        works well with the interval option. */
 1341     while(repeat < 0 || repeat-- > 0) {
 1342         redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
 1343         while (config.monitor_mode) {
 1344             if (cliReadReply(output_raw) != REDIS_OK) exit(1);
 1345             fflush(stdout);
 1346         }
 1347 
 1348         if (config.pubsub_mode) {
 1349             if (config.output != OUTPUT_RAW)
 1350                 printf("Reading messages... (press Ctrl-C to quit)\n");
 1351             while (1) {
 1352                 if (cliReadReply(output_raw) != REDIS_OK) exit(1);
 1353             }
 1354         }
 1355 
 1356         if (config.slave_mode) {
 1357             printf("Entering replica output mode...  (press Ctrl-C to quit)\n");
 1358             slaveMode();
 1359             config.slave_mode = 0;
 1360             zfree(argvlen);
 1361             return REDIS_ERR;  /* Error = slaveMode lost connection to master */
 1362         }
 1363 
 1364         if (cliReadReply(output_raw) != REDIS_OK) {
 1365             zfree(argvlen);
 1366             return REDIS_ERR;
 1367         } else {
 1368             /* Store database number when SELECT was successfully executed. */
 1369             if (!strcasecmp(command,"select") && argc == 2 && config.last_cmd_type != REDIS_REPLY_ERROR) {
 1370                 config.dbnum = atoi(argv[1]);
 1371                 cliRefreshPrompt();
 1372             } else if (!strcasecmp(command,"auth") && (argc == 2 || argc == 3))
 1373             {
 1374                 cliSelect();
 1375             }
 1376         }
 1377         if (config.cluster_reissue_command){
 1378             /* If we need to reissue the command, break to prevent a
 1379                further 'repeat' number of dud interations */
 1380             break;
 1381         }
 1382         if (config.interval) usleep(config.interval);
 1383         fflush(stdout); /* Make it grep friendly */
 1384     }
 1385 
 1386     zfree(argvlen);
 1387     return REDIS_OK;
 1388 }
 1389 
 1390 /* Send a command reconnecting the link if needed. */
 1391 static redisReply *reconnectingRedisCommand(redisContext *c, const char *fmt, ...) {
 1392     redisReply *reply = NULL;
 1393     int tries = 0;
 1394     va_list ap;
 1395 
 1396     assert(!c->err);
 1397     while(reply == NULL) {
 1398         while (c->err & (REDIS_ERR_IO | REDIS_ERR_EOF)) {
 1399             printf("\r\x1b[0K"); /* Cursor to left edge + clear line. */
 1400             printf("Reconnecting... %d\r", ++tries);
 1401             fflush(stdout);
 1402 
 1403             redisFree(c);
 1404             c = redisConnect(config.hostip,config.hostport);
 1405             if (!c->err && config.tls) {
 1406                 const char *err = NULL;
 1407                 if (cliSecureConnection(c, &err) == REDIS_ERR && err) {
 1408                     fprintf(stderr, "TLS Error: %s\n", err);
 1409                     exit(1);
 1410                 }
 1411             }
 1412             usleep(1000000);
 1413         }
 1414 
 1415         va_start(ap,fmt);
 1416         reply = redisvCommand(c,fmt,ap);
 1417         va_end(ap);
 1418 
 1419         if (c->err && !(c->err & (REDIS_ERR_IO | REDIS_ERR_EOF))) {
 1420             fprintf(stderr, "Error: %s\n", c->errstr);
 1421             exit(1);
 1422         } else if (tries > 0) {
 1423             printf("\r\x1b[0K"); /* Cursor to left edge + clear line. */
 1424         }
 1425     }
 1426 
 1427     context = c;
 1428     return reply;
 1429 }
 1430 
 1431 /*------------------------------------------------------------------------------
 1432  * User interface
 1433  *--------------------------------------------------------------------------- */
 1434 
 1435 static int parseOptions(int argc, char **argv) {
 1436     int i;
 1437 
 1438     for (i = 1; i < argc; i++) {
 1439         int lastarg = i==argc-1;
 1440 
 1441         if (!strcmp(argv[i],"-h") && !lastarg) {
 1442             sdsfree(config.hostip);
 1443             config.hostip = sdsnew(argv[++i]);
 1444         } else if (!strcmp(argv[i],"-h") && lastarg) {
 1445             usage();
 1446         } else if (!strcmp(argv[i],"--help")) {
 1447             usage();
 1448         } else if (!strcmp(argv[i],"-x")) {
 1449             config.stdinarg = 1;
 1450         } else if (!strcmp(argv[i],"-p") && !lastarg) {
 1451             config.hostport = atoi(argv[++i]);
 1452         } else if (!strcmp(argv[i],"-s") && !lastarg) {
 1453             config.hostsocket = argv[++i];
 1454         } else if (!strcmp(argv[i],"-r") && !lastarg) {
 1455             config.repeat = strtoll(argv[++i],NULL,10);
 1456         } else if (!strcmp(argv[i],"-i") && !lastarg) {
 1457             double seconds = atof(argv[++i]);
 1458             config.interval = seconds*1000000;
 1459         } else if (!strcmp(argv[i],"-n") && !lastarg) {
 1460             config.dbnum = atoi(argv[++i]);
 1461         } else if (!strcmp(argv[i], "--no-auth-warning")) {
 1462             config.no_auth_warning = 1;
 1463         } else if (!strcmp(argv[i], "--askpass")) {
 1464             config.askpass = 1;
 1465         } else if ((!strcmp(argv[i],"-a") || !strcmp(argv[i],"--pass"))
 1466                    && !lastarg)
 1467         {
 1468             config.auth = argv[++i];
 1469         } else if (!strcmp(argv[i],"--user") && !lastarg) {
 1470             config.user = argv[++i];
 1471         } else if (!strcmp(argv[i],"-u") && !lastarg) {
 1472             parseRedisUri(argv[++i]);
 1473         } else if (!strcmp(argv[i],"--raw")) {
 1474             config.output = OUTPUT_RAW;
 1475         } else if (!strcmp(argv[i],"--no-raw")) {
 1476             config.output = OUTPUT_STANDARD;
 1477         } else if (!strcmp(argv[i],"--csv")) {
 1478             config.output = OUTPUT_CSV;
 1479         } else if (!strcmp(argv[i],"--latency")) {
 1480             config.latency_mode = 1;
 1481         } else if (!strcmp(argv[i],"--latency-dist")) {
 1482             config.latency_dist_mode = 1;
 1483         } else if (!strcmp(argv[i],"--mono")) {
 1484             spectrum_palette = spectrum_palette_mono;
 1485             spectrum_palette_size = spectrum_palette_mono_size;
 1486         } else if (!strcmp(argv[i],"--latency-history")) {
 1487             config.latency_mode = 1;
 1488             config.latency_history = 1;
 1489         } else if (!strcmp(argv[i],"--lru-test") && !lastarg) {
 1490             config.lru_test_mode = 1;
 1491             config.lru_test_sample_size = strtoll(argv[++i],NULL,10);
 1492         } else if (!strcmp(argv[i],"--slave")) {
 1493             config.slave_mode = 1;
 1494         } else if (!strcmp(argv[i],"--replica")) {
 1495             config.slave_mode = 1;
 1496         } else if (!strcmp(argv[i],"--stat")) {
 1497             config.stat_mode = 1;
 1498         } else if (!strcmp(argv[i],"--scan")) {
 1499             config.scan_mode = 1;
 1500         } else if (!strcmp(argv[i],"--pattern") && !lastarg) {
 1501             config.pattern = argv[++i];
 1502         } else if (!strcmp(argv[i],"--intrinsic-latency") && !lastarg) {
 1503             config.intrinsic_latency_mode = 1;
 1504             config.intrinsic_latency_duration = atoi(argv[++i]);
 1505         } else if (!strcmp(argv[i],"--rdb") && !lastarg) {
 1506             config.getrdb_mode = 1;
 1507             config.rdb_filename = argv[++i];
 1508         } else if (!strcmp(argv[i],"--pipe")) {
 1509             config.pipe_mode = 1;
 1510         } else if (!strcmp(argv[i],"--pipe-timeout") && !lastarg) {
 1511             config.pipe_timeout = atoi(argv[++i]);
 1512         } else if (!strcmp(argv[i],"--bigkeys")) {
 1513             config.bigkeys = 1;
 1514         } else if (!strcmp(argv[i],"--memkeys")) {
 1515             config.memkeys = 1;
 1516             config.memkeys_samples = 0; /* use redis default */
 1517         } else if (!strcmp(argv[i],"--memkeys-samples")) {
 1518             config.memkeys = 1;
 1519             config.memkeys_samples = atoi(argv[++i]);
 1520         } else if (!strcmp(argv[i],"--hotkeys")) {
 1521             config.hotkeys = 1;
 1522         } else if (!strcmp(argv[i],"--eval") && !lastarg) {
 1523             config.eval = argv[++i];
 1524         } else if (!strcmp(argv[i],"--ldb")) {
 1525             config.eval_ldb = 1;
 1526             config.output = OUTPUT_RAW;
 1527         } else if (!strcmp(argv[i],"--ldb-sync-mode")) {
 1528             config.eval_ldb = 1;
 1529             config.eval_ldb_sync = 1;
 1530             config.output = OUTPUT_RAW;
 1531         } else if (!strcmp(argv[i],"-c")) {
 1532             config.cluster_mode = 1;
 1533         } else if (!strcmp(argv[i],"-d") && !lastarg) {
 1534             sdsfree(config.mb_delim);
 1535             config.mb_delim = sdsnew(argv[++i]);
 1536         } else if (!strcmp(argv[i],"--verbose")) {
 1537             config.verbose = 1;
 1538         } else if (!strcmp(argv[i],"--cluster") && !lastarg) {
 1539             if (CLUSTER_MANAGER_MODE()) usage();
 1540             char *cmd = argv[++i];
 1541             int j = i;
 1542             while (j < argc && argv[j][0] != '-') j++;
 1543             if (j > i) j--;
 1544             createClusterManagerCommand(cmd, j - i, argv + i + 1);
 1545             i = j;
 1546         } else if (!strcmp(argv[i],"--cluster") && lastarg) {
 1547             usage();
 1548         } else if ((!strcmp(argv[i],"--cluster-only-masters"))) {
 1549             config.cluster_manager_command.flags |=
 1550                     CLUSTER_MANAGER_CMD_FLAG_MASTERS_ONLY;
 1551         } else if ((!strcmp(argv[i],"--cluster-only-replicas"))) {
 1552             config.cluster_manager_command.flags |=
 1553                     CLUSTER_MANAGER_CMD_FLAG_SLAVES_ONLY;
 1554         } else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) {
 1555             config.cluster_manager_command.replicas = atoi(argv[++i]);
 1556         } else if (!strcmp(argv[i],"--cluster-master-id") && !lastarg) {
 1557             config.cluster_manager_command.master_id = argv[++i];
 1558         } else if (!strcmp(argv[i],"--cluster-from") && !lastarg) {
 1559             config.cluster_manager_command.from = argv[++i];
 1560         } else if (!strcmp(argv[i],"--cluster-to") && !lastarg) {
 1561             config.cluster_manager_command.to = argv[++i];
 1562         } else if (!strcmp(argv[i],"--cluster-weight") && !lastarg) {
 1563             if (config.cluster_manager_command.weight != NULL) {
 1564                 fprintf(stderr, "WARNING: you cannot use --cluster-weight "
 1565                                 "more than once.\n"
 1566                                 "You can set more weights by adding them "
 1567                                 "as a space-separated list, ie:\n"
 1568                                 "--cluster-weight n1=w n2=w\n");
 1569                 exit(1);
 1570             }
 1571             int widx = i + 1;
 1572             char **weight = argv + widx;
 1573             int wargc = 0;
 1574             for (; widx < argc; widx++) {
 1575                 if (strstr(argv[widx], "--") == argv[widx]) break;
 1576                 if (strchr(argv[widx], '=') == NULL) break;
 1577                 wargc++;
 1578             }
 1579             if (wargc > 0) {
 1580                 config.cluster_manager_command.weight = weight;
 1581                 config.cluster_manager_command.weight_argc = wargc;
 1582                 i += wargc;
 1583             }
 1584         } else if (!strcmp(argv[i],"--cluster-slots") && !lastarg) {
 1585             config.cluster_manager_command.slots = atoi(argv[++i]);
 1586         } else if (!strcmp(argv[i],"--cluster-timeout") && !lastarg) {
 1587             config.cluster_manager_command.timeout = atoi(argv[++i]);
 1588         } else if (!strcmp(argv[i],"--cluster-pipeline") && !lastarg) {
 1589             config.cluster_manager_command.pipeline = atoi(argv[++i]);
 1590         } else if (!strcmp(argv[i],"--cluster-threshold") && !lastarg) {
 1591             config.cluster_manager_command.threshold = atof(argv[++i]);
 1592         } else if (!strcmp(argv[i],"--cluster-yes")) {
 1593             config.cluster_manager_command.flags |=
 1594                 CLUSTER_MANAGER_CMD_FLAG_YES;
 1595         } else if (!strcmp(argv[i],"--cluster-simulate")) {
 1596             config.cluster_manager_command.flags |=
 1597                 CLUSTER_MANAGER_CMD_FLAG_SIMULATE;
 1598         } else if (!strcmp(argv[i],"--cluster-replace")) {
 1599             config.cluster_manager_command.flags |=
 1600                 CLUSTER_MANAGER_CMD_FLAG_REPLACE;
 1601         } else if (!strcmp(argv[i],"--cluster-copy")) {
 1602             config.cluster_manager_command.flags |=
 1603                 CLUSTER_MANAGER_CMD_FLAG_COPY;
 1604         } else if (!strcmp(argv[i],"--cluster-slave")) {
 1605             config.cluster_manager_command.flags |=
 1606                 CLUSTER_MANAGER_CMD_FLAG_SLAVE;
 1607         } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
 1608             config.cluster_manager_command.flags |=
 1609                 CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
 1610         } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) {
 1611             config.cluster_manager_command.flags |=
 1612                 CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
 1613         } else if (!strcmp(argv[i],"--cluster-fix-with-unreachable-masters")) {
 1614             config.cluster_manager_command.flags |=
 1615                 CLUSTER_MANAGER_CMD_FLAG_FIX_WITH_UNREACHABLE_MASTERS;
 1616 #ifdef USE_OPENSSL
 1617         } else if (!strcmp(argv[i],"--tls")) {
 1618             config.tls = 1;
 1619         } else if (!strcmp(argv[i],"--sni") && !lastarg) {
 1620             config.sni = argv[++i];
 1621         } else if (!strcmp(argv[i],"--cacertdir") && !lastarg) {
 1622             config.cacertdir = argv[++i];
 1623         } else if (!strcmp(argv[i],"--cacert") && !lastarg) {
 1624             config.cacert = argv[++i];
 1625         } else if (!strcmp(argv[i],"--cert") && !lastarg) {
 1626             config.cert = argv[++i];
 1627         } else if (!strcmp(argv[i],"--key") && !lastarg) {
 1628             config.key = argv[++i];
 1629 #endif
 1630         } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
 1631             sds version = cliVersion();
 1632             printf("redis-cli %s\n", version);
 1633             sdsfree(version);
 1634             exit(0);
 1635         } else if (!strcmp(argv[i],"-3")) {
 1636             config.resp3 = 1;
 1637         } else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') {
 1638             if (config.cluster_manager_command.argc == 0) {
 1639                 int j = i + 1;
 1640                 while (j < argc && argv[j][0] != '-') j++;
 1641                 int cmd_argc = j - i;
 1642                 config.cluster_manager_command.argc = cmd_argc;
 1643                 config.cluster_manager_command.argv = argv + i;
 1644                 if (cmd_argc > 1) i = j - 1;
 1645             }
 1646         } else {
 1647             if (argv[i][0] == '-') {
 1648                 fprintf(stderr,
 1649                     "Unrecognized option or bad number of args for: '%s'\n",
 1650                     argv[i]);
 1651                 exit(1);
 1652             } else {
 1653                 /* Likely the command name, stop here. */
 1654                 break;
 1655             }
 1656         }
 1657     }
 1658 
 1659     /* --ldb requires --eval. */
 1660     if (config.eval_ldb && config.eval == NULL) {
 1661         fprintf(stderr,"Options --ldb and --ldb-sync-mode require --eval.\n");
 1662         fprintf(stderr,"Try %s --help for more information.\n", argv[0]);
 1663         exit(1);
 1664     }
 1665 
 1666     if (!config.no_auth_warning && config.auth != NULL) {
 1667         fputs("Warning: Using a password with '-a' or '-u' option on the command"
 1668               " line interface may not be safe.\n", stderr);
 1669     }
 1670 
 1671     return i;
 1672 }
 1673 
 1674 static void parseEnv() {
 1675     /* Set auth from env, but do not overwrite CLI arguments if passed */
 1676     char *auth = getenv(REDIS_CLI_AUTH_ENV);
 1677     if (auth != NULL && config.auth == NULL) {
 1678         config.auth = auth;
 1679     }
 1680 
 1681     char *cluster_yes = getenv(REDIS_CLI_CLUSTER_YES_ENV);
 1682     if (cluster_yes != NULL && !strcmp(cluster_yes, "1")) {
 1683         config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_YES;
 1684     }
 1685 }
 1686 
 1687 static sds readArgFromStdin(void) {
 1688     char buf[1024];
 1689     sds arg = sdsempty();
 1690 
 1691     while(1) {
 1692         int nread = read(fileno(stdin),buf,1024);
 1693 
 1694         if (nread == 0) break;
 1695         else if (nread == -1) {
 1696             perror("Reading from standard input");
 1697             exit(1);
 1698         }
 1699         arg = sdscatlen(arg,buf,nread);
 1700     }
 1701     return arg;
 1702 }
 1703 
 1704 static void usage(void) {
 1705     sds version = cliVersion();
 1706     fprintf(stderr,
 1707 "redis-cli %s\n"
 1708 "\n"
 1709 "Usage: redis-cli [OPTIONS] [cmd [arg [arg ...]]]\n"
 1710 "  -h <hostname>      Server hostname (default: 127.0.0.1).\n"
 1711 "  -p <port>          Server port (default: 6379).\n"
 1712 "  -s <socket>        Server socket (overrides hostname and port).\n"
 1713 "  -a <password>      Password to use when connecting to the server.\n"
 1714 "                     You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
 1715 "                     variable to pass this password more safely\n"
 1716 "                     (if both are used, this argument takes predecence).\n"
 1717 "  --user <username>  Used to send ACL style 'AUTH username pass'. Needs -a.\n"
 1718 "  --pass <password>  Alias of -a for consistency with the new --user option.\n"
 1719 "  --askpass          Force user to input password with mask from STDIN.\n"
 1720 "                     If this argument is used, '-a' and " REDIS_CLI_AUTH_ENV "\n"
 1721 "                     environment variable will be ignored.\n"
 1722 "  -u <uri>           Server URI.\n"
 1723 "  -r <repeat>        Execute specified command N times.\n"
 1724 "  -i <interval>      When -r is used, waits <interval> seconds per command.\n"
 1725 "                     It is possible to specify sub-second times like -i 0.1.\n"
 1726 "  -n <db>            Database number.\n"
 1727 "  -3                 Start session in RESP3 protocol mode.\n"
 1728 "  -x                 Read last argument from STDIN.\n"
 1729 "  -d <delimiter>     Multi-bulk delimiter in for raw formatting (default: \\n).\n"
 1730 "  -c                 Enable cluster mode (follow -ASK and -MOVED redirections).\n"
 1731 #ifdef USE_OPENSSL
 1732 "  --tls              Establish a secure TLS connection.\n"
 1733 "  --sni <host>       Server name indication for TLS.\n"
 1734 "  --cacert <file>    CA Certificate file to verify with.\n"
 1735 "  --cacertdir <dir>  Directory where trusted CA certificates are stored.\n"
 1736 "                     If neither cacert nor cacertdir are specified, the default\n"
 1737 "                     system-wide trusted root certs configuration will apply.\n"
 1738 "  --cert <file>      Client certificate to authenticate with.\n"
 1739 "  --key <file>       Private key file to authenticate with.\n"
 1740 #endif
 1741 "  --raw              Use raw formatting for replies (default when STDOUT is\n"
 1742 "                     not a tty).\n"
 1743 "  --no-raw           Force formatted output even when STDOUT is not a tty.\n"
 1744 "  --csv              Output in CSV format.\n"
 1745 "  --stat             Print rolling stats about server: mem, clients, ...\n"
 1746 "  --latency          Enter a special mode continuously sampling latency.\n"
 1747 "                     If you use this mode in an interactive session it runs\n"
 1748 "                     forever displaying real-time stats. Otherwise if --raw or\n"
 1749 "                     --csv is specified, or if you redirect the output to a non\n"
 1750 "                     TTY, it samples the latency for 1 second (you can use\n"
 1751 "                     -i to change the interval), then produces a single output\n"
 1752 "                     and exits.\n",version);
 1753 
 1754     fprintf(stderr,
 1755 "  --latency-history  Like --latency but tracking latency changes over time.\n"
 1756 "                     Default time interval is 15 sec. Change it using -i.\n"
 1757 "  --latency-dist     Shows latency as a spectrum, requires xterm 256 colors.\n"
 1758 "                     Default time interval is 1 sec. Change it using -i.\n"
 1759 "  --lru-test <keys>  Simulate a cache workload with an 80-20 distribution.\n"
 1760 "  --replica          Simulate a replica showing commands received from the master.\n"
 1761 "  --rdb <filename>   Transfer an RDB dump from remote server to local file.\n"
 1762 "  --pipe             Transfer raw Redis protocol from stdin to server.\n"
 1763 "  --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n"
 1764 "                     no reply is received within <n> seconds.\n"
 1765 "                     Default timeout: %d. Use 0 to wait forever.\n",
 1766     REDIS_CLI_DEFAULT_PIPE_TIMEOUT);
 1767     fprintf(stderr,
 1768 "  --bigkeys          Sample Redis keys looking for keys with many elements (complexity).\n"
 1769 "  --memkeys          Sample Redis keys looking for keys consuming a lot of memory.\n"
 1770 "  --memkeys-samples <n> Sample Redis keys looking for keys consuming a lot of memory.\n"
 1771 "                     And define number of key elements to sample\n"
 1772 "  --hotkeys          Sample Redis keys looking for hot keys.\n"
 1773 "                     only works when maxmemory-policy is *lfu.\n"
 1774 "  --scan             List all keys using the SCAN command.\n"
 1775 "  --pattern <pat>    Keys pattern when using the --scan, --bigkeys or --hotkeys\n"
 1776 "                     options (default: *).\n"
 1777 "  --intrinsic-latency <sec> Run a test to measure intrinsic system latency.\n"
 1778 "                     The test will run for the specified amount of seconds.\n"
 1779 "  --eval <file>      Send an EVAL command using the Lua script at <file>.\n"
 1780 "  --ldb              Used with --eval enable the Redis Lua debugger.\n"
 1781 "  --ldb-sync-mode    Like --ldb but uses the synchronous Lua debugger, in\n"
 1782 "                     this mode the server is blocked and script changes are\n"
 1783 "                     not rolled back from the server memory.\n"
 1784 "  --cluster <command> [args...] [opts...]\n"
 1785 "                     Cluster Manager command and arguments (see below).\n"
 1786 "  --verbose          Verbose mode.\n"
 1787 "  --no-auth-warning  Don't show warning message when using password on command\n"
 1788 "                     line interface.\n"
 1789 "  --help             Output this help and exit.\n"
 1790 "  --version          Output version and exit.\n"
 1791 "\n");
 1792     /* Using another fprintf call to avoid -Woverlength-strings compile warning */
 1793     fprintf(stderr,
 1794 "Cluster Manager Commands:\n"
 1795 "  Use --cluster help to list all available cluster manager commands.\n"
 1796 "\n"
 1797 "Examples:\n"
 1798 "  cat /etc/passwd | redis-cli -x set mypasswd\n"
 1799 "  redis-cli get mypasswd\n"
 1800 "  redis-cli -r 100 lpush mylist x\n"
 1801 "  redis-cli -r 100 -i 1 info | grep used_memory_human:\n"
 1802 "  redis-cli --eval myscript.lua key1 key2 , arg1 arg2 arg3\n"
 1803 "  redis-cli --scan --pattern '*:12345*'\n"
 1804 "\n"
 1805 "  (Note: when using --eval the comma separates KEYS[] from ARGV[] items)\n"
 1806 "\n"
 1807 "When no command is given, redis-cli starts in interactive mode.\n"
 1808 "Type \"help\" in interactive mode for information on available commands\n"
 1809 "and settings.\n"
 1810 "\n");
 1811     sdsfree(version);
 1812     exit(1);
 1813 }
 1814 
 1815 static int confirmWithYes(char *msg, int ignore_force) {
 1816     /* if --cluster-yes option is set and ignore_force is false,
 1817      * do not prompt for an answer */
 1818     if (!ignore_force &&
 1819         (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_YES)) {
 1820         return 1;
 1821     }
 1822 
 1823     printf("%s (type 'yes' to accept): ", msg);
 1824     fflush(stdout);
 1825     char buf[4];
 1826     int nread = read(fileno(stdin),buf,4);
 1827     buf[3] = '\0';
 1828     return (nread != 0 && !strcmp("yes", buf));
 1829 }
 1830 
 1831 /* Turn the plain C strings into Sds strings */
 1832 static char **convertToSds(int count, char** args) {
 1833   int j;
 1834   char **sds = zmalloc(sizeof(char*)*count);
 1835 
 1836   for(j = 0; j < count; j++)
 1837     sds[j] = sdsnew(args[j]);
 1838 
 1839   return sds;
 1840 }
 1841 
 1842 static int issueCommandRepeat(int argc, char **argv, long repeat) {
 1843     while (1) {
 1844         config.cluster_reissue_command = 0;
 1845         if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
 1846             cliConnect(CC_FORCE);
 1847 
 1848             /* If we still cannot send the command print error.
 1849              * We'll try to reconnect the next time. */
 1850             if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
 1851                 cliPrintContextError();
 1852                 return REDIS_ERR;
 1853             }
 1854         }
 1855         /* Issue the command again if we got redirected in cluster mode */
 1856         if (config.cluster_mode && config.cluster_reissue_command) {
 1857             cliConnect(CC_FORCE);
 1858         } else {
 1859             break;
 1860         }
 1861     }
 1862     return REDIS_OK;
 1863 }
 1864 
 1865 static int issueCommand(int argc, char **argv) {
 1866     return issueCommandRepeat(argc, argv, config.repeat);
 1867 }
 1868 
 1869 /* Split the user provided command into multiple SDS arguments.
 1870  * This function normally uses sdssplitargs() from sds.c which is able
 1871  * to understand "quoted strings", escapes and so forth. However when
 1872  * we are in Lua debugging mode and the "eval" command is used, we want
 1873  * the remaining Lua script (after "e " or "eval ") to be passed verbatim
 1874  * as a single big argument. */
 1875 static sds *cliSplitArgs(char *line, int *argc) {
 1876     if (config.eval_ldb && (strstr(line,"eval ") == line ||
 1877                             strstr(line,"e ") == line))
 1878     {
 1879         sds *argv = sds_malloc(sizeof(sds)*2);
 1880         *argc = 2;
 1881         int len = strlen(line);
 1882         int elen = line[1] == ' ' ? 2 : 5; /* "e " or "eval "? */
 1883         argv[0] = sdsnewlen(line,elen-1);
 1884         argv[1] = sdsnewlen(line+elen,len-elen);
 1885         return argv;
 1886     } else {
 1887         return sdssplitargs(line,argc);
 1888     }
 1889 }
 1890 
 1891 /* Set the CLI preferences. This function is invoked when an interactive
 1892  * ":command" is called, or when reading ~/.redisclirc file, in order to
 1893  * set user preferences. */
 1894 void cliSetPreferences(char **argv, int argc, int interactive) {
 1895     if (!strcasecmp(argv[0],":set") && argc >= 2) {
 1896         if (!strcasecmp(argv[1],"hints")) pref.hints = 1;
 1897         else if (!strcasecmp(argv[1],"nohints")) pref.hints = 0;
 1898         else {
 1899             printf("%sunknown redis-cli preference '%s'\n",
 1900                 interactive ? "" : ".redisclirc: ",
 1901                 argv[1]);
 1902         }
 1903     } else {
 1904         printf("%sunknown redis-cli internal command '%s'\n",
 1905             interactive ? "" : ".redisclirc: ",
 1906             argv[0]);
 1907     }
 1908 }
 1909 
 1910 /* Load the ~/.redisclirc file if any. */
 1911 void cliLoadPreferences(void) {
 1912     sds rcfile = getDotfilePath(REDIS_CLI_RCFILE_ENV,REDIS_CLI_RCFILE_DEFAULT);
 1913     if (rcfile == NULL) return;
 1914     FILE *fp = fopen(rcfile,"r");
 1915     char buf[1024];
 1916 
 1917     if (fp) {
 1918         while(fgets(buf,sizeof(buf),fp) != NULL) {
 1919             sds *argv;
 1920             int argc;
 1921 
 1922             argv = sdssplitargs(buf,&argc);
 1923             if (argc > 0) cliSetPreferences(argv,argc,0);
 1924             sdsfreesplitres(argv,argc);
 1925         }
 1926         fclose(fp);
 1927     }
 1928     sdsfree(rcfile);
 1929 }
 1930 
 1931 static void repl(void) {
 1932     sds historyfile = NULL;
 1933     int history = 0;
 1934     char *line;
 1935     int argc;
 1936     sds *argv;
 1937 
 1938     /* Initialize the help and, if possible, use the COMMAND command in order
 1939      * to retrieve missing entries. */
 1940     cliInitHelp();
 1941     cliIntegrateHelp();
 1942 
 1943     config.interactive = 1;
 1944     linenoiseSetMultiLine(1);
 1945     linenoiseSetCompletionCallback(completionCallback);
 1946     linenoiseSetHintsCallback(hintsCallback);
 1947     linenoiseSetFreeHintsCallback(freeHintsCallback);
 1948 
 1949     /* Only use history and load the rc file when stdin is a tty. */
 1950     if (isatty(fileno(stdin))) {
 1951         historyfile = getDotfilePath(REDIS_CLI_HISTFILE_ENV,REDIS_CLI_HISTFILE_DEFAULT);
 1952         //keep in-memory history always regardless if history file can be determined
 1953         history = 1;
 1954         if (historyfile != NULL) {
 1955             linenoiseHistoryLoad(historyfile);
 1956         }
 1957         cliLoadPreferences();
 1958     }
 1959 
 1960     cliRefreshPrompt();
 1961     while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
 1962         if (line[0] != '\0') {
 1963             long repeat = 1;
 1964             int skipargs = 0;
 1965             char *endptr = NULL;
 1966 
 1967             argv = cliSplitArgs(line,&argc);
 1968 
 1969             /* check if we have a repeat command option and
 1970              * need to skip the first arg */
 1971             if (argv && argc > 0) {
 1972                 errno = 0;
 1973                 repeat = strtol(argv[0], &endptr, 10);
 1974                 if (argc > 1 && *endptr == '\0') {
 1975                     if (errno == ERANGE || errno == EINVAL || repeat <= 0) {
 1976                         fputs("Invalid redis-cli repeat command option value.\n", stdout);
 1977                         sdsfreesplitres(argv, argc);
 1978                         linenoiseFree(line);
 1979                         continue;
 1980                     }
 1981                     skipargs = 1;
 1982                 } else {
 1983                     repeat = 1;
 1984                 }
 1985             }
 1986 
 1987             /* Won't save auth or acl setuser commands in history file */
 1988             int dangerous = 0;
 1989             if (argv && argc > 0) {
 1990                 if (!strcasecmp(argv[skipargs], "auth")) {
 1991                     dangerous = 1;
 1992                 } else if (skipargs+1 < argc &&
 1993                            !strcasecmp(argv[skipargs], "acl") &&
 1994                            !strcasecmp(argv[skipargs+1], "setuser"))
 1995                 {
 1996                     dangerous = 1;
 1997                 }
 1998             }
 1999 
 2000             if (!dangerous) {
 2001                 if (history) linenoiseHistoryAdd(line);
 2002                 if (historyfile) linenoiseHistorySave(historyfile);
 2003             }
 2004 
 2005             if (argv == NULL) {
 2006                 printf("Invalid argument(s)\n");
 2007                 fflush(stdout);
 2008                 linenoiseFree(line);
 2009                 continue;
 2010             } else if (argc > 0) {
 2011                 if (strcasecmp(argv[0],"quit") == 0 ||
 2012                     strcasecmp(argv[0],"exit") == 0)
 2013                 {
 2014                     exit(0);
 2015                 } else if (argv[0][0] == ':') {
 2016                     cliSetPreferences(argv,argc,1);
 2017                     sdsfreesplitres(argv,argc);
 2018                     linenoiseFree(line);
 2019                     continue;
 2020                 } else if (strcasecmp(argv[0],"restart") == 0) {
 2021                     if (config.eval) {
 2022                         config.eval_ldb = 1;
 2023                         config.output = OUTPUT_RAW;
 2024                         sdsfreesplitres(argv,argc);
 2025                         linenoiseFree(line);
 2026                         return; /* Return to evalMode to restart the session. */
 2027                     } else {
 2028                         printf("Use 'restart' only in Lua debugging mode.");
 2029                     }
 2030                 } else if (argc == 3 && !strcasecmp(argv[0],"connect")) {
 2031                     sdsfree(config.hostip);
 2032                     config.hostip = sdsnew(argv[1]);
 2033                     config.hostport = atoi(argv[2]);
 2034                     cliRefreshPrompt();
 2035                     cliConnect(CC_FORCE);
 2036                 } else if (argc == 1 && !strcasecmp(argv[0],"clear")) {
 2037                     linenoiseClearScreen();
 2038                 } else {
 2039                     long long start_time = mstime(), elapsed;
 2040 
 2041                     issueCommandRepeat(argc-skipargs, argv+skipargs, repeat);
 2042 
 2043                     /* If our debugging session ended, show the EVAL final
 2044                      * reply. */
 2045                     if (config.eval_ldb_end) {
 2046                         config.eval_ldb_end = 0;
 2047                         cliReadReply(0);
 2048                         printf("\n(Lua debugging session ended%s)\n\n",
 2049                             config.eval_ldb_sync ? "" :
 2050                             " -- dataset changes rolled back");
 2051                     }
 2052 
 2053                     elapsed = mstime()-start_time;
 2054                     if (elapsed >= 500 &&
 2055                         config.output == OUTPUT_STANDARD)
 2056                     {
 2057                         printf("(%.2fs)\n",(double)elapsed/1000);
 2058                     }
 2059                 }
 2060             }
 2061             /* Free the argument vector */
 2062             sdsfreesplitres(argv,argc);
 2063         }
 2064         /* linenoise() returns malloc-ed lines like readline() */
 2065         linenoiseFree(line);
 2066     }
 2067     exit(0);
 2068 }
 2069 
 2070 static int noninteractive(int argc, char **argv) {
 2071     int retval = 0;
 2072     if (config.stdinarg) {
 2073         argv = zrealloc(argv, (argc+1)*sizeof(char*));
 2074         argv[argc] = readArgFromStdin();
 2075         retval = issueCommand(argc+1, argv);
 2076     } else {
 2077         retval = issueCommand(argc, argv);
 2078     }
 2079     return retval;
 2080 }
 2081 
 2082 /*------------------------------------------------------------------------------
 2083  * Eval mode
 2084  *--------------------------------------------------------------------------- */
 2085 
 2086 static int evalMode(int argc, char **argv) {
 2087     sds script = NULL;
 2088     FILE *fp;
 2089     char buf[1024];
 2090     size_t nread;
 2091     char **argv2;
 2092     int j, got_comma, keys;
 2093     int retval = REDIS_OK;
 2094 
 2095     while(1) {
 2096         if (config.eval_ldb) {
 2097             printf(
 2098             "Lua debugging session started, please use:\n"
 2099             "quit    -- End the session.\n"
 2100             "restart -- Restart the script in debug mode again.\n"
 2101             "help    -- Show Lua script debugging commands.\n\n"
 2102             );
 2103         }
 2104 
 2105         sdsfree(script);
 2106         script = sdsempty();
 2107         got_comma = 0;
 2108         keys = 0;
 2109 
 2110         /* Load the script from the file, as an sds string. */
 2111         fp = fopen(config.eval,"r");
 2112         if (!fp) {
 2113             fprintf(stderr,
 2114                 "Can't open file '%s': %s\n", config.eval, strerror(errno));
 2115             exit(1);
 2116         }
 2117         while((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
 2118             script = sdscatlen(script,buf,nread);
 2119         }
 2120         fclose(fp);
 2121 
 2122         /* If we are debugging a script, enable the Lua debugger. */
 2123         if (config.eval_ldb) {
 2124             redisReply *reply = redisCommand(context,
 2125                     config.eval_ldb_sync ?
 2126                     "SCRIPT DEBUG sync": "SCRIPT DEBUG yes");
 2127             if (reply) freeReplyObject(reply);
 2128         }
 2129 
 2130         /* Create our argument vector */
 2131         argv2 = zmalloc(sizeof(sds)*(argc+3));
 2132         argv2[0] = sdsnew("EVAL");
 2133         argv2[1] = script;
 2134         for (j = 0; j < argc; j++) {
 2135             if (!got_comma && argv[j][0] == ',' && argv[j][1] == 0) {
 2136                 got_comma = 1;
 2137                 continue;
 2138             }
 2139             argv2[j+3-got_comma] = sdsnew(argv[j]);
 2140             if (!got_comma) keys++;
 2141         }
 2142         argv2[2] = sdscatprintf(sdsempty(),"%d",keys);
 2143 
 2144         /* Call it */
 2145         int eval_ldb = config.eval_ldb; /* Save it, may be reverteed. */
 2146         retval = issueCommand(argc+3-got_comma, argv2);
 2147         if (eval_ldb) {
 2148             if (!config.eval_ldb) {
 2149                 /* If the debugging session ended immediately, there was an
 2150                  * error compiling the script. Show it and they don't enter
 2151                  * the REPL at all. */
 2152                 printf("Eval debugging session can't start:\n");
 2153                 cliReadReply(0);
 2154                 break; /* Return to the caller. */
 2155             } else {
 2156                 strncpy(config.prompt,"lua debugger> ",sizeof(config.prompt));
 2157                 repl();
 2158                 /* Restart the session if repl() returned. */
 2159                 cliConnect(CC_FORCE);
 2160                 printf("\n");
 2161             }
 2162         } else {
 2163             break; /* Return to the caller. */
 2164         }
 2165     }
 2166     return retval;
 2167 }
 2168 
 2169 /*------------------------------------------------------------------------------
 2170  * Cluster Manager
 2171  *--------------------------------------------------------------------------- */
 2172 
 2173 /* The Cluster Manager global structure */
 2174 static struct clusterManager {
 2175     list *nodes;    /* List of nodes in the configuration. */
 2176     list *errors;
 2177     int unreachable_masters;    /* Masters we are not able to reach. */
 2178 } cluster_manager;
 2179 
 2180 /* Used by clusterManagerFixSlotsCoverage */
 2181 dict *clusterManagerUncoveredSlots = NULL;
 2182 
 2183 typedef struct clusterManagerNode {
 2184     redisContext *context;
 2185     sds name;
 2186     char *ip;
 2187     int port;
 2188     uint64_t current_epoch;
 2189     time_t ping_sent;
 2190     time_t ping_recv;
 2191     int flags;
 2192     list *flags_str; /* Flags string representations */
 2193     sds replicate;  /* Master ID if node is a slave */
 2194     int dirty;      /* Node has changes that can be flushed */
 2195     uint8_t slots[CLUSTER_MANAGER_SLOTS];
 2196     int slots_count;
 2197     int replicas_count;
 2198     list *friends;
 2199     sds *migrating; /* An array of sds where even strings are slots and odd
 2200                      * strings are the destination node IDs. */
 2201     sds *importing; /* An array of sds where even strings are slots and odd
 2202                      * strings are the source node IDs. */
 2203     int migrating_count; /* Length of the migrating array (migrating slots*2) */
 2204     int importing_count; /* Length of the importing array (importing slots*2) */
 2205     float weight;   /* Weight used by rebalance */
 2206     int balance;    /* Used by rebalance */
 2207 } clusterManagerNode;
 2208 
 2209 /* Data structure used to represent a sequence of cluster nodes. */
 2210 typedef struct clusterManagerNodeArray {
 2211     clusterManagerNode **nodes; /* Actual nodes array */
 2212     clusterManagerNode **alloc; /* Pointer to the allocated memory */
 2213     int len;                    /* Actual length of the array */
 2214     int count;                  /* Non-NULL nodes count */
 2215 } clusterManagerNodeArray;
 2216 
 2217 /* Used for the reshard table. */
 2218 typedef struct clusterManagerReshardTableItem {
 2219     clusterManagerNode *source;
 2220     int slot;
 2221 } clusterManagerReshardTableItem;
 2222 
 2223 /* Info about a cluster internal link. */
 2224 
 2225 typedef struct clusterManagerLink {
 2226     sds node_name;
 2227     sds node_addr;
 2228     int connected;
 2229     int handshaking;
 2230 } clusterManagerLink;
 2231 
 2232 static dictType clusterManagerDictType = {
 2233     dictSdsHash,               /* hash function */
 2234     NULL,                      /* key dup */
 2235     NULL,                      /* val dup */
 2236     dictSdsKeyCompare,         /* key compare */
 2237     NULL,                      /* key destructor */
 2238     dictSdsDestructor          /* val destructor */
 2239 };
 2240 
 2241 static dictType clusterManagerLinkDictType = {
 2242     dictSdsHash,               /* hash function */
 2243     NULL,                      /* key dup */
 2244     NULL,                      /* val dup */
 2245     dictSdsKeyCompare,         /* key compare */
 2246     dictSdsDestructor,         /* key destructor */
 2247     dictListDestructor         /* val destructor */
 2248 };
 2249 
 2250 typedef int clusterManagerCommandProc(int argc, char **argv);
 2251 typedef int (*clusterManagerOnReplyError)(redisReply *reply,
 2252     clusterManagerNode *n, int bulk_idx);
 2253 
 2254 /* Cluster Manager helper functions */
 2255 
 2256 static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
 2257 static clusterManagerNode *clusterManagerNodeByName(const char *name);
 2258 static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
 2259 static void clusterManagerNodeResetSlots(clusterManagerNode *node);
 2260 static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
 2261 static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node,
 2262                                                    char *err);
 2263 static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
 2264                                       char **err);
 2265 static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts);
 2266 static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err);
 2267 static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
 2268     int ip_count, clusterManagerNode ***offending, int *offending_len);
 2269 static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
 2270     int ip_count);
 2271 static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent);
 2272 static void clusterManagerShowNodes(void);
 2273 static void clusterManagerShowClusterInfo(void);
 2274 static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
 2275 static void clusterManagerWaitForClusterJoin(void);
 2276 static int clusterManagerCheckCluster(int quiet);
 2277 static void clusterManagerLog(int level, const char* fmt, ...);
 2278 static int clusterManagerIsConfigConsistent(void);
 2279 static dict *clusterManagerGetLinkStatus(void);
 2280 static void clusterManagerOnError(sds err);
 2281 static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
 2282                                         int len);
 2283 static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array);
 2284 static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
 2285                                          clusterManagerNode **nodeptr);
 2286 static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
 2287                                        clusterManagerNode *node);
 2288 
 2289 /* Cluster Manager commands. */
 2290 
 2291 static int clusterManagerCommandCreate(int argc, char **argv);
 2292 static int clusterManagerCommandAddNode(int argc, char **argv);
 2293 static int clusterManagerCommandDeleteNode(int argc, char **argv);
 2294 static int clusterManagerCommandInfo(int argc, char **argv);
 2295 static int clusterManagerCommandCheck(int argc, char **argv);
 2296 static int clusterManagerCommandFix(int argc, char **argv);
 2297 static int clusterManagerCommandReshard(int argc, char **argv);
 2298 static int clusterManagerCommandRebalance(int argc, char **argv);
 2299 static int clusterManagerCommandSetTimeout(int argc, char **argv);
 2300 static int clusterManagerCommandImport(int argc, char **argv);
 2301 static int clusterManagerCommandCall(int argc, char **argv);
 2302 static int clusterManagerCommandHelp(int argc, char **argv);
 2303 static int clusterManagerCommandBackup(int argc, char **argv);
 2304 
 2305 typedef struct clusterManagerCommandDef {
 2306     char *name;
 2307     clusterManagerCommandProc *proc;
 2308     int arity;
 2309     char *args;
 2310     char *options;
 2311 } clusterManagerCommandDef;
 2312 
 2313 clusterManagerCommandDef clusterManagerCommands[] = {
 2314     {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
 2315      "replicas <arg>"},
 2316     {"check", clusterManagerCommandCheck, -1, "host:port",
 2317      "search-multiple-owners"},
 2318     {"info", clusterManagerCommandInfo, -1, "host:port", NULL},
 2319     {"fix", clusterManagerCommandFix, -1, "host:port",
 2320      "search-multiple-owners,fix-with-unreachable-masters"},
 2321     {"reshard", clusterManagerCommandReshard, -1, "host:port",
 2322      "from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
 2323      "replace"},
 2324     {"rebalance", clusterManagerCommandRebalance, -1, "host:port",
 2325      "weight <node1=w1...nodeN=wN>,use-empty-masters,"
 2326      "timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
 2327     {"add-node", clusterManagerCommandAddNode, 2,
 2328      "new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
 2329     {"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
 2330     {"call", clusterManagerCommandCall, -2,
 2331         "host:port command arg arg .. arg", "only-masters,only-replicas"},
 2332     {"set-timeout", clusterManagerCommandSetTimeout, 2,
 2333      "host:port milliseconds", NULL},
 2334     {"import", clusterManagerCommandImport, 1, "host:port",
 2335      "from <arg>,copy,replace"},
 2336     {"backup", clusterManagerCommandBackup, 2,  "host:port backup_directory",
 2337      NULL},
 2338     {"help", clusterManagerCommandHelp, 0, NULL, NULL}
 2339 };
 2340 
 2341 static void getRDB(clusterManagerNode *node);
 2342 
 2343 static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
 2344     clusterManagerCommand *cmd = &config.cluster_manager_command;
 2345     cmd->name = cmdname;
 2346     cmd->argc = argc;
 2347     cmd->argv = argc ? argv : NULL;
 2348     if (isColorTerm()) cmd->flags |= CLUSTER_MANAGER_CMD_FLAG_COLOR;
 2349 }
 2350 
 2351 
 2352 static clusterManagerCommandProc *validateClusterManagerCommand(void) {
 2353     int i, commands_count = sizeof(clusterManagerCommands) /
 2354                             sizeof(clusterManagerCommandDef);
 2355     clusterManagerCommandProc *proc = NULL;
 2356     char *cmdname = config.cluster_manager_command.name;
 2357     int argc = config.cluster_manager_command.argc;
 2358     for (i = 0; i < commands_count; i++) {
 2359         clusterManagerCommandDef cmddef = clusterManagerCommands[i];
 2360         if (!strcmp(cmddef.name, cmdname)) {
 2361             if ((cmddef.arity > 0 && argc != cmddef.arity) ||
 2362                 (cmddef.arity < 0 && argc < (cmddef.arity * -1))) {
 2363                 fprintf(stderr, "[ERR] Wrong number of arguments for "
 2364                                 "specified --cluster sub command\n");
 2365                 return NULL;
 2366             }
 2367             proc = cmddef.proc;
 2368         }
 2369     }
 2370     if (!proc) fprintf(stderr, "Unknown --cluster subcommand\n");
 2371     return proc;
 2372 }
 2373 
 2374 static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
 2375                                    int *bus_port_ptr)
 2376 {
 2377     char *c = strrchr(addr, '@');
 2378     if (c != NULL) {
 2379         *c = '\0';
 2380         if (bus_port_ptr != NULL)
 2381             *bus_port_ptr = atoi(c + 1);
 2382     }
 2383     c = strrchr(addr, ':');
 2384     if (c != NULL) {
 2385         *c = '\0';
 2386         *ip_ptr = addr;
 2387         *port_ptr = atoi(++c);
 2388     } else return 0;
 2389     return 1;
 2390 }
 2391 
 2392 /* Get host ip and port from command arguments. If only one argument has
 2393  * been provided it must be in the form of 'ip:port', elsewhere
 2394  * the first argument must be the ip and the second one the port.
 2395  * If host and port can be detected, it returns 1 and it stores host and
 2396  * port into variables referenced by'ip_ptr' and 'port_ptr' pointers,
 2397  * elsewhere it returns 0. */
 2398 static int getClusterHostFromCmdArgs(int argc, char **argv,
 2399                                      char **ip_ptr, int *port_ptr) {
 2400     int port = 0;
 2401     char *ip = NULL;
 2402     if (argc == 1) {
 2403         char *addr = argv[0];
 2404         if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) return 0;
 2405     } else {
 2406         ip = argv[0];
 2407         port = atoi(argv[1]);
 2408     }
 2409     if (!ip || !port) return 0;
 2410     else {
 2411         *ip_ptr = ip;
 2412         *port_ptr = port;
 2413     }
 2414     return 1;
 2415 }
 2416 
 2417 static void freeClusterManagerNodeFlags(list *flags) {
 2418     listIter li;
 2419     listNode *ln;
 2420     listRewind(flags, &li);
 2421     while ((ln = listNext(&li)) != NULL) {
 2422         sds flag = ln->value;
 2423         sdsfree(flag);
 2424     }
 2425     listRelease(flags);
 2426 }
 2427 
 2428 static void freeClusterManagerNode(clusterManagerNode *node) {
 2429     if (node->context != NULL) redisFree(node->context);
 2430     if (node->friends != NULL) {
 2431         listIter li;
 2432         listNode *ln;
 2433         listRewind(node->friends,&li);
 2434         while ((ln = listNext(&li)) != NULL) {
 2435             clusterManagerNode *fn = ln->value;
 2436             freeClusterManagerNode(fn);
 2437         }
 2438         listRelease(node->friends);
 2439         node->friends = NULL;
 2440     }
 2441     if (node->name != NULL) sdsfree(node->name);
 2442     if (node->replicate != NULL) sdsfree(node->replicate);
 2443     if ((node->flags & CLUSTER_MANAGER_FLAG_FRIEND) && node->ip)
 2444         sdsfree(node->ip);
 2445     int i;
 2446     if (node->migrating != NULL) {
 2447         for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
 2448         zfree(node->migrating);
 2449     }
 2450     if (node->importing != NULL) {
 2451         for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
 2452         zfree(node->importing);
 2453     }
 2454     if (node->flags_str != NULL) {
 2455         freeClusterManagerNodeFlags(node->flags_str);
 2456         node->flags_str = NULL;
 2457     }
 2458     zfree(node);
 2459 }
 2460 
 2461 static void freeClusterManager(void) {
 2462     listIter li;
 2463     listNode *ln;
 2464     if (cluster_manager.nodes != NULL) {
 2465         listRewind(cluster_manager.nodes,&li);
 2466         while ((ln = listNext(&li)) != NULL) {
 2467             clusterManagerNode *n = ln->value;
 2468             freeClusterManagerNode(n);
 2469         }
 2470         listRelease(cluster_manager.nodes);
 2471         cluster_manager.nodes = NULL;
 2472     }
 2473     if (cluster_manager.errors != NULL) {
 2474         listRewind(cluster_manager.errors,&li);
 2475         while ((ln = listNext(&li)) != NULL) {
 2476             sds err = ln->value;
 2477             sdsfree(err);
 2478         }
 2479         listRelease(cluster_manager.errors);
 2480         cluster_manager.errors = NULL;
 2481     }
 2482     if (clusterManagerUncoveredSlots != NULL)
 2483         dictRelease(clusterManagerUncoveredSlots);
 2484 }
 2485 
 2486 static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
 2487     clusterManagerNode *node = zmalloc(sizeof(*node));
 2488     node->context = NULL;
 2489     node->name = NULL;
 2490     node->ip = ip;
 2491     node->port = port;
 2492     node->current_epoch = 0;
 2493     node->ping_sent = 0;
 2494     node->ping_recv = 0;
 2495     node->flags = 0;
 2496     node->flags_str = NULL;
 2497     node->replicate = NULL;
 2498     node->dirty = 0;
 2499     node->friends = NULL;
 2500     node->migrating = NULL;
 2501     node->importing = NULL;
 2502     node->migrating_count = 0;
 2503     node->importing_count = 0;
 2504     node->replicas_count = 0;
 2505     node->weight = 1.0f;
 2506     node->balance = 0;
 2507     clusterManagerNodeResetSlots(node);
 2508     return node;
 2509 }
 2510 
 2511 static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) {
 2512     assert(config.cluster_manager_command.backup_dir);
 2513     sds filename = sdsnew(config.cluster_manager_command.backup_dir);
 2514     if (filename[sdslen(filename) - 1] != '/')
 2515         filename = sdscat(filename, "/");
 2516     filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip,
 2517                             node->port, node->name);
 2518     return filename;
 2519 }
 2520 
 2521 /* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the
 2522  * latest case, if the 'err' arg is not NULL, it gets allocated with a copy
 2523  * of reply error (it's up to the caller function to free it), elsewhere
 2524  * the error is directly printed. */
 2525 static int clusterManagerCheckRedisReply(clusterManagerNode *n,
 2526                                          redisReply *r, char **err)
 2527 {
 2528     int is_err = 0;
 2529     if (!r || (is_err = (r->type == REDIS_REPLY_ERROR))) {
 2530         if (is_err) {
 2531             if (err != NULL) {
 2532                 *err = zmalloc((r->len + 1) * sizeof(char));
 2533                 strcpy(*err, r->str);
 2534             } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, r->str);
 2535         }
 2536         return 0;
 2537     }
 2538     return 1;
 2539 }
 2540 
 2541 /* Call MULTI command on a cluster node. */
 2542 static int clusterManagerStartTransaction(clusterManagerNode *node) {
 2543     redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
 2544     int success = clusterManagerCheckRedisReply(node, reply, NULL);
 2545     if (reply) freeReplyObject(reply);
 2546     return success;
 2547 }
 2548 
 2549 /* Call EXEC command on a cluster node. */
 2550 static int clusterManagerExecTransaction(clusterManagerNode *node,
 2551                                          clusterManagerOnReplyError onerror)
 2552 {
 2553     redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
 2554     int success = clusterManagerCheckRedisReply(node, reply, NULL);
 2555     if (success) {
 2556         if (reply->type != REDIS_REPLY_ARRAY) {
 2557             success = 0;
 2558             goto cleanup;
 2559         }
 2560         size_t i;
 2561         for (i = 0; i < reply->elements; i++) {
 2562             redisReply *r = reply->element[i];
 2563             char *err = NULL;
 2564             success = clusterManagerCheckRedisReply(node, r, &err);
 2565             if (!success && onerror) success = onerror(r, node, i);
 2566             if (err) {
 2567                 if (!success)
 2568                     CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
 2569                 zfree(err);
 2570             }
 2571             if (!success) break;
 2572         }
 2573     }
 2574 cleanup:
 2575     if (reply) freeReplyObject(reply);
 2576     return success;
 2577 }
 2578 
 2579 static int clusterManagerNodeConnect(clusterManagerNode *node) {
 2580     if (node->context) redisFree(node->context);
 2581     node->context = redisConnect(node->ip, node->port);
 2582     if (!node->context->err && config.tls) {
 2583         const char *err = NULL;
 2584         if (cliSecureConnection(node->context, &err) == REDIS_ERR && err) {
 2585             fprintf(stderr,"TLS Error: %s\n", err);
 2586             redisFree(node->context);
 2587             node->context = NULL;
 2588             return 0;
 2589         }
 2590     }
 2591     if (node->context->err) {
 2592         fprintf(stderr,"Could not connect to Redis at ");
 2593         fprintf(stderr,"%s:%d: %s\n", node->ip, node->port,
 2594                 node->context->errstr);
 2595         redisFree(node->context);
 2596         node->context = NULL;
 2597         return 0;
 2598     }
 2599     /* Set aggressive KEEP_ALIVE socket option in the Redis context socket
 2600      * in order to prevent timeouts caused by the execution of long
 2601      * commands. At the same time this improves the detection of real
 2602      * errors. */
 2603     anetKeepAlive(NULL, node->context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
 2604     if (config.auth) {
 2605         redisReply *reply;
 2606         if (config.user == NULL)
 2607             reply = redisCommand(node->context,"AUTH %s", config.auth);
 2608         else
 2609             reply = redisCommand(node->context,"AUTH %s %s",
 2610                                  config.user,config.auth);
 2611         int ok = clusterManagerCheckRedisReply(node, reply, NULL);
 2612         if (reply != NULL) freeReplyObject(reply);
 2613         if (!ok) return 0;
 2614     }
 2615     return 1;
 2616 }
 2617 
 2618 static void clusterManagerRemoveNodeFromList(list *nodelist,
 2619                                              clusterManagerNode *node) {
 2620     listIter li;
 2621     listNode *ln;
 2622     listRewind(nodelist, &li);
 2623     while ((ln = listNext(&li)) != NULL) {
 2624         if (node == ln->value) {
 2625             listDelNode(nodelist, ln);
 2626             break;
 2627         }
 2628     }
 2629 }
 2630 
 2631 /* Return the node with the specified name (ID) or NULL. */
 2632 static clusterManagerNode *clusterManagerNodeByName(const char *name) {
 2633     if (cluster_manager.nodes == NULL) return NULL;
 2634     clusterManagerNode *found = NULL;
 2635     sds lcname = sdsempty();
 2636     lcname = sdscpy(lcname, name);
 2637     sdstolower(lcname);
 2638     listIter li;
 2639     listNode *ln;
 2640     listRewind(cluster_manager.nodes, &li);
 2641     while ((ln = listNext(&li)) != NULL) {
 2642         clusterManagerNode *n = ln->value;
 2643         if (n->name && !sdscmp(n->name, lcname)) {
 2644             found = n;
 2645             break;
 2646         }
 2647     }
 2648     sdsfree(lcname);
 2649     return found;
 2650 }
 2651 
 2652 /* Like clusterManagerNodeByName but the specified name can be just the first
 2653  * part of the node ID as long as the prefix in unique across the
 2654  * cluster.
 2655  */
 2656 static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char*name)
 2657 {
 2658     if (cluster_manager.nodes == NULL) return NULL;
 2659     clusterManagerNode *found = NULL;
 2660     sds lcname = sdsempty();
 2661     lcname = sdscpy(lcname, name);
 2662     sdstolower(lcname);
 2663     listIter li;
 2664     listNode *ln;
 2665     listRewind(cluster_manager.nodes, &li);
 2666     while ((ln = listNext(&li)) != NULL) {
 2667         clusterManagerNode *n = ln->value;
 2668         if (n->name &&
 2669             strstr(n->name, lcname) == n->name) {
 2670             found = n;
 2671             break;
 2672         }
 2673     }
 2674     sdsfree(lcname);
 2675     return found;
 2676 }
 2677 
 2678 static void clusterManagerNodeResetSlots(clusterManagerNode *node) {
 2679     memset(node->slots, 0, sizeof(node->slots));
 2680     node->slots_count = 0;
 2681 }
 2682 
 2683 /* Call "INFO" redis command on the specified node and return the reply. */
 2684 static redisReply *clusterManagerGetNodeRedisInfo(clusterManagerNode *node,
 2685                                                   char **err)
 2686 {
 2687     redisReply *info = CLUSTER_MANAGER_COMMAND(node, "INFO");
 2688     if (err != NULL) *err = NULL;
 2689     if (info == NULL) return NULL;
 2690     if (info->type == REDIS_REPLY_ERROR) {
 2691         if (err != NULL) {
 2692             *err = zmalloc((info->len + 1) * sizeof(char));
 2693             strcpy(*err, info->str);
 2694         }
 2695         freeReplyObject(info);
 2696         return  NULL;
 2697     }
 2698     return info;
 2699 }
 2700 
 2701 static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) {
 2702     redisReply *info = clusterManagerGetNodeRedisInfo(node, err);
 2703     if (info == NULL) return 0;
 2704     int is_cluster = (int) getLongInfoField(info->str, "cluster_enabled");
 2705     freeReplyObject(info);
 2706     return is_cluster;
 2707 }
 2708 
 2709 /* Checks whether the node is empty. Node is considered not-empty if it has
 2710  * some key or if it already knows other nodes */
 2711 static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err) {
 2712     redisReply *info = clusterManagerGetNodeRedisInfo(node, err);
 2713     int is_empty = 1;
 2714     if (info == NULL) return 0;
 2715     if (strstr(info->str, "db0:") != NULL) {
 2716         is_empty = 0;
 2717         goto result;
 2718     }
 2719     freeReplyObject(info);
 2720     info = CLUSTER_MANAGER_COMMAND(node, "CLUSTER INFO");
 2721     if (err != NULL) *err = NULL;
 2722     if (!clusterManagerCheckRedisReply(node, info, err)) {
 2723         is_empty = 0;
 2724         goto result;
 2725     }
 2726     long known_nodes = getLongInfoField(info->str, "cluster_known_nodes");
 2727     is_empty = (known_nodes == 1);
 2728 result:
 2729     freeReplyObject(info);
 2730     return is_empty;
 2731 }
 2732 
 2733 /* Return the anti-affinity score, which is a measure of the amount of
 2734  * violations of anti-affinity in the current cluster layout, that is, how
 2735  * badly the masters and slaves are distributed in the different IP
 2736  * addresses so that slaves of the same master are not in the master
 2737  * host and are also in different hosts.
 2738  *
 2739  * The score is calculated as follows:
 2740  *
 2741  * SAME_AS_MASTER = 10000 * each slave in the same IP of its master.
 2742  * SAME_AS_SLAVE  = 1 * each slave having the same IP as another slave
 2743                       of the same master.
 2744  * FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE
 2745  *
 2746  * So a greater score means a worse anti-affinity level, while zero
 2747  * means perfect anti-affinity.
 2748  *
 2749  * The anti affinity optimizator will try to get a score as low as
 2750  * possible. Since we do not want to sacrifice the fact that slaves should
 2751  * not be in the same host as the master, we assign 10000 times the score
 2752  * to this violation, so that we'll optimize for the second factor only
 2753  * if it does not impact the first one.
 2754  *
 2755  * The ipnodes argument is an array of clusterManagerNodeArray, one for
 2756  * each IP, while ip_count is the total number of IPs in the configuration.
 2757  *
 2758  * The function returns the above score, and the list of
 2759  * offending slaves can be stored into the 'offending' argument,
 2760  * so that the optimizer can try changing the configuration of the
 2761  * slaves violating the anti-affinity goals. */
 2762 static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
 2763     int ip_count, clusterManagerNode ***offending, int *offending_len)
 2764 {
 2765     int score = 0, i, j;
 2766     int node_len = cluster_manager.nodes->len;
 2767     clusterManagerNode **offending_p = NULL;
 2768     if (offending != NULL) {
 2769         *offending = zcalloc(node_len * sizeof(clusterManagerNode*));
 2770         offending_p = *offending;
 2771     }
 2772     /* For each set of nodes in the same host, split by
 2773      * related nodes (masters and slaves which are involved in
 2774      * replication of each other) */
 2775     for (i = 0; i < ip_count; i++) {
 2776         clusterManagerNodeArray *node_array = &(ipnodes[i]);
 2777         dict *related = dictCreate(&clusterManagerDictType, NULL);
 2778         char *ip = NULL;
 2779         for (j = 0; j < node_array->len; j++) {
 2780             clusterManagerNode *node = node_array->nodes[j];
 2781             if (node == NULL) continue;
 2782             if (!ip) ip = node->ip;
 2783             sds types;
 2784             /* We always use the Master ID as key. */
 2785             sds key = (!node->replicate ? node->name : node->replicate);
 2786             assert(key != NULL);
 2787             dictEntry *entry = dictFind(related, key);
 2788             if (entry) types = sdsdup((sds) dictGetVal(entry));
 2789             else types = sdsempty();
 2790             /* Master type 'm' is always set as the first character of the
 2791              * types string. */
 2792             if (!node->replicate) types = sdscatprintf(types, "m%s", types);
 2793             else types = sdscat(types, "s");
 2794             dictReplace(related, key, types);
 2795         }
 2796         /* Now it's trivial to check, for each related group having the
 2797          * same host, what is their local score. */
 2798         dictIterator *iter = dictGetIterator(related);
 2799         dictEntry *entry;
 2800         while ((entry = dictNext(iter)) != NULL) {
 2801             sds types = (sds) dictGetVal(entry);
 2802             sds name = (sds) dictGetKey(entry);
 2803             int typeslen = sdslen(types);
 2804             if (typeslen < 2) continue;
 2805             if (types[0] == 'm') score += (10000 * (typeslen - 1));
 2806             else score += (1 * typeslen);
 2807             if (offending == NULL) continue;
 2808             /* Populate the list of offending nodes. */
 2809             listIter li;
 2810             listNode *ln;
 2811             listRewind(cluster_manager.nodes, &li);
 2812             while ((ln = listNext(&li)) != NULL) {
 2813                 clusterManagerNode *n = ln->value;
 2814                 if (n->replicate == NULL) continue;
 2815                 if (!strcmp(n->replicate, name) && !strcmp(n->ip, ip)) {
 2816                     *(offending_p++) = n;
 2817                     if (offending_len != NULL) (*offending_len)++;
 2818                     break;
 2819                 }
 2820             }
 2821         }
 2822         //if (offending_len != NULL) *offending_len = offending_p - *offending;
 2823         dictReleaseIterator(iter);
 2824         dictRelease(related);
 2825     }
 2826     return score;
 2827 }
 2828 
 2829 static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
 2830     int ip_count)
 2831 {
 2832     clusterManagerNode **offenders = NULL;
 2833     int score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count,
 2834                                                    NULL, NULL);
 2835     if (score == 0) goto cleanup;
 2836     clusterManagerLogInfo(">>> Trying to optimize slaves allocation "
 2837                           "for anti-affinity\n");
 2838     int node_len = cluster_manager.nodes->len;
 2839     int maxiter = 500 * node_len; // Effort is proportional to cluster size...
 2840     srand(time(NULL));
 2841     while (maxiter > 0) {
 2842         int offending_len = 0;
 2843         if (offenders != NULL) {
 2844             zfree(offenders);
 2845             offenders = NULL;
 2846         }
 2847         score = clusterManagerGetAntiAffinityScore(ipnodes,
 2848                                                    ip_count,
 2849                                                    &offenders,
 2850                                                    &offending_len);
 2851         if (score == 0) break; // Optimal anti affinity reached
 2852         /* We'll try to randomly swap a slave's assigned master causing
 2853          * an affinity problem with another random slave, to see if we
 2854          * can improve the affinity. */
 2855         int rand_idx = rand() % offending_len;
 2856         clusterManagerNode *first = offenders[rand_idx],
 2857                            *second = NULL;
 2858         clusterManagerNode **other_replicas = zcalloc((node_len - 1) *
 2859                                                       sizeof(*other_replicas));
 2860         int other_replicas_count = 0;
 2861         listIter li;
 2862         listNode *ln;
 2863         listRewind(cluster_manager.nodes, &li);
 2864         while ((ln = listNext(&li)) != NULL) {
 2865             clusterManagerNode *n = ln->value;
 2866             if (n != first && n->replicate != NULL)
 2867                 other_replicas[other_replicas_count++] = n;
 2868         }
 2869         if (other_replicas_count == 0) {
 2870             zfree(other_replicas);
 2871             break;
 2872         }
 2873         rand_idx = rand() % other_replicas_count;
 2874         second = other_replicas[rand_idx];
 2875         char *first_master = first->replicate,
 2876              *second_master = second->replicate;
 2877         first->replicate = second_master, first->dirty = 1;
 2878         second->replicate = first_master, second->dirty = 1;
 2879         int new_score = clusterManagerGetAntiAffinityScore(ipnodes,
 2880                                                            ip_count,
 2881                                                            NULL, NULL);
 2882         /* If the change actually makes thing worse, revert. Otherwise
 2883          * leave as it is because the best solution may need a few
 2884          * combined swaps. */
 2885         if (new_score > score) {
 2886             first->replicate = first_master;
 2887             second->replicate = second_master;
 2888         }
 2889         zfree(other_replicas);
 2890         maxiter--;
 2891     }
 2892     score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count, NULL, NULL);
 2893     char *msg;
 2894     int perfect = (score == 0);
 2895     int log_level = (perfect ? CLUSTER_MANAGER_LOG_LVL_SUCCESS :
 2896                                CLUSTER_MANAGER_LOG_LVL_WARN);
 2897     if (perfect) msg = "[OK] Perfect anti-affinity obtained!";
 2898     else if (score >= 10000)
 2899         msg = ("[WARNING] Some slaves are in the same host as their master");
 2900     else
 2901         msg=("[WARNING] Some slaves of the same master are in the same host");
 2902     clusterManagerLog(log_level, "%s\n", msg);
 2903 cleanup:
 2904     zfree(offenders);
 2905 }
 2906 
 2907 /* Return a representable string of the node's flags */
 2908 static sds clusterManagerNodeFlagString(clusterManagerNode *node) {
 2909     sds flags = sdsempty();
 2910     if (!node->flags_str) return flags;
 2911     int empty = 1;
 2912     listIter li;
 2913     listNode *ln;
 2914     listRewind(node->flags_str, &li);
 2915     while ((ln = listNext(&li)) != NULL) {
 2916         sds flag = ln->value;
 2917         if (strcmp(flag, "myself") == 0) continue;
 2918         if (!empty) flags = sdscat(flags, ",");
 2919         flags = sdscatfmt(flags, "%S", flag);
 2920         empty = 0;
 2921     }
 2922     return flags;
 2923 }
 2924 
 2925 /* Return a representable string of the node's slots */
 2926 static sds clusterManagerNodeSlotsString(clusterManagerNode *node) {
 2927     sds slots = sdsempty();
 2928     int first_range_idx = -1, last_slot_idx = -1, i;
 2929     for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
 2930         int has_slot = node->slots[i];
 2931         if (has_slot) {
 2932             if (first_range_idx == -1) {
 2933                 if (sdslen(slots)) slots = sdscat(slots, ",");
 2934                 first_range_idx = i;
 2935                 slots = sdscatfmt(slots, "[%u", i);
 2936             }
 2937             last_slot_idx = i;
 2938         } else {
 2939             if (last_slot_idx >= 0) {
 2940                 if (first_range_idx == last_slot_idx)
 2941                     slots = sdscat(slots, "]");
 2942                 else slots = sdscatfmt(slots, "-%u]", last_slot_idx);
 2943             }
 2944             last_slot_idx = -1;
 2945             first_range_idx = -1;
 2946         }
 2947     }
 2948     if (last_slot_idx >= 0) {
 2949         if (first_range_idx == last_slot_idx) slots = sdscat(slots, "]");
 2950         else slots = sdscatfmt(slots, "-%u]", last_slot_idx);
 2951     }
 2952     return slots;
 2953 }
 2954 
 2955 static sds clusterManagerNodeGetJSON(clusterManagerNode *node,
 2956                                      unsigned long error_count)
 2957 {
 2958     sds json = sdsempty();
 2959     sds replicate = sdsempty();
 2960     if (node->replicate)
 2961         replicate = sdscatprintf(replicate, "\"%s\"", node->replicate);
 2962     else
 2963         replicate = sdscat(replicate, "null");
 2964     sds slots = clusterManagerNodeSlotsString(node);
 2965     sds flags = clusterManagerNodeFlagString(node);
 2966     char *p = slots;
 2967     while ((p = strchr(p, '-')) != NULL)
 2968         *(p++) = ',';
 2969     json = sdscatprintf(json,
 2970         "  {\n"
 2971         "    \"name\": \"%s\",\n"
 2972         "    \"host\": \"%s\",\n"
 2973         "    \"port\": %d,\n"
 2974         "    \"replicate\": %s,\n"
 2975         "    \"slots\": [%s],\n"
 2976         "    \"slots_count\": %d,\n"
 2977         "    \"flags\": \"%s\",\n"
 2978         "    \"current_epoch\": %llu",
 2979         node->name,
 2980         node->ip,
 2981         node->port,
 2982         replicate,
 2983         slots,
 2984         node->slots_count,
 2985         flags,
 2986         (unsigned long long)node->current_epoch
 2987     );
 2988     if (error_count > 0) {
 2989         json = sdscatprintf(json, ",\n    \"cluster_errors\": %lu",
 2990                             error_count);
 2991     }
 2992     if (node->migrating_count > 0 && node->migrating != NULL) {
 2993         int i = 0;
 2994         sds migrating = sdsempty();
 2995         for (; i < node->migrating_count; i += 2) {
 2996             sds slot = node->migrating[i];
 2997             sds dest = node->migrating[i + 1];
 2998             if (slot && dest) {
 2999                 if (sdslen(migrating) > 0) migrating = sdscat(migrating, ",");
 3000                 migrating = sdscatfmt(migrating, "\"%S\": \"%S\"", slot, dest);
 3001             }
 3002         }
 3003         if (sdslen(migrating) > 0)
 3004             json = sdscatfmt(json, ",\n    \"migrating\": {%S}", migrating);
 3005         sdsfree(migrating);
 3006     }
 3007     if (node->importing_count > 0 && node->importing != NULL) {
 3008         int i = 0;
 3009         sds importing = sdsempty();
 3010         for (; i < node->importing_count; i += 2) {
 3011             sds slot = node->importing[i];
 3012             sds from = node->importing[i + 1];
 3013             if (slot && from) {
 3014                 if (sdslen(importing) > 0) importing = sdscat(importing, ",");
 3015                 importing = sdscatfmt(importing, "\"%S\": \"%S\"", slot, from);
 3016             }
 3017         }
 3018         if (sdslen(importing) > 0)
 3019             json = sdscatfmt(json, ",\n    \"importing\": {%S}", importing);
 3020         sdsfree(importing);
 3021     }
 3022     json = sdscat(json, "\n  }");
 3023     sdsfree(replicate);
 3024     sdsfree(slots);
 3025     sdsfree(flags);
 3026     return json;
 3027 }
 3028 
 3029 
 3030 /* -----------------------------------------------------------------------------
 3031  * Key space handling
 3032  * -------------------------------------------------------------------------- */
 3033 
 3034 /* We have 16384 hash slots. The hash slot of a given key is obtained
 3035  * as the least significant 14 bits of the crc16 of the key.
 3036  *
 3037  * However if the key contains the {...} pattern, only the part between
 3038  * { and } is hashed. This may be useful in the future to force certain
 3039  * keys to be in the same node (assuming no resharding is in progress). */
 3040 static unsigned int clusterManagerKeyHashSlot(char *key, int keylen) {
 3041     int s, e; /* start-end indexes of { and } */
 3042 
 3043     for (s = 0; s < keylen; s++)
 3044         if (key[s] == '{') break;
 3045 
 3046     /* No '{' ? Hash the whole key. This is the base case. */
 3047     if (s == keylen) return crc16(key,keylen) & 0x3FFF;
 3048 
 3049     /* '{' found? Check if we have the corresponding '}'. */
 3050     for (e = s+1; e < keylen; e++)
 3051         if (key[e] == '}') break;
 3052 
 3053     /* No '}' or nothing between {} ? Hash the whole key. */
 3054     if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
 3055 
 3056     /* If we are here there is both a { and a } on its right. Hash
 3057      * what is in the middle between { and }. */
 3058     return crc16(key+s+1,e-s-1) & 0x3FFF;
 3059 }
 3060 
 3061 /* Return a string representation of the cluster node. */
 3062 static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) {
 3063     sds info = sdsempty();
 3064     sds spaces = sdsempty();
 3065     int i;
 3066     for (i = 0; i < indent; i++) spaces = sdscat(spaces, " ");
 3067     if (indent) info = sdscat(info, spaces);
 3068     int is_master = !(node->flags & CLUSTER_MANAGER_FLAG_SLAVE);
 3069     char *role = (is_master ? "M" : "S");
 3070     sds slots = NULL;
 3071     if (node->dirty && node->replicate != NULL)
 3072         info = sdscatfmt(info, "S: %S %s:%u", node->name, node->ip, node->port);
 3073     else {
 3074         slots = clusterManagerNodeSlotsString(node);
 3075         sds flags = clusterManagerNodeFlagString(node);
 3076         info = sdscatfmt(info, "%s: %S %s:%u\n"
 3077                                "%s   slots:%S (%u slots) "
 3078                                "%S",
 3079                                role, node->name, node->ip, node->port, spaces,
 3080                                slots, node->slots_count, flags);
 3081         sdsfree(slots);
 3082         sdsfree(flags);
 3083     }
 3084     if (node->replicate != NULL)
 3085         info = sdscatfmt(info, "\n%s   replicates %S", spaces, node->replicate);
 3086     else if (node->replicas_count)
 3087         info = sdscatfmt(info, "\n%s   %U additional replica(s)",
 3088                          spaces, node->replicas_count);
 3089     sdsfree(spaces);
 3090     return info;
 3091 }
 3092 
 3093 static void clusterManagerShowNodes(void) {
 3094     listIter li;
 3095     listNode *ln;
 3096     listRewind(cluster_manager.nodes, &li);
 3097     while ((ln = listNext(&li)) != NULL) {
 3098         clusterManagerNode *node = ln->value;
 3099         sds info = clusterManagerNodeInfo(node, 0);
 3100         printf("%s\n", (char *) info);
 3101         sdsfree(info);
 3102     }
 3103 }
 3104 
 3105 static void clusterManagerShowClusterInfo(void) {
 3106     int masters = 0;
 3107     int keys = 0;
 3108     listIter li;
 3109     listNode *ln;
 3110     listRewind(cluster_manager.nodes, &li);
 3111     while ((ln = listNext(&li)) != NULL) {
 3112         clusterManagerNode *node = ln->value;
 3113         if (!(node->flags & CLUSTER_MANAGER_FLAG_SLAVE)) {
 3114             if (!node->name) continue;
 3115             int replicas = 0;
 3116             int dbsize = -1;
 3117             char name[9];
 3118             memcpy(name, node->name, 8);
 3119             name[8] = '\0';
 3120             listIter ri;
 3121             listNode *rn;
 3122             listRewind(cluster_manager.nodes, &ri);
 3123             while ((rn = listNext(&ri)) != NULL) {
 3124                 clusterManagerNode *n = rn->value;
 3125                 if (n == node || !(n->flags & CLUSTER_MANAGER_FLAG_SLAVE))
 3126                     continue;
 3127                 if (n->replicate && !strcmp(n->replicate, node->name))
 3128                     replicas++;
 3129             }
 3130             redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "DBSIZE");
 3131             if (reply != NULL && reply->type == REDIS_REPLY_INTEGER)
 3132                 dbsize = reply->integer;
 3133             if (dbsize < 0) {
 3134                 char *err = "";
 3135                 if (reply != NULL && reply->type == REDIS_REPLY_ERROR)
 3136                     err = reply->str;
 3137                 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
 3138                 if (reply != NULL) freeReplyObject(reply);
 3139                 return;
 3140             };
 3141             if (reply != NULL) freeReplyObject(reply);
 3142             printf("%s:%d (%s...) -> %d keys | %d slots | %d slaves.\n",
 3143                    node->ip, node->port, name, dbsize,
 3144                    node->slots_count, replicas);
 3145             masters++;
 3146             keys += dbsize;
 3147         }
 3148     }
 3149     clusterManagerLogOk("[OK] %d keys in %d masters.\n", keys, masters);
 3150     float keys_per_slot = keys / (float) CLUSTER_MANAGER_SLOTS;
 3151     printf("%.2f keys per slot on average.\n", keys_per_slot);
 3152 }
 3153 
 3154 /* Flush dirty slots configuration of the node by calling CLUSTER ADDSLOTS */
 3155 static int clusterManagerAddSlots(clusterManagerNode *node, char**err)
 3156 {
 3157     redisReply *reply = NULL;
 3158     void *_reply = NULL;
 3159     int success = 1;
 3160     /* First two args are used for the command itself. */
 3161     int argc = node->slots_count + 2;
 3162     sds *argv = zmalloc(argc * sizeof(*argv));
 3163     size_t *argvlen = zmalloc(argc * sizeof(*argvlen));
 3164     argv[0] = "CLUSTER";
 3165     argv[1] = "ADDSLOTS";
 3166     argvlen[0] = 7;
 3167     argvlen[1] = 8;
 3168     *err = NULL;
 3169     int i, argv_idx = 2;
 3170     for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
 3171         if (argv_idx >= argc) break;
 3172         if (node->slots[i]) {
 3173             argv[argv_idx] = sdsfromlonglong((long long) i);
 3174             argvlen[argv_idx] = sdslen(argv[argv_idx]);
 3175             argv_idx++;
 3176         }
 3177     }
 3178     if (!argv_idx) {
 3179         success = 0;
 3180         goto cleanup;
 3181     }
 3182     redisAppendCommandArgv(node->context,argc,(const char**)argv,argvlen);
 3183     if (redisGetReply(node->context, &_reply) != REDIS_OK) {
 3184         success = 0;
 3185         goto cleanup;
 3186     }
 3187     reply = (redisReply*) _reply;
 3188     success = clusterManagerCheckRedisReply(node, reply, err);
 3189 cleanup:
 3190     zfree(argvlen);
 3191     if (argv != NULL) {
 3192         for (i = 2; i < argc; i++) sdsfree(argv[i]);
 3193         zfree(argv);
 3194     }
 3195     if (reply != NULL) freeReplyObject(reply);
 3196     return success;
 3197 }
 3198 
 3199 /* Get the node the slot is assigned to from the point of view of node *n.
 3200  * If the slot is unassigned or if the reply is an error, return NULL.
 3201  * Use the **err argument in order to check wether the slot is unassigned
 3202  * or the reply resulted in an error. */
 3203 static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
 3204                                                       int slot, char **err)
 3205 {
 3206     assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
 3207     clusterManagerNode *owner = NULL;
 3208     redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
 3209     if (clusterManagerCheckRedisReply(n, reply, err)) {
 3210         assert(reply->type == REDIS_REPLY_ARRAY);
 3211         size_t i;
 3212         for (i = 0; i < reply->elements; i++) {
 3213             redisReply *r = reply->element[i];
 3214             assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
 3215             int from, to;
 3216             from = r->element[0]->integer;
 3217             to = r->element[1]->integer;
 3218             if (slot < from || slot > to) continue;
 3219             redisReply *nr =  r->element[2];
 3220             assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
 3221             char *name = NULL;
 3222             if (nr->elements >= 3)
 3223                 name =  nr->element[2]->str;
 3224             if (name != NULL)
 3225                 owner = clusterManagerNodeByName(name);
 3226             else {
 3227                 char *ip = nr->element[0]->str;
 3228                 assert(ip != NULL);
 3229                 int port = (int) nr->element[1]->integer;
 3230                 listIter li;
 3231                 listNode *ln;
 3232                 listRewind(cluster_manager.nodes, &li);
 3233                 while ((ln = listNext(&li)) != NULL) {
 3234                     clusterManagerNode *nd = ln->value;
 3235                     if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
 3236                         owner = nd;
 3237                         break;
 3238                     }
 3239                 }
 3240             }
 3241             if (owner) break;
 3242         }
 3243     }
 3244     if (reply) freeReplyObject(reply);
 3245     return owner;
 3246 }
 3247 
 3248 /* Set slot status to "importing" or "migrating" */
 3249 static int clusterManagerSetSlot(clusterManagerNode *node1,
 3250                                  clusterManagerNode *node2,
 3251                                  int slot, const char *status, char **err) {
 3252     redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER "
 3253                                                 "SETSLOT %d %s %s",
 3254                                                 slot, status,
 3255                                                 (char *) node2->name);
 3256     if (err != NULL) *err = NULL;
 3257     if (!reply) return 0;
 3258     int success = 1;
 3259     if (reply->type == REDIS_REPLY_ERROR) {
 3260         success = 0;
 3261         if (err != NULL) {
 3262             *err = zmalloc((reply->len + 1) * sizeof(char));
 3263             strcpy(*err, reply->str);
 3264         } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str);
 3265         goto cleanup;
 3266     }
 3267 cleanup:
 3268     freeReplyObject(reply);
 3269     return success;
 3270 }
 3271 
 3272 static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
 3273     redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
 3274         "CLUSTER SETSLOT %d %s", slot, "STABLE");
 3275     int success = clusterManagerCheckRedisReply(node, reply, NULL);
 3276     if (reply) freeReplyObject(reply);
 3277     return success;
 3278 }
 3279 
 3280 static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
 3281                                  int ignore_unassigned_err)
 3282 {
 3283     redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
 3284         "CLUSTER DELSLOTS %d", slot);
 3285     char *err = NULL;
 3286     int success = clusterManagerCheckRedisReply(node, reply, &err);
 3287     if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
 3288         ignore_unassigned_err)
 3289     {
 3290         char *get_owner_err = NULL;
 3291         clusterManagerNode *assigned_to =
 3292             clusterManagerGetSlotOwner(node, slot, &get_owner_err);
 3293         if (!assigned_to) {
 3294             if (get_owner_err == NULL) success = 1;
 3295             else {
 3296                 CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
 3297                 zfree(get_owner_err);
 3298             }
 3299         }
 3300     }
 3301     if (!success && err != NULL) {
 3302         CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
 3303         zfree(err);
 3304     }
 3305     if (reply) freeReplyObject(reply);
 3306     return success;
 3307 }
 3308 
 3309 static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
 3310     redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
 3311         "CLUSTER ADDSLOTS %d", slot);
 3312     int success = clusterManagerCheckRedisReply(node, reply, NULL);
 3313     if (reply) freeReplyObject(reply);
 3314     return success;
 3315 }
 3316 
 3317 static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
 3318                                                 int slot)
 3319 {
 3320     redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
 3321         "CLUSTER COUNTKEYSINSLOT %d", slot);
 3322     int count = -1;
 3323     int success = clusterManagerCheckRedisReply(node, reply, NULL);
 3324     if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
 3325     if (reply) freeReplyObject(reply);
 3326     return count;
 3327 }
 3328 
 3329 static int clusterManagerBumpEpoch(clusterManagerNode *node) {
 3330     redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
 3331     int success = clusterManagerCheckRedisReply(node, reply, NULL);
 3332     if (reply) freeReplyObject(reply);
 3333     return success;
 3334 }
 3335 
 3336 /* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
 3337  * errors except for ADDSLOTS errors.
 3338  * Return 1 if the error should be ignored. */
 3339 static int clusterManagerOnSetOwnerErr(redisReply *reply,
 3340     clusterManagerNode *n, int bulk_idx)
 3341 {
 3342     UNUSED(reply);
 3343     UNUSED(n);
 3344     /* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
 3345     return (bulk_idx != 1);
 3346 }
 3347 
 3348 static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
 3349                                       int slot,
 3350                                       int do_clear)
 3351 {
 3352     int success = clusterManagerStartTransaction(owner);
 3353     if (!success) return 0;
 3354     /* Ensure the slot is not already assigned. */
 3355     clusterManagerDelSlot(owner, slot, 1);
 3356     /* Add the slot and bump epoch. */
 3357     clusterManagerAddSlot(owner, slot);
 3358     if (do_clear) clusterManagerClearSlotStatus(owner, slot);
 3359     clusterManagerBumpEpoch(owner);
 3360     success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
 3361     return success;
 3362 }
 3363 
 3364 /* Get the hash for the values of the specified keys in *keys_reply for the
 3365  * specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
 3366  * on both nodes. Every key with same name on both nodes but having different
 3367  * values will be added to the *diffs list. Return 0 in case of reply
 3368  * error. */
 3369 static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
 3370                                           clusterManagerNode *n2,
 3371                                           redisReply *keys_reply,
 3372                                           list *diffs)
 3373 {
 3374     size_t i, argc = keys_reply->elements + 2;
 3375     static const char *hash_zero = "0000000000000000000000000000000000000000";
 3376     char **argv = zcalloc(argc * sizeof(char *));
 3377     size_t  *argv_len = zcalloc(argc * sizeof(size_t));
 3378     argv[0] = "DEBUG";
 3379     argv_len[0] = 5;
 3380     argv[1] = "DIGEST-VALUE";
 3381     argv_len[1] = 12;
 3382     for (i = 0; i < keys_reply->elements; i++) {
 3383         redisReply *entry = keys_reply->element[i];
 3384         int idx = i + 2;
 3385         argv[idx] = entry->str;
 3386         argv_len[idx] = entry->len;
 3387     }
 3388     int success = 0;
 3389     void *_reply1 = NULL, *_reply2 = NULL;
 3390     redisReply *r1 = NULL, *r2 = NULL;
 3391     redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
 3392     success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
 3393     if (!success) goto cleanup;
 3394     r1 = (redisReply *) _reply1;
 3395     redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
 3396     success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
 3397     if (!success) goto cleanup;
 3398     r2 = (redisReply *) _reply2;
 3399     success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
 3400     if (r1->type == REDIS_REPLY_ERROR) {
 3401         CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
 3402         success = 0;
 3403     }
 3404     if (r2->type == REDIS_REPLY_ERROR) {
 3405         CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
 3406         success = 0;
 3407     }
 3408     if (!success) goto cleanup;
 3409     assert(keys_reply->elements == r1->elements &&
 3410            keys_reply->elements == r2->elements);
 3411     for (i = 0; i < keys_reply->elements; i++) {
 3412         char *key = keys_reply->element[i]->str;
 3413         char *hash1 = r1->element[i]->str;
 3414         char *hash2 = r2->element[i]->str;
 3415         /* Ignore keys that don't exist in both nodes. */
 3416         if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
 3417             continue;
 3418         if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
 3419     }
 3420 cleanup:
 3421     if (r1) freeReplyObject(r1);
 3422     if (r2) freeReplyObject(r2);
 3423     zfree(argv);
 3424     zfree(argv_len);
 3425     return success;
 3426 }
 3427 
 3428 /* Migrate keys taken from reply->elements. It returns the reply from the
 3429  * MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
 3430  * is not NULL, a dot will be printed for every migrated key. */
 3431 static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source,
 3432                                                     clusterManagerNode *target,
 3433                                                     redisReply *reply,
 3434                                                     int replace, int timeout,
 3435                                                     char *dots)
 3436 {
 3437     redisReply *migrate_reply = NULL;
 3438     char **argv = NULL;
 3439     size_t *argv_len = NULL;
 3440     int c = (replace ? 8 : 7);
 3441     if (config.auth) c += 2;
 3442     if (config.user) c += 1;
 3443     size_t argc = c + reply->elements;
 3444     size_t i, offset = 6; // Keys Offset
 3445     argv = zcalloc(argc * sizeof(char *));
 3446     argv_len = zcalloc(argc * sizeof(size_t));
 3447     char portstr[255];
 3448     char timeoutstr[255];
 3449     snprintf(portstr, 10, "%d", target->port);
 3450     snprintf(timeoutstr, 10, "%d", timeout);
 3451     argv[0] = "MIGRATE";
 3452     argv_len[0] = 7;
 3453     argv[1] = target->ip;
 3454     argv_len[1] = strlen(target->ip);
 3455     argv[2] = portstr;
 3456     argv_len[2] = strlen(portstr);
 3457     argv[3] = "";
 3458     argv_len[3] = 0;
 3459     argv[4] = "0";
 3460     argv_len[4] = 1;
 3461     argv[5] = timeoutstr;
 3462     argv_len[5] = strlen(timeoutstr);
 3463     if (replace) {
 3464         argv[offset] = "REPLACE";
 3465         argv_len[offset] = 7;
 3466         offset++;
 3467     }
 3468     if (config.auth) {
 3469         if (config.user) {
 3470             argv[offset] = "AUTH2";
 3471             argv_len[offset] = 5;
 3472             offset++;
 3473             argv[offset] = config.user;
 3474             argv_len[offset] = strlen(config.user);
 3475             offset++;
 3476             argv[offset] = config.auth;
 3477             argv_len[offset] = strlen(config.auth);
 3478             offset++;
 3479         } else {
 3480             argv[offset] = "AUTH";
 3481             argv_len[offset] = 4;
 3482             offset++;
 3483             argv[offset] = config.auth;
 3484             argv_len[offset] = strlen(config.auth);
 3485             offset++;
 3486         }
 3487     }
 3488     argv[offset] = "KEYS";
 3489     argv_len[offset] = 4;
 3490     offset++;
 3491     for (i = 0; i < reply->elements; i++) {
 3492         redisReply *entry = reply->element[i];
 3493         size_t idx = i + offset;
 3494         assert(entry->type == REDIS_REPLY_STRING);
 3495         argv[idx] = (char *) sdsnewlen(entry->str, entry->len);
 3496         argv_len[idx] = entry->len;
 3497         if (dots) dots[i] = '.';
 3498     }
 3499     if (dots) dots[reply->elements] = '\0';
 3500     void *_reply = NULL;
 3501     redisAppendCommandArgv(source->context,argc,
 3502                            (const char**)argv,argv_len);
 3503     int success = (redisGetReply(source->context, &_reply) == REDIS_OK);
 3504     for (i = 0; i < reply->elements; i++) sdsfree(argv[i + offset]);
 3505     if (!success) goto cleanup;
 3506     migrate_reply = (redisReply *) _reply;
 3507 cleanup:
 3508     zfree(argv);
 3509     zfree(argv_len);
 3510     return migrate_reply;
 3511 }
 3512 
 3513 /* Migrate all keys in the given slot from source to target.*/
 3514 static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
 3515                                            clusterManagerNode *target,
 3516                                            int slot, int timeout,
 3517                                            int pipeline, int verbose,
 3518                                            char **err)
 3519 {
 3520     int success = 1;
 3521     int do_fix = config.cluster_manager_command.flags &
 3522                  CLUSTER_MANAGER_CMD_FLAG_FIX;
 3523     int do_replace = config.cluster_manager_command.flags &
 3524                      CLUSTER_MANAGER_CMD_FLAG_REPLACE;
 3525     while (1) {
 3526         char *dots = NULL;
 3527         redisReply *reply = NULL, *migrate_reply = NULL;
 3528         reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER "
 3529                                         "GETKEYSINSLOT %d %d", slot,
 3530                                         pipeline);
 3531         success = (reply != NULL);
 3532         if (!success) return 0;
 3533         if (reply->type == REDIS_REPLY_ERROR) {
 3534             success = 0;
 3535             if (err != NULL) {
 3536                 *err = zmalloc((reply->len + 1) * sizeof(char));
 3537                 strcpy(*err, reply->str);
 3538                 CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err);
 3539             }
 3540             goto next;
 3541         }
 3542         assert(reply->type == REDIS_REPLY_ARRAY);
 3543         size_t count = reply->elements;
 3544         if (count == 0) {
 3545             freeReplyObject(reply);
 3546             break;
 3547         }
 3548         if (verbose) dots = zmalloc((count+1) * sizeof(char));
 3549         /* Calling MIGRATE command. */
 3550         migrate_reply = clusterManagerMigrateKeysInReply(source, target,
 3551                                                          reply, 0, timeout,
 3552                                                          dots);
 3553         if (migrate_reply == NULL) goto next;
 3554         if (migrate_reply->type == REDIS_REPLY_ERROR) {
 3555             int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
 3556             int not_served = 0;
 3557             if (!is_busy) {
 3558                 /* Check if the slot is unassigned (not served) in the
 3559                  * source node's configuration. */
 3560                 char *get_owner_err = NULL;
 3561                 clusterManagerNode *served_by =
 3562                     clusterManagerGetSlotOwner(source, slot, &get_owner_err);
 3563                 if (!served_by) {
 3564                     if (get_owner_err == NULL) not_served = 1;
 3565                     else {
 3566                         CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
 3567                                                           get_owner_err);
 3568                         zfree(get_owner_err);
 3569                     }
 3570                 }
 3571             }
 3572             /* Try to handle errors. */
 3573             if (is_busy || not_served) {
 3574                 /* If the key's slot is not served, try to assign slot
 3575                  * to the target node. */
 3576                 if (do_fix && not_served) {
 3577                     clusterManagerLogWarn("*** Slot was not served, setting "
 3578                                           "owner to node %s:%d.\n",
 3579                                           target->ip, target->port);
 3580                     clusterManagerSetSlot(source, target, slot, "node", NULL);
 3581                 }
 3582                 /* If the key already exists in the target node (BUSYKEY),
 3583                  * check whether its value is the same in both nodes.
 3584                  * In case of equal values, retry migration with the
 3585                  * REPLACE option.
 3586                  * In case of different values:
 3587                  *  - If the migration is requested by the fix command, stop
 3588                  *    and warn the user.
 3589                  *  - In other cases (ie. reshard), proceed only if the user
 3590                  *    launched the command with the --cluster-replace option.*/
 3591                 if (is_busy) {
 3592                     clusterManagerLogWarn("\n*** Target key exists\n");
 3593                     if (!do_replace) {
 3594                         clusterManagerLogWarn("*** Checking key values on "
 3595                                               "both nodes...\n");
 3596                         list *diffs = listCreate();
 3597                         success = clusterManagerCompareKeysValues(source,
 3598                             target, reply, diffs);
 3599                         if (!success) {
 3600                             clusterManagerLogErr("*** Value check failed!\n");
 3601                             listRelease(diffs);
 3602                             goto next;
 3603                         }
 3604                         if (listLength(diffs) > 0) {
 3605                             success = 0;
 3606                             clusterManagerLogErr(
 3607                                 "*** Found %d key(s) in both source node and "
 3608                                 "target node having different values.\n"
 3609                                 "    Source node: %s:%d\n"
 3610                                 "    Target node: %s:%d\n"
 3611                                 "    Keys(s):\n",
 3612                                 listLength(diffs),
 3613                                 source->ip, source->port,
 3614                                 target->ip, target->port);
 3615                             listIter dli;
 3616                             listNode *dln;
 3617                             listRewind(diffs, &dli);
 3618                             while((dln = listNext(&dli)) != NULL) {
 3619                                 char *k = dln->value;
 3620                                 clusterManagerLogErr("    - %s\n", k);
 3621                             }
 3622                             clusterManagerLogErr("Please fix the above key(s) "
 3623                                                  "manually and try again "
 3624                                                  "or relaunch the command \n"
 3625                                                  "with --cluster-replace "
 3626                                                  "option to force key "
 3627                                                  "overriding.\n");
 3628                             listRelease(diffs);
 3629                             goto next;
 3630                         }
 3631                         listRelease(diffs);
 3632                     }
 3633                     clusterManagerLogWarn("*** Replacing target keys...\n");
 3634                 }
 3635                 freeReplyObject(migrate_reply);
 3636                 migrate_reply = clusterManagerMigrateKeysInReply(source,
 3637                                                                  target,
 3638                                                                  reply,
 3639                                                                  is_busy,
 3640                                                                  timeout,
 3641                                                                  NULL);
 3642                 success = (migrate_reply != NULL &&
 3643                            migrate_reply->type != REDIS_REPLY_ERROR);
 3644             } else success = 0;
 3645             if (!success) {
 3646                 if (migrate_reply != NULL) {
 3647                     if (err) {
 3648                         *err = zmalloc((migrate_reply->len + 1) * sizeof(char));
 3649                         strcpy(*err, migrate_reply->str);
 3650                     }
 3651                     printf("\n");
 3652                     CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
 3653                                                       migrate_reply->str);
 3654                 }
 3655                 goto next;
 3656             }
 3657         }
 3658         if (verbose) {
 3659             printf("%s", dots);
 3660             fflush(stdout);
 3661         }
 3662 next:
 3663         if (reply != NULL) freeReplyObject(reply);
 3664         if (migrate_reply != NULL) freeReplyObject(migrate_reply);
 3665         if (dots) zfree(dots);
 3666         if (!success) break;
 3667     }
 3668     return success;
 3669 }
 3670 
 3671 /* Move slots between source and target nodes using MIGRATE.
 3672  *
 3673  * Options:
 3674  * CLUSTER_MANAGER_OPT_VERBOSE -- Print a dot for every moved key.
 3675  * CLUSTER_MANAGER_OPT_COLD    -- Move keys without opening slots /
 3676  *                                reconfiguring the nodes.
 3677  * CLUSTER_MANAGER_OPT_UPDATE  -- Update node->slots for source/target nodes.
 3678  * CLUSTER_MANAGER_OPT_QUIET   -- Don't print info messages.
 3679 */
 3680 static int clusterManagerMoveSlot(clusterManagerNode *source,
 3681                                   clusterManagerNode *target,
 3682                                   int slot, int opts,  char**err)
 3683 {
 3684     if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) {
 3685         printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip,
 3686                source->port, target->ip, target->port);
 3687         fflush(stdout);
 3688     }
 3689     if (err != NULL) *err = NULL;
 3690     int pipeline = config.cluster_manager_command.pipeline,
 3691         timeout = config.cluster_manager_command.timeout,
 3692         print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE),
 3693         option_cold = (opts & CLUSTER_MANAGER_OPT_COLD),
 3694         success = 1;
 3695     if (!option_cold) {
 3696         success = clusterManagerSetSlot(target, source, slot,
 3697                                         "importing", err);
 3698         if (!success) return 0;
 3699         success = clusterManagerSetSlot(source, target, slot,
 3700                                         "migrating", err);
 3701         if (!success) return 0;
 3702     }
 3703     success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout,
 3704                                               pipeline, print_dots, err);
 3705     if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) printf("\n");
 3706     if (!success) return 0;
 3707     /* Set the new node as the owner of the slot in all the known nodes. */
 3708     if (!option_cold) {
 3709         listIter li;
 3710         listNode *ln;
 3711         listRewind(cluster_manager.nodes, &li);
 3712         while ((ln = listNext(&li)) != NULL) {
 3713             clusterManagerNode *n = ln->value;
 3714             if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
 3715             redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER "
 3716                                                     "SETSLOT %d %s %s",
 3717                                                     slot, "node",
 3718                                                     target->name);
 3719             success = (r != NULL);
 3720             if (!success) return 0;
 3721             if (r->type == REDIS_REPLY_ERROR) {
 3722                 success = 0;
 3723                 if (err != NULL) {
 3724                     *err = zmalloc((r->len + 1) * sizeof(char));
 3725                     strcpy(*err, r->str);
 3726                     CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err);
 3727                 }
 3728             }
 3729             freeReplyObject(r);
 3730             if (!success) return 0;
 3731         }
 3732     }
 3733     /* Update the node logical config */
 3734     if (opts & CLUSTER_MANAGER_OPT_UPDATE) {
 3735         source->slots[slot] = 0;
 3736         target->slots[slot] = 1;
 3737     }
 3738     return 1;
 3739 }
 3740 
 3741 /* Flush the dirty node configuration by calling replicate for slaves or
 3742  * adding the slots defined in the masters. */
 3743 static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
 3744     if (!node->dirty) return 0;
 3745     redisReply *reply = NULL;
 3746     int is_err = 0, success = 1;
 3747     if (err != NULL) *err = NULL;
 3748     if (node->replicate != NULL) {
 3749         reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER REPLICATE %s",
 3750                                         node->replicate);
 3751         if (reply == NULL || (is_err = (reply->type == REDIS_REPLY_ERROR))) {
 3752             if (is_err && err != NULL) {
 3753                 *err = zmalloc((reply->len + 1) * sizeof(char));
 3754                 strcpy(*err, reply->str);
 3755             }
 3756             success = 0;
 3757             /* If the cluster did not already joined it is possible that
 3758              * the slave does not know the master node yet. So on errors
 3759              * we return ASAP leaving the dirty flag set, to flush the
 3760              * config later. */
 3761             goto cleanup;
 3762         }
 3763     } else {
 3764         int added = clusterManagerAddSlots(node, err);
 3765         if (!added || *err != NULL) success = 0;
 3766     }
 3767     node->dirty = 0;
 3768 cleanup:
 3769     if (reply != NULL) freeReplyObject(reply);
 3770     return success;
 3771 }
 3772 
 3773 /* Wait until the cluster configuration is consistent. */
 3774 static void clusterManagerWaitForClusterJoin(void) {
 3775     printf("Waiting for the cluster to join\n");
 3776     int counter = 0,
 3777         check_after = CLUSTER_JOIN_CHECK_AFTER +
 3778                       (int)(listLength(cluster_manager.nodes) * 0.15f);
 3779     while(!clusterManagerIsConfigConsistent()) {
 3780         printf(".");
 3781         fflush(stdout);
 3782         sleep(1);
 3783         if (++counter > check_after) {
 3784             dict *status = clusterManagerGetLinkStatus();
 3785             dictIterator *iter = NULL;
 3786             if (status != NULL && dictSize(status) > 0) {
 3787                 printf("\n");
 3788                 clusterManagerLogErr("Warning: %d node(s) may "
 3789                                      "be unreachable\n", dictSize(status));
 3790                 iter = dictGetIterator(status);
 3791                 dictEntry *entry;
 3792                 while ((entry = dictNext(iter)) != NULL) {
 3793                     sds nodeaddr = (sds) dictGetKey(entry);
 3794                     char *node_ip = NULL;
 3795                     int node_port = 0, node_bus_port = 0;
 3796                     list *from = (list *) dictGetVal(entry);
 3797                     if (parseClusterNodeAddress(nodeaddr, &node_ip,
 3798                         &node_port, &node_bus_port) && node_bus_port) {
 3799                         clusterManagerLogErr(" - The port %d of node %s may "
 3800                                              "be unreachable from:\n",
 3801                                              node_bus_port, node_ip);
 3802                     } else {
 3803                         clusterManagerLogErr(" - Node %s may be unreachable "
 3804                                              "from:\n", nodeaddr);
 3805                     }
 3806                     listIter li;
 3807                     listNode *ln;
 3808                     listRewind(from, &li);
 3809                     while ((ln = listNext(&li)) != NULL) {
 3810                         sds from_addr = ln->value;
 3811                         clusterManagerLogErr("   %s\n", from_addr);
 3812                         sdsfree(from_addr);
 3813                     }
 3814                     clusterManagerLogErr("Cluster bus ports must be reachable "
 3815                                          "by every node.\nRemember that "
 3816                                          "cluster bus ports are different "
 3817                                          "from standard instance ports.\n");
 3818                     listEmpty(from);
 3819                 }
 3820             }
 3821             if (iter != NULL) dictReleaseIterator(iter);
 3822             if (status != NULL) dictRelease(status);
 3823             counter = 0;
 3824         }
 3825     }
 3826     printf("\n");
 3827 }
 3828 
 3829 /* Load node's cluster configuration by calling "CLUSTER NODES" command.
 3830  * Node's configuration (name, replicate, slots, ...) is then updated.
 3831  * If CLUSTER_MANAGER_OPT_GETFRIENDS flag is set into 'opts' argument,
 3832  * and node already knows other nodes, the node's friends list is populated
 3833  * with the other nodes info. */
 3834 static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
 3835                                       char **err)
 3836 {
 3837     redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
 3838     int success = 1;
 3839     *err = NULL;
 3840     if (!clusterManagerCheckRedisReply(node, reply, err)) {
 3841         success = 0;
 3842         goto cleanup;
 3843     }
 3844     int getfriends = (opts & CLUSTER_MANAGER_OPT_GETFRIENDS);
 3845     char *lines = reply->str, *p, *line;
 3846     while ((p = strstr(lines, "\n")) != NULL) {
 3847         *p = '\0';
 3848         line = lines;
 3849         lines = p + 1;
 3850         char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL,
 3851              *ping_sent = NULL, *ping_recv = NULL, *config_epoch = NULL,
 3852              *link_status = NULL;
 3853         UNUSED(link_status);
 3854         int i = 0;
 3855         while ((p = strchr(line, ' ')) != NULL) {
 3856             *p = '\0';
 3857             char *token = line;
 3858             line = p + 1;
 3859             switch(i++){
 3860             case 0: name = token; break;
 3861             case 1: addr = token; break;
 3862             case 2: flags = token; break;
 3863             case 3: master_id = token; break;
 3864             case 4: ping_sent = token; break;
 3865             case 5: ping_recv = token; break;
 3866             case 6: config_epoch = token; break;
 3867             case 7: link_status = token; break;
 3868             }
 3869             if (i == 8) break; // Slots
 3870         }
 3871         if (!flags) {
 3872             success = 0;
 3873             goto cleanup;
 3874         }
 3875         int myself = (strstr(flags, "myself") != NULL);
 3876         clusterManagerNode *currentNode = NULL;
 3877         if (myself) {
 3878             node->flags |= CLUSTER_MANAGER_FLAG_MYSELF;
 3879             currentNode = node;
 3880             clusterManagerNodeResetSlots(node);
 3881             if (i == 8) {
 3882                 int remaining = strlen(line);
 3883                 while (remaining > 0) {
 3884                     p = strchr(line, ' ');
 3885                     if (p == NULL) p = line + remaining;
 3886                     remaining -= (p - line);
 3887 
 3888                     char *slotsdef = line;
 3889                     *p = '\0';
 3890                     if (remaining) {
 3891                         line = p + 1;
 3892                         remaining--;
 3893                     } else line = p;
 3894                     char *dash = NULL;
 3895                     if (slotsdef[0] == '[') {
 3896                         slotsdef++;
 3897                         if ((p = strstr(slotsdef, "->-"))) { // Migrating
 3898                             *p = '\0';
 3899                             p += 3;
 3900                             char *closing_bracket = strchr(p, ']');
 3901                             if (closing_bracket) *closing_bracket = '\0';
 3902                             sds slot = sdsnew(slotsdef);
 3903                             sds dst = sdsnew(p);
 3904                             node->migrating_count += 2;
 3905                             node->migrating = zrealloc(node->migrating,
 3906                                 (node->migrating_count * sizeof(sds)));
 3907                             node->migrating[node->migrating_count - 2] =
 3908                                 slot;
 3909                             node->migrating[node->migrating_count - 1] =
 3910                                 dst;
 3911                         }  else if ((p = strstr(slotsdef, "-<-"))) {//Importing
 3912                             *p = '\0';
 3913                             p += 3;
 3914                             char *closing_bracket = strchr(p, ']');
 3915                             if (closing_bracket) *closing_bracket = '\0';
 3916                             sds slot = sdsnew(slotsdef);
 3917                             sds src = sdsnew(p);
 3918                             node->importing_count += 2;
 3919                             node->importing = zrealloc(node->importing,
 3920                                 (node->importing_count * sizeof(sds)));
 3921                             node->importing[node->importing_count - 2] =
 3922                                 slot;
 3923                             node->importing[node->importing_count - 1] =
 3924                                 src;
 3925                         }
 3926                     } else if ((dash = strchr(slotsdef, '-')) != NULL) {
 3927                         p = dash;
 3928                         int start, stop;
 3929                         *p = '\0';
 3930                         start = atoi(slotsdef);
 3931                         stop = atoi(p + 1);
 3932                         node->slots_count += (stop - (start - 1));
 3933                         while (start <= stop) node->slots[start++] = 1;
 3934                     } else if (p > slotsdef) {
 3935                         node->slots[atoi(slotsdef)] = 1;
 3936                         node->slots_count++;
 3937                     }
 3938                 }
 3939             }
 3940             node->dirty = 0;
 3941         } else if (!getfriends) {
 3942             if (!(node->flags & CLUSTER_MANAGER_FLAG_MYSELF)) continue;
 3943             else break;
 3944         } else {
 3945             if (addr == NULL) {
 3946                 fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
 3947                 success = 0;
 3948                 goto cleanup;
 3949             }
 3950             char *c = strrchr(addr, '@');
 3951             if (c != NULL) *c = '\0';
 3952             c = strrchr(addr, ':');
 3953             if (c == NULL) {
 3954                 fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
 3955                 success = 0;
 3956                 goto cleanup;
 3957             }
 3958             *c = '\0';
 3959             int port = atoi(++c);
 3960             currentNode = clusterManagerNewNode(sdsnew(addr), port);
 3961             currentNode->flags |= CLUSTER_MANAGER_FLAG_FRIEND;
 3962             if (node->friends == NULL) node->friends = listCreate();
 3963             listAddNodeTail(node->friends, currentNode);
 3964         }
 3965         if (name != NULL) {
 3966             if (currentNode->name) sdsfree(currentNode->name);
 3967             currentNode->name = sdsnew(name);
 3968         }
 3969         if (currentNode->flags_str != NULL)
 3970             freeClusterManagerNodeFlags(currentNode->flags_str);
 3971         currentNode->flags_str = listCreate();
 3972         int flag_len;
 3973         while ((flag_len = strlen(flags)) > 0) {
 3974             sds flag = NULL;
 3975             char *fp = strchr(flags, ',');
 3976             if (fp) {
 3977                 *fp = '\0';
 3978                 flag = sdsnew(flags);
 3979                 flags = fp + 1;
 3980             } else {
 3981                 flag = sdsnew(flags);
 3982                 flags += flag_len;
 3983             }
 3984             if (strcmp(flag, "noaddr") == 0)
 3985                 currentNode->flags |= CLUSTER_MANAGER_FLAG_NOADDR;
 3986             else if (strcmp(flag, "disconnected") == 0)
 3987                 currentNode->flags |= CLUSTER_MANAGER_FLAG_DISCONNECT;
 3988             else if (strcmp(flag, "fail") == 0)
 3989                 currentNode->flags |= CLUSTER_MANAGER_FLAG_FAIL;
 3990             else if (strcmp(flag, "slave") == 0) {
 3991                 currentNode->flags |= CLUSTER_MANAGER_FLAG_SLAVE;
 3992                 if (master_id != NULL) {
 3993                     if (currentNode->replicate) sdsfree(currentNode->replicate);
 3994                     currentNode->replicate = sdsnew(master_id);
 3995                 }
 3996             }
 3997             listAddNodeTail(currentNode->flags_str, flag);
 3998         }
 3999         if (config_epoch != NULL)
 4000             currentNode->current_epoch = atoll(config_epoch);
 4001         if (ping_sent != NULL) currentNode->ping_sent = atoll(ping_sent);
 4002         if (ping_recv != NULL) currentNode->ping_recv = atoll(ping_recv);
 4003         if (!getfriends && myself) break;
 4004     }
 4005 cleanup:
 4006     if (reply) freeReplyObject(reply);
 4007     return success;
 4008 }
 4009 
 4010 /* Retrieves info about the cluster using argument 'node' as the starting
 4011  * point. All nodes will be loaded inside the cluster_manager.nodes list.
 4012  * Warning: if something goes wrong, it will free the starting node before
 4013  * returning 0. */
 4014 static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) {
 4015     if (node->context == NULL && !clusterManagerNodeConnect(node)) {
 4016         freeClusterManagerNode(node);
 4017         return 0;
 4018     }
 4019     opts |= CLUSTER_MANAGER_OPT_GETFRIENDS;
 4020     char *e = NULL;
 4021     if (!clusterManagerNodeIsCluster(node, &e)) {
 4022         clusterManagerPrintNotClusterNodeError(node, e);
 4023         if (e) zfree(e);
 4024         freeClusterManagerNode(node);
 4025         return 0;
 4026     }
 4027     e = NULL;
 4028     if (!clusterManagerNodeLoadInfo(node, opts, &e)) {
 4029         if (e) {
 4030             CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, e);
 4031             zfree(e);
 4032         }
 4033         freeClusterManagerNode(node);
 4034         return 0;
 4035     }
 4036     listIter li;
 4037     listNode *ln;
 4038     if (cluster_manager.nodes != NULL) {
 4039         listRewind(cluster_manager.nodes, &li);
 4040         while ((ln = listNext(&li)) != NULL)
 4041             freeClusterManagerNode((clusterManagerNode *) ln->value);
 4042         listRelease(cluster_manager.nodes);
 4043     }
 4044     cluster_manager.nodes = listCreate();
 4045     listAddNodeTail(cluster_manager.nodes, node);
 4046     if (node->friends != NULL) {
 4047         listRewind(node->friends, &li);
 4048         while ((ln = listNext(&li)) != NULL) {
 4049             clusterManagerNode *friend = ln->value;
 4050             if (!friend->ip || !friend->port) goto invalid_friend;
 4051             if (!friend->context && !clusterManagerNodeConnect(friend))
 4052                 goto invalid_friend;
 4053             e = NULL;
 4054             if (clusterManagerNodeLoadInfo(friend, 0, &e)) {
 4055                 if (friend->flags & (CLUSTER_MANAGER_FLAG_NOADDR |
 4056                                      CLUSTER_MANAGER_FLAG_DISCONNECT |
 4057                                      CLUSTER_MANAGER_FLAG_FAIL))
 4058                 {
 4059                     goto invalid_friend;
 4060                 }
 4061                 listAddNodeTail(cluster_manager.nodes, friend);
 4062             } else {
 4063                 clusterManagerLogErr("[ERR] Unable to load info for "
 4064                                      "node %s:%d\n",
 4065                                      friend->ip, friend->port);
 4066                 goto invalid_friend;
 4067             }
 4068             continue;
 4069 invalid_friend:
 4070             if (!(friend->flags & CLUSTER_MANAGER_FLAG_SLAVE))
 4071                 cluster_manager.unreachable_masters++;
 4072             freeClusterManagerNode(friend);
 4073         }
 4074         listRelease(node->friends);
 4075         node->friends = NULL;
 4076     }
 4077     // Count replicas for each node
 4078     listRewind(cluster_manager.nodes, &li);
 4079     while ((ln = listNext(&li)) != NULL) {
 4080         clusterManagerNode *n = ln->value;
 4081         if (n->replicate != NULL) {
 4082             clusterManagerNode *master = clusterManagerNodeByName(n->replicate);
 4083             if (master == NULL) {
 4084                 clusterManagerLogWarn("*** WARNING: %s:%d claims to be "
 4085                                       "slave of unknown node ID %s.\n",
 4086                                       n->ip, n->port, n->replicate);
 4087             } else master->replicas_count++;
 4088         }
 4089     }
 4090     return 1;
 4091 }
 4092 
 4093 /* Compare functions used by various sorting operations. */
 4094 int clusterManagerSlotCompare(const void *slot1, const void *slot2) {
 4095     const char **i1 = (const char **)slot1;
 4096     const char **i2 = (const char **)slot2;
 4097     return strcmp(*i1, *i2);
 4098 }
 4099 
 4100 int clusterManagerSlotCountCompareDesc(const void *n1, const void *n2) {
 4101     clusterManagerNode *node1 = *((clusterManagerNode **) n1);
 4102     clusterManagerNode *node2 = *((clusterManagerNode **) n2);
 4103     return node2->slots_count - node1->slots_count;
 4104 }
 4105 
 4106 int clusterManagerCompareNodeBalance(const void *n1, const void *n2) {
 4107     clusterManagerNode *node1 = *((clusterManagerNode **) n1);
 4108     clusterManagerNode *node2 = *((clusterManagerNode **) n2);
 4109     return node1->balance - node2->balance;
 4110 }
 4111 
 4112 static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
 4113     sds signature = NULL;
 4114     int node_count = 0, i = 0, name_len = 0;
 4115     char **node_configs = NULL;
 4116     redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
 4117     if (reply == NULL || reply->type == REDIS_REPLY_ERROR)
 4118         goto cleanup;
 4119     char *lines = reply->str, *p, *line;
 4120     while ((p = strstr(lines, "\n")) != NULL) {
 4121         i = 0;
 4122         *p = '\0';
 4123         line = lines;
 4124         lines = p + 1;
 4125         char *nodename = NULL;
 4126         int tot_size = 0;
 4127         while ((p = strchr(line, ' ')) != NULL) {
 4128             *p = '\0';
 4129             char *token = line;
 4130             line = p + 1;
 4131             if (i == 0) {
 4132                 nodename = token;
 4133                 tot_size = (p - token);
 4134                 name_len = tot_size++; // Make room for ':' in tot_size
 4135             }
 4136             if (++i == 8) break;
 4137         }
 4138         if (i != 8) continue;
 4139         if (nodename == NULL) continue;
 4140         int remaining = strlen(line);
 4141         if (remaining == 0) continue;
 4142         char **slots = NULL;
 4143         int c = 0;
 4144         while (remaining > 0) {
 4145             p = strchr(line, ' ');
 4146             if (p == NULL) p = line + remaining;
 4147             int size = (p - line);
 4148             remaining -= size;
 4149             tot_size += size;
 4150             char *slotsdef = line;
 4151             *p = '\0';
 4152             if (remaining) {
 4153                 line = p + 1;
 4154                 remaining--;
 4155             } else line = p;
 4156             if (slotsdef[0] != '[') {
 4157                 c++;
 4158                 slots = zrealloc(slots, (c * sizeof(char *)));
 4159                 slots[c - 1] = slotsdef;
 4160             }
 4161         }
 4162         if (c > 0) {
 4163             if (c > 1)
 4164                 qsort(slots, c, sizeof(char *), clusterManagerSlotCompare);
 4165             node_count++;
 4166             node_configs =
 4167                 zrealloc(node_configs, (node_count * sizeof(char *)));
 4168             /* Make room for '|' separators. */
 4169             tot_size += (sizeof(char) * (c - 1));
 4170             char *cfg = zmalloc((sizeof(char) * tot_size) + 1);
 4171             memcpy(cfg, nodename, name_len);
 4172             char *sp = cfg + name_len;
 4173             *(sp++) = ':';
 4174             for (i = 0; i < c; i++) {
 4175                 if (i > 0) *(sp++) = ',';
 4176                 int slen = strlen(slots[i]);
 4177                 memcpy(sp, slots[i], slen);
 4178                 sp += slen;
 4179             }
 4180             *(sp++) = '\0';
 4181             node_configs[node_count - 1] = cfg;
 4182         }
 4183         zfree(slots);
 4184     }
 4185     if (node_count > 0) {
 4186         if (node_count > 1) {
 4187             qsort(node_configs, node_count, sizeof(char *),
 4188                   clusterManagerSlotCompare);
 4189         }
 4190         signature = sdsempty();
 4191         for (i = 0; i < node_count; i++) {
 4192             if (i > 0) signature = sdscatprintf(signature, "%c", '|');
 4193             signature = sdscatfmt(signature, "%s", node_configs[i]);
 4194         }
 4195     }
 4196 cleanup:
 4197     if (reply != NULL) freeReplyObject(reply);
 4198     if (node_configs != NULL) {
 4199         for (i = 0; i < node_count; i++) zfree(node_configs[i]);
 4200         zfree(node_configs);
 4201     }
 4202     return signature;
 4203 }
 4204 
 4205 static int clusterManagerIsConfigConsistent(void) {
 4206     if (cluster_manager.nodes == NULL) return 0;
 4207     int consistent = (listLength(cluster_manager.nodes) <= 1);
 4208     // If the Cluster has only one node, it's always consistent
 4209     if (consistent) return 1;
 4210     sds first_cfg = NULL;
 4211     listIter li;
 4212     listNode *ln;
 4213     listRewind(cluster_manager.nodes, &li);
 4214     while ((ln = listNext(&li)) != NULL) {
 4215         clusterManagerNode *node = ln->value;
 4216         sds cfg = clusterManagerGetConfigSignature(node);
 4217         if (cfg == NULL) {
 4218             consistent = 0;
 4219             break;
 4220         }
 4221         if (first_cfg == NULL) first_cfg = cfg;
 4222         else {
 4223             consistent = !sdscmp(first_cfg, cfg);
 4224             sdsfree(cfg);
 4225             if (!consistent) break;
 4226         }
 4227     }
 4228     if (first_cfg != NULL) sdsfree(first_cfg);
 4229     return consistent;
 4230 }
 4231 
 4232 static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) {
 4233     list *links = NULL;
 4234     redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
 4235     if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup;
 4236     links = listCreate();
 4237     char *lines = reply->str, *p, *line;
 4238     while ((p = strstr(lines, "\n")) != NULL) {
 4239         int i = 0;
 4240         *p = '\0';
 4241         line = lines;
 4242         lines = p + 1;
 4243         char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL;
 4244         while ((p = strchr(line, ' ')) != NULL) {
 4245             *p = '\0';
 4246             char *token = line;
 4247             line = p + 1;
 4248             if (i == 0) nodename = token;
 4249             else if (i == 1) addr = token;
 4250             else if (i == 2) flags = token;
 4251             else if (i == 7) link_status = token;
 4252             else if (i == 8) break;
 4253             i++;
 4254         }
 4255         if (i == 7) link_status = line;
 4256         if (nodename == NULL || addr == NULL || flags == NULL ||
 4257             link_status == NULL) continue;
 4258         if (strstr(flags, "myself") != NULL) continue;
 4259         int disconnected = ((strstr(flags, "disconnected") != NULL) ||
 4260                             (strstr(link_status, "disconnected")));
 4261         int handshaking = (strstr(flags, "handshake") != NULL);
 4262         if (disconnected || handshaking) {
 4263             clusterManagerLink *link = zmalloc(sizeof(*link));
 4264             link->node_name = sdsnew(nodename);
 4265             link->node_addr = sdsnew(addr);
 4266             link->connected = 0;
 4267             link->handshaking = handshaking;
 4268             listAddNodeTail(links, link);
 4269         }
 4270     }
 4271 cleanup:
 4272     if (reply != NULL) freeReplyObject(reply);
 4273     return links;
 4274 }
 4275 
 4276 /* Check for disconnected cluster links. It returns a dict whose keys
 4277  * are the unreachable node addresses and the values are lists of
 4278  * node addresses that cannot reach the unreachable node. */
 4279 static dict *clusterManagerGetLinkStatus(void) {
 4280     if (cluster_manager.nodes == NULL) return NULL;
 4281     dict *status = dictCreate(&clusterManagerLinkDictType, NULL);
 4282     listIter li;
 4283     listNode *ln;
 4284     listRewind(cluster_manager.nodes, &li);
 4285     while ((ln = listNext(&li)) != NULL) {
 4286         clusterManagerNode *node = ln->value;
 4287         list *links = clusterManagerGetDisconnectedLinks(node);
 4288         if (links) {
 4289             listIter lli;
 4290             listNode *lln;
 4291             listRewind(links, &lli);
 4292             while ((lln = listNext(&lli)) != NULL) {
 4293                 clusterManagerLink *link = lln->value;
 4294                 list *from = NULL;
 4295                 dictEntry *entry = dictFind(status, link->node_addr);
 4296                 if (entry) from = dictGetVal(entry);
 4297                 else {
 4298                     from = listCreate();
 4299                     dictAdd(status, sdsdup(link->node_addr), from);
 4300                 }
 4301                 sds myaddr = sdsempty();
 4302                 myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port);
 4303                 listAddNodeTail(from, myaddr);
 4304                 sdsfree(link->node_name);
 4305                 sdsfree(link->node_addr);
 4306                 zfree(link);
 4307             }
 4308             listRelease(links);
 4309         }
 4310     }
 4311     return status;
 4312 }
 4313 
 4314 /* Add the error string to cluster_manager.errors and print it. */
 4315 static void clusterManagerOnError(sds err) {
 4316     if (cluster_manager.errors == NULL)
 4317         cluster_manager.errors = listCreate();
 4318     listAddNodeTail(cluster_manager.errors, err);
 4319     clusterManagerLogErr("%s\n", (char *) err);
 4320 }
 4321 
 4322 /* Check the slots coverage of the cluster. The 'all_slots' argument must be
 4323  * and array of 16384 bytes. Every covered slot will be set to 1 in the
 4324  * 'all_slots' array. The function returns the total number if covered slots.*/
 4325 static int clusterManagerGetCoveredSlots(char *all_slots) {
 4326     if (cluster_manager.nodes == NULL) return 0;
 4327     listIter li;
 4328     listNode *ln;
 4329     listRewind(cluster_manager.nodes, &li);
 4330     int totslots = 0, i;
 4331     while ((ln = listNext(&li)) != NULL) {
 4332         clusterManagerNode *node = ln->value;
 4333         for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
 4334             if (node->slots[i] && !all_slots[i]) {
 4335                 all_slots[i] = 1;
 4336                 totslots++;
 4337             }
 4338         }
 4339     }
 4340     return totslots;
 4341 }
 4342 
 4343 static void clusterManagerPrintSlotsList(list *slots) {
 4344     clusterManagerNode n = {0};
 4345     listIter li;
 4346     listNode *ln;
 4347     listRewind(slots, &li);
 4348     while ((ln = listNext(&li)) != NULL) {
 4349         int slot = atoi(ln->value);
 4350         if (slot >= 0 && slot < CLUSTER_MANAGER_SLOTS)
 4351             n.slots[slot] = 1;
 4352     }
 4353     sds nodeslist = clusterManagerNodeSlotsString(&n);
 4354     printf("%s\n", nodeslist);
 4355     sdsfree(nodeslist);
 4356 }
 4357 
 4358 /* Return the node, among 'nodes' with the greatest number of keys
 4359  * in the specified slot. */
 4360 static clusterManagerNode * clusterManagerGetNodeWithMostKeysInSlot(list *nodes,
 4361                                                                     int slot,
 4362                                                                     char **err)
 4363 {
 4364     clusterManagerNode *node = NULL;
 4365     int numkeys = 0;
 4366     listIter li;
 4367     listNode *ln;
 4368     listRewind(nodes, &li);
 4369     if (err) *err = NULL;
 4370     while ((ln = listNext(&li)) != NULL) {
 4371         clusterManagerNode *n = ln->value;
 4372         if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
 4373             continue;
 4374         redisReply *r =
 4375             CLUSTER_MANAGER_COMMAND(n, "CLUSTER COUNTKEYSINSLOT %d", slot);
 4376         int success = clusterManagerCheckRedisReply(n, r, err);
 4377         if (success) {
 4378             if (r->integer > numkeys || node == NULL) {
 4379                 numkeys = r->integer;
 4380                 node = n;
 4381             }
 4382         }
 4383         if (r != NULL) freeReplyObject(r);
 4384         /* If the reply contains errors */
 4385         if (!success) {
 4386             if (err != NULL && *err != NULL)
 4387                 CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err);
 4388             node = NULL;
 4389             break;
 4390         }
 4391     }
 4392     return node;
 4393 }
 4394 
 4395 /* This function returns the master that has the least number of replicas
 4396  * in the cluster. If there are multiple masters with the same smaller
 4397  * number of replicas, one at random is returned. */
 4398 
 4399 static clusterManagerNode *clusterManagerNodeWithLeastReplicas() {
 4400     clusterManagerNode *node = NULL;
 4401     int lowest_count = 0;
 4402     listIter li;
 4403     listNode *ln;
 4404     listRewind(cluster_manager.nodes, &li);
 4405     while ((ln = listNext(&li)) != NULL) {
 4406         clusterManagerNode *n = ln->value;
 4407         if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
 4408         if (node == NULL || n->replicas_count < lowest_count) {
 4409             node = n;
 4410             lowest_count = n->replicas_count;
 4411         }
 4412     }
 4413     return node;
 4414 }
 4415 
 4416 /* This function returns a random master node, return NULL if none */
 4417 
 4418 static clusterManagerNode *clusterManagerNodeMasterRandom() {
 4419     int master_count = 0;
 4420     int idx;
 4421     listIter li;
 4422     listNode *ln;
 4423     listRewind(cluster_manager.nodes, &li);
 4424     while ((ln = listNext(&li)) != NULL) {
 4425         clusterManagerNode *n = ln->value;
 4426         if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
 4427         master_count++;
 4428     }
 4429 
 4430     srand(time(NULL));
 4431     idx = rand() % master_count;
 4432     listRewind(cluster_manager.nodes, &li);
 4433     while ((ln = listNext(&li)) != NULL) {
 4434         clusterManagerNode *n = ln->value;
 4435         if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
 4436         if (!idx--) {
 4437             return n;
 4438         }
 4439     }
 4440     /* Can not be reached */
 4441     return NULL;
 4442 }
 4443 
 4444 static int clusterManagerFixSlotsCoverage(char *all_slots) {
 4445     int force_fix = config.cluster_manager_command.flags &
 4446                     CLUSTER_MANAGER_CMD_FLAG_FIX_WITH_UNREACHABLE_MASTERS;
 4447 
 4448     if (cluster_manager.unreachable_masters > 0 && !force_fix) {
 4449         clusterManagerLogWarn("*** Fixing slots coverage with %d unreachable masters is dangerous: redis-cli will assume that slots about masters that are not reachable are not covered, and will try to reassign them to the reachable nodes. This can cause data loss and is rarely what you want to do. If you really want to proceed use the --cluster-fix-with-unreachable-masters option.\n", cluster_manager.unreachable_masters);
 4450         exit(1);
 4451     }
 4452 
 4453     int i, fixed = 0;
 4454     list *none = NULL, *single = NULL, *multi = NULL;
 4455     clusterManagerLogInfo(">>> Fixing slots coverage...\n");
 4456     for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
 4457         int covered = all_slots[i];
 4458         if (!covered) {
 4459             sds slot = sdsfromlonglong((long long) i);
 4460             list *slot_nodes = listCreate();
 4461             sds slot_nodes_str = sdsempty();
 4462             listIter li;
 4463             listNode *ln;
 4464             listRewind(cluster_manager.nodes, &li);
 4465             while ((ln = listNext(&li)) != NULL) {
 4466                 clusterManagerNode *n = ln->value;
 4467                 if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
 4468                     continue;
 4469                 redisReply *reply = CLUSTER_MANAGER_COMMAND(n,
 4470                     "CLUSTER GETKEYSINSLOT %d %d", i, 1);
 4471                 if (!clusterManagerCheckRedisReply(n, reply, NULL)) {
 4472                     fixed = -1;
 4473                     if (reply) freeReplyObject(reply);
 4474                     goto cleanup;
 4475                 }
 4476                 assert(reply->type == REDIS_REPLY_ARRAY);
 4477                 if (reply->elements > 0) {
 4478                     listAddNodeTail(slot_nodes, n);
 4479                     if (listLength(slot_nodes) > 1)
 4480                         slot_nodes_str = sdscat(slot_nodes_str, ", ");
 4481                     slot_nodes_str = sdscatfmt(slot_nodes_str,
 4482                                                "%s:%u", n->ip, n->port);
 4483                 }
 4484                 freeReplyObject(reply);
 4485             }
 4486             sdsfree(slot_nodes_str);
 4487             dictAdd(clusterManagerUncoveredSlots, slot, slot_nodes);
 4488         }
 4489     }
 4490 
 4491     /* For every slot, take action depending on the actual condition:
 4492      * 1) No node has keys for this slot.
 4493      * 2) A single node has keys for this slot.
 4494      * 3) Multiple nodes have keys for this slot. */
 4495     none = listCreate();
 4496     single = listCreate();
 4497     multi = listCreate();
 4498     dictIterator *iter = dictGetIterator(clusterManagerUncoveredSlots);
 4499     dictEntry *entry;
 4500     while ((entry = dictNext(iter)) != NULL) {
 4501         sds slot = (sds) dictGetKey(entry);
 4502         list *nodes = (list *) dictGetVal(entry);
 4503         switch (listLength(nodes)){
 4504         case 0: listAddNodeTail(none, slot); break;
 4505         case 1: listAddNodeTail(single, slot); break;
 4506         default: listAddNodeTail(multi, slot); break;
 4507         }
 4508     }
 4509     dictReleaseIterator(iter);
 4510 
 4511     /* we want explicit manual confirmation from users for all the fix cases */
 4512     int ignore_force = 1;
 4513 
 4514     /*  Handle case "1": keys in no node. */
 4515     if (listLength(none) > 0) {
 4516         printf("The following uncovered slots have no keys "
 4517                "across the cluster:\n");
 4518         clusterManagerPrintSlotsList(none);
 4519         if (confirmWithYes("Fix these slots by covering with a random node?",
 4520                            ignore_force)) {
 4521             listIter li;
 4522             listNode *ln;
 4523             listRewind(none, &li);
 4524             while ((ln = listNext(&li)) != NULL) {
 4525                 sds slot = ln->value;
 4526                 int s = atoi(slot);
 4527                 clusterManagerNode *n = clusterManagerNodeMasterRandom();
 4528                 clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
 4529                                       slot, n->ip, n->port);
 4530                 if (!clusterManagerSetSlotOwner(n, s, 0)) {
 4531                     fixed = -1;
 4532                     goto cleanup;
 4533                 }
 4534                 /* Since CLUSTER ADDSLOTS succeeded, we also update the slot
 4535                  * info into the node struct, in order to keep it synced */
 4536                 n->slots[s] = 1;
 4537                 fixed++;
 4538             }
 4539         }
 4540     }
 4541 
 4542     /*  Handle case "2": keys only in one node. */
 4543     if (listLength(single) > 0) {
 4544         printf("The following uncovered slots have keys in just one node:\n");
 4545         clusterManagerPrintSlotsList(single);
 4546         if (confirmWithYes("Fix these slots by covering with those nodes?",
 4547                            ignore_force)) {
 4548             listIter li;
 4549             listNode *ln;
 4550             listRewind(single, &li);
 4551             while ((ln = listNext(&li)) != NULL) {
 4552                 sds slot