"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.0.8/src/sentinel.c" (10 Sep 2020, 180788 Bytes) of package /linux/misc/redis-6.0.8.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "sentinel.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.7_vs_6.0.8.

    1 /* Redis Sentinel implementation
    2  *
    3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
    4  * All rights reserved.
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions are met:
    8  *
    9  *   * Redistributions of source code must retain the above copyright notice,
   10  *     this list of conditions and the following disclaimer.
   11  *   * Redistributions in binary form must reproduce the above copyright
   12  *     notice, this list of conditions and the following disclaimer in the
   13  *     documentation and/or other materials provided with the distribution.
   14  *   * Neither the name of Redis nor the names of its contributors may be used
   15  *     to endorse or promote products derived from this software without
   16  *     specific prior written permission.
   17  *
   18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   28  * POSSIBILITY OF SUCH DAMAGE.
   29  */
   30 
   31 #include "server.h"
   32 #include "hiredis.h"
   33 #ifdef USE_OPENSSL
   34 #include "openssl/ssl.h"
   35 #include "hiredis_ssl.h"
   36 #endif
   37 #include "async.h"
   38 
   39 #include <ctype.h>
   40 #include <arpa/inet.h>
   41 #include <sys/socket.h>
   42 #include <sys/wait.h>
   43 #include <fcntl.h>
   44 
   45 extern char **environ;
   46 
   47 #ifdef USE_OPENSSL
   48 extern SSL_CTX *redis_tls_ctx;
   49 #endif
   50 
   51 #define REDIS_SENTINEL_PORT 26379
   52 
   53 /* ======================== Sentinel global state =========================== */
   54 
   55 /* Address object, used to describe an ip:port pair. */
   56 typedef struct sentinelAddr {
   57     char *ip;
   58     int port;
   59 } sentinelAddr;
   60 
   61 /* A Sentinel Redis Instance object is monitoring. */
   62 #define SRI_MASTER  (1<<0)
   63 #define SRI_SLAVE   (1<<1)
   64 #define SRI_SENTINEL (1<<2)
   65 #define SRI_S_DOWN (1<<3)   /* Subjectively down (no quorum). */
   66 #define SRI_O_DOWN (1<<4)   /* Objectively down (confirmed by others). */
   67 #define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
   68                                    its master is down. */
   69 #define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
   70                                            this master. */
   71 #define SRI_PROMOTED (1<<7)            /* Slave selected for promotion. */
   72 #define SRI_RECONF_SENT (1<<8)     /* SLAVEOF <newmaster> sent. */
   73 #define SRI_RECONF_INPROG (1<<9)   /* Slave synchronization in progress. */
   74 #define SRI_RECONF_DONE (1<<10)     /* Slave synchronized with new master. */
   75 #define SRI_FORCE_FAILOVER (1<<11)  /* Force failover with master up. */
   76 #define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
   77 
   78 /* Note: times are in milliseconds. */
   79 #define SENTINEL_INFO_PERIOD 10000
   80 #define SENTINEL_PING_PERIOD 1000
   81 #define SENTINEL_ASK_PERIOD 1000
   82 #define SENTINEL_PUBLISH_PERIOD 2000
   83 #define SENTINEL_DEFAULT_DOWN_AFTER 30000
   84 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
   85 #define SENTINEL_TILT_TRIGGER 2000
   86 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
   87 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
   88 #define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
   89 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
   90 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
   91 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
   92 #define SENTINEL_MAX_PENDING_COMMANDS 100
   93 #define SENTINEL_ELECTION_TIMEOUT 10000
   94 #define SENTINEL_MAX_DESYNC 1000
   95 #define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1
   96 
   97 /* Failover machine different states. */
   98 #define SENTINEL_FAILOVER_STATE_NONE 0  /* No failover in progress. */
   99 #define SENTINEL_FAILOVER_STATE_WAIT_START 1  /* Wait for failover_start_time*/
  100 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
  101 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
  102 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
  103 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
  104 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
  105 
  106 #define SENTINEL_MASTER_LINK_STATUS_UP 0
  107 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
  108 
  109 /* Generic flags that can be used with different functions.
  110  * They use higher bits to avoid colliding with the function specific
  111  * flags. */
  112 #define SENTINEL_NO_FLAGS 0
  113 #define SENTINEL_GENERATE_EVENT (1<<16)
  114 #define SENTINEL_LEADER (1<<17)
  115 #define SENTINEL_OBSERVER (1<<18)
  116 
  117 /* Script execution flags and limits. */
  118 #define SENTINEL_SCRIPT_NONE 0
  119 #define SENTINEL_SCRIPT_RUNNING 1
  120 #define SENTINEL_SCRIPT_MAX_QUEUE 256
  121 #define SENTINEL_SCRIPT_MAX_RUNNING 16
  122 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
  123 #define SENTINEL_SCRIPT_MAX_RETRY 10
  124 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
  125 
  126 /* SENTINEL SIMULATE-FAILURE command flags. */
  127 #define SENTINEL_SIMFAILURE_NONE 0
  128 #define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
  129 #define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)
  130 
  131 /* The link to a sentinelRedisInstance. When we have the same set of Sentinels
  132  * monitoring many masters, we have different instances representing the
  133  * same Sentinels, one per master, and we need to share the hiredis connections
  134  * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
  135  * 500 outgoing connections instead of 5.
  136  *
  137  * So this structure represents a reference counted link in terms of the two
  138  * hiredis connections for commands and Pub/Sub, and the fields needed for
  139  * failure detection, since the ping/pong time are now local to the link: if
  140  * the link is available, the instance is avaialbe. This way we don't just
  141  * have 5 connections instead of 500, we also send 5 pings instead of 500.
  142  *
  143  * Links are shared only for Sentinels: master and slave instances have
  144  * a link with refcount = 1, always. */
  145 typedef struct instanceLink {
  146     int refcount;          /* Number of sentinelRedisInstance owners. */
  147     int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
  148     int pending_commands;  /* Number of commands sent waiting for a reply. */
  149     redisAsyncContext *cc; /* Hiredis context for commands. */
  150     redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
  151     mstime_t cc_conn_time; /* cc connection time. */
  152     mstime_t pc_conn_time; /* pc connection time. */
  153     mstime_t pc_last_activity; /* Last time we received any message. */
  154     mstime_t last_avail_time; /* Last time the instance replied to ping with
  155                                  a reply we consider valid. */
  156     mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
  157                                  received after it) was sent. This field is
  158                                  set to 0 when a pong is received, and set again
  159                                  to the current time if the value is 0 and a new
  160                                  ping is sent. */
  161     mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
  162                                  only used to avoid sending too many pings
  163                                  during failure. Idle time is computed using
  164                                  the act_ping_time field. */
  165     mstime_t last_pong_time;  /* Last time the instance replied to ping,
  166                                  whatever the reply was. That's used to check
  167                                  if the link is idle and must be reconnected. */
  168     mstime_t last_reconn_time;  /* Last reconnection attempt performed when
  169                                    the link was down. */
  170 } instanceLink;
  171 
  172 typedef struct sentinelRedisInstance {
  173     int flags;      /* See SRI_... defines */
  174     char *name;     /* Master name from the point of view of this sentinel. */
  175     char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
  176     uint64_t config_epoch;  /* Configuration epoch. */
  177     sentinelAddr *addr; /* Master host. */
  178     instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
  179     mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
  180     mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
  181                                  we received a hello from this Sentinel
  182                                  via Pub/Sub. */
  183     mstime_t last_master_down_reply_time; /* Time of last reply to
  184                                              SENTINEL is-master-down command. */
  185     mstime_t s_down_since_time; /* Subjectively down since time. */
  186     mstime_t o_down_since_time; /* Objectively down since time. */
  187     mstime_t down_after_period; /* Consider it down after that period. */
  188     mstime_t info_refresh;  /* Time at which we received INFO output from it. */
  189     dict *renamed_commands;     /* Commands renamed in this instance:
  190                                    Sentinel will use the alternative commands
  191                                    mapped on this table to send things like
  192                                    SLAVEOF, CONFING, INFO, ... */
  193 
  194     /* Role and the first time we observed it.
  195      * This is useful in order to delay replacing what the instance reports
  196      * with our own configuration. We need to always wait some time in order
  197      * to give a chance to the leader to report the new configuration before
  198      * we do silly things. */
  199     int role_reported;
  200     mstime_t role_reported_time;
  201     mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
  202 
  203     /* Master specific. */
  204     dict *sentinels;    /* Other sentinels monitoring the same master. */
  205     dict *slaves;       /* Slaves for this master instance. */
  206     unsigned int quorum;/* Number of sentinels that need to agree on failure. */
  207     int parallel_syncs; /* How many slaves to reconfigure at same time. */
  208     char *auth_pass;    /* Password to use for AUTH against master & replica. */
  209     char *auth_user;    /* Username for ACLs AUTH against master & replica. */
  210 
  211     /* Slave specific. */
  212     mstime_t master_link_down_time; /* Slave replication link down time. */
  213     int slave_priority; /* Slave priority according to its INFO output. */
  214     mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
  215     struct sentinelRedisInstance *master; /* Master instance if it's slave. */
  216     char *slave_master_host;    /* Master host as reported by INFO */
  217     int slave_master_port;      /* Master port as reported by INFO */
  218     int slave_master_link_status; /* Master link status as reported by INFO */
  219     unsigned long long slave_repl_offset; /* Slave replication offset. */
  220     /* Failover */
  221     char *leader;       /* If this is a master instance, this is the runid of
  222                            the Sentinel that should perform the failover. If
  223                            this is a Sentinel, this is the runid of the Sentinel
  224                            that this Sentinel voted as leader. */
  225     uint64_t leader_epoch; /* Epoch of the 'leader' field. */
  226     uint64_t failover_epoch; /* Epoch of the currently started failover. */
  227     int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
  228     mstime_t failover_state_change_time;
  229     mstime_t failover_start_time;   /* Last failover attempt start time. */
  230     mstime_t failover_timeout;      /* Max time to refresh failover state. */
  231     mstime_t failover_delay_logged; /* For what failover_start_time value we
  232                                        logged the failover delay. */
  233     struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
  234     /* Scripts executed to notify admin or reconfigure clients: when they
  235      * are set to NULL no script is executed. */
  236     char *notification_script;
  237     char *client_reconfig_script;
  238     sds info; /* cached INFO output */
  239 } sentinelRedisInstance;
  240 
  241 /* Main state. */
  242 struct sentinelState {
  243     char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
  244     uint64_t current_epoch;         /* Current epoch. */
  245     dict *masters;      /* Dictionary of master sentinelRedisInstances.
  246                            Key is the instance name, value is the
  247                            sentinelRedisInstance structure pointer. */
  248     int tilt;           /* Are we in TILT mode? */
  249     int running_scripts;    /* Number of scripts in execution right now. */
  250     mstime_t tilt_start_time;       /* When TITL started. */
  251     mstime_t previous_time;         /* Last time we ran the time handler. */
  252     list *scripts_queue;            /* Queue of user scripts to execute. */
  253     char *announce_ip;  /* IP addr that is gossiped to other sentinels if
  254                            not NULL. */
  255     int announce_port;  /* Port that is gossiped to other sentinels if
  256                            non zero. */
  257     unsigned long simfailure_flags; /* Failures simulation. */
  258     int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
  259                                   paths at runtime? */
  260 } sentinel;
  261 
  262 /* A script execution job. */
  263 typedef struct sentinelScriptJob {
  264     int flags;              /* Script job flags: SENTINEL_SCRIPT_* */
  265     int retry_num;          /* Number of times we tried to execute it. */
  266     char **argv;            /* Arguments to call the script. */
  267     mstime_t start_time;    /* Script execution time if the script is running,
  268                                otherwise 0 if we are allowed to retry the
  269                                execution at any time. If the script is not
  270                                running and it's not 0, it means: do not run
  271                                before the specified time. */
  272     pid_t pid;              /* Script execution pid. */
  273 } sentinelScriptJob;
  274 
  275 /* ======================= hiredis ae.c adapters =============================
  276  * Note: this implementation is taken from hiredis/adapters/ae.h, however
  277  * we have our modified copy for Sentinel in order to use our allocator
  278  * and to have full control over how the adapter works. */
  279 
  280 typedef struct redisAeEvents {
  281     redisAsyncContext *context;
  282     aeEventLoop *loop;
  283     int fd;
  284     int reading, writing;
  285 } redisAeEvents;
  286 
  287 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
  288     ((void)el); ((void)fd); ((void)mask);
  289 
  290     redisAeEvents *e = (redisAeEvents*)privdata;
  291     redisAsyncHandleRead(e->context);
  292 }
  293 
  294 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
  295     ((void)el); ((void)fd); ((void)mask);
  296 
  297     redisAeEvents *e = (redisAeEvents*)privdata;
  298     redisAsyncHandleWrite(e->context);
  299 }
  300 
  301 static void redisAeAddRead(void *privdata) {
  302     redisAeEvents *e = (redisAeEvents*)privdata;
  303     aeEventLoop *loop = e->loop;
  304     if (!e->reading) {
  305         e->reading = 1;
  306         aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
  307     }
  308 }
  309 
  310 static void redisAeDelRead(void *privdata) {
  311     redisAeEvents *e = (redisAeEvents*)privdata;
  312     aeEventLoop *loop = e->loop;
  313     if (e->reading) {
  314         e->reading = 0;
  315         aeDeleteFileEvent(loop,e->fd,AE_READABLE);
  316     }
  317 }
  318 
  319 static void redisAeAddWrite(void *privdata) {
  320     redisAeEvents *e = (redisAeEvents*)privdata;
  321     aeEventLoop *loop = e->loop;
  322     if (!e->writing) {
  323         e->writing = 1;
  324         aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
  325     }
  326 }
  327 
  328 static void redisAeDelWrite(void *privdata) {
  329     redisAeEvents *e = (redisAeEvents*)privdata;
  330     aeEventLoop *loop = e->loop;
  331     if (e->writing) {
  332         e->writing = 0;
  333         aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
  334     }
  335 }
  336 
  337 static void redisAeCleanup(void *privdata) {
  338     redisAeEvents *e = (redisAeEvents*)privdata;
  339     redisAeDelRead(privdata);
  340     redisAeDelWrite(privdata);
  341     zfree(e);
  342 }
  343 
  344 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
  345     redisContext *c = &(ac->c);
  346     redisAeEvents *e;
  347 
  348     /* Nothing should be attached when something is already attached */
  349     if (ac->ev.data != NULL)
  350         return C_ERR;
  351 
  352     /* Create container for context and r/w events */
  353     e = (redisAeEvents*)zmalloc(sizeof(*e));
  354     e->context = ac;
  355     e->loop = loop;
  356     e->fd = c->fd;
  357     e->reading = e->writing = 0;
  358 
  359     /* Register functions to start/stop listening for events */
  360     ac->ev.addRead = redisAeAddRead;
  361     ac->ev.delRead = redisAeDelRead;
  362     ac->ev.addWrite = redisAeAddWrite;
  363     ac->ev.delWrite = redisAeDelWrite;
  364     ac->ev.cleanup = redisAeCleanup;
  365     ac->ev.data = e;
  366 
  367     return C_OK;
  368 }
  369 
  370 /* ============================= Prototypes ================================= */
  371 
  372 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
  373 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
  374 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
  375 sentinelRedisInstance *sentinelGetMasterByName(char *name);
  376 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
  377 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
  378 int yesnotoi(char *s);
  379 void instanceLinkConnectionError(const redisAsyncContext *c);
  380 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
  381 void sentinelAbortFailover(sentinelRedisInstance *ri);
  382 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
  383 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
  384 void sentinelScheduleScriptExecution(char *path, ...);
  385 void sentinelStartFailover(sentinelRedisInstance *master);
  386 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
  387 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
  388 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
  389 void sentinelFlushConfig(void);
  390 void sentinelGenerateInitialMonitorEvents(void);
  391 int sentinelSendPing(sentinelRedisInstance *ri);
  392 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
  393 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
  394 void sentinelSimFailureCrash(void);
  395 
  396 /* ========================= Dictionary types =============================== */
  397 
  398 uint64_t dictSdsHash(const void *key);
  399 uint64_t dictSdsCaseHash(const void *key);
  400 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
  401 int dictSdsKeyCaseCompare(void *privdata, const void *key1, const void *key2);
  402 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
  403 
  404 void dictInstancesValDestructor (void *privdata, void *obj) {
  405     UNUSED(privdata);
  406     releaseSentinelRedisInstance(obj);
  407 }
  408 
  409 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
  410  *
  411  * also used for: sentinelRedisInstance->sentinels dictionary that maps
  412  * sentinels ip:port to last seen time in Pub/Sub hello message. */
  413 dictType instancesDictType = {
  414     dictSdsHash,               /* hash function */
  415     NULL,                      /* key dup */
  416     NULL,                      /* val dup */
  417     dictSdsKeyCompare,         /* key compare */
  418     NULL,                      /* key destructor */
  419     dictInstancesValDestructor /* val destructor */
  420 };
  421 
  422 /* Instance runid (sds) -> votes (long casted to void*)
  423  *
  424  * This is useful into sentinelGetObjectiveLeader() function in order to
  425  * count the votes and understand who is the leader. */
  426 dictType leaderVotesDictType = {
  427     dictSdsHash,               /* hash function */
  428     NULL,                      /* key dup */
  429     NULL,                      /* val dup */
  430     dictSdsKeyCompare,         /* key compare */
  431     NULL,                      /* key destructor */
  432     NULL                       /* val destructor */
  433 };
  434 
  435 /* Instance renamed commands table. */
  436 dictType renamedCommandsDictType = {
  437     dictSdsCaseHash,           /* hash function */
  438     NULL,                      /* key dup */
  439     NULL,                      /* val dup */
  440     dictSdsKeyCaseCompare,     /* key compare */
  441     dictSdsDestructor,         /* key destructor */
  442     dictSdsDestructor          /* val destructor */
  443 };
  444 
  445 /* =========================== Initialization =============================== */
  446 
  447 void sentinelCommand(client *c);
  448 void sentinelInfoCommand(client *c);
  449 void sentinelSetCommand(client *c);
  450 void sentinelPublishCommand(client *c);
  451 void sentinelRoleCommand(client *c);
  452 
  453 struct redisCommand sentinelcmds[] = {
  454     {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
  455     {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
  456     {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
  457     {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
  458     {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
  459     {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
  460     {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
  461     {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
  462     {"role",sentinelRoleCommand,1,"ok-loading",0,NULL,0,0,0,0,0},
  463     {"client",clientCommand,-2,"read-only no-script",0,NULL,0,0,0,0,0},
  464     {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
  465     {"auth",authCommand,2,"no-auth no-script ok-loading ok-stale fast",0,NULL,0,0,0,0,0},
  466     {"hello",helloCommand,-2,"no-auth no-script fast",0,NULL,0,0,0,0,0}
  467 };
  468 
  469 /* This function overwrites a few normal Redis config default with Sentinel
  470  * specific defaults. */
  471 void initSentinelConfig(void) {
  472     server.port = REDIS_SENTINEL_PORT;
  473     server.protected_mode = 0; /* Sentinel must be exposed. */
  474 }
  475 
  476 /* Perform the Sentinel mode initialization. */
  477 void initSentinel(void) {
  478     unsigned int j;
  479 
  480     /* Remove usual Redis commands from the command table, then just add
  481      * the SENTINEL command. */
  482     dictEmpty(server.commands,NULL);
  483     for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
  484         int retval;
  485         struct redisCommand *cmd = sentinelcmds+j;
  486 
  487         retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
  488         serverAssert(retval == DICT_OK);
  489 
  490         /* Translate the command string flags description into an actual
  491          * set of flags. */
  492         if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR)
  493             serverPanic("Unsupported command flag");
  494     }
  495 
  496     /* Initialize various data structures. */
  497     sentinel.current_epoch = 0;
  498     sentinel.masters = dictCreate(&instancesDictType,NULL);
  499     sentinel.tilt = 0;
  500     sentinel.tilt_start_time = 0;
  501     sentinel.previous_time = mstime();
  502     sentinel.running_scripts = 0;
  503     sentinel.scripts_queue = listCreate();
  504     sentinel.announce_ip = NULL;
  505     sentinel.announce_port = 0;
  506     sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
  507     sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
  508     memset(sentinel.myid,0,sizeof(sentinel.myid));
  509 }
  510 
  511 /* This function gets called when the server is in Sentinel mode, started,
  512  * loaded the configuration, and is ready for normal operations. */
  513 void sentinelIsRunning(void) {
  514     int j;
  515 
  516     if (server.configfile == NULL) {
  517         serverLog(LL_WARNING,
  518             "Sentinel started without a config file. Exiting...");
  519         exit(1);
  520     } else if (access(server.configfile,W_OK) == -1) {
  521         serverLog(LL_WARNING,
  522             "Sentinel config file %s is not writable: %s. Exiting...",
  523             server.configfile,strerror(errno));
  524         exit(1);
  525     }
  526 
  527     /* If this Sentinel has yet no ID set in the configuration file, we
  528      * pick a random one and persist the config on disk. From now on this
  529      * will be this Sentinel ID across restarts. */
  530     for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
  531         if (sentinel.myid[j] != 0) break;
  532 
  533     if (j == CONFIG_RUN_ID_SIZE) {
  534         /* Pick ID and persist the config. */
  535         getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
  536         sentinelFlushConfig();
  537     }
  538 
  539     /* Log its ID to make debugging of issues simpler. */
  540     serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
  541 
  542     /* We want to generate a +monitor event for every configured master
  543      * at startup. */
  544     sentinelGenerateInitialMonitorEvents();
  545 }
  546 
  547 /* ============================== sentinelAddr ============================== */
  548 
  549 /* Create a sentinelAddr object and return it on success.
  550  * On error NULL is returned and errno is set to:
  551  *  ENOENT: Can't resolve the hostname.
  552  *  EINVAL: Invalid port number.
  553  */
  554 sentinelAddr *createSentinelAddr(char *hostname, int port) {
  555     char ip[NET_IP_STR_LEN];
  556     sentinelAddr *sa;
  557 
  558     if (port < 0 || port > 65535) {
  559         errno = EINVAL;
  560         return NULL;
  561     }
  562     if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
  563         errno = ENOENT;
  564         return NULL;
  565     }
  566     sa = zmalloc(sizeof(*sa));
  567     sa->ip = sdsnew(ip);
  568     sa->port = port;
  569     return sa;
  570 }
  571 
  572 /* Return a duplicate of the source address. */
  573 sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
  574     sentinelAddr *sa;
  575 
  576     sa = zmalloc(sizeof(*sa));
  577     sa->ip = sdsnew(src->ip);
  578     sa->port = src->port;
  579     return sa;
  580 }
  581 
  582 /* Free a Sentinel address. Can't fail. */
  583 void releaseSentinelAddr(sentinelAddr *sa) {
  584     sdsfree(sa->ip);
  585     zfree(sa);
  586 }
  587 
  588 /* Return non-zero if two addresses are equal. */
  589 int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
  590     return a->port == b->port && !strcasecmp(a->ip,b->ip);
  591 }
  592 
  593 /* =========================== Events notification ========================== */
  594 
  595 /* Send an event to log, pub/sub, user notification script.
  596  *
  597  * 'level' is the log level for logging. Only LL_WARNING events will trigger
  598  * the execution of the user notification script.
  599  *
  600  * 'type' is the message type, also used as a pub/sub channel name.
  601  *
  602  * 'ri', is the redis instance target of this event if applicable, and is
  603  * used to obtain the path of the notification script to execute.
  604  *
  605  * The remaining arguments are printf-alike.
  606  * If the format specifier starts with the two characters "%@" then ri is
  607  * not NULL, and the message is prefixed with an instance identifier in the
  608  * following format:
  609  *
  610  *  <instance type> <instance name> <ip> <port>
  611  *
  612  *  If the instance type is not master, than the additional string is
  613  *  added to specify the originating master:
  614  *
  615  *  @ <master name> <master ip> <master port>
  616  *
  617  *  Any other specifier after "%@" is processed by printf itself.
  618  */
  619 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
  620                    const char *fmt, ...) {
  621     va_list ap;
  622     char msg[LOG_MAX_LEN];
  623     robj *channel, *payload;
  624 
  625     /* Handle %@ */
  626     if (fmt[0] == '%' && fmt[1] == '@') {
  627         sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
  628                                          NULL : ri->master;
  629 
  630         if (master) {
  631             snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
  632                 sentinelRedisInstanceTypeStr(ri),
  633                 ri->name, ri->addr->ip, ri->addr->port,
  634                 master->name, master->addr->ip, master->addr->port);
  635         } else {
  636             snprintf(msg, sizeof(msg), "%s %s %s %d",
  637                 sentinelRedisInstanceTypeStr(ri),
  638                 ri->name, ri->addr->ip, ri->addr->port);
  639         }
  640         fmt += 2;
  641     } else {
  642         msg[0] = '\0';
  643     }
  644 
  645     /* Use vsprintf for the rest of the formatting if any. */
  646     if (fmt[0] != '\0') {
  647         va_start(ap, fmt);
  648         vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
  649         va_end(ap);
  650     }
  651 
  652     /* Log the message if the log level allows it to be logged. */
  653     if (level >= server.verbosity)
  654         serverLog(level,"%s %s",type,msg);
  655 
  656     /* Publish the message via Pub/Sub if it's not a debugging one. */
  657     if (level != LL_DEBUG) {
  658         channel = createStringObject(type,strlen(type));
  659         payload = createStringObject(msg,strlen(msg));
  660         pubsubPublishMessage(channel,payload);
  661         decrRefCount(channel);
  662         decrRefCount(payload);
  663     }
  664 
  665     /* Call the notification script if applicable. */
  666     if (level == LL_WARNING && ri != NULL) {
  667         sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
  668                                          ri : ri->master;
  669         if (master && master->notification_script) {
  670             sentinelScheduleScriptExecution(master->notification_script,
  671                 type,msg,NULL);
  672         }
  673     }
  674 }
  675 
  676 /* This function is called only at startup and is used to generate a
  677  * +monitor event for every configured master. The same events are also
  678  * generated when a master to monitor is added at runtime via the
  679  * SENTINEL MONITOR command. */
  680 void sentinelGenerateInitialMonitorEvents(void) {
  681     dictIterator *di;
  682     dictEntry *de;
  683 
  684     di = dictGetIterator(sentinel.masters);
  685     while((de = dictNext(di)) != NULL) {
  686         sentinelRedisInstance *ri = dictGetVal(de);
  687         sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
  688     }
  689     dictReleaseIterator(di);
  690 }
  691 
  692 /* ============================ script execution ============================ */
  693 
  694 /* Release a script job structure and all the associated data. */
  695 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
  696     int j = 0;
  697 
  698     while(sj->argv[j]) sdsfree(sj->argv[j++]);
  699     zfree(sj->argv);
  700     zfree(sj);
  701 }
  702 
  703 #define SENTINEL_SCRIPT_MAX_ARGS 16
  704 void sentinelScheduleScriptExecution(char *path, ...) {
  705     va_list ap;
  706     char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
  707     int argc = 1;
  708     sentinelScriptJob *sj;
  709 
  710     va_start(ap, path);
  711     while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
  712         argv[argc] = va_arg(ap,char*);
  713         if (!argv[argc]) break;
  714         argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
  715         argc++;
  716     }
  717     va_end(ap);
  718     argv[0] = sdsnew(path);
  719 
  720     sj = zmalloc(sizeof(*sj));
  721     sj->flags = SENTINEL_SCRIPT_NONE;
  722     sj->retry_num = 0;
  723     sj->argv = zmalloc(sizeof(char*)*(argc+1));
  724     sj->start_time = 0;
  725     sj->pid = 0;
  726     memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
  727 
  728     listAddNodeTail(sentinel.scripts_queue,sj);
  729 
  730     /* Remove the oldest non running script if we already hit the limit. */
  731     if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
  732         listNode *ln;
  733         listIter li;
  734 
  735         listRewind(sentinel.scripts_queue,&li);
  736         while ((ln = listNext(&li)) != NULL) {
  737             sj = ln->value;
  738 
  739             if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
  740             /* The first node is the oldest as we add on tail. */
  741             listDelNode(sentinel.scripts_queue,ln);
  742             sentinelReleaseScriptJob(sj);
  743             break;
  744         }
  745         serverAssert(listLength(sentinel.scripts_queue) <=
  746                     SENTINEL_SCRIPT_MAX_QUEUE);
  747     }
  748 }
  749 
  750 /* Lookup a script in the scripts queue via pid, and returns the list node
  751  * (so that we can easily remove it from the queue if needed). */
  752 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
  753     listNode *ln;
  754     listIter li;
  755 
  756     listRewind(sentinel.scripts_queue,&li);
  757     while ((ln = listNext(&li)) != NULL) {
  758         sentinelScriptJob *sj = ln->value;
  759 
  760         if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
  761             return ln;
  762     }
  763     return NULL;
  764 }
  765 
  766 /* Run pending scripts if we are not already at max number of running
  767  * scripts. */
  768 void sentinelRunPendingScripts(void) {
  769     listNode *ln;
  770     listIter li;
  771     mstime_t now = mstime();
  772 
  773     /* Find jobs that are not running and run them, from the top to the
  774      * tail of the queue, so we run older jobs first. */
  775     listRewind(sentinel.scripts_queue,&li);
  776     while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
  777            (ln = listNext(&li)) != NULL)
  778     {
  779         sentinelScriptJob *sj = ln->value;
  780         pid_t pid;
  781 
  782         /* Skip if already running. */
  783         if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
  784 
  785         /* Skip if it's a retry, but not enough time has elapsed. */
  786         if (sj->start_time && sj->start_time > now) continue;
  787 
  788         sj->flags |= SENTINEL_SCRIPT_RUNNING;
  789         sj->start_time = mstime();
  790         sj->retry_num++;
  791         pid = fork();
  792 
  793         if (pid == -1) {
  794             /* Parent (fork error).
  795              * We report fork errors as signal 99, in order to unify the
  796              * reporting with other kind of errors. */
  797             sentinelEvent(LL_WARNING,"-script-error",NULL,
  798                           "%s %d %d", sj->argv[0], 99, 0);
  799             sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
  800             sj->pid = 0;
  801         } else if (pid == 0) {
  802             /* Child */
  803             execve(sj->argv[0],sj->argv,environ);
  804             /* If we are here an error occurred. */
  805             _exit(2); /* Don't retry execution. */
  806         } else {
  807             sentinel.running_scripts++;
  808             sj->pid = pid;
  809             sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
  810         }
  811     }
  812 }
  813 
  814 /* How much to delay the execution of a script that we need to retry after
  815  * an error?
  816  *
  817  * We double the retry delay for every further retry we do. So for instance
  818  * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
  819  * starting from the second attempt to execute the script the delays are:
  820  * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
  821 mstime_t sentinelScriptRetryDelay(int retry_num) {
  822     mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
  823 
  824     while (retry_num-- > 1) delay *= 2;
  825     return delay;
  826 }
  827 
  828 /* Check for scripts that terminated, and remove them from the queue if the
  829  * script terminated successfully. If instead the script was terminated by
  830  * a signal, or returned exit code "1", it is scheduled to run again if
  831  * the max number of retries did not already elapsed. */
  832 void sentinelCollectTerminatedScripts(void) {
  833     int statloc;
  834     pid_t pid;
  835 
  836     while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
  837         int exitcode = WEXITSTATUS(statloc);
  838         int bysignal = 0;
  839         listNode *ln;
  840         sentinelScriptJob *sj;
  841 
  842         if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
  843         sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
  844             (long)pid, exitcode, bysignal);
  845 
  846         ln = sentinelGetScriptListNodeByPid(pid);
  847         if (ln == NULL) {
  848             serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
  849             continue;
  850         }
  851         sj = ln->value;
  852 
  853         /* If the script was terminated by a signal or returns an
  854          * exit code of "1" (that means: please retry), we reschedule it
  855          * if the max number of retries is not already reached. */
  856         if ((bysignal || exitcode == 1) &&
  857             sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
  858         {
  859             sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
  860             sj->pid = 0;
  861             sj->start_time = mstime() +
  862                              sentinelScriptRetryDelay(sj->retry_num);
  863         } else {
  864             /* Otherwise let's remove the script, but log the event if the
  865              * execution did not terminated in the best of the ways. */
  866             if (bysignal || exitcode != 0) {
  867                 sentinelEvent(LL_WARNING,"-script-error",NULL,
  868                               "%s %d %d", sj->argv[0], bysignal, exitcode);
  869             }
  870             listDelNode(sentinel.scripts_queue,ln);
  871             sentinelReleaseScriptJob(sj);
  872         }
  873         sentinel.running_scripts--;
  874     }
  875 }
  876 
  877 /* Kill scripts in timeout, they'll be collected by the
  878  * sentinelCollectTerminatedScripts() function. */
  879 void sentinelKillTimedoutScripts(void) {
  880     listNode *ln;
  881     listIter li;
  882     mstime_t now = mstime();
  883 
  884     listRewind(sentinel.scripts_queue,&li);
  885     while ((ln = listNext(&li)) != NULL) {
  886         sentinelScriptJob *sj = ln->value;
  887 
  888         if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
  889             (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
  890         {
  891             sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
  892                 sj->argv[0], (long)sj->pid);
  893             kill(sj->pid,SIGKILL);
  894         }
  895     }
  896 }
  897 
  898 /* Implements SENTINEL PENDING-SCRIPTS command. */
  899 void sentinelPendingScriptsCommand(client *c) {
  900     listNode *ln;
  901     listIter li;
  902 
  903     addReplyArrayLen(c,listLength(sentinel.scripts_queue));
  904     listRewind(sentinel.scripts_queue,&li);
  905     while ((ln = listNext(&li)) != NULL) {
  906         sentinelScriptJob *sj = ln->value;
  907         int j = 0;
  908 
  909         addReplyMapLen(c,5);
  910 
  911         addReplyBulkCString(c,"argv");
  912         while (sj->argv[j]) j++;
  913         addReplyArrayLen(c,j);
  914         j = 0;
  915         while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
  916 
  917         addReplyBulkCString(c,"flags");
  918         addReplyBulkCString(c,
  919             (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
  920 
  921         addReplyBulkCString(c,"pid");
  922         addReplyBulkLongLong(c,sj->pid);
  923 
  924         if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
  925             addReplyBulkCString(c,"run-time");
  926             addReplyBulkLongLong(c,mstime() - sj->start_time);
  927         } else {
  928             mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
  929             if (delay < 0) delay = 0;
  930             addReplyBulkCString(c,"run-delay");
  931             addReplyBulkLongLong(c,delay);
  932         }
  933 
  934         addReplyBulkCString(c,"retry-num");
  935         addReplyBulkLongLong(c,sj->retry_num);
  936     }
  937 }
  938 
  939 /* This function calls, if any, the client reconfiguration script with the
  940  * following parameters:
  941  *
  942  * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
  943  *
  944  * It is called every time a failover is performed.
  945  *
  946  * <state> is currently always "failover".
  947  * <role> is either "leader" or "observer".
  948  *
  949  * from/to fields are respectively master -> promoted slave addresses for
  950  * "start" and "end". */
  951 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
  952     char fromport[32], toport[32];
  953 
  954     if (master->client_reconfig_script == NULL) return;
  955     ll2string(fromport,sizeof(fromport),from->port);
  956     ll2string(toport,sizeof(toport),to->port);
  957     sentinelScheduleScriptExecution(master->client_reconfig_script,
  958         master->name,
  959         (role == SENTINEL_LEADER) ? "leader" : "observer",
  960         state, from->ip, fromport, to->ip, toport, NULL);
  961 }
  962 
  963 /* =============================== instanceLink ============================= */
  964 
  965 /* Create a not yet connected link object. */
  966 instanceLink *createInstanceLink(void) {
  967     instanceLink *link = zmalloc(sizeof(*link));
  968 
  969     link->refcount = 1;
  970     link->disconnected = 1;
  971     link->pending_commands = 0;
  972     link->cc = NULL;
  973     link->pc = NULL;
  974     link->cc_conn_time = 0;
  975     link->pc_conn_time = 0;
  976     link->last_reconn_time = 0;
  977     link->pc_last_activity = 0;
  978     /* We set the act_ping_time to "now" even if we actually don't have yet
  979      * a connection with the node, nor we sent a ping.
  980      * This is useful to detect a timeout in case we'll not be able to connect
  981      * with the node at all. */
  982     link->act_ping_time = mstime();
  983     link->last_ping_time = 0;
  984     link->last_avail_time = mstime();
  985     link->last_pong_time = mstime();
  986     return link;
  987 }
  988 
  989 /* Disconnect an hiredis connection in the context of an instance link. */
  990 void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
  991     if (c == NULL) return;
  992 
  993     if (link->cc == c) {
  994         link->cc = NULL;
  995         link->pending_commands = 0;
  996     }
  997     if (link->pc == c) link->pc = NULL;
  998     c->data = NULL;
  999     link->disconnected = 1;
 1000     redisAsyncFree(c);
 1001 }
 1002 
 1003 /* Decrement the refcount of a link object, if it drops to zero, actually
 1004  * free it and return NULL. Otherwise don't do anything and return the pointer
 1005  * to the object.
 1006  *
 1007  * If we are not going to free the link and ri is not NULL, we rebind all the
 1008  * pending requests in link->cc (hiredis connection for commands) to a
 1009  * callback that will just ignore them. This is useful to avoid processing
 1010  * replies for an instance that no longer exists. */
 1011 instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
 1012 {
 1013     serverAssert(link->refcount > 0);
 1014     link->refcount--;
 1015     if (link->refcount != 0) {
 1016         if (ri && ri->link->cc) {
 1017             /* This instance may have pending callbacks in the hiredis async
 1018              * context, having as 'privdata' the instance that we are going to
 1019              * free. Let's rewrite the callback list, directly exploiting
 1020              * hiredis internal data structures, in order to bind them with
 1021              * a callback that will ignore the reply at all. */
 1022             redisCallback *cb;
 1023             redisCallbackList *callbacks = &link->cc->replies;
 1024 
 1025             cb = callbacks->head;
 1026             while(cb) {
 1027                 if (cb->privdata == ri) {
 1028                     cb->fn = sentinelDiscardReplyCallback;
 1029                     cb->privdata = NULL; /* Not strictly needed. */
 1030                 }
 1031                 cb = cb->next;
 1032             }
 1033         }
 1034         return link; /* Other active users. */
 1035     }
 1036 
 1037     instanceLinkCloseConnection(link,link->cc);
 1038     instanceLinkCloseConnection(link,link->pc);
 1039     zfree(link);
 1040     return NULL;
 1041 }
 1042 
 1043 /* This function will attempt to share the instance link we already have
 1044  * for the same Sentinel in the context of a different master, with the
 1045  * instance we are passing as argument.
 1046  *
 1047  * This way multiple Sentinel objects that refer all to the same physical
 1048  * Sentinel instance but in the context of different masters will use
 1049  * a single connection, will send a single PING per second for failure
 1050  * detection and so forth.
 1051  *
 1052  * Return C_OK if a matching Sentinel was found in the context of a
 1053  * different master and sharing was performed. Otherwise C_ERR
 1054  * is returned. */
 1055 int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
 1056     serverAssert(ri->flags & SRI_SENTINEL);
 1057     dictIterator *di;
 1058     dictEntry *de;
 1059 
 1060     if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
 1061     if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
 1062 
 1063     di = dictGetIterator(sentinel.masters);
 1064     while((de = dictNext(di)) != NULL) {
 1065         sentinelRedisInstance *master = dictGetVal(de), *match;
 1066         /* We want to share with the same physical Sentinel referenced
 1067          * in other masters, so skip our master. */
 1068         if (master == ri->master) continue;
 1069         match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
 1070                                                        NULL,0,ri->runid);
 1071         if (match == NULL) continue; /* No match. */
 1072         if (match == ri) continue; /* Should never happen but... safer. */
 1073 
 1074         /* We identified a matching Sentinel, great! Let's free our link
 1075          * and use the one of the matching Sentinel. */
 1076         releaseInstanceLink(ri->link,NULL);
 1077         ri->link = match->link;
 1078         match->link->refcount++;
 1079         dictReleaseIterator(di);
 1080         return C_OK;
 1081     }
 1082     dictReleaseIterator(di);
 1083     return C_ERR;
 1084 }
 1085 
 1086 /* When we detect a Sentinel to switch address (reporting a different IP/port
 1087  * pair in Hello messages), let's update all the matching Sentinels in the
 1088  * context of other masters as well and disconnect the links, so that everybody
 1089  * will be updated.
 1090  *
 1091  * Return the number of updated Sentinel addresses. */
 1092 int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
 1093     serverAssert(ri->flags & SRI_SENTINEL);
 1094     dictIterator *di;
 1095     dictEntry *de;
 1096     int reconfigured = 0;
 1097 
 1098     di = dictGetIterator(sentinel.masters);
 1099     while((de = dictNext(di)) != NULL) {
 1100         sentinelRedisInstance *master = dictGetVal(de), *match;
 1101         match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
 1102                                                        NULL,0,ri->runid);
 1103         /* If there is no match, this master does not know about this
 1104          * Sentinel, try with the next one. */
 1105         if (match == NULL) continue;
 1106 
 1107         /* Disconnect the old links if connected. */
 1108         if (match->link->cc != NULL)
 1109             instanceLinkCloseConnection(match->link,match->link->cc);
 1110         if (match->link->pc != NULL)
 1111             instanceLinkCloseConnection(match->link,match->link->pc);
 1112 
 1113         if (match == ri) continue; /* Address already updated for it. */
 1114 
 1115         /* Update the address of the matching Sentinel by copying the address
 1116          * of the Sentinel object that received the address update. */
 1117         releaseSentinelAddr(match->addr);
 1118         match->addr = dupSentinelAddr(ri->addr);
 1119         reconfigured++;
 1120     }
 1121     dictReleaseIterator(di);
 1122     if (reconfigured)
 1123         sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
 1124                     "%@ %d additional matching instances", reconfigured);
 1125     return reconfigured;
 1126 }
 1127 
 1128 /* This function is called when an hiredis connection reported an error.
 1129  * We set it to NULL and mark the link as disconnected so that it will be
 1130  * reconnected again.
 1131  *
 1132  * Note: we don't free the hiredis context as hiredis will do it for us
 1133  * for async connections. */
 1134 void instanceLinkConnectionError(const redisAsyncContext *c) {
 1135     instanceLink *link = c->data;
 1136     int pubsub;
 1137 
 1138     if (!link) return;
 1139 
 1140     pubsub = (link->pc == c);
 1141     if (pubsub)
 1142         link->pc = NULL;
 1143     else
 1144         link->cc = NULL;
 1145     link->disconnected = 1;
 1146 }
 1147 
 1148 /* Hiredis connection established / disconnected callbacks. We need them
 1149  * just to cleanup our link state. */
 1150 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
 1151     if (status != C_OK) instanceLinkConnectionError(c);
 1152 }
 1153 
 1154 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
 1155     UNUSED(status);
 1156     instanceLinkConnectionError(c);
 1157 }
 1158 
 1159 /* ========================== sentinelRedisInstance ========================= */
 1160 
 1161 /* Create a redis instance, the following fields must be populated by the
 1162  * caller if needed:
 1163  * runid: set to NULL but will be populated once INFO output is received.
 1164  * info_refresh: is set to 0 to mean that we never received INFO so far.
 1165  *
 1166  * If SRI_MASTER is set into initial flags the instance is added to
 1167  * sentinel.masters table.
 1168  *
 1169  * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
 1170  * instance is added into master->slaves or master->sentinels table.
 1171  *
 1172  * If the instance is a slave or sentinel, the name parameter is ignored and
 1173  * is created automatically as hostname:port.
 1174  *
 1175  * The function fails if hostname can't be resolved or port is out of range.
 1176  * When this happens NULL is returned and errno is set accordingly to the
 1177  * createSentinelAddr() function.
 1178  *
 1179  * The function may also fail and return NULL with errno set to EBUSY if
 1180  * a master with the same name, a slave with the same address, or a sentinel
 1181  * with the same ID already exists. */
 1182 
 1183 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
 1184     sentinelRedisInstance *ri;
 1185     sentinelAddr *addr;
 1186     dict *table = NULL;
 1187     char slavename[NET_PEER_ID_LEN], *sdsname;
 1188 
 1189     serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
 1190     serverAssert((flags & SRI_MASTER) || master != NULL);
 1191 
 1192     /* Check address validity. */
 1193     addr = createSentinelAddr(hostname,port);
 1194     if (addr == NULL) return NULL;
 1195 
 1196     /* For slaves use ip:port as name. */
 1197     if (flags & SRI_SLAVE) {
 1198         anetFormatAddr(slavename, sizeof(slavename), hostname, port);
 1199         name = slavename;
 1200     }
 1201 
 1202     /* Make sure the entry is not duplicated. This may happen when the same
 1203      * name for a master is used multiple times inside the configuration or
 1204      * if we try to add multiple times a slave or sentinel with same ip/port
 1205      * to a master. */
 1206     if (flags & SRI_MASTER) table = sentinel.masters;
 1207     else if (flags & SRI_SLAVE) table = master->slaves;
 1208     else if (flags & SRI_SENTINEL) table = master->sentinels;
 1209     sdsname = sdsnew(name);
 1210     if (dictFind(table,sdsname)) {
 1211         releaseSentinelAddr(addr);
 1212         sdsfree(sdsname);
 1213         errno = EBUSY;
 1214         return NULL;
 1215     }
 1216 
 1217     /* Create the instance object. */
 1218     ri = zmalloc(sizeof(*ri));
 1219     /* Note that all the instances are started in the disconnected state,
 1220      * the event loop will take care of connecting them. */
 1221     ri->flags = flags;
 1222     ri->name = sdsname;
 1223     ri->runid = NULL;
 1224     ri->config_epoch = 0;
 1225     ri->addr = addr;
 1226     ri->link = createInstanceLink();
 1227     ri->last_pub_time = mstime();
 1228     ri->last_hello_time = mstime();
 1229     ri->last_master_down_reply_time = mstime();
 1230     ri->s_down_since_time = 0;
 1231     ri->o_down_since_time = 0;
 1232     ri->down_after_period = master ? master->down_after_period :
 1233                             SENTINEL_DEFAULT_DOWN_AFTER;
 1234     ri->master_link_down_time = 0;
 1235     ri->auth_pass = NULL;
 1236     ri->auth_user = NULL;
 1237     ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
 1238     ri->slave_reconf_sent_time = 0;
 1239     ri->slave_master_host = NULL;
 1240     ri->slave_master_port = 0;
 1241     ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
 1242     ri->slave_repl_offset = 0;
 1243     ri->sentinels = dictCreate(&instancesDictType,NULL);
 1244     ri->quorum = quorum;
 1245     ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
 1246     ri->master = master;
 1247     ri->slaves = dictCreate(&instancesDictType,NULL);
 1248     ri->info_refresh = 0;
 1249     ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL);
 1250 
 1251     /* Failover state. */
 1252     ri->leader = NULL;
 1253     ri->leader_epoch = 0;
 1254     ri->failover_epoch = 0;
 1255     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
 1256     ri->failover_state_change_time = 0;
 1257     ri->failover_start_time = 0;
 1258     ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
 1259     ri->failover_delay_logged = 0;
 1260     ri->promoted_slave = NULL;
 1261     ri->notification_script = NULL;
 1262     ri->client_reconfig_script = NULL;
 1263     ri->info = NULL;
 1264 
 1265     /* Role */
 1266     ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
 1267     ri->role_reported_time = mstime();
 1268     ri->slave_conf_change_time = mstime();
 1269 
 1270     /* Add into the right table. */
 1271     dictAdd(table, ri->name, ri);
 1272     return ri;
 1273 }
 1274 
 1275 /* Release this instance and all its slaves, sentinels, hiredis connections.
 1276  * This function does not take care of unlinking the instance from the main
 1277  * masters table (if it is a master) or from its master sentinels/slaves table
 1278  * if it is a slave or sentinel. */
 1279 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
 1280     /* Release all its slaves or sentinels if any. */
 1281     dictRelease(ri->sentinels);
 1282     dictRelease(ri->slaves);
 1283 
 1284     /* Disconnect the instance. */
 1285     releaseInstanceLink(ri->link,ri);
 1286 
 1287     /* Free other resources. */
 1288     sdsfree(ri->name);
 1289     sdsfree(ri->runid);
 1290     sdsfree(ri->notification_script);
 1291     sdsfree(ri->client_reconfig_script);
 1292     sdsfree(ri->slave_master_host);
 1293     sdsfree(ri->leader);
 1294     sdsfree(ri->auth_pass);
 1295     sdsfree(ri->auth_user);
 1296     sdsfree(ri->info);
 1297     releaseSentinelAddr(ri->addr);
 1298     dictRelease(ri->renamed_commands);
 1299 
 1300     /* Clear state into the master if needed. */
 1301     if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
 1302         ri->master->promoted_slave = NULL;
 1303 
 1304     zfree(ri);
 1305 }
 1306 
 1307 /* Lookup a slave in a master Redis instance, by ip and port. */
 1308 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
 1309                 sentinelRedisInstance *ri, char *ip, int port)
 1310 {
 1311     sds key;
 1312     sentinelRedisInstance *slave;
 1313     char buf[NET_PEER_ID_LEN];
 1314 
 1315     serverAssert(ri->flags & SRI_MASTER);
 1316     anetFormatAddr(buf,sizeof(buf),ip,port);
 1317     key = sdsnew(buf);
 1318     slave = dictFetchValue(ri->slaves,key);
 1319     sdsfree(key);
 1320     return slave;
 1321 }
 1322 
 1323 /* Return the name of the type of the instance as a string. */
 1324 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
 1325     if (ri->flags & SRI_MASTER) return "master";
 1326     else if (ri->flags & SRI_SLAVE) return "slave";
 1327     else if (ri->flags & SRI_SENTINEL) return "sentinel";
 1328     else return "unknown";
 1329 }
 1330 
 1331 /* This function remove the Sentinel with the specified ID from the
 1332  * specified master.
 1333  *
 1334  * If "runid" is NULL the function returns ASAP.
 1335  *
 1336  * This function is useful because on Sentinels address switch, we want to
 1337  * remove our old entry and add a new one for the same ID but with the new
 1338  * address.
 1339  *
 1340  * The function returns 1 if the matching Sentinel was removed, otherwise
 1341  * 0 if there was no Sentinel with this ID. */
 1342 int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
 1343     dictIterator *di;
 1344     dictEntry *de;
 1345     int removed = 0;
 1346 
 1347     if (runid == NULL) return 0;
 1348 
 1349     di = dictGetSafeIterator(master->sentinels);
 1350     while((de = dictNext(di)) != NULL) {
 1351         sentinelRedisInstance *ri = dictGetVal(de);
 1352 
 1353         if (ri->runid && strcmp(ri->runid,runid) == 0) {
 1354             dictDelete(master->sentinels,ri->name);
 1355             removed++;
 1356         }
 1357     }
 1358     dictReleaseIterator(di);
 1359     return removed;
 1360 }
 1361 
 1362 /* Search an instance with the same runid, ip and port into a dictionary
 1363  * of instances. Return NULL if not found, otherwise return the instance
 1364  * pointer.
 1365  *
 1366  * runid or ip can be NULL. In such a case the search is performed only
 1367  * by the non-NULL field. */
 1368 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
 1369     dictIterator *di;
 1370     dictEntry *de;
 1371     sentinelRedisInstance *instance = NULL;
 1372 
 1373     serverAssert(ip || runid);   /* User must pass at least one search param. */
 1374     di = dictGetIterator(instances);
 1375     while((de = dictNext(di)) != NULL) {
 1376         sentinelRedisInstance *ri = dictGetVal(de);
 1377 
 1378         if (runid && !ri->runid) continue;
 1379         if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
 1380             (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
 1381                             ri->addr->port == port)))
 1382         {
 1383             instance = ri;
 1384             break;
 1385         }
 1386     }
 1387     dictReleaseIterator(di);
 1388     return instance;
 1389 }
 1390 
 1391 /* Master lookup by name */
 1392 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
 1393     sentinelRedisInstance *ri;
 1394     sds sdsname = sdsnew(name);
 1395 
 1396     ri = dictFetchValue(sentinel.masters,sdsname);
 1397     sdsfree(sdsname);
 1398     return ri;
 1399 }
 1400 
 1401 /* Add the specified flags to all the instances in the specified dictionary. */
 1402 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
 1403     dictIterator *di;
 1404     dictEntry *de;
 1405 
 1406     di = dictGetIterator(instances);
 1407     while((de = dictNext(di)) != NULL) {
 1408         sentinelRedisInstance *ri = dictGetVal(de);
 1409         ri->flags |= flags;
 1410     }
 1411     dictReleaseIterator(di);
 1412 }
 1413 
 1414 /* Remove the specified flags to all the instances in the specified
 1415  * dictionary. */
 1416 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
 1417     dictIterator *di;
 1418     dictEntry *de;
 1419 
 1420     di = dictGetIterator(instances);
 1421     while((de = dictNext(di)) != NULL) {
 1422         sentinelRedisInstance *ri = dictGetVal(de);
 1423         ri->flags &= ~flags;
 1424     }
 1425     dictReleaseIterator(di);
 1426 }
 1427 
 1428 /* Reset the state of a monitored master:
 1429  * 1) Remove all slaves.
 1430  * 2) Remove all sentinels.
 1431  * 3) Remove most of the flags resulting from runtime operations.
 1432  * 4) Reset timers to their default value. For example after a reset it will be
 1433  *    possible to failover again the same master ASAP, without waiting the
 1434  *    failover timeout delay.
 1435  * 5) In the process of doing this undo the failover if in progress.
 1436  * 6) Disconnect the connections with the master (will reconnect automatically).
 1437  */
 1438 
 1439 #define SENTINEL_RESET_NO_SENTINELS (1<<0)
 1440 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
 1441     serverAssert(ri->flags & SRI_MASTER);
 1442     dictRelease(ri->slaves);
 1443     ri->slaves = dictCreate(&instancesDictType,NULL);
 1444     if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
 1445         dictRelease(ri->sentinels);
 1446         ri->sentinels = dictCreate(&instancesDictType,NULL);
 1447     }
 1448     instanceLinkCloseConnection(ri->link,ri->link->cc);
 1449     instanceLinkCloseConnection(ri->link,ri->link->pc);
 1450     ri->flags &= SRI_MASTER;
 1451     if (ri->leader) {
 1452         sdsfree(ri->leader);
 1453         ri->leader = NULL;
 1454     }
 1455     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
 1456     ri->failover_state_change_time = 0;
 1457     ri->failover_start_time = 0; /* We can failover again ASAP. */
 1458     ri->promoted_slave = NULL;
 1459     sdsfree(ri->runid);
 1460     sdsfree(ri->slave_master_host);
 1461     ri->runid = NULL;
 1462     ri->slave_master_host = NULL;
 1463     ri->link->act_ping_time = mstime();
 1464     ri->link->last_ping_time = 0;
 1465     ri->link->last_avail_time = mstime();
 1466     ri->link->last_pong_time = mstime();
 1467     ri->role_reported_time = mstime();
 1468     ri->role_reported = SRI_MASTER;
 1469     if (flags & SENTINEL_GENERATE_EVENT)
 1470         sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
 1471 }
 1472 
 1473 /* Call sentinelResetMaster() on every master with a name matching the specified
 1474  * pattern. */
 1475 int sentinelResetMastersByPattern(char *pattern, int flags) {
 1476     dictIterator *di;
 1477     dictEntry *de;
 1478     int reset = 0;
 1479 
 1480     di = dictGetIterator(sentinel.masters);
 1481     while((de = dictNext(di)) != NULL) {
 1482         sentinelRedisInstance *ri = dictGetVal(de);
 1483 
 1484         if (ri->name) {
 1485             if (stringmatch(pattern,ri->name,0)) {
 1486                 sentinelResetMaster(ri,flags);
 1487                 reset++;
 1488             }
 1489         }
 1490     }
 1491     dictReleaseIterator(di);
 1492     return reset;
 1493 }
 1494 
 1495 /* Reset the specified master with sentinelResetMaster(), and also change
 1496  * the ip:port address, but take the name of the instance unmodified.
 1497  *
 1498  * This is used to handle the +switch-master event.
 1499  *
 1500  * The function returns C_ERR if the address can't be resolved for some
 1501  * reason. Otherwise C_OK is returned.  */
 1502 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
 1503     sentinelAddr *oldaddr, *newaddr;
 1504     sentinelAddr **slaves = NULL;
 1505     int numslaves = 0, j;
 1506     dictIterator *di;
 1507     dictEntry *de;
 1508 
 1509     newaddr = createSentinelAddr(ip,port);
 1510     if (newaddr == NULL) return C_ERR;
 1511 
 1512     /* Make a list of slaves to add back after the reset.
 1513      * Don't include the one having the address we are switching to. */
 1514     di = dictGetIterator(master->slaves);
 1515     while((de = dictNext(di)) != NULL) {
 1516         sentinelRedisInstance *slave = dictGetVal(de);
 1517 
 1518         if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
 1519         slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
 1520         slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
 1521                                                  slave->addr->port);
 1522     }
 1523     dictReleaseIterator(di);
 1524 
 1525     /* If we are switching to a different address, include the old address
 1526      * as a slave as well, so that we'll be able to sense / reconfigure
 1527      * the old master. */
 1528     if (!sentinelAddrIsEqual(newaddr,master->addr)) {
 1529         slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
 1530         slaves[numslaves++] = createSentinelAddr(master->addr->ip,
 1531                                                  master->addr->port);
 1532     }
 1533 
 1534     /* Reset and switch address. */
 1535     sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
 1536     oldaddr = master->addr;
 1537     master->addr = newaddr;
 1538     master->o_down_since_time = 0;
 1539     master->s_down_since_time = 0;
 1540 
 1541     /* Add slaves back. */
 1542     for (j = 0; j < numslaves; j++) {
 1543         sentinelRedisInstance *slave;
 1544 
 1545         slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
 1546                     slaves[j]->port, master->quorum, master);
 1547         releaseSentinelAddr(slaves[j]);
 1548         if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
 1549     }
 1550     zfree(slaves);
 1551 
 1552     /* Release the old address at the end so we are safe even if the function
 1553      * gets the master->addr->ip and master->addr->port as arguments. */
 1554     releaseSentinelAddr(oldaddr);
 1555     sentinelFlushConfig();
 1556     return C_OK;
 1557 }
 1558 
 1559 /* Return non-zero if there was no SDOWN or ODOWN error associated to this
 1560  * instance in the latest 'ms' milliseconds. */
 1561 int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
 1562     mstime_t most_recent;
 1563 
 1564     most_recent = ri->s_down_since_time;
 1565     if (ri->o_down_since_time > most_recent)
 1566         most_recent = ri->o_down_since_time;
 1567     return most_recent == 0 || (mstime() - most_recent) > ms;
 1568 }
 1569 
 1570 /* Return the current master address, that is, its address or the address
 1571  * of the promoted slave if already operational. */
 1572 sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) {
 1573     /* If we are failing over the master, and the state is already
 1574      * SENTINEL_FAILOVER_STATE_RECONF_SLAVES or greater, it means that we
 1575      * already have the new configuration epoch in the master, and the
 1576      * slave acknowledged the configuration switch. Advertise the new
 1577      * address. */
 1578     if ((master->flags & SRI_FAILOVER_IN_PROGRESS) &&
 1579         master->promoted_slave &&
 1580         master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
 1581     {
 1582         return master->promoted_slave->addr;
 1583     } else {
 1584         return master->addr;
 1585     }
 1586 }
 1587 
 1588 /* This function sets the down_after_period field value in 'master' to all
 1589  * the slaves and sentinel instances connected to this master. */
 1590 void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
 1591     dictIterator *di;
 1592     dictEntry *de;
 1593     int j;
 1594     dict *d[] = {master->slaves, master->sentinels, NULL};
 1595 
 1596     for (j = 0; d[j]; j++) {
 1597         di = dictGetIterator(d[j]);
 1598         while((de = dictNext(di)) != NULL) {
 1599             sentinelRedisInstance *ri = dictGetVal(de);
 1600             ri->down_after_period = master->down_after_period;
 1601         }
 1602         dictReleaseIterator(di);
 1603     }
 1604 }
 1605 
 1606 char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
 1607     if (ri->flags & SRI_MASTER) return "master";
 1608     else if (ri->flags & SRI_SLAVE) return "slave";
 1609     else if (ri->flags & SRI_SENTINEL) return "sentinel";
 1610     else return "unknown";
 1611 }
 1612 
 1613 /* This function is used in order to send commands to Redis instances: the
 1614  * commands we send from Sentinel may be renamed, a common case is a master
 1615  * with CONFIG and SLAVEOF commands renamed for security concerns. In that
 1616  * case we check the ri->renamed_command table (or if the instance is a slave,
 1617  * we check the one of the master), and map the command that we should send
 1618  * to the set of renamed commads. However, if the command was not renamed,
 1619  * we just return "command" itself. */
 1620 char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) {
 1621     sds sc = sdsnew(command);
 1622     if (ri->master) ri = ri->master;
 1623     char *retval = dictFetchValue(ri->renamed_commands, sc);
 1624     sdsfree(sc);
 1625     return retval ? retval : command;
 1626 }
 1627 
 1628 /* ============================ Config handling ============================= */
 1629 char *sentinelHandleConfiguration(char **argv, int argc) {
 1630     sentinelRedisInstance *ri;
 1631 
 1632     if (!strcasecmp(argv[0],"monitor") && argc == 5) {
 1633         /* monitor <name> <host> <port> <quorum> */
 1634         int quorum = atoi(argv[4]);
 1635 
 1636         if (quorum <= 0) return "Quorum must be 1 or greater.";
 1637         if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
 1638                                         atoi(argv[3]),quorum,NULL) == NULL)
 1639         {
 1640             switch(errno) {
 1641             case EBUSY: return "Duplicated master name.";
 1642             case ENOENT: return "Can't resolve master instance hostname.";
 1643             case EINVAL: return "Invalid port number";
 1644             }
 1645         }
 1646     } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
 1647         /* down-after-milliseconds <name> <milliseconds> */
 1648         ri = sentinelGetMasterByName(argv[1]);
 1649         if (!ri) return "No such master with specified name.";
 1650         ri->down_after_period = atoi(argv[2]);
 1651         if (ri->down_after_period <= 0)
 1652             return "negative or zero time parameter.";
 1653         sentinelPropagateDownAfterPeriod(ri);
 1654     } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
 1655         /* failover-timeout <name> <milliseconds> */
 1656         ri = sentinelGetMasterByName(argv[1]);
 1657         if (!ri) return "No such master with specified name.";
 1658         ri->failover_timeout = atoi(argv[2]);
 1659         if (ri->failover_timeout <= 0)
 1660             return "negative or zero time parameter.";
 1661     } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
 1662         /* parallel-syncs <name> <milliseconds> */
 1663         ri = sentinelGetMasterByName(argv[1]);
 1664         if (!ri) return "No such master with specified name.";
 1665         ri->parallel_syncs = atoi(argv[2]);
 1666     } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
 1667         /* notification-script <name> <path> */
 1668         ri = sentinelGetMasterByName(argv[1]);
 1669         if (!ri) return "No such master with specified name.";
 1670         if (access(argv[2],X_OK) == -1)
 1671             return "Notification script seems non existing or non executable.";
 1672         ri->notification_script = sdsnew(argv[2]);
 1673     } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
 1674         /* client-reconfig-script <name> <path> */
 1675         ri = sentinelGetMasterByName(argv[1]);
 1676         if (!ri) return "No such master with specified name.";
 1677         if (access(argv[2],X_OK) == -1)
 1678             return "Client reconfiguration script seems non existing or "
 1679                    "non executable.";
 1680         ri->client_reconfig_script = sdsnew(argv[2]);
 1681     } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
 1682         /* auth-pass <name> <password> */
 1683         ri = sentinelGetMasterByName(argv[1]);
 1684         if (!ri) return "No such master with specified name.";
 1685         ri->auth_pass = sdsnew(argv[2]);
 1686     } else if (!strcasecmp(argv[0],"auth-user") && argc == 3) {
 1687         /* auth-user <name> <username> */
 1688         ri = sentinelGetMasterByName(argv[1]);
 1689         if (!ri) return "No such master with specified name.";
 1690         ri->auth_user = sdsnew(argv[2]);
 1691     } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
 1692         /* current-epoch <epoch> */
 1693         unsigned long long current_epoch = strtoull(argv[1],NULL,10);
 1694         if (current_epoch > sentinel.current_epoch)
 1695             sentinel.current_epoch = current_epoch;
 1696     } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
 1697         if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
 1698             return "Malformed Sentinel id in myid option.";
 1699         memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
 1700     } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
 1701         /* config-epoch <name> <epoch> */
 1702         ri = sentinelGetMasterByName(argv[1]);
 1703         if (!ri) return "No such master with specified name.";
 1704         ri->config_epoch = strtoull(argv[2],NULL,10);
 1705         /* The following update of current_epoch is not really useful as
 1706          * now the current epoch is persisted on the config file, but
 1707          * we leave this check here for redundancy. */
 1708         if (ri->config_epoch > sentinel.current_epoch)
 1709             sentinel.current_epoch = ri->config_epoch;
 1710     } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
 1711         /* leader-epoch <name> <epoch> */
 1712         ri = sentinelGetMasterByName(argv[1]);
 1713         if (!ri) return "No such master with specified name.";
 1714         ri->leader_epoch = strtoull(argv[2],NULL,10);
 1715     } else if ((!strcasecmp(argv[0],"known-slave") ||
 1716                 !strcasecmp(argv[0],"known-replica")) && argc == 4)
 1717     {
 1718         sentinelRedisInstance *slave;
 1719 
 1720         /* known-replica <name> <ip> <port> */
 1721         ri = sentinelGetMasterByName(argv[1]);
 1722         if (!ri) return "No such master with specified name.";
 1723         if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
 1724                     atoi(argv[3]), ri->quorum, ri)) == NULL)
 1725         {
 1726             return "Wrong hostname or port for replica.";
 1727         }
 1728     } else if (!strcasecmp(argv[0],"known-sentinel") &&
 1729                (argc == 4 || argc == 5)) {
 1730         sentinelRedisInstance *si;
 1731 
 1732         if (argc == 5) { /* Ignore the old form without runid. */
 1733             /* known-sentinel <name> <ip> <port> [runid] */
 1734             ri = sentinelGetMasterByName(argv[1]);
 1735             if (!ri) return "No such master with specified name.";
 1736             if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
 1737                         atoi(argv[3]), ri->quorum, ri)) == NULL)
 1738             {
 1739                 return "Wrong hostname or port for sentinel.";
 1740             }
 1741             si->runid = sdsnew(argv[4]);
 1742             sentinelTryConnectionSharing(si);
 1743         }
 1744     } else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
 1745         /* rename-command <name> <command> <renamed-command> */
 1746         ri = sentinelGetMasterByName(argv[1]);
 1747         if (!ri) return "No such master with specified name.";
 1748         sds oldcmd = sdsnew(argv[2]);
 1749         sds newcmd = sdsnew(argv[3]);
 1750         if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
 1751             sdsfree(oldcmd);
 1752             sdsfree(newcmd);
 1753             return "Same command renamed multiple times with rename-command.";
 1754         }
 1755     } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
 1756         /* announce-ip <ip-address> */
 1757         if (strlen(argv[1]))
 1758             sentinel.announce_ip = sdsnew(argv[1]);
 1759     } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
 1760         /* announce-port <port> */
 1761         sentinel.announce_port = atoi(argv[1]);
 1762     } else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) {
 1763         /* deny-scripts-reconfig <yes|no> */
 1764         if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) {
 1765             return "Please specify yes or no for the "
 1766                    "deny-scripts-reconfig options.";
 1767         }
 1768     } else {
 1769         return "Unrecognized sentinel configuration statement.";
 1770     }
 1771     return NULL;
 1772 }
 1773 
 1774 /* Implements CONFIG REWRITE for "sentinel" option.
 1775  * This is used not just to rewrite the configuration given by the user
 1776  * (the configured masters) but also in order to retain the state of
 1777  * Sentinel across restarts: config epoch of masters, associated slaves
 1778  * and sentinel instances, and so forth. */
 1779 void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
 1780     dictIterator *di, *di2;
 1781     dictEntry *de;
 1782     sds line;
 1783 
 1784     /* sentinel unique ID. */
 1785     line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
 1786     rewriteConfigRewriteLine(state,"sentinel",line,1);
 1787 
 1788     /* sentinel deny-scripts-reconfig. */
 1789     line = sdscatprintf(sdsempty(), "sentinel deny-scripts-reconfig %s",
 1790         sentinel.deny_scripts_reconfig ? "yes" : "no");
 1791     rewriteConfigRewriteLine(state,"sentinel",line,
 1792         sentinel.deny_scripts_reconfig != SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG);
 1793 
 1794     /* For every master emit a "sentinel monitor" config entry. */
 1795     di = dictGetIterator(sentinel.masters);
 1796     while((de = dictNext(di)) != NULL) {
 1797         sentinelRedisInstance *master, *ri;
 1798         sentinelAddr *master_addr;
 1799 
 1800         /* sentinel monitor */
 1801         master = dictGetVal(de);
 1802         master_addr = sentinelGetCurrentMasterAddress(master);
 1803         line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d",
 1804             master->name, master_addr->ip, master_addr->port,
 1805             master->quorum);
 1806         rewriteConfigRewriteLine(state,"sentinel",line,1);
 1807 
 1808         /* sentinel down-after-milliseconds */
 1809         if (master->down_after_period != SENTINEL_DEFAULT_DOWN_AFTER) {
 1810             line = sdscatprintf(sdsempty(),
 1811                 "sentinel down-after-milliseconds %s %ld",
 1812                 master->name, (long) master->down_after_period);
 1813             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1814         }
 1815 
 1816         /* sentinel failover-timeout */
 1817         if (master->failover_timeout != SENTINEL_DEFAULT_FAILOVER_TIMEOUT) {
 1818             line = sdscatprintf(sdsempty(),
 1819                 "sentinel failover-timeout %s %ld",
 1820                 master->name, (long) master->failover_timeout);
 1821             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1822         }
 1823 
 1824         /* sentinel parallel-syncs */
 1825         if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) {
 1826             line = sdscatprintf(sdsempty(),
 1827                 "sentinel parallel-syncs %s %d",
 1828                 master->name, master->parallel_syncs);
 1829             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1830         }
 1831 
 1832         /* sentinel notification-script */
 1833         if (master->notification_script) {
 1834             line = sdscatprintf(sdsempty(),
 1835                 "sentinel notification-script %s %s",
 1836                 master->name, master->notification_script);
 1837             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1838         }
 1839 
 1840         /* sentinel client-reconfig-script */
 1841         if (master->client_reconfig_script) {
 1842             line = sdscatprintf(sdsempty(),
 1843                 "sentinel client-reconfig-script %s %s",
 1844                 master->name, master->client_reconfig_script);
 1845             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1846         }
 1847 
 1848         /* sentinel auth-pass & auth-user */
 1849         if (master->auth_pass) {
 1850             line = sdscatprintf(sdsempty(),
 1851                 "sentinel auth-pass %s %s",
 1852                 master->name, master->auth_pass);
 1853             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1854         }
 1855 
 1856         if (master->auth_user) {
 1857             line = sdscatprintf(sdsempty(),
 1858                 "sentinel auth-user %s %s",
 1859                 master->name, master->auth_user);
 1860             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1861         }
 1862 
 1863         /* sentinel config-epoch */
 1864         line = sdscatprintf(sdsempty(),
 1865             "sentinel config-epoch %s %llu",
 1866             master->name, (unsigned long long) master->config_epoch);
 1867         rewriteConfigRewriteLine(state,"sentinel",line,1);
 1868 
 1869         /* sentinel leader-epoch */
 1870         line = sdscatprintf(sdsempty(),
 1871             "sentinel leader-epoch %s %llu",
 1872             master->name, (unsigned long long) master->leader_epoch);
 1873         rewriteConfigRewriteLine(state,"sentinel",line,1);
 1874 
 1875         /* sentinel known-slave */
 1876         di2 = dictGetIterator(master->slaves);
 1877         while((de = dictNext(di2)) != NULL) {
 1878             sentinelAddr *slave_addr;
 1879 
 1880             ri = dictGetVal(de);
 1881             slave_addr = ri->addr;
 1882 
 1883             /* If master_addr (obtained using sentinelGetCurrentMasterAddress()
 1884              * so it may be the address of the promoted slave) is equal to this
 1885              * slave's address, a failover is in progress and the slave was
 1886              * already successfully promoted. So as the address of this slave
 1887              * we use the old master address instead. */
 1888             if (sentinelAddrIsEqual(slave_addr,master_addr))
 1889                 slave_addr = master->addr;
 1890             line = sdscatprintf(sdsempty(),
 1891                 "sentinel known-replica %s %s %d",
 1892                 master->name, slave_addr->ip, slave_addr->port);
 1893             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1894         }
 1895         dictReleaseIterator(di2);
 1896 
 1897         /* sentinel known-sentinel */
 1898         di2 = dictGetIterator(master->sentinels);
 1899         while((de = dictNext(di2)) != NULL) {
 1900             ri = dictGetVal(de);
 1901             if (ri->runid == NULL) continue;
 1902             line = sdscatprintf(sdsempty(),
 1903                 "sentinel known-sentinel %s %s %d %s",
 1904                 master->name, ri->addr->ip, ri->addr->port, ri->runid);
 1905             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1906         }
 1907         dictReleaseIterator(di2);
 1908 
 1909         /* sentinel rename-command */
 1910         di2 = dictGetIterator(master->renamed_commands);
 1911         while((de = dictNext(di2)) != NULL) {
 1912             sds oldname = dictGetKey(de);
 1913             sds newname = dictGetVal(de);
 1914             line = sdscatprintf(sdsempty(),
 1915                 "sentinel rename-command %s %s %s",
 1916                 master->name, oldname, newname);
 1917             rewriteConfigRewriteLine(state,"sentinel",line,1);
 1918         }
 1919         dictReleaseIterator(di2);
 1920     }
 1921 
 1922     /* sentinel current-epoch is a global state valid for all the masters. */
 1923     line = sdscatprintf(sdsempty(),
 1924         "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
 1925     rewriteConfigRewriteLine(state,"sentinel",line,1);
 1926 
 1927     /* sentinel announce-ip. */
 1928     if (sentinel.announce_ip) {
 1929         line = sdsnew("sentinel announce-ip ");
 1930         line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
 1931         rewriteConfigRewriteLine(state,"sentinel",line,1);
 1932     }
 1933 
 1934     /* sentinel announce-port. */
 1935     if (sentinel.announce_port) {
 1936         line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
 1937                             sentinel.announce_port);
 1938         rewriteConfigRewriteLine(state,"sentinel",line,1);
 1939     }
 1940 
 1941     dictReleaseIterator(di);
 1942 }
 1943 
 1944 /* This function uses the config rewriting Redis engine in order to persist
 1945  * the state of the Sentinel in the current configuration file.
 1946  *
 1947  * Before returning the function calls fsync() against the generated
 1948  * configuration file to make sure changes are committed to disk.
 1949  *
 1950  * On failure the function logs a warning on the Redis log. */
 1951 void sentinelFlushConfig(void) {
 1952     int fd = -1;
 1953     int saved_hz = server.hz;
 1954     int rewrite_status;
 1955 
 1956     server.hz = CONFIG_DEFAULT_HZ;
 1957     rewrite_status = rewriteConfig(server.configfile, 0);
 1958     server.hz = saved_hz;
 1959 
 1960     if (rewrite_status == -1) goto werr;
 1961     if ((fd = open(server.configfile,O_RDONLY)) == -1) goto werr;
 1962     if (fsync(fd) == -1) goto werr;
 1963     if (close(fd) == EOF) goto werr;
 1964     return;
 1965 
 1966 werr:
 1967     if (fd != -1) close(fd);
 1968     serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
 1969 }
 1970 
 1971 /* ====================== hiredis connection handling ======================= */
 1972 
 1973 /* Send the AUTH command with the specified master password if needed.
 1974  * Note that for slaves the password set for the master is used.
 1975  *
 1976  * In case this Sentinel requires a password as well, via the "requirepass"
 1977  * configuration directive, we assume we should use the local password in
 1978  * order to authenticate when connecting with the other Sentinels as well.
 1979  * So basically all the Sentinels share the same password and use it to
 1980  * authenticate reciprocally.
 1981  *
 1982  * We don't check at all if the command was successfully transmitted
 1983  * to the instance as if it fails Sentinel will detect the instance down,
 1984  * will disconnect and reconnect the link and so forth. */
 1985 void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
 1986     char *auth_pass = NULL;
 1987     char *auth_user = NULL;
 1988 
 1989     if (ri->flags & SRI_MASTER) {
 1990         auth_pass = ri->auth_pass;
 1991         auth_user = ri->auth_user;
 1992     } else if (ri->flags & SRI_SLAVE) {
 1993         auth_pass = ri->master->auth_pass;
 1994         auth_user = ri->master->auth_user;
 1995     } else if (ri->flags & SRI_SENTINEL) {
 1996         auth_pass = server.requirepass;
 1997         auth_user = NULL;
 1998     }
 1999 
 2000     if (auth_pass && auth_user == NULL) {
 2001         if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
 2002             sentinelInstanceMapCommand(ri,"AUTH"),
 2003             auth_pass) == C_OK) ri->link->pending_commands++;
 2004     } else if (auth_pass && auth_user) {
 2005         /* If we also have an username, use the ACL-style AUTH command
 2006          * with two arguments, username and password. */
 2007         if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s",
 2008             sentinelInstanceMapCommand(ri,"AUTH"),
 2009             auth_user, auth_pass) == C_OK) ri->link->pending_commands++;
 2010     }
 2011 }
 2012 
 2013 /* Use CLIENT SETNAME to name the connection in the Redis instance as
 2014  * sentinel-<first_8_chars_of_runid>-<connection_type>
 2015  * The connection type is "cmd" or "pubsub" as specified by 'type'.
 2016  *
 2017  * This makes it possible to list all the sentinel instances connected
 2018  * to a Redis servewr with CLIENT LIST, grepping for a specific name format. */
 2019 void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
 2020     char name[64];
 2021 
 2022     snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
 2023     if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
 2024         "%s SETNAME %s",
 2025         sentinelInstanceMapCommand(ri,"CLIENT"),
 2026         name) == C_OK)
 2027     {
 2028         ri->link->pending_commands++;
 2029     }
 2030 }
 2031 
 2032 static int instanceLinkNegotiateTLS(redisAsyncContext *context) {
 2033 #ifndef USE_OPENSSL
 2034     (void) context;
 2035 #else
 2036     if (!redis_tls_ctx) return C_ERR;
 2037     SSL *ssl = SSL_new(redis_tls_ctx);
 2038     if (!ssl) return C_ERR;
 2039 
 2040     if (redisInitiateSSL(&context->c, ssl) == REDIS_ERR) return C_ERR;
 2041 #endif
 2042     return C_OK;
 2043 }
 2044 
 2045 /* Create the async connections for the instance link if the link
 2046  * is disconnected. Note that link->disconnected is true even if just
 2047  * one of the two links (commands and pub/sub) is missing. */
 2048 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
 2049     if (ri->link->disconnected == 0) return;
 2050     if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
 2051     instanceLink *link = ri->link;
 2052     mstime_t now = mstime();
 2053 
 2054     if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
 2055     ri->link->last_reconn_time = now;
 2056 
 2057     /* Commands connection. */
 2058     if (link->cc == NULL) {
 2059         link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
 2060         if (!link->cc->err && server.tls_replication &&
 2061                 (instanceLinkNegotiateTLS(link->cc) == C_ERR)) {
 2062             sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
 2063             instanceLinkCloseConnection(link,link->cc);
 2064         } else if (link->cc->err) {
 2065             sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
 2066                 link->cc->errstr);
 2067             instanceLinkCloseConnection(link,link->cc);
 2068         } else {
 2069             link->pending_commands = 0;
 2070             link->cc_conn_time = mstime();
 2071             link->cc->data = link;
 2072             redisAeAttach(server.el,link->cc);
 2073             redisAsyncSetConnectCallback(link->cc,
 2074                     sentinelLinkEstablishedCallback);
 2075             redisAsyncSetDisconnectCallback(link->cc,
 2076                     sentinelDisconnectCallback);
 2077             sentinelSendAuthIfNeeded(ri,link->cc);
 2078             sentinelSetClientName(ri,link->cc,"cmd");
 2079 
 2080             /* Send a PING ASAP when reconnecting. */
 2081             sentinelSendPing(ri);
 2082         }
 2083     }
 2084     /* Pub / Sub */
 2085     if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
 2086         link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
 2087         if (!link->pc->err && server.tls_replication &&
 2088                 (instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
 2089             sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
 2090         } else if (link->pc->err) {
 2091             sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
 2092                 link->pc->errstr);
 2093             instanceLinkCloseConnection(link,link->pc);
 2094         } else {
 2095             int retval;
 2096 
 2097             link->pc_conn_time = mstime();
 2098             link->pc->data = link;
 2099             redisAeAttach(server.el,link->pc);
 2100             redisAsyncSetConnectCallback(link->pc,
 2101                     sentinelLinkEstablishedCallback);
 2102             redisAsyncSetDisconnectCallback(link->pc,
 2103                     sentinelDisconnectCallback);
 2104             sentinelSendAuthIfNeeded(ri,link->pc);
 2105             sentinelSetClientName(ri,link->pc,"pubsub");
 2106             /* Now we subscribe to the Sentinels "Hello" channel. */
 2107             retval = redisAsyncCommand(link->pc,
 2108                 sentinelReceiveHelloMessages, ri, "%s %s",
 2109                 sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
 2110                 SENTINEL_HELLO_CHANNEL);
 2111             if (retval != C_OK) {
 2112                 /* If we can't subscribe, the Pub/Sub connection is useless
 2113                  * and we can simply disconnect it and try again. */
 2114                 instanceLinkCloseConnection(link,link->pc);
 2115                 return;
 2116             }
 2117         }
 2118     }
 2119     /* Clear the disconnected status only if we have both the connections
 2120      * (or just the commands connection if this is a sentinel instance). */
 2121     if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
 2122         link->disconnected = 0;
 2123 }
 2124 
 2125 /* ======================== Redis instances pinging  ======================== */
 2126 
 2127 /* Return true if master looks "sane", that is:
 2128  * 1) It is actually a master in the current configuration.
 2129  * 2) It reports itself as a master.
 2130  * 3) It is not SDOWN or ODOWN.
 2131  * 4) We obtained last INFO no more than two times the INFO period time ago. */
 2132 int sentinelMasterLooksSane(sentinelRedisInstance *master) {
 2133     return
 2134         master->flags & SRI_MASTER &&
 2135         master->role_reported == SRI_MASTER &&
 2136         (master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
 2137         (mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2;
 2138 }
 2139 
 2140 /* Process the INFO output from masters. */
 2141 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
 2142     sds *lines;
 2143     int numlines, j;
 2144     int role = 0;
 2145 
 2146     /* cache full INFO output for instance */
 2147     sdsfree(ri->info);
 2148     ri->info = sdsnew(info);
 2149 
 2150     /* The following fields must be reset to a given value in the case they
 2151      * are not found at all in the INFO output. */
 2152     ri->master_link_down_time = 0;
 2153 
 2154     /* Process line by line. */
 2155     lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
 2156     for (j = 0; j < numlines; j++) {
 2157         sentinelRedisInstance *slave;
 2158         sds l = lines[j];
 2159 
 2160         /* run_id:<40 hex chars>*/
 2161         if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
 2162             if (ri->runid == NULL) {
 2163                 ri->runid = sdsnewlen(l+7,40);
 2164             } else {
 2165                 if (strncmp(ri->runid,l+7,40) != 0) {
 2166                     sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
 2167                     sdsfree(ri->runid);
 2168                     ri->runid = sdsnewlen(l+7,40);
 2169                 }
 2170             }
 2171         }
 2172 
 2173         /* old versions: slave0:<ip>,<port>,<state>
 2174          * new versions: slave0:ip=127.0.0.1,port=9999,... */
 2175         if ((ri->flags & SRI_MASTER) &&
 2176             sdslen(l) >= 7 &&
 2177             !memcmp(l,"slave",5) && isdigit(l[5]))
 2178         {
 2179             char *ip, *port, *end;
 2180 
 2181             if (strstr(l,"ip=") == NULL) {
 2182                 /* Old format. */
 2183                 ip = strchr(l,':'); if (!ip) continue;
 2184                 ip++; /* Now ip points to start of ip address. */
 2185                 port = strchr(ip,','); if (!port) continue;
 2186                 *port = '\0'; /* nul term for easy access. */
 2187                 port++; /* Now port points to start of port number. */
 2188                 end = strchr(port,','); if (!end) continue;
 2189                 *end = '\0'; /* nul term for easy access. */
 2190             } else {
 2191                 /* New format. */
 2192                 ip = strstr(l,"ip="); if (!ip) continue;
 2193                 ip += 3; /* Now ip points to start of ip address. */
 2194                 port = strstr(l,"port="); if (!port) continue;
 2195                 port += 5; /* Now port points to start of port number. */
 2196                 /* Nul term both fields for easy access. */
 2197                 end = strchr(ip,','); if (end) *end = '\0';
 2198                 end = strchr(port,','); if (end) *end = '\0';
 2199             }
 2200 
 2201             /* Check if we already have this slave into our table,
 2202              * otherwise add it. */
 2203             if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
 2204                 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
 2205                             atoi(port), ri->quorum, ri)) != NULL)
 2206                 {
 2207                     sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
 2208                     sentinelFlushConfig();
 2209                 }
 2210             }
 2211         }
 2212 
 2213         /* master_link_down_since_seconds:<seconds> */
 2214         if (sdslen(l) >= 32 &&
 2215             !memcmp(l,"master_link_down_since_seconds",30))
 2216         {
 2217             ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
 2218         }
 2219 
 2220         /* role:<role> */
 2221         if (sdslen(l) >= 11 && !memcmp(l,"role:master",11)) role = SRI_MASTER;
 2222         else if (sdslen(l) >= 10 && !memcmp(l,"role:slave",10)) role = SRI_SLAVE;
 2223 
 2224         if (role == SRI_SLAVE) {
 2225             /* master_host:<host> */
 2226             if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
 2227                 if (ri->slave_master_host == NULL ||
 2228                     strcasecmp(l+12,ri->slave_master_host))
 2229                 {
 2230                     sdsfree(ri->slave_master_host);
 2231                     ri->slave_master_host = sdsnew(l+12);
 2232                     ri->slave_conf_change_time = mstime();
 2233                 }
 2234             }
 2235 
 2236             /* master_port:<port> */
 2237             if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
 2238                 int slave_master_port = atoi(l+12);
 2239 
 2240                 if (ri->slave_master_port != slave_master_port) {
 2241                     ri->slave_master_port = slave_master_port;
 2242                     ri->slave_conf_change_time = mstime();
 2243                 }
 2244             }
 2245 
 2246             /* master_link_status:<status> */
 2247             if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
 2248                 ri->slave_master_link_status =
 2249                     (strcasecmp(l+19,"up") == 0) ?
 2250                     SENTINEL_MASTER_LINK_STATUS_UP :
 2251                     SENTINEL_MASTER_LINK_STATUS_DOWN;
 2252             }
 2253 
 2254             /* slave_priority:<priority> */
 2255             if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
 2256                 ri->slave_priority = atoi(l+15);
 2257 
 2258             /* slave_repl_offset:<offset> */
 2259             if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
 2260                 ri->slave_repl_offset = strtoull(l+18,NULL,10);
 2261         }
 2262     }
 2263     ri->info_refresh = mstime();
 2264     sdsfreesplitres(lines,numlines);
 2265 
 2266     /* ---------------------------- Acting half -----------------------------
 2267      * Some things will not happen if sentinel.tilt is true, but some will
 2268      * still be processed. */
 2269 
 2270     /* Remember when the role changed. */
 2271     if (role != ri->role_reported) {
 2272         ri->role_reported_time = mstime();
 2273         ri->role_reported = role;
 2274         if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
 2275         /* Log the event with +role-change if the new role is coherent or
 2276          * with -role-change if there is a mismatch with the current config. */
 2277         sentinelEvent(LL_VERBOSE,
 2278             ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
 2279             "+role-change" : "-role-change",
 2280             ri, "%@ new reported role is %s",
 2281             role == SRI_MASTER ? "master" : "slave",
 2282             ri->flags & SRI_MASTER ? "master" : "slave");
 2283     }
 2284 
 2285     /* None of the following conditions are processed when in tilt mode, so
 2286      * return asap. */
 2287     if (sentinel.tilt) return;
 2288 
 2289     /* Handle master -> slave role switch. */
 2290     if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
 2291         /* Nothing to do, but masters claiming to be slaves are
 2292          * considered to be unreachable by Sentinel, so eventually
 2293          * a failover will be triggered. */
 2294     }
 2295 
 2296     /* Handle slave -> master role switch. */
 2297     if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
 2298         /* If this is a promoted slave we can change state to the
 2299          * failover state machine. */
 2300         if ((ri->flags & SRI_PROMOTED) &&
 2301             (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
 2302             (ri->master->failover_state ==
 2303                 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
 2304         {
 2305             /* Now that we are sure the slave was reconfigured as a master
 2306              * set the master configuration epoch to the epoch we won the
 2307              * election to perform this failover. This will force the other
 2308              * Sentinels to update their config (assuming there is not
 2309              * a newer one already available). */
 2310             ri->master->config_epoch = ri->master->failover_epoch;
 2311             ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
 2312             ri->master->failover_state_change_time = mstime();
 2313             sentinelFlushConfig();
 2314             sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
 2315             if (sentinel.simfailure_flags &
 2316                 SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
 2317                 sentinelSimFailureCrash();
 2318             sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
 2319                 ri->master,"%@");
 2320             sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
 2321                 "start",ri->master->addr,ri->addr);
 2322             sentinelForceHelloUpdateForMaster(ri->master);
 2323         } else {
 2324             /* A slave turned into a master. We want to force our view and
 2325              * reconfigure as slave. Wait some time after the change before
 2326              * going forward, to receive new configs if any. */
 2327             mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
 2328 
 2329             if (!(ri->flags & SRI_PROMOTED) &&
 2330                  sentinelMasterLooksSane(ri->master) &&
 2331                  sentinelRedisInstanceNoDownFor(ri,wait_time) &&
 2332                  mstime() - ri->role_reported_time > wait_time)
 2333             {
 2334                 int retval = sentinelSendSlaveOf(ri,
 2335                         ri->master->addr->ip,
 2336                         ri->master->addr->port);
 2337                 if (retval == C_OK)
 2338                     sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
 2339             }
 2340         }
 2341     }
 2342 
 2343     /* Handle slaves replicating to a different master address. */
 2344     if ((ri->flags & SRI_SLAVE) &&
 2345         role == SRI_SLAVE &&
 2346         (ri->slave_master_port != ri->master->addr->port ||
 2347          strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
 2348     {
 2349         mstime_t wait_time = ri->master->failover_timeout;
 2350 
 2351         /* Make sure the master is sane before reconfiguring this instance
 2352          * into a slave. */
 2353         if (sentinelMasterLooksSane(ri->master) &&
 2354             sentinelRedisInstanceNoDownFor(ri,wait_time) &&
 2355             mstime() - ri->slave_conf_change_time > wait_time)
 2356         {
 2357             int retval = sentinelSendSlaveOf(ri,
 2358                     ri->master->addr->ip,
 2359                     ri->master->addr->port);
 2360             if (retval == C_OK)
 2361                 sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
 2362         }
 2363     }
 2364 
 2365     /* Detect if the slave that is in the process of being reconfigured
 2366      * changed state. */
 2367     if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
 2368         (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
 2369     {
 2370         /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
 2371         if ((ri->flags & SRI_RECONF_SENT) &&
 2372             ri->slave_master_host &&
 2373             strcmp(ri->slave_master_host,
 2374                     ri->master->promoted_slave->addr->ip) == 0 &&
 2375             ri->slave_master_port == ri->master->promoted_slave->addr->port)
 2376         {
 2377             ri->flags &= ~SRI_RECONF_SENT;
 2378             ri->flags |= SRI_RECONF_INPROG;
 2379             sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
 2380         }
 2381 
 2382         /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
 2383         if ((ri->flags & SRI_RECONF_INPROG) &&
 2384             ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
 2385         {
 2386             ri->flags &= ~SRI_RECONF_INPROG;
 2387             ri->flags |= SRI_RECONF_DONE;
 2388             sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
 2389         }
 2390     }
 2391 }
 2392 
 2393 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
 2394     sentinelRedisInstance *ri = privdata;
 2395     instanceLink *link = c->data;
 2396     redisReply *r;
 2397 
 2398     if (!reply || !link) return;
 2399     link->pending_commands--;
 2400     r = reply;
 2401 
 2402     if (r->type == REDIS_REPLY_STRING)
 2403         sentinelRefreshInstanceInfo(ri,r->str);
 2404 }
 2405 
 2406 /* Just discard the reply. We use this when we are not monitoring the return
 2407  * value of the command but its effects directly. */
 2408 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
 2409     instanceLink *link = c->data;
 2410     UNUSED(reply);
 2411     UNUSED(privdata);
 2412 
 2413     if (link) link->pending_commands--;
 2414 }
 2415 
 2416 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
 2417     sentinelRedisInstance *ri = privdata;
 2418     instanceLink *link = c->data;
 2419     redisReply *r;
 2420 
 2421     if (!reply || !link) return;
 2422     link->pending_commands--;
 2423     r = reply;
 2424 
 2425     if (r->type == REDIS_REPLY_STATUS ||
 2426         r->type == REDIS_REPLY_ERROR) {
 2427         /* Update the "instance available" field only if this is an
 2428          * acceptable reply. */
 2429         if (strncmp(r->str,"PONG",4) == 0 ||
 2430             strncmp(r->str,"LOADING",7) == 0 ||
 2431             strncmp(r->str,"MASTERDOWN",10) == 0)
 2432         {
 2433             link->last_avail_time = mstime();
 2434             link->act_ping_time = 0; /* Flag the pong as received. */
 2435         } else {
 2436             /* Send a SCRIPT KILL command if the instance appears to be
 2437              * down because of a busy script. */
 2438             if (strncmp(r->str,"BUSY",4) == 0 &&
 2439                 (ri->flags & SRI_S_DOWN) &&
 2440                 !(ri->flags & SRI_SCRIPT_KILL_SENT))
 2441             {
 2442                 if (redisAsyncCommand(ri->link->cc,
 2443                         sentinelDiscardReplyCallback, ri,
 2444                         "%s KILL",
 2445                         sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
 2446                 {
 2447                     ri->link->pending_commands++;
 2448                 }
 2449                 ri->flags |= SRI_SCRIPT_KILL_SENT;
 2450             }
 2451         }
 2452     }
 2453     link->last_pong_time = mstime();
 2454 }
 2455 
 2456 /* This is called when we get the reply about the PUBLISH command we send
 2457  * to the master to advertise this sentinel. */
 2458 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
 2459     sentinelRedisInstance *ri = privdata;
 2460     instanceLink *link = c->data;
 2461     redisReply *r;
 2462 
 2463     if (!reply || !link) return;
 2464     link->pending_commands--;
 2465     r = reply;
 2466 
 2467     /* Only update pub_time if we actually published our message. Otherwise
 2468      * we'll retry again in 100 milliseconds. */
 2469     if (r->type != REDIS_REPLY_ERROR)
 2470         ri->last_pub_time = mstime();
 2471 }
 2472 
 2473 /* Process an hello message received via Pub/Sub in master or slave instance,
 2474  * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
 2475  *
 2476  * If the master name specified in the message is not known, the message is
 2477  * discarded. */
 2478 void sentinelProcessHelloMessage(char *hello, int hello_len) {
 2479     /* Format is composed of 8 tokens:
 2480      * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
 2481      * 5=master_ip,6=master_port,7=master_config_epoch. */
 2482     int numtokens, port, removed, master_port;
 2483     uint64_t current_epoch, master_config_epoch;
 2484     char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
 2485     sentinelRedisInstance *si, *master;
 2486 
 2487     if (numtokens == 8) {
 2488         /* Obtain a reference to the master this hello message is about */
 2489         master = sentinelGetMasterByName(token[4]);
 2490         if (!master) goto cleanup; /* Unknown master, skip the message. */
 2491 
 2492         /* First, try to see if we already have this sentinel. */
 2493         port = atoi(token[1]);
 2494         master_port = atoi(token[6]);
 2495         si = getSentinelRedisInstanceByAddrAndRunID(
 2496                         master->sentinels,token[0],port,token[2]);
 2497         current_epoch = strtoull(token[3],NULL,10);
 2498         master_config_epoch = strtoull(token[7],NULL,10);
 2499 
 2500         if (!si) {
 2501             /* If not, remove all the sentinels that have the same runid
 2502              * because there was an address change, and add the same Sentinel
 2503              * with the new address back. */
 2504             removed = removeMatchingSentinelFromMaster(master,token[2]);
 2505             if (removed) {
 2506                 sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
 2507                     "%@ ip %s port %d for %s", token[0],port,token[2]);
 2508             } else {
 2509                 /* Check if there is another Sentinel with the same address this
 2510                  * new one is reporting. What we do if this happens is to set its
 2511                  * port to 0, to signal the address is invalid. We'll update it
 2512                  * later if we get an HELLO message. */
 2513                 sentinelRedisInstance *other =
 2514                     getSentinelRedisInstanceByAddrAndRunID(
 2515                         master->sentinels, token[0],port,NULL);
 2516                 if (other) {
 2517                     sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
 2518                     other->addr->port = 0; /* It means: invalid address. */
 2519                     sentinelUpdateSentinelAddressInAllMasters(other);
 2520                 }
 2521             }
 2522 
 2523             /* Add the new sentinel. */
 2524             si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
 2525                             token[0],port,master->quorum,master);
 2526 
 2527             if (si) {
 2528                 if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
 2529                 /* The runid is NULL after a new instance creation and
 2530                  * for Sentinels we don't have a later chance to fill it,
 2531                  * so do it now. */
 2532                 si->runid = sdsnew(token[2]);
 2533                 sentinelTryConnectionSharing(si);
 2534                 if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
 2535                 sentinelFlushConfig();
 2536             }
 2537         }
 2538 
 2539         /* Update local current_epoch if received current_epoch is greater.*/
 2540         if (current_epoch > sentinel.current_epoch) {
 2541             sentinel.current_epoch = current_epoch;
 2542             sentinelFlushConfig();
 2543             sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
 2544                 (unsigned long long) sentinel.current_epoch);
 2545         }
 2546 
 2547         /* Update master info if received configuration is newer. */
 2548         if (si && master->config_epoch < master_config_epoch) {
 2549             master->config_epoch = master_config_epoch;
 2550             if (master_port != master->addr->port ||
 2551                 strcmp(master->addr->ip, token[5]))
 2552             {
 2553                 sentinelAddr *old_addr;
 2554 
 2555                 sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
 2556                 sentinelEvent(LL_WARNING,"+switch-master",
 2557                     master,"%s %s %d %s %d",
 2558                     master->name,
 2559                     master->addr->ip, master->addr->port,
 2560                     token[5], master_port);
 2561 
 2562                 old_addr = dupSentinelAddr(master->addr);
 2563                 sentinelResetMasterAndChangeAddress(master, token[5], master_port);
 2564                 sentinelCallClientReconfScript(master,
 2565                     SENTINEL_OBSERVER,"start",
 2566                     old_addr,master->addr);
 2567                 releaseSentinelAddr(old_addr);
 2568             }
 2569         }
 2570 
 2571         /* Update the state of the Sentinel. */
 2572         if (si) si->last_hello_time = mstime();
 2573     }
 2574 
 2575 cleanup:
 2576     sdsfreesplitres(token,numtokens);
 2577 }
 2578 
 2579 
 2580 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
 2581  * to discover other sentinels attached at the same master. */
 2582 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
 2583     sentinelRedisInstance *ri = privdata;
 2584     redisReply *r;
 2585     UNUSED(c);
 2586 
 2587     if (!reply || !ri) return;
 2588     r = reply;
 2589 
 2590     /* Update the last activity in the pubsub channel. Note that since we
 2591      * receive our messages as well this timestamp can be used to detect
 2592      * if the link is probably disconnected even if it seems otherwise. */
 2593     ri->link->pc_last_activity = mstime();
 2594 
 2595     /* Sanity check in the reply we expect, so that the code that follows
 2596      * can avoid to check for details. */
 2597     if (r->type != REDIS_REPLY_ARRAY ||
 2598         r->elements != 3 ||
 2599         r->element[0]->type != REDIS_REPLY_STRING ||
 2600         r->element[1]->type != REDIS_REPLY_STRING ||
 2601         r->element[2]->type != REDIS_REPLY_STRING ||
 2602         strcmp(r->element[0]->str,"message") != 0) return;
 2603 
 2604     /* We are not interested in meeting ourselves */
 2605     if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;
 2606 
 2607     sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
 2608 }
 2609 
 2610 /* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
 2611  * instance in order to broadcast the current configuration for this
 2612  * master, and to advertise the existence of this Sentinel at the same time.
 2613  *
 2614  * The message has the following format:
 2615  *
 2616  * sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
 2617  * master_name,master_ip,master_port,master_config_epoch.
 2618  *
 2619  * Returns C_OK if the PUBLISH was queued correctly, otherwise
 2620  * C_ERR is returned. */
 2621 int sentinelSendHello(sentinelRedisInstance *ri) {
 2622     char ip[NET_IP_STR_LEN];
 2623     char payload[NET_IP_STR_LEN+1024];
 2624     int retval;
 2625     char *announce_ip;
 2626     int announce_port;
 2627     sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
 2628     sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
 2629 
 2630     if (ri->link->disconnected) return C_ERR;
 2631 
 2632     /* Use the specified announce address if specified, otherwise try to
 2633      * obtain our own IP address. */
 2634     if (sentinel.announce_ip) {
 2635         announce_ip = sentinel.announce_ip;
 2636     } else {
 2637         if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
 2638             return C_ERR;
 2639         announce_ip = ip;
 2640     }
 2641     if (sentinel.announce_port) announce_port = sentinel.announce_port;
 2642     else if (server.tls_replication && server.tls_port) announce_port = server.tls_port;
 2643     else announce_port = server.port;
 2644 
 2645     /* Format and send the Hello message. */
 2646     snprintf(payload,sizeof(payload),
 2647         "%s,%d,%s,%llu," /* Info about this sentinel. */
 2648         "%s,%s,%d,%llu", /* Info about current master. */
 2649         announce_ip, announce_port, sentinel.myid,
 2650         (unsigned long long) sentinel.current_epoch,
 2651         /* --- */
 2652         master->name,master_addr->ip,master_addr->port,
 2653         (unsigned long long) master->config_epoch);
 2654     retval = redisAsyncCommand(ri->link->cc,
 2655         sentinelPublishReplyCallback, ri, "%s %s %s",
 2656         sentinelInstanceMapCommand(ri,"PUBLISH"),
 2657         SENTINEL_HELLO_CHANNEL,payload);
 2658     if (retval != C_OK) return C_ERR;
 2659     ri->link->pending_commands++;
 2660     return C_OK;
 2661 }
 2662 
 2663 /* Reset last_pub_time in all the instances in the specified dictionary
 2664  * in order to force the delivery of an Hello update ASAP. */
 2665 void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
 2666     dictIterator *di;
 2667     dictEntry *de;
 2668 
 2669     di = dictGetSafeIterator(instances);
 2670     while((de = dictNext(di)) != NULL) {
 2671         sentinelRedisInstance *ri = dictGetVal(de);
 2672         if (ri->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
 2673             ri->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
 2674     }
 2675     dictReleaseIterator(di);
 2676 }
 2677 
 2678 /* This function forces the delivery of an "Hello" message (see
 2679  * sentinelSendHello() top comment for further information) to all the Redis
 2680  * and Sentinel instances related to the specified 'master'.
 2681  *
 2682  * It is technically not needed since we send an update to every instance
 2683  * with a period of SENTINEL_PUBLISH_PERIOD milliseconds, however when a
 2684  * Sentinel upgrades a configuration it is a good idea to deliever an update
 2685  * to the other Sentinels ASAP. */
 2686 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
 2687     if (!(master->flags & SRI_MASTER)) return C_ERR;
 2688     if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
 2689         master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
 2690     sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
 2691     sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
 2692     return C_OK;
 2693 }
 2694 
 2695 /* Send a PING to the specified instance and refresh the act_ping_time
 2696  * if it is zero (that is, if we received a pong for the previous ping).
 2697  *
 2698  * On error zero is returned, and we can't consider the PING command
 2699  * queued in the connection. */
 2700 int sentinelSendPing(sentinelRedisInstance *ri) {
 2701     int retval = redisAsyncCommand(ri->link->cc,
 2702         sentinelPingReplyCallback, ri, "%s",
 2703         sentinelInstanceMapCommand(ri,"PING"));
 2704     if (retval == C_OK) {
 2705         ri->link->pending_commands++;
 2706         ri->link->last_ping_time = mstime();
 2707         /* We update the active ping time only if we received the pong for
 2708          * the previous ping, otherwise we are technically waiting since the
 2709          * first ping that did not receive a reply. */
 2710         if (ri->link->act_ping_time == 0)
 2711             ri->link->act_ping_time = ri->link->last_ping_time;
 2712         return 1;
 2713     } else {
 2714         return 0;
 2715     }
 2716 }
 2717 
 2718 /* Send periodic PING, INFO, and PUBLISH to the Hello channel to
 2719  * the specified master or slave instance. */
 2720 void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
 2721     mstime_t now = mstime();
 2722     mstime_t info_period, ping_period;
 2723     int retval;
 2724 
 2725     /* Return ASAP if we have already a PING or INFO already pending, or
 2726      * in the case the instance is not properly connected. */
 2727     if (ri->link->disconnected) return;
 2728 
 2729     /* For INFO, PING, PUBLISH that are not critical commands to send we
 2730      * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
 2731      * want to use a lot of memory just because a link is not working
 2732      * properly (note that anyway there is a redundant protection about this,
 2733      * that is, the link will be disconnected and reconnected if a long
 2734      * timeout condition is detected. */
 2735     if (ri->link->pending_commands >=
 2736         SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
 2737 
 2738     /* If this is a slave of a master in O_DOWN condition we start sending
 2739      * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
 2740      * period. In this state we want to closely monitor slaves in case they
 2741      * are turned into masters by another Sentinel, or by the sysadmin.
 2742      *
 2743      * Similarly we monitor the INFO output more often if the slave reports
 2744      * to be disconnected from the master, so that we can have a fresh
 2745      * disconnection time figure. */
 2746     if ((ri->flags & SRI_SLAVE) &&
 2747         ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
 2748          (ri->master_link_down_time != 0)))
 2749     {
 2750         info_period = 1000;
 2751     } else {
 2752         info_period = SENTINEL_INFO_PERIOD;
 2753     }
 2754 
 2755     /* We ping instances every time the last received pong is older than
 2756      * the configured 'down-after-milliseconds' time, but every second
 2757      * anyway if 'down-after-milliseconds' is greater than 1 second. */
 2758     ping_period = ri->down_after_period;
 2759     if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
 2760 
 2761     /* Send INFO to masters and slaves, not sentinels. */
 2762     if ((ri->flags & SRI_SENTINEL) == 0 &&
 2763         (ri->info_refresh == 0 ||
 2764         (now - ri->info_refresh) > info_period))
 2765     {
 2766         retval = redisAsyncCommand(ri->link->cc,
 2767             sentinelInfoReplyCallback, ri, "%s",
 2768             sentinelInstanceMapCommand(ri,"INFO"));
 2769         if (retval == C_OK) ri->link->pending_commands++;
 2770     }
 2771 
 2772     /* Send PING to all the three kinds of instances. */
 2773     if ((now - ri->link->last_pong_time) > ping_period &&
 2774                (now - ri->link->last_ping_time) > ping_period/2) {
 2775         sentinelSendPing(ri);
 2776     }
 2777 
 2778     /* PUBLISH hello messages to all the three kinds of instances. */
 2779     if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
 2780         sentinelSendHello(ri);
 2781     }
 2782 }
 2783 
 2784 /* =========================== SENTINEL command ============================= */
 2785 
 2786 const char *sentinelFailoverStateStr(int state) {
 2787     switch(state) {
 2788     case SENTINEL_FAILOVER_STATE_NONE: return "none";
 2789     case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
 2790     case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
 2791     case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
 2792     case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
 2793     case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
 2794     case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
 2795     default: return "unknown";
 2796     }
 2797 }
 2798 
 2799 /* Redis instance to Redis protocol representation. */
 2800 void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
 2801     char *flags = sdsempty();
 2802     void *mbl;
 2803     int fields = 0;
 2804 
 2805     mbl = addReplyDeferredLen(c);
 2806 
 2807     addReplyBulkCString(c,"name");
 2808     addReplyBulkCString(c,ri->name);
 2809     fields++;
 2810 
 2811     addReplyBulkCString(c,"ip");
 2812     addReplyBulkCString(c,ri->addr->ip);
 2813     fields++;
 2814 
 2815     addReplyBulkCString(c,"port");
 2816     addReplyBulkLongLong(c,ri->addr->port);
 2817     fields++;
 2818 
 2819     addReplyBulkCString(c,"runid");
 2820     addReplyBulkCString(c,ri->runid ? ri->runid : "");
 2821     fields++;
 2822 
 2823     addReplyBulkCString(c,"flags");
 2824     if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
 2825     if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
 2826     if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
 2827     if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
 2828     if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
 2829     if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
 2830     if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
 2831     if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
 2832         flags = sdscat(flags,"failover_in_progress,");
 2833     if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
 2834     if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
 2835     if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
 2836     if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
 2837 
 2838     if (sdslen(flags) != 0) sdsrange(flags,0,-2); /* remove last "," */
 2839     addReplyBulkCString(c,flags);
 2840     sdsfree(flags);
 2841     fields++;
 2842 
 2843     addReplyBulkCString(c,"link-pending-commands");
 2844     addReplyBulkLongLong(c,ri->link->pending_commands);
 2845     fields++;
 2846 
 2847     addReplyBulkCString(c,"link-refcount");
 2848     addReplyBulkLongLong(c,ri->link->refcount);
 2849     fields++;
 2850 
 2851     if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
 2852         addReplyBulkCString(c,"failover-state");
 2853         addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
 2854         fields++;
 2855     }
 2856 
 2857     addReplyBulkCString(c,"last-ping-sent");
 2858     addReplyBulkLongLong(c,
 2859         ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
 2860     fields++;
 2861 
 2862     addReplyBulkCString(c,"last-ok-ping-reply");
 2863     addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
 2864     fields++;
 2865 
 2866     addReplyBulkCString(c,"last-ping-reply");
 2867     addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
 2868     fields++;
 2869 
 2870     if (ri->flags & SRI_S_DOWN) {
 2871         addReplyBulkCString(c,"s-down-time");
 2872         addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
 2873         fields++;
 2874     }
 2875 
 2876     if (ri->flags & SRI_O_DOWN) {
 2877         addReplyBulkCString(c,"o-down-time");
 2878         addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
 2879         fields++;
 2880     }
 2881 
 2882     addReplyBulkCString(c,"down-after-milliseconds");
 2883     addReplyBulkLongLong(c,ri->down_after_period);
 2884     fields++;
 2885 
 2886     /* Masters and Slaves */
 2887     if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
 2888         addReplyBulkCString(c,"info-refresh");
 2889         addReplyBulkLongLong(c,mstime() - ri->info_refresh);
 2890         fields++;
 2891 
 2892         addReplyBulkCString(c,"role-reported");
 2893         addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" :
 2894                                                                    "slave");
 2895         fields++;
 2896 
 2897         addReplyBulkCString(c,"role-reported-time");
 2898         addReplyBulkLongLong(c,mstime() - ri->role_reported_time);
 2899         fields++;
 2900     }
 2901 
 2902     /* Only masters */
 2903     if (ri->flags & SRI_MASTER) {
 2904         addReplyBulkCString(c,"config-epoch");
 2905         addReplyBulkLongLong(c,ri->config_epoch);
 2906         fields++;
 2907 
 2908         addReplyBulkCString(c,"num-slaves");
 2909         addReplyBulkLongLong(c,dictSize(ri->slaves));
 2910         fields++;
 2911 
 2912         addReplyBulkCString(c,"num-other-sentinels");
 2913         addReplyBulkLongLong(c,dictSize(ri->sentinels));
 2914         fields++;
 2915 
 2916         addReplyBulkCString(c,"quorum");
 2917         addReplyBulkLongLong(c,ri->quorum);
 2918         fields++;
 2919 
 2920         addReplyBulkCString(c,"failover-timeout");
 2921         addReplyBulkLongLong(c,ri->failover_timeout);
 2922         fields++;
 2923 
 2924         addReplyBulkCString(c,"parallel-syncs");
 2925         addReplyBulkLongLong(c,ri->parallel_syncs);
 2926         fields++;
 2927 
 2928         if (ri->notification_script) {
 2929             addReplyBulkCString(c,"notification-script");
 2930             addReplyBulkCString(c,ri->notification_script);
 2931             fields++;
 2932         }
 2933 
 2934         if (ri->client_reconfig_script) {
 2935             addReplyBulkCString(c,"client-reconfig-script");
 2936             addReplyBulkCString(c,ri->client_reconfig_script);
 2937             fields++;
 2938         }
 2939     }
 2940 
 2941     /* Only slaves */
 2942     if (ri->flags & SRI_SLAVE) {
 2943         addReplyBulkCString(c,"master-link-down-time");
 2944         addReplyBulkLongLong(c,ri->master_link_down_time);
 2945         fields++;
 2946 
 2947         addReplyBulkCString(c,"master-link-status");
 2948         addReplyBulkCString(c,
 2949             (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
 2950             "ok" : "err");
 2951         fields++;
 2952 
 2953         addReplyBulkCString(c,"master-host");
 2954         addReplyBulkCString(c,
 2955             ri->slave_master_host ? ri->slave_master_host : "?");
 2956         fields++;
 2957 
 2958         addReplyBulkCString(c,"master-port");
 2959         addReplyBulkLongLong(c,ri->slave_master_port);
 2960         fields++;
 2961 
 2962         addReplyBulkCString(c,"slave-priority");
 2963         addReplyBulkLongLong(c,ri->slave_priority);
 2964         fields++;
 2965 
 2966         addReplyBulkCString(c,"slave-repl-offset");
 2967         addReplyBulkLongLong(c,ri->slave_repl_offset);
 2968         fields++;
 2969     }
 2970 
 2971     /* Only sentinels */
 2972     if (ri->flags & SRI_SENTINEL) {
 2973         addReplyBulkCString(c,"last-hello-message");
 2974         addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
 2975         fields++;
 2976 
 2977         addReplyBulkCString(c,"voted-leader");
 2978         addReplyBulkCString(c,ri->leader ? ri->leader : "?");
 2979         fields++;
 2980 
 2981         addReplyBulkCString(c,"voted-leader-epoch");
 2982         addReplyBulkLongLong(c,ri->leader_epoch);
 2983         fields++;
 2984     }
 2985 
 2986     setDeferredMapLen(c,mbl,fields);
 2987 }
 2988 
 2989 /* Output a number of instances contained inside a dictionary as
 2990  * Redis protocol. */
 2991 void addReplyDictOfRedisInstances(client *c, dict *instances) {
 2992     dictIterator *di;
 2993     dictEntry *de;
 2994 
 2995     di = dictGetIterator(instances);
 2996     addReplyArrayLen(c,dictSize(instances));
 2997     while((de = dictNext(di)) != NULL) {
 2998         sentinelRedisInstance *ri = dictGetVal(de);
 2999 
 3000         addReplySentinelRedisInstance(c,ri);
 3001     }
 3002     dictReleaseIterator(di);
 3003 }
 3004 
 3005 /* Lookup the named master into sentinel.masters.
 3006  * If the master is not found reply to the client with an error and returns
 3007  * NULL. */
 3008 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
 3009                         robj *name)
 3010 {
 3011     sentinelRedisInstance *ri;
 3012 
 3013     ri = dictFetchValue(sentinel.masters,name->ptr);
 3014     if (!ri) {
 3015         addReplyError(c,"No such master with that name");
 3016         return NULL;
 3017     }
 3018     return ri;
 3019 }
 3020 
 3021 #define SENTINEL_ISQR_OK 0
 3022 #define SENTINEL_ISQR_NOQUORUM (1<<0)
 3023 #define SENTINEL_ISQR_NOAUTH (1<<1)
 3024 int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
 3025     dictIterator *di;
 3026     dictEntry *de;
 3027     int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
 3028     int result = SENTINEL_ISQR_OK;
 3029     int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */
 3030 
 3031     di = dictGetIterator(master->sentinels);
 3032     while((de = dictNext(di)) != NULL) {
 3033         sentinelRedisInstance *ri = dictGetVal(de);
 3034 
 3035         if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
 3036         usable++;
 3037     }
 3038     dictReleaseIterator(di);
 3039 
 3040     if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
 3041     if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
 3042     if (usableptr) *usableptr = usable;
 3043     return result;
 3044 }
 3045 
 3046 void sentinelCommand(client *c) {
 3047     if (!strcasecmp(c->argv[1]->ptr,"masters")) {
 3048         /* SENTINEL MASTERS */
 3049         if (c->argc != 2) goto numargserr;
 3050         addReplyDictOfRedisInstances(c,sentinel.masters);
 3051     } else if (!strcasecmp(c->argv[1]->ptr,"master")) {
 3052         /* SENTINEL MASTER <name> */
 3053         sentinelRedisInstance *ri;
 3054 
 3055         if (c->argc != 3) goto numargserr;
 3056         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
 3057             == NULL) return;
 3058         addReplySentinelRedisInstance(c,ri);
 3059     } else if (!strcasecmp(c->argv[1]->ptr,"slaves") ||
 3060                !strcasecmp(c->argv[1]->ptr,"replicas"))
 3061     {
 3062         /* SENTINEL REPLICAS <master-name> */
 3063         sentinelRedisInstance *ri;
 3064 
 3065         if (c->argc != 3) goto numargserr;
 3066         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
 3067             return;
 3068         addReplyDictOfRedisInstances(c,ri->slaves);
 3069     } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
 3070         /* SENTINEL SENTINELS <master-name> */
 3071         sentinelRedisInstance *ri;
 3072 
 3073         if (c->argc != 3) goto numargserr;
 3074         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
 3075             return;
 3076         addReplyDictOfRedisInstances(c,ri->sentinels);
 3077     } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
 3078         /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
 3079          *
 3080          * Arguments:
 3081          *
 3082          * ip and port are the ip and port of the master we want to be
 3083          * checked by Sentinel. Note that the command will not check by
 3084          * name but just by master, in theory different Sentinels may monitor
 3085          * differnet masters with the same name.
 3086          *
 3087          * current-epoch is needed in order to understand if we are allowed
 3088          * to vote for a failover leader or not. Each Sentinel can vote just
 3089          * one time per epoch.
 3090          *
 3091          * runid is "*" if we are not seeking for a vote from the Sentinel
 3092          * in order to elect the failover leader. Otherwise it is set to the
 3093          * runid we want the Sentinel to vote if it did not already voted.
 3094          */
 3095         sentinelRedisInstance *ri;
 3096         long long req_epoch;
 3097         uint64_t leader_epoch = 0;
 3098         char *leader = NULL;
 3099         long port;
 3100         int isdown = 0;
 3101 
 3102         if (c->argc != 6) goto numargserr;
 3103         if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
 3104             getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
 3105                                                               != C_OK)
 3106             return;
 3107         ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
 3108             c->argv[2]->ptr,port,NULL);
 3109 
 3110         /* It exists? Is actually a master? Is subjectively down? It's down.
 3111          * Note: if we are in tilt mode we always reply with "0". */
 3112         if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
 3113                                     (ri->flags & SRI_MASTER))
 3114             isdown = 1;
 3115 
 3116         /* Vote for the master (or fetch the previous vote) if the request
 3117          * includes a runid, otherwise the sender is not seeking for a vote. */
 3118         if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
 3119             leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
 3120                                             c->argv[5]->ptr,
 3121                                             &leader_epoch);
 3122         }
 3123 
 3124         /* Reply with a three-elements multi-bulk reply:
 3125          * down state, leader, vote epoch. */
 3126         addReplyArrayLen(c,3);
 3127         addReply(c, isdown ? shared.cone : shared.czero);
 3128         addReplyBulkCString(c, leader ? leader : "*");
 3129         addReplyLongLong(c, (long long)leader_epoch);
 3130         if (leader) sdsfree(leader);
 3131     } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
 3132         /* SENTINEL RESET <pattern> */
 3133         if (c->argc != 3) goto numargserr;
 3134         addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
 3135     } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
 3136         /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
 3137         sentinelRedisInstance *ri;
 3138 
 3139         if (c->argc != 3) goto numargserr;
 3140         ri = sentinelGetMasterByName(c->argv[2]->ptr);
 3141         if (ri == NULL) {
 3142             addReplyNullArray(c);
 3143         } else {
 3144             sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri);
 3145 
 3146             addReplyArrayLen(c,2);
 3147             addReplyBulkCString(c,addr->ip);
 3148             addReplyBulkLongLong(c,addr->port);
 3149         }
 3150     } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
 3151         /* SENTINEL FAILOVER <master-name> */
 3152         sentinelRedisInstance *ri;
 3153 
 3154         if (c->argc != 3) goto numargserr;
 3155         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
 3156             return;
 3157         if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
 3158             addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
 3159             return;
 3160         }
 3161         if (sentinelSelectSlave(ri) == NULL) {
 3162             addReplySds(c,sdsnew("-NOGOODSLAVE No suitable replica to promote\r\n"));
 3163             return;
 3164         }
 3165         serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
 3166             ri->name);
 3167         sentinelStartFailover(ri);
 3168         ri->flags |= SRI_FORCE_FAILOVER;
 3169         addReply(c,shared.ok);
 3170     } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
 3171         /* SENTINEL PENDING-SCRIPTS */
 3172 
 3173         if (c->argc != 2) goto numargserr;
 3174         sentinelPendingScriptsCommand(c);
 3175     } else if (!strcasecmp(c->argv[1]->ptr,"monitor")) {
 3176         /* SENTINEL MONITOR <name> <ip> <port> <quorum> */
 3177         sentinelRedisInstance *ri;
 3178         long quorum, port;
 3179         char ip[NET_IP_STR_LEN];
 3180 
 3181         if (c->argc != 6) goto numargserr;
 3182         if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
 3183             != C_OK) return;
 3184         if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
 3185             != C_OK) return;
 3186 
 3187         if (quorum <= 0) {
 3188             addReplyError(c, "Quorum must be 1 or greater.");
 3189             return;
 3190         }
 3191 
 3192         /* Make sure the IP field is actually a valid IP before passing it
 3193          * to createSentinelRedisInstance(), otherwise we may trigger a
 3194          * DNS lookup at runtime. */
 3195         if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
 3196             addReplyError(c,"Invalid IP address specified");
 3197             return;
 3198         }
 3199 
 3200         /* Parameters are valid. Try to create the master instance. */
 3201         ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER,
 3202                 c->argv[3]->ptr,port,quorum,NULL);
 3203         if (ri == NULL) {
 3204             switch(errno) {
 3205             case EBUSY:
 3206                 addReplyError(c,"Duplicated master name");
 3207                 break;
 3208             case EINVAL:
 3209                 addReplyError(c,"Invalid port number");
 3210                 break;
 3211             default:
 3212                 addReplyError(c,"Unspecified error adding the instance");
 3213                 break;
 3214             }
 3215         } else {
 3216             sentinelFlushConfig();
 3217             sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
 3218             addReply(c,shared.ok);
 3219         }
 3220     } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
 3221         if (c->argc != 2) goto numargserr;
 3222         sentinelFlushConfig();
 3223         addReply(c,shared.ok);
 3224         return;
 3225     } else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
 3226         /* SENTINEL REMOVE <name> */
 3227         sentinelRedisInstance *ri;
 3228 
 3229         if (c->argc != 3) goto numargserr;
 3230         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
 3231             == NULL) return;
 3232         sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
 3233         dictDelete(sentinel.masters,c->argv[2]->ptr);
 3234         sentinelFlushConfig();
 3235         addReply(c,shared.ok);
 3236     } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
 3237         /* SENTINEL CKQUORUM <name> */
 3238         sentinelRedisInstance *ri;
 3239         int usable;
 3240 
 3241         if (c->argc != 3) goto numargserr;
 3242         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
 3243             == NULL) return;
 3244         int result = sentinelIsQuorumReachable(ri,&usable);
 3245         if (result == SENTINEL_ISQR_OK) {
 3246             addReplySds(c, sdscatfmt(sdsempty(),
 3247                 "+OK %i usable Sentinels. Quorum and failover authorization "
 3248                 "can be reached\r\n",usable));
 3249         } else {
 3250             sds e = sdscatfmt(sdsempty(),
 3251                 "-NOQUORUM %i usable Sentinels. ",usable);
 3252             if (result & SENTINEL_ISQR_NOQUORUM)
 3253                 e = sdscat(e,"Not enough available Sentinels to reach the"
 3254                              " specified quorum for this master");
 3255             if (result & SENTINEL_ISQR_NOAUTH) {
 3256                 if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
 3257                 e = sdscat(e, "Not enough available Sentinels to reach the"
 3258                               " majority and authorize a failover");
 3259             }
 3260             e = sdscat(e,"\r\n");
 3261             addReplySds(c,e);
 3262         }
 3263     } else if (!strcasecmp(c->argv[1]->ptr,"set")) {
 3264         if (c->argc < 3) goto numargserr;
 3265         sentinelSetCommand(c);
 3266     } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
 3267         /* SENTINEL INFO-CACHE <name> */
 3268         if (c->argc < 2) goto numargserr;
 3269         mstime_t now = mstime();
 3270 
 3271         /* Create an ad-hoc dictionary type so that we can iterate
 3272          * a dictionary composed of just the master groups the user
 3273          * requested. */
 3274         dictType copy_keeper = instancesDictType;
 3275         copy_keeper.valDestructor = NULL;
 3276         dict *masters_local = sentinel.masters;
 3277         if (c->argc > 2) {
 3278             masters_local = dictCreate(&copy_keeper, NULL);
 3279 
 3280             for (int i = 2; i < c->argc; i++) {
 3281                 sentinelRedisInstance *ri;
 3282                 ri = sentinelGetMasterByName(c->argv[i]->ptr);
 3283                 if (!ri) continue; /* ignore non-existing names */
 3284                 dictAdd(masters_local, ri->name, ri);
 3285             }
 3286         }
 3287 
 3288         /* Reply format:
 3289          *   1.) master name
 3290          *   2.) 1.) info from master
 3291          *       2.) info from replica
 3292          *       ...
 3293          *   3.) other master name
 3294          *   ...
 3295          */
 3296         addReplyArrayLen(c,dictSize(masters_local) * 2);
 3297 
 3298         dictIterator  *di;
 3299         dictEntry *de;
 3300         di = dictGetIterator(masters_local);
 3301         while ((de = dictNext(di)) != NULL) {
 3302             sentinelRedisInstance *ri = dictGetVal(de);
 3303             addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
 3304             addReplyArrayLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
 3305             addReplyArrayLen(c,2);
 3306             addReplyLongLong(c, now - ri->info_refresh);
 3307             if (ri->info)
 3308                 addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
 3309             else
 3310                 addReplyNull(c);
 3311 
 3312             dictIterator *sdi;
 3313             dictEntry *sde;
 3314             sdi = dictGetIterator(ri->slaves);
 3315             while ((sde = dictNext(sdi)) != NULL) {
 3316                 sentinelRedisInstance *sri = dictGetVal(sde);
 3317                 addReplyArrayLen(c,2);
 3318                 addReplyLongLong(c, now - sri->info_refresh);
 3319                 if (sri->info)
 3320                     addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
 3321                 else
 3322                     addReplyNull(c);
 3323             }
 3324             dictReleaseIterator(sdi);
 3325         }
 3326         dictReleaseIterator(di);
 3327         if (masters_local != sentinel.masters) dictRelease(masters_local);
 3328     } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
 3329         /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */
 3330         int j;
 3331 
 3332         sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
 3333         for (j = 2; j < c->argc; j++) {
 3334             if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
 3335                 sentinel.simfailure_flags |=
 3336                     SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
 3337                 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
 3338                     "will crash after being successfully elected as failover "
 3339                     "leader");
 3340             } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
 3341                 sentinel.simfailure_flags |=
 3342                     SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
 3343                 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
 3344                     "will crash after promoting the selected replica to master");
 3345             } else if (!strcasecmp(c->argv[j]->ptr,"help")) {
 3346                 addReplyArrayLen(c,2);
 3347                 addReplyBulkCString(c,"crash-after-election");
 3348                 addReplyBulkCString(c,"crash-after-promotion");
 3349             } else {
 3350                 addReplyError(c,"Unknown failure simulation specified");
 3351                 return;
 3352             }
 3353         }
 3354         addReply(c,shared.ok);
 3355     } else {
 3356         addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
 3357                                (char*)c->argv[1]->ptr);
 3358     }
 3359     return;
 3360 
 3361 numargserr:
 3362     addReplyErrorFormat(c,"Wrong number of arguments for 'sentinel %s'",
 3363                           (char*)c->argv[1]->ptr);
 3364 }
 3365 
 3366 #define info_section_from_redis(section_name) do { \
 3367     if (defsections || allsections || !strcasecmp(section,section_name)) { \
 3368         sds redissection; \
 3369         if (sections++) info = sdscat(info,"\r\n"); \
 3370         redissection = genRedisInfoString(section_name); \
 3371         info = sdscatlen(info,redissection,sdslen(redissection)); \
 3372         sdsfree(redissection); \
 3373     } \
 3374 } while(0)
 3375 
 3376 /* SENTINEL INFO [section] */
 3377 void sentinelInfoCommand(client *c) {
 3378     if (c->argc > 2) {
 3379         addReply(c,shared.syntaxerr);
 3380         return;
 3381     }
 3382 
 3383     int defsections = 0, allsections = 0;
 3384     char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
 3385     if (section) {
 3386         allsections = !strcasecmp(section,"all");
 3387         defsections = !strcasecmp(section,"default");
 3388     } else {
 3389         defsections = 1;
 3390     }
 3391 
 3392     int sections = 0;
 3393     sds info = sdsempty();
 3394 
 3395     info_section_from_redis("server");
 3396     info_section_from_redis("clients");
 3397     info_section_from_redis("cpu");
 3398     info_section_from_redis("stats");
 3399 
 3400     if (defsections || allsections || !strcasecmp(section,"sentinel")) {
 3401         dictIterator *di;
 3402         dictEntry *de;
 3403         int master_id = 0;
 3404 
 3405         if (sections++) info = sdscat(info,"\r\n");
 3406         info = sdscatprintf(info,
 3407             "# Sentinel\r\n"
 3408             "sentinel_masters:%lu\r\n"
 3409             "sentinel_tilt:%d\r\n"
 3410             "sentinel_running_scripts:%d\r\n"
 3411             "sentinel_scripts_queue_length:%ld\r\n"
 3412             "sentinel_simulate_failure_flags:%lu\r\n",
 3413             dictSize(sentinel.masters),
 3414             sentinel.tilt,
 3415             sentinel.running_scripts,
 3416             listLength(sentinel.scripts_queue),
 3417             sentinel.simfailure_flags);
 3418 
 3419         di = dictGetIterator(sentinel.masters);
 3420         while((de = dictNext(di)) != NULL) {
 3421             sentinelRedisInstance *ri = dictGetVal(de);
 3422             char *status = "ok";
 3423 
 3424             if (ri->flags & SRI_O_DOWN) status = "odown";
 3425             else if (ri->flags & SRI_S_DOWN) status = "sdown";
 3426             info = sdscatprintf(info,
 3427                 "master%d:name=%s,status=%s,address=%s:%d,"
 3428                 "slaves=%lu,sentinels=%lu\r\n",
 3429                 master_id++, ri->name, status,
 3430                 ri->addr->ip, ri->addr->port,
 3431                 dictSize(ri->slaves),
 3432                 dictSize(ri->sentinels)+1);
 3433         }
 3434         dictReleaseIterator(di);
 3435     }
 3436 
 3437     addReplyBulkSds(c, info);
 3438 }
 3439 
 3440 /* Implements Sentinel version of the ROLE command. The output is
 3441  * "sentinel" and the list of currently monitored master names. */
 3442 void sentinelRoleCommand(client *c) {
 3443     dictIterator *di;
 3444     dictEntry *de;
 3445 
 3446     addReplyArrayLen(c,2);
 3447     addReplyBulkCBuffer(c,"sentinel",8);
 3448     addReplyArrayLen(c,dictSize(sentinel.masters));
 3449 
 3450     di = dictGetIterator(sentinel.masters);
 3451     while((de = dictNext(di)) != NULL) {
 3452         sentinelRedisInstance *ri = dictGetVal(de);
 3453 
 3454         addReplyBulkCString(c,ri->name);
 3455     }
 3456     dictReleaseIterator(di);
 3457 }
 3458 
 3459 /* SENTINEL SET <mastername> [<option> <value> ...] */
 3460 void sentinelSetCommand(client *c) {
 3461     sentinelRedisInstance *ri;
 3462     int j, changes = 0;
 3463     int badarg = 0; /* Bad argument position for error reporting. */
 3464     char *option;
 3465 
 3466     if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
 3467         == NULL) return;
 3468 
 3469     /* Process option - value pairs. */
 3470     for (j = 3; j < c->argc; j++) {
 3471         int moreargs = (c->argc-1) - j;
 3472         option = c->argv[j]->ptr;
 3473         long long ll;
 3474         int old_j = j; /* Used to know what to log as an event. */
 3475 
 3476         if (!strcasecmp(option,"down-after-milliseconds") && moreargs > 0) {
 3477             /* down-after-millisecodns <milliseconds> */
 3478             robj *o = c->argv[++j];
 3479             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
 3480                 badarg = j;
 3481                 goto badfmt;
 3482             }
 3483             ri->down_after_period = ll;
 3484             sentinelPropagateDownAfterPeriod(ri);
 3485             changes++;
 3486         } else if (!strcasecmp(option,"failover-timeout") && moreargs > 0) {
 3487             /* failover-timeout <milliseconds> */
 3488             robj *o = c->argv[++j];
 3489             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
 3490                 badarg = j;
 3491                 goto badfmt;
 3492             }
 3493             ri->failover_timeout = ll;
 3494             changes++;
 3495         } else if (!strcasecmp(option,"parallel-syncs") && moreargs > 0) {
 3496             /* parallel-syncs <milliseconds> */
 3497             robj *o = c->argv[++j];
 3498             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
 3499                 badarg = j;
 3500                 goto badfmt;
 3501             }
 3502             ri->parallel_syncs = ll;
 3503             changes++;
 3504         } else if (!strcasecmp(option,"notification-script") && moreargs > 0) {
 3505             /* notification-script <path> */
 3506             char *value = c->argv[++j]->ptr;
 3507             if (sentinel.deny_scripts_reconfig) {
 3508                 addReplyError(c,
 3509                     "Reconfiguration of scripts path is denied for "
 3510                     "security reasons. Check the deny-scripts-reconfig "
 3511                     "configuration directive in your Sentinel configuration");
 3512                 return;
 3513             }
 3514 
 3515             if (strlen(value) && access(value,X_OK) == -1) {
 3516                 addReplyError(c,
 3517                     "Notification script seems non existing or non executable");
 3518                 if (changes) sentinelFlushConfig();
 3519                 return;
 3520             }
 3521             sdsfree(ri->notification_script);
 3522             ri->notification_script = strlen(value) ? sdsnew(value) : NULL;
 3523             changes++;
 3524         } else if (!strcasecmp(option,"client-reconfig-script") && moreargs > 0) {
 3525             /* client-reconfig-script <path> */
 3526             char *value = c->argv[++j]->ptr;
 3527             if (sentinel.deny_scripts_reconfig) {
 3528                 addReplyError(c,
 3529                     "Reconfiguration of scripts path is denied for "
 3530                     "security reasons. Check the deny-scripts-reconfig "
 3531                     "configuration directive in your Sentinel configuration");
 3532                 return;
 3533             }
 3534 
 3535             if (strlen(value) && access(value,X_OK) == -1) {
 3536                 addReplyError(c,
 3537                     "Client reconfiguration script seems non existing or "
 3538                     "non executable");
 3539                 if (changes) sentinelFlushConfig();
 3540                 return;
 3541             }
 3542             sdsfree(ri->client_reconfig_script);
 3543             ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL;
 3544             changes++;
 3545         } else if (!strcasecmp(option,"auth-pass") && moreargs > 0) {
 3546             /* auth-pass <password> */
 3547             char *value = c->argv[++j]->ptr;
 3548             sdsfree(ri->auth_pass);
 3549             ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
 3550             changes++;
 3551         } else if (!strcasecmp(option,"auth-user") && moreargs > 0) {
 3552             /* auth-user <username> */
 3553             char *value = c->argv[++j]->ptr;
 3554             sdsfree(ri->auth_user);
 3555             ri->auth_user = strlen(value) ? sdsnew(value) : NULL;
 3556             changes++;
 3557         } else if (!strcasecmp(option,"quorum") && moreargs > 0) {
 3558             /* quorum <count> */
 3559             robj *o = c->argv[++j];
 3560             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
 3561                 badarg = j;
 3562                 goto badfmt;
 3563             }
 3564             ri->quorum = ll;
 3565             changes++;
 3566         } else if (!strcasecmp(option,"rename-command") && moreargs > 1) {
 3567             /* rename-command <oldname> <newname> */
 3568             sds oldname = c->argv[++j]->ptr;
 3569             sds newname = c->argv[++j]->ptr;
 3570 
 3571             if ((sdslen(oldname) == 0) || (sdslen(newname) == 0)) {
 3572                 badarg = sdslen(newname) ? j-1 : j;
 3573                 goto badfmt;
 3574             }
 3575 
 3576             /* Remove any older renaming for this command. */
 3577             dictDelete(ri->renamed_commands,oldname);
 3578 
 3579             /* If the target name is the same as the source name there
 3580              * is no need to add an entry mapping to itself. */
 3581             if (!dictSdsKeyCaseCompare(NULL,oldname,newname)) {
 3582                 oldname = sdsdup(oldname);
 3583                 newname = sdsdup(newname);
 3584                 dictAdd(ri->renamed_commands,oldname,newname);
 3585             }
 3586             changes++;
 3587         } else {
 3588             addReplyErrorFormat(c,"Unknown option or number of arguments for "
 3589                                   "SENTINEL SET '%s'", option);
 3590             if (changes) sentinelFlushConfig();
 3591             return;
 3592         }
 3593 
 3594         /* Log the event. */
 3595         int numargs = j-old_j+1;
 3596         switch(numargs) {
 3597         case 2:
 3598             sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",c->argv[old_j]->ptr,
 3599                                                           c->argv[old_j+1]->ptr);
 3600             break;
 3601         case 3:
 3602             sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",c->argv[old_j]->ptr,
 3603                                                              c->argv[old_j+1]->ptr,
 3604                                                              c->argv[old_j+2]->ptr);
 3605             break;
 3606         default:
 3607             sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",c->argv[old_j]->ptr);
 3608             break;
 3609         }
 3610     }
 3611 
 3612     if (changes) sentinelFlushConfig();
 3613     addReply(c,shared.ok);
 3614     return;
 3615 
 3616 badfmt: /* Bad format errors */
 3617     if (changes) sentinelFlushConfig();
 3618     addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'",
 3619         (char*)c->argv[badarg]->ptr,option);
 3620 }
 3621 
 3622 /* Our fake PUBLISH command: it is actually useful only to receive hello messages
 3623  * from the other sentinel instances, and publishing to a channel other than
 3624  * SENTINEL_HELLO_CHANNEL is forbidden.
 3625  *
 3626  * Because we have a Sentinel PUBLISH, the code to send hello messages is the same
 3627  * for all the three kind of instances: masters, slaves, sentinels. */
 3628 void sentinelPublishCommand(client *c) {
 3629     if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
 3630         addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
 3631         return;
 3632     }
 3633     sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
 3634     addReplyLongLong(c,1);
 3635 }
 3636 
 3637 /* ===================== SENTINEL availability checks ======================= */
 3638 
 3639 /* Is this instance down from our point of view? */
 3640 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
 3641     mstime_t elapsed = 0;
 3642 
 3643     if (ri->link->act_ping_time)
 3644         elapsed = mstime() - ri->link->act_ping_time;
 3645     else if (ri->link->disconnected)
 3646         elapsed = mstime() - ri->link->last_avail_time;
 3647 
 3648     /* Check if we are in need for a reconnection of one of the
 3649      * links, because we are detecting low activity.
 3650      *
 3651      * 1) Check if the command link seems connected, was connected not less
 3652      *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
 3653      *    pending ping for more than half the timeout. */
 3654     if (ri->link->cc &&
 3655         (mstime() - ri->link->cc_conn_time) >
 3656         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
 3657         ri->link->act_ping_time != 0 && /* There is a pending ping... */
 3658         /* The pending ping is delayed, and we did not receive
 3659          * error replies as well. */
 3660         (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
 3661         (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
 3662     {
 3663         instanceLinkCloseConnection(ri->link,ri->link->cc);
 3664     }
 3665 
 3666     /* 2) Check if the pubsub link seems connected, was connected not less
 3667      *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
 3668      *    activity in the Pub/Sub channel for more than
 3669      *    SENTINEL_PUBLISH_PERIOD * 3.
 3670      */
 3671     if (ri->link->pc &&
 3672         (mstime() - ri->link->pc_conn_time) >
 3673          SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
 3674         (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
 3675     {
 3676         instanceLinkCloseConnection(ri->link,ri->link->pc);
 3677     }
 3678 
 3679     /* Update the SDOWN flag. We believe the instance is SDOWN if:
 3680      *
 3681      * 1) It is not replying.
 3682      * 2) We believe it is a master, it reports to be a slave for enough time
 3683      *    to meet the down_after_period, plus enough time to get two times
 3684      *    INFO report from the instance. */
 3685     if (elapsed > ri->down_after_period ||
 3686         (ri->flags & SRI_MASTER &&
 3687          ri->role_reported == SRI_SLAVE &&
 3688          mstime() - ri->role_reported_time >
 3689           (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
 3690     {
 3691         /* Is subjectively down */
 3692         if ((ri->flags & SRI_S_DOWN) == 0) {
 3693             sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
 3694             ri->s_down_since_time = mstime();
 3695             ri->flags |= SRI_S_DOWN;
 3696         }
 3697     } else {
 3698         /* Is subjectively up */
 3699         if (ri->flags & SRI_S_DOWN) {
 3700             sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
 3701             ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
 3702         }
 3703     }
 3704 }
 3705 
 3706 /* Is this instance down according to the configured quorum?
 3707  *
 3708  * Note that ODOWN is a weak quorum, it only means that enough Sentinels
 3709  * reported in a given time range that the instance was not reachable.
 3710  * However messages can be delayed so there are no strong guarantees about
 3711  * N instances agreeing at the same time about the down state. */
 3712 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
 3713     dictIterator *di;
 3714     dictEntry *de;
 3715     unsigned int quorum = 0, odown = 0;
 3716 
 3717     if (master->flags & SRI_S_DOWN) {
 3718         /* Is down for enough sentinels? */
 3719         quorum = 1; /* the current sentinel. */
 3720         /* Count all the other sentinels. */
 3721         di = dictGetIterator(master->sentinels);
 3722         while((de = dictNext(di)) != NULL) {
 3723             sentinelRedisInstance *ri = dictGetVal(de);
 3724 
 3725             if (ri->flags & SRI_MASTER_DOWN) quorum++;
 3726         }
 3727         dictReleaseIterator(di);
 3728         if (quorum >= master->quorum) odown = 1;
 3729     }
 3730 
 3731     /* Set the flag accordingly to the outcome. */
 3732     if (odown) {
 3733         if ((master->flags & SRI_O_DOWN) == 0) {
 3734             sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
 3735                 quorum, master->quorum);
 3736             master->flags |= SRI_O_DOWN;
 3737             master->o_down_since_time = mstime();
 3738         }
 3739     } else {
 3740         if (master->flags & SRI_O_DOWN) {
 3741             sentinelEvent(LL_WARNING,"-odown",master,"%@");
 3742             master->flags &= ~SRI_O_DOWN;
 3743         }
 3744     }
 3745 }
 3746 
 3747 /* Receive the SENTINEL is-master-down-by-addr reply, see the
 3748  * sentinelAskMasterStateToOtherSentinels() function for more information. */
 3749 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
 3750     sentinelRedisInstance *ri = privdata;
 3751     instanceLink *link = c->data;
 3752     redisReply *r;
 3753 
 3754     if (!reply || !link) return;
 3755     link->pending_commands--;
 3756     r = reply;
 3757 
 3758     /* Ignore every error or unexpected reply.
 3759      * Note that if the command returns an error for any reason we'll
 3760      * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
 3761     if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
 3762         r->element[0]->type == REDIS_REPLY_INTEGER &&
 3763         r->element[1]->type == REDIS_REPLY_STRING &&
 3764         r->element[2]->type == REDIS_REPLY_INTEGER)
 3765     {
 3766         ri->last_master_down_reply_time = mstime();
 3767         if (r->element[0]->integer == 1) {
 3768             ri->flags |= SRI_MASTER_DOWN;
 3769         } else {
 3770             ri->flags &= ~SRI_MASTER_DOWN;
 3771         }
 3772         if (strcmp(r->element[1]->str,"*")) {
 3773             /* If the runid in the reply is not "*" the Sentinel actually
 3774              * replied with a vote. */
 3775             sdsfree(ri->leader);
 3776             if ((long long)ri->leader_epoch != r->element[2]->integer)
 3777                 serverLog(LL_WARNING,
 3778                     "%s voted for %s %llu", ri->name,
 3779                     r->element[1]->str,
 3780                     (unsigned long long) r->element[2]->integer);
 3781             ri->leader = sdsnew(r->element[1]->str);
 3782             ri->leader_epoch = r->element[2]->integer;
 3783         }
 3784     }
 3785 }
 3786 
 3787 /* If we think the master is down, we start sending
 3788  * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
 3789  * in order to get the replies that allow to reach the quorum
 3790  * needed to mark the master in ODOWN state and trigger a failover. */
 3791 #define SENTINEL_ASK_FORCED (1<<0)
 3792 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
 3793     dictIterator *di;
 3794     dictEntry *de;
 3795 
 3796     di = dictGetIterator(master->sentinels);
 3797     while((de = dictNext(di)) != NULL) {
 3798         sentinelRedisInstance *ri = dictGetVal(de);
 3799         mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
 3800         char port[32];
 3801         int retval;
 3802 
 3803         /* If the master state from other sentinel is too old, we clear it. */
 3804         if (elapsed > SENTINEL_ASK_PERIOD*5) {
 3805             ri->flags &= ~SRI_MASTER_DOWN;
 3806             sdsfree(ri->leader);
 3807             ri->leader = NULL;
 3808         }
 3809 
 3810         /* Only ask if master is down to other sentinels if:
 3811          *
 3812          * 1) We believe it is down, or there is a failover in progress.
 3813          * 2) Sentinel is connected.
 3814          * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
 3815         if ((master->flags & SRI_S_DOWN) == 0) continue;
 3816         if (ri->link->disconnected) continue;
 3817         if (!(flags & SENTINEL_ASK_FORCED) &&
 3818             mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
 3819             continue;
 3820 
 3821         /* Ask */
 3822         ll2string(port,sizeof(port),master->addr->port);
 3823         retval = redisAsyncCommand(ri->link->cc,
 3824                     sentinelReceiveIsMasterDownReply, ri,
 3825                     "%s is-master-down-by-addr %s %s %llu %s",
 3826                     sentinelInstanceMapCommand(ri,"SENTINEL"),
 3827                     master->addr->ip, port,
 3828                     sentinel.current_epoch,
 3829                     (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
 3830                     sentinel.myid : "*");
 3831         if (retval == C_OK) ri->link->pending_commands++;
 3832     }
 3833     dictReleaseIterator(di);
 3834 }
 3835 
 3836 /* =============================== FAILOVER ================================= */
 3837 
 3838 /* Crash because of user request via SENTINEL simulate-failure command. */
 3839 void sentinelSimFailureCrash(void) {
 3840     serverLog(LL_WARNING,
 3841         "Sentinel CRASH because of SENTINEL simulate-failure");
 3842     exit(99);
 3843 }
 3844 
 3845 /* Vote for the sentinel with 'req_runid' or return the old vote if already
 3846  * voted for the specified 'req_epoch' or one greater.
 3847  *
 3848  * If a vote is not available returns NULL, otherwise return the Sentinel
 3849  * runid and populate the leader_epoch with the epoch of the vote. */
 3850 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
 3851     if (req_epoch > sentinel.current_epoch) {
 3852         sentinel.current_epoch = req_epoch;
 3853         sentinelFlushConfig();
 3854         sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
 3855             (unsigned long long) sentinel.current_epoch);
 3856     }
 3857 
 3858     if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
 3859     {
 3860         sdsfree(master->leader);
 3861         master->leader = sdsnew(req_runid);
 3862         master->leader_epoch = sentinel.current_epoch;
 3863         sentinelFlushConfig();
 3864         sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
 3865             master->leader, (unsigned long long) master->leader_epoch);
 3866         /* If we did not voted for ourselves, set the master failover start
 3867          * time to now, in order to force a delay before we can start a
 3868          * failover for the same master. */
 3869         if (strcasecmp(master->leader,sentinel.myid))
 3870             master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
 3871     }
 3872 
 3873     *leader_epoch = master->leader_epoch;
 3874     return master->leader ? sdsnew(master->leader) : NULL;
 3875 }
 3876 
 3877 struct sentinelLeader {
 3878     char *runid;
 3879     unsigned long votes;
 3880 };
 3881 
 3882 /* Helper function for sentinelGetLeader, increment the counter
 3883  * relative to the specified runid. */
 3884 int sentinelLeaderIncr(dict *counters, char *runid) {
 3885     dictEntry *existing, *de;
 3886     uint64_t oldval;
 3887 
 3888     de = dictAddRaw(counters,runid,&existing);
 3889     if (existing) {
 3890         oldval = dictGetUnsignedIntegerVal(existing);
 3891         dictSetUnsignedIntegerVal(existing,oldval+1);
 3892         return oldval+1;
 3893     } else {
 3894         serverAssert(de != NULL);
 3895         dictSetUnsignedIntegerVal(de,1);
 3896         return 1;
 3897     }
 3898 }
 3899 
 3900 /* Scan all the Sentinels attached to this master to check if there
 3901  * is a leader for the specified epoch.
 3902  *
 3903  * To be a leader for a given epoch, we should have the majority of
 3904  * the Sentinels we know (ever seen since the last SENTINEL RESET) that
 3905  * reported the same instance as leader for the same epoch. */
 3906 char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
 3907     dict *counters;
 3908     dictIterator *di;
 3909     dictEntry *de;
 3910     unsigned int voters = 0, voters_quorum;
 3911     char *myvote;
 3912     char *winner = NULL;
 3913     uint64_t leader_epoch;
 3914     uint64_t max_votes = 0;
 3915 
 3916     serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
 3917     counters = dictCreate(&leaderVotesDictType,NULL);
 3918 
 3919     voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
 3920 
 3921     /* Count other sentinels votes */
 3922     di = dictGetIterator(master->sentinels);
 3923     while((de = dictNext(di)) != NULL) {
 3924         sentinelRedisInstance *ri = dictGetVal(de);
 3925         if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
 3926             sentinelLeaderIncr(counters,ri->leader);
 3927     }
 3928     dictReleaseIterator(di);
 3929 
 3930     /* Check what's the winner. For the winner to win, it needs two conditions:
 3931      * 1) Absolute majority between voters (50% + 1).
 3932      * 2) And anyway at least master->quorum votes. */
 3933     di = dictGetIterator(counters);
 3934     while((de = dictNext(di)) != NULL) {
 3935         uint64_t votes = dictGetUnsignedIntegerVal(de);
 3936 
 3937         if (votes > max_votes) {
 3938             max_votes = votes;
 3939             winner = dictGetKey(de);
 3940         }
 3941     }
 3942     dictReleaseIterator(di);
 3943 
 3944     /* Count this Sentinel vote:
 3945      * if this Sentinel did not voted yet, either vote for the most
 3946      * common voted sentinel, or for itself if no vote exists at all. */
 3947     if (winner)
 3948         myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
 3949     else
 3950         myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
 3951 
 3952     if (myvote && leader_epoch == epoch) {
 3953         uint64_t votes = sentinelLeaderIncr(counters,myvote);
 3954 
 3955         if (votes > max_votes) {
 3956             max_votes = votes;
 3957             winner = myvote;
 3958         }
 3959     }
 3960 
 3961     voters_quorum = voters/2+1;
 3962     if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
 3963         winner = NULL;
 3964 
 3965     winner = winner ? sdsnew(winner) : NULL;
 3966     sdsfree(myvote);
 3967     dictRelease(counters);
 3968     return winner;
 3969 }
 3970 
 3971 /* Send SLAVEOF to the specified instance, always followed by a
 3972  * CONFIG REWRITE command in order to store the new configuration on disk
 3973  * when possible (that is, if the Redis instance is recent enough to support
 3974  * config rewriting, and if the server was started with a configuration file).
 3975  *
 3976  * If Host is NULL the function sends "SLAVEOF NO ONE".
 3977  *
 3978  * The command returns C_OK if the SLAVEOF command was accepted for
 3979  * (later) delivery otherwise C_ERR. The command replies are just
 3980  * discarded. */
 3981 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
 3982     char portstr[32];
 3983     int retval;
 3984 
 3985     ll2string(portstr,sizeof(portstr),port);
 3986 
 3987     /* If host is NULL we send SLAVEOF NO ONE that will turn the instance
 3988      * into a master. */
 3989     if (host == NULL) {
 3990         host = "NO";
 3991         memcpy(portstr,"ONE",4);
 3992     }
 3993 
 3994     /* In order to send SLAVEOF in a safe way, we send a transaction performing
 3995      * the following tasks:
 3996      * 1) Reconfigure the instance according to the specified host/port params.
 3997      * 2) Rewrite the configuration.
 3998      * 3) Disconnect all clients (but this one sending the commnad) in order
 3999      *    to trigger the ask-master-on-reconnection protocol for connected
 4000      *    clients.
 4001      *
 4002      * Note that we don't check the replies returned by commands, since we
 4003      * will observe instead the effects in the next INFO output. */
 4004     retval = redisAsyncCommand(ri->link->cc,
 4005         sentinelDiscardReplyCallback, ri, "%s",
 4006         sentinelInstanceMapCommand(ri,"MULTI"));
 4007     if (retval == C_ERR) return retval;
 4008     ri->link->pending_commands++;
 4009 
 4010     retval = redisAsyncCommand(ri->link->cc,
 4011         sentinelDiscardReplyCallback, ri, "%s %s %s",
 4012         sentinelInstanceMapCommand(ri,"SLAVEOF"),
 4013         host, portstr);
 4014     if (retval == C_ERR) return retval;
 4015     ri->link->pending_commands++;
 4016 
 4017     retval = redisAsyncCommand(ri->link->cc,
 4018         sentinelDiscardReplyCallback, ri, "%s REWRITE",
 4019         sentinelInstanceMapCommand(ri,"CONFIG"));
 4020     if (retval == C_ERR) return retval;
 4021     ri->link->pending_commands++;
 4022 
 4023     /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
 4024      * however sending it to an instance not understanding this command is not
 4025      * an issue because CLIENT is variadic command, so Redis will not
 4026      * recognized as a syntax error, and the transaction will not fail (but
 4027      * only the unsupported command will fail). */
 4028     for (int type = 0; type < 2; type++) {
 4029         retval = redisAsyncCommand(ri->link->cc,
 4030             sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
 4031             sentinelInstanceMapCommand(ri,"CLIENT"),
 4032             type == 0 ? "normal" : "pubsub");
 4033         if (retval == C_ERR) return retval;
 4034         ri->link->pending_commands++;
 4035     }
 4036 
 4037     retval = redisAsyncCommand(ri->link->cc,
 4038         sentinelDiscardReplyCallback, ri, "%s",
 4039         sentinelInstanceMapCommand(ri,"EXEC"));
 4040     if (retval == C_ERR) return retval;
 4041     ri->link->pending_commands++;
 4042 
 4043     return C_OK;
 4044 }
 4045 
 4046 /* Setup the master state to start a failover. */
 4047 void sentinelStartFailover(sentinelRedisInstance *master) {
 4048     serverAssert(master->flags & SRI_MASTER);
 4049 
 4050     master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
 4051     master->flags |= SRI_FAILOVER_IN_PROGRESS;
 4052     master->failover_epoch = ++sentinel.current_epoch;
 4053     sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
 4054         (unsigned long long) sentinel.current_epoch);
 4055     sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
 4056     master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
 4057     master->failover_state_change_time = mstime();
 4058 }
 4059 
 4060 /* This function checks if there are the conditions to start the failover,
 4061  * that is:
 4062  *
 4063  * 1) Master must be in ODOWN condition.
 4064  * 2) No failover already in progress.
 4065  * 3) No failover already attempted recently.
 4066  *
 4067  * We still don't know if we'll win the election so it is possible that we
 4068  * start the failover but that we'll not be able to act.
 4069  *
 4070  * Return non-zero if a failover was started. */
 4071 int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
 4072     /* We can't failover if the master is not in O_DOWN state. */
 4073     if (!(master->flags & SRI_O_DOWN)) return 0;
 4074 
 4075     /* Failover already in progress? */
 4076     if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
 4077 
 4078     /* Last failover attempt started too little time ago? */
 4079     if (mstime() - master->failover_start_time <
 4080         master->failover_timeout*2)
 4081     {
 4082         if (master->failover_delay_logged != master->failover_start_time) {
 4083             time_t clock = (master->failover_start_time +
 4084                             master->failover_timeout*2) / 1000;
 4085             char ctimebuf[26];
 4086 
 4087             ctime_r(&clock,ctimebuf);
 4088             ctimebuf[24] = '\0'; /* Remove newline. */
 4089             master->failover_delay_logged = master->failover_start_time;
 4090             serverLog(LL_WARNING,
 4091                 "Next failover delay: I will not start a failover before %s",
 4092                 ctimebuf);
 4093         }
 4094         return 0;
 4095     }
 4096 
 4097     sentinelStartFailover(master);
 4098     return 1;
 4099 }
 4100 
 4101 /* Select a suitable slave to promote. The current algorithm only uses
 4102  * the following parameters:
 4103  *
 4104  * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
 4105  * 2) Last time the slave replied to ping no more than 5 times the PING period.
 4106  * 3) info_refresh not older than 3 times the INFO refresh period.
 4107  * 4) master_link_down_time no more than:
 4108  *     (now - master->s_down_since_time) + (master->down_after_period * 10).
 4109  *    Basically since the master is down from our POV, the slave reports
 4110  *    to be disconnected no more than 10 times the configured down-after-period.
 4111  *    This is pretty much black magic but the idea is, the master was not
 4112  *    available so the slave may be lagging, but not over a certain time.
 4113  *    Anyway we'll select the best slave according to replication offset.
 4114  * 5) Slave priority can't be zero, otherwise the slave is discarded.
 4115  *
 4116  * Among all the slaves matching the above conditions we select the slave
 4117  * with, in order of sorting key:
 4118  *
 4119  * - lower slave_priority.
 4120  * - bigger processed replication offset.
 4121  * - lexicographically smaller runid.
 4122  *
 4123  * Basically if runid is the same, the slave that processed more commands
 4124  * from the master is selected.
 4125  *
 4126  * The function returns the pointer to the selected slave, otherwise
 4127  * NULL if no suitable slave was found.
 4128  */
 4129 
 4130 /* Helper for sentinelSelectSlave(). This is used by qsort() in order to
 4131  * sort suitable slaves in a "better first" order, to take the first of
 4132  * the list. */
 4133 int compareSlavesForPromotion(const void *a, const void *b) {
 4134     sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
 4135                           **sb = (sentinelRedisInstance **)b;
 4136     char *sa_runid, *sb_runid;
 4137 
 4138     if ((*sa)->slave_priority != (*sb)->slave_priority)
 4139         return (*sa)->slave_priority - (*sb)->slave_priority;
 4140 
 4141     /* If priority is the same, select the slave with greater replication
 4142      * offset (processed more data from the master). */
 4143     if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
 4144         return -1; /* a < b */
 4145     } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
 4146         return 1; /* a > b */
 4147     }
 4148 
 4149     /* If the replication offset is the same select the slave with that has
 4150      * the lexicographically smaller runid. Note that we try to handle runid
 4151      * == NULL as there are old Redis versions that don't publish runid in
 4152      * INFO. A NULL runid is considered bigger than any other runid. */
 4153     sa_runid = (*sa)->runid;
 4154     sb_runid = (*sb)->runid;
 4155     if (sa_runid == NULL && sb_runid == NULL) return 0;
 4156     else if (sa_runid == NULL) return 1;  /* a > b */
 4157     else if (sb_runid == NULL) return -1; /* a < b */
 4158     return strcasecmp(sa_runid, sb_runid);
 4159 }
 4160 
 4161 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
 4162     sentinelRedisInstance **instance =
 4163         zmalloc(sizeof(instance[0])*dictSize(master->slaves));
 4164     sentinelRedisInstance *selected = NULL;
 4165     int instances = 0;
 4166     dictIterator *di;
 4167     dictEntry *de;
 4168     mstime_t max_master_down_time = 0;
 4169 
 4170     if (master->flags & SRI_S_DOWN)
 4171         max_master_down_time += mstime() - master->s_down_since_time;
 4172     max_master_down_time += master->down_after_period * 10;
 4173 
 4174     di = dictGetIterator(master->slaves);
 4175     while((de = dictNext(di)) != NULL) {
 4176         sentinelRedisInstance *slave = dictGetVal(de);
 4177         mstime_t info_validity_time;
 4178 
 4179         if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
 4180         if (slave->link->disconnected) continue;
 4181         if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
 4182         if (slave->slave_priority == 0) continue;
 4183 
 4184         /* If the master is in SDOWN state we get INFO for slaves every second.
 4185          * Otherwise we get it with the usual period so we need to account for
 4186          * a larger delay. */
 4187         if (master->flags & SRI_S_DOWN)
 4188             info_validity_time = SENTINEL_PING_PERIOD*5;
 4189         else
 4190             info_validity_time = SENTINEL_INFO_PERIOD*3;
 4191         if (mstime() - slave->info_refresh > info_validity_time) continue;
 4192         if (slave->master_link_down_time > max_master_down_time) continue;
 4193         instance[instances++] = slave;
 4194     }
 4195     dictReleaseIterator(di);
 4196     if (instances) {
 4197         qsort(instance,instances,sizeof(sentinelRedisInstance*),
 4198             compareSlavesForPromotion);
 4199         selected = instance[0];
 4200     }
 4201     zfree(instance);
 4202     return selected;
 4203 }
 4204 
 4205 /* ---------------- Failover state machine implementation ------------------- */
 4206 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
 4207     char *leader;
 4208     int isleader;
 4209 
 4210     /* Check if we are the leader for the failover epoch. */
 4211     leader = sentinelGetLeader(ri, ri->failover_epoch);
 4212     isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
 4213     sdsfree(leader);
 4214 
 4215     /* If I'm not the leader, and it is not a forced failover via
 4216      * SENTINEL FAILOVER, then I can't continue with the failover. */
 4217     if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
 4218         int election_timeout = SENTINEL_ELECTION_TIMEOUT;
 4219 
 4220         /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
 4221          * and the configured failover timeout. */
 4222         if (election_timeout > ri->failover_timeout)
 4223             election_timeout = ri->failover_timeout;
 4224         /* Abort the failover if I'm not the leader after some time. */
 4225         if (mstime() - ri->failover_start_time > election_timeout) {
 4226             sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
 4227             sentinelAbortFailover(ri);
 4228         }
 4229         return;
 4230     }
 4231     sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
 4232     if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
 4233         sentinelSimFailureCrash();
 4234     ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
 4235     ri->failover_state_change_time = mstime();
 4236     sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
 4237 }
 4238 
 4239 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
 4240     sentinelRedisInstance *slave = sentinelSelectSlave(ri);
 4241 
 4242     /* We don't handle the timeout in this state as the function aborts
 4243      * the failover or go forward in the next state. */
 4244     if (slave == NULL) {
 4245         sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
 4246         sentinelAbortFailover(ri);
 4247     } else {
 4248         sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
 4249         slave->flags |= SRI_PROMOTED;
 4250         ri->promoted_slave = slave;
 4251         ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
 4252         ri->failover_state_change_time = mstime();
 4253         sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
 4254             slave, "%@");
 4255     }
 4256 }
 4257 
 4258 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
 4259     int retval;
 4260 
 4261     /* We can't send the command to the promoted slave if it is now
 4262      * disconnected. Retry again and again with this state until the timeout
 4263      * is reached, then abort the failover. */
 4264     if (ri->promoted_slave->link->disconnected) {
 4265         if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
 4266             sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
 4267             sentinelAbortFailover(ri);
 4268         }
 4269         return;
 4270     }
 4271 
 4272     /* Send SLAVEOF NO ONE command to turn the slave into a master.
 4273      * We actually register a generic callback for this command as we don't
 4274      * really care about the reply. We check if it worked indirectly observing
 4275      * if INFO returns a different role (master instead of slave). */
 4276     retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
 4277     if (retval != C_OK) return;
 4278     sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
 4279         ri->promoted_slave,"%@");
 4280     ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
 4281     ri->failover_state_change_time = mstime();
 4282 }
 4283 
 4284 /* We actually wait for promotion indirectly checking with INFO when the
 4285  * slave turns into a master. */
 4286 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
 4287     /* Just handle the timeout. Switching to the next state is handled
 4288      * by the function parsing the INFO command of the promoted slave. */
 4289     if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
 4290         sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
 4291         sentinelAbortFailover(ri);
 4292     }
 4293 }
 4294 
 4295 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
 4296     int not_reconfigured = 0, timeout = 0;
 4297     dictIterator *di;
 4298     dictEntry *de;
 4299     mstime_t elapsed = mstime() - master->failover_state_change_time;
 4300 
 4301     /* We can't consider failover finished if the promoted slave is
 4302      * not reachable. */
 4303     if (master->promoted_slave == NULL ||
 4304         master->promoted_slave->flags & SRI_S_DOWN) return;
 4305 
 4306     /* The failover terminates once all the reachable slaves are properly
 4307      * configured. */
 4308     di = dictGetIterator(master->slaves);
 4309     while((de = dictNext(di)) != NULL) {
 4310         sentinelRedisInstance *slave = dictGetVal(de);
 4311 
 4312         if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
 4313         if (slave->flags & SRI_S_DOWN) continue;
 4314         not_reconfigured++;
 4315     }
 4316     dictReleaseIterator(di);
 4317 
 4318     /* Force end of failover on timeout. */
 4319     if (elapsed > master->failover_timeout) {
 4320         not_reconfigured = 0;
 4321         timeout = 1;
 4322         sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
 4323     }
 4324 
 4325     if (not_reconfigured == 0) {
 4326         sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
 4327         master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
 4328         master->failover_state_change_time = mstime();
 4329     }
 4330 
 4331     /* If I'm the leader it is a good idea to send a best effort SLAVEOF
 4332      * command to all the slaves still not reconfigured to replicate with
 4333      * the new master. */
 4334     if (timeout) {
 4335         dictIterator *di;
 4336         dictEntry *de;
 4337 
 4338         di = dictGetIterator(master->slaves);
 4339         while((de = dictNext(di)) != NULL) {
 4340             sentinelRedisInstance *slave = dictGetVal(de);
 4341             int retval;
 4342 
 4343             if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
 4344             if (slave->link->disconnected) continue;
 4345 
 4346             retval = sentinelSendSlaveOf(slave,
 4347                     master->promoted_slave->addr->ip,
 4348                     master->promoted_slave->addr->port);
 4349             if (retval == C_OK) {
 4350                 sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
 4351                 slave->flags |= SRI_RECONF_SENT;
 4352             }
 4353         }
 4354         dictReleaseIterator(di);
 4355     }
 4356 }
 4357 
 4358 /* Send SLAVE OF <new master address> to all the remaining slaves that
 4359  * still don't appear to have the configuration updated. */
 4360 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
 4361     dictIterator *di;
 4362     dictEntry *de;
 4363     int in_progress = 0;
 4364 
 4365     di = dictGetIterator(master->slaves);
 4366     while((de = dictNext(di)) != NULL) {
 4367         sentinelRedisInstance *slave = dictGetVal(de);
 4368 
 4369         if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
 4370             in_progress++;
 4371     }
 4372     dictReleaseIterator(di);
 4373 
 4374     di = dictGetIterator(master->slaves);
 4375     while(in_progress < master->parallel_syncs &&
 4376           (de = dictNext(di)) != NULL)
 4377     {
 4378         sentinelRedisInstance *slave = dictGetVal(de);
 4379         int retval;
 4380 
 4381         /* Skip the promoted slave, and already configured slaves. */
 4382         if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
 4383 
 4384         /* If too much time elapsed without the slave moving forward to
 4385          * the next state, consider it reconfigured even if it is not.
 4386          * Sentinels will detect the slave as misconfigured and fix its
 4387          * configuration later. */
 4388         if ((slave->flags & SRI_RECONF_SENT) &&
 4389             (mstime() - slave->slave_reconf_sent_time) >
 4390             SENTINEL_SLAVE_RECONF_TIMEOUT)
 4391         {
 4392             sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
 4393             slave->flags &= ~SRI_RECONF_SENT;
 4394             slave->flags |= SRI_RECONF_DONE;
 4395         }
 4396 
 4397         /* Nothing to do for instances that are disconnected or already
 4398          * in RECONF_SENT state. */
 4399         if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
 4400         if (slave->link->disconnected) continue;
 4401 
 4402         /* Send SLAVEOF <new master>. */
 4403         retval = sentinelSendSlaveOf(slave,
 4404                 master->promoted_slave->addr->ip,
 4405                 master->promoted_slave->addr->port);
 4406         if (retval == C_OK) {
 4407             slave->flags |= SRI_RECONF_SENT;
 4408             slave->slave_reconf_sent_time = mstime();
 4409             sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
 4410             in_progress++;
 4411         }
 4412     }
 4413     dictReleaseIterator(di);
 4414 
 4415     /* Check if all the slaves are reconfigured and handle timeout. */
 4416     sentinelFailoverDetectEnd(master);
 4417 }
 4418 
 4419 /* This function is called when the slave is in
 4420  * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
 4421  * to remove it from the master table and add the promoted slave instead. */
 4422 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
 4423     sentinelRedisInstance *ref = master->promoted_slave ?
 4424                                  master->promoted_slave : master;
 4425 
 4426     sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
 4427         master->name, master->addr->ip, master->addr->port,
 4428         ref->addr->ip, ref->addr->port);
 4429 
 4430     sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
 4431 }
 4432 
 4433 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
 4434     serverAssert(ri->flags & SRI_MASTER);
 4435 
 4436     if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
 4437 
 4438     switch(ri->failover_state) {
 4439         case SENTINEL_FAILOVER_STATE_WAIT_START:
 4440             sentinelFailoverWaitStart(ri);
 4441             break;
 4442         case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
 4443             sentinelFailoverSelectSlave(ri);
 4444             break;
 4445         case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
 4446             sentinelFailoverSendSlaveOfNoOne(ri);
 4447             break;
 4448         case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
 4449             sentinelFailoverWaitPromotion(ri);
 4450             break;
 4451         case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
 4452             sentinelFailoverReconfNextSlave(ri);
 4453             break;
 4454     }
 4455 }
 4456 
 4457 /* Abort a failover in progress:
 4458  *
 4459  * This function can only be called before the promoted slave acknowledged
 4460  * the slave -> master switch. Otherwise the failover can't be aborted and
 4461  * will reach its end (possibly by timeout). */
 4462 void sentinelAbortFailover(sentinelRedisInstance *ri) {
 4463     serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
 4464     serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
 4465 
 4466     ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);
 4467     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
 4468     ri->failover_state_change_time = mstime();
 4469     if (ri->promoted_slave) {
 4470         ri->promoted_slave->flags &= ~SRI_PROMOTED;
 4471         ri->promoted_slave = NULL;
 4472     }
 4473 }
 4474 
 4475 /* ======================== SENTINEL timer handler ==========================
 4476  * This is the "main" our Sentinel, being sentinel completely non blocking
 4477  * in design. The function is called every second.
 4478  * -------------------------------------------------------------------------- */
 4479 
 4480 /* Perform scheduled operations for the specified Redis instance. */
 4481 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
 4482     /* ========== MONITORING HALF ============ */
 4483     /* Every kind of instance */
 4484     sentinelReconnectInstance(ri);
 4485     sentinelSendPeriodicCommands(ri);
 4486 
 4487     /* ============== ACTING HALF ============= */
 4488     /* We don't proceed with the acting half if we are in TILT mode.
 4489      * TILT happens when we find something odd with the time, like a
 4490      * sudden change in the clock. */
 4491     if (sentinel.tilt) {
 4492         if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
 4493         sentinel.tilt = 0;
 4494         sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
 4495     }
 4496 
 4497     /* Every kind of instance */
 4498     sentinelCheckSubjectivelyDown(ri);
 4499 
 4500     /* Masters and slaves */
 4501     if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
 4502         /* Nothing so far. */
 4503     }
 4504 
 4505     /* Only masters */
 4506     if (ri->flags & SRI_MASTER) {
 4507         sentinelCheckObjectivelyDown(ri);
 4508         if (sentinelStartFailoverIfNeeded(ri))
 4509             sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
 4510         sentinelFailoverStateMachine(ri);
 4511         sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
 4512     }
 4513 }
 4514 
 4515 /* Perform scheduled operations for all the instances in the dictionary.
 4516  * Recursively call the function against dictionaries of slaves. */
 4517 void sentinelHandleDictOfRedisInstances(dict *instances) {
 4518     dictIterator *di;
 4519     dictEntry *de;
 4520     sentinelRedisInstance *switch_to_promoted = NULL;
 4521 
 4522     /* There are a number of things we need to perform against every master. */
 4523     di = dictGetIterator(instances);
 4524     while((de = dictNext(di)) != NULL) {
 4525         sentinelRedisInstance *ri = dictGetVal(de);
 4526 
 4527         sentinelHandleRedisInstance(ri);
 4528         if (ri->flags & SRI_MASTER) {
 4529             sentinelHandleDictOfRedisInstances(ri->slaves);
 4530             sentinelHandleDictOfRedisInstances(ri->sentinels);
 4531             if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
 4532                 switch_to_promoted = ri;
 4533             }
 4534         }
 4535     }
 4536     if (switch_to_promoted)
 4537         sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
 4538     dictReleaseIterator(di);
 4539 }
 4540 
 4541 /* This function checks if we need to enter the TITL mode.
 4542  *
 4543  * The TILT mode is entered if we detect that between two invocations of the
 4544  * timer interrupt, a negative amount of time, or too much time has passed.
 4545  * Note that we expect that more or less just 100 milliseconds will pass
 4546  * if everything is fine. However we'll see a negative number or a
 4547  * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
 4548  * following conditions happen:
 4549  *
 4550  * 1) The Sentiel process for some time is blocked, for every kind of
 4551  * random reason: the load is huge, the computer was frozen for some time
 4552  * in I/O or alike, the process was stopped by a signal. Everything.
 4553  * 2) The system clock was altered significantly.
 4554  *
 4555  * Under both this conditions we'll see everything as timed out and failing
 4556  * without good reasons. Instead we enter the TILT mode and wait
 4557  * for SENTINEL_TILT_PERIOD to elapse before starting to act again.
 4558  *
 4559  * During TILT time we still collect information, we just do not act. */
 4560 void sentinelCheckTiltCondition(void) {
 4561     mstime_t now = mstime();
 4562     mstime_t delta = now - sentinel.previous_time;
 4563 
 4564     if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
 4565         sentinel.tilt = 1;
 4566         sentinel.tilt_start_time = mstime();
 4567         sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
 4568     }
 4569     sentinel.previous_time = mstime();
 4570 }
 4571 
 4572 void sentinelTimer(void) {
 4573     sentinelCheckTiltCondition();
 4574     sentinelHandleDictOfRedisInstances(sentinel.masters);
 4575     sentinelRunPendingScripts();
 4576     sentinelCollectTerminatedScripts();
 4577     sentinelKillTimedoutScripts();
 4578 
 4579     /* We continuously change the frequency of the Redis "timer interrupt"
 4580      * in order to desynchronize every Sentinel from every other.
 4581      * This non-determinism avoids that Sentinels started at the same time
 4582      * exactly continue to stay synchronized asking to be voted at the
 4583      * same time again and again (resulting in nobody likely winning the
 4584      * election because of split brain voting). */
 4585     server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
 4586 }
 4587