"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/replication.c" between
redis-6.2-rc3.tar.gz and redis-6.2.0.tar.gz

About: redis is an advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.

replication.c  (redis-6.2-rc3):replication.c  (redis-6.2.0)
skipping to change at line 59 skipping to change at line 59
* the instance is configured to have no persistence. */ * the instance is configured to have no persistence. */
int RDBGeneratedByReplication = 0; int RDBGeneratedByReplication = 0;
/* --------------------------- Utility functions ---------------------------- */ /* --------------------------- Utility functions ---------------------------- */
/* Return the pointer to a string representing the slave ip:listening_port /* Return the pointer to a string representing the slave ip:listening_port
* pair. Mostly useful for logging, since we want to log a slave using its * pair. Mostly useful for logging, since we want to log a slave using its
* IP address and its listening port which is more clear for the user, for * IP address and its listening port which is more clear for the user, for
* example: "Closing connection with replica 10.1.2.3:6380". */ * example: "Closing connection with replica 10.1.2.3:6380". */
char *replicationGetSlaveName(client *c) { char *replicationGetSlaveName(client *c) {
static char buf[NET_ADDR_STR_LEN]; static char buf[NET_HOST_PORT_STR_LEN];
char ip[NET_IP_STR_LEN]; char ip[NET_IP_STR_LEN];
ip[0] = '\0'; ip[0] = '\0';
buf[0] = '\0'; buf[0] = '\0';
if (c->slave_ip[0] != '\0' || if (c->slave_addr ||
connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1) connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1)
{ {
/* Note that the 'ip' buffer is always larger than 'c->slave_ip' */ char *addr = c->slave_addr ? c->slave_addr : ip;
if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
if (c->slave_listening_port) if (c->slave_listening_port)
anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port);
else else
snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",ip); snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",addr);
} else { } else {
snprintf(buf,sizeof(buf),"client id #%llu", snprintf(buf,sizeof(buf),"client id #%llu",
(unsigned long long) c->id); (unsigned long long) c->id);
} }
return buf; return buf;
} }
/* Plain unlink() can block for quite some time in order to actually apply /* Plain unlink() can block for quite some time in order to actually apply
* the file deletion to the filesystem. This call removes the file in a * the file deletion to the filesystem. This call removes the file in a
* background thread instead. We actually just do close() in the thread, * background thread instead. We actually just do close() in the thread,
skipping to change at line 927 skipping to change at line 925
/* Process every option-value pair. */ /* Process every option-value pair. */
for (j = 1; j < c->argc; j+=2) { for (j = 1; j < c->argc; j+=2) {
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
long port; long port;
if ((getLongFromObjectOrReply(c,c->argv[j+1], if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != C_OK)) &port,NULL) != C_OK))
return; return;
c->slave_listening_port = port; c->slave_listening_port = port;
} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) { } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
sds ip = c->argv[j+1]->ptr; sds addr = c->argv[j+1]->ptr;
if (sdslen(ip) < sizeof(c->slave_ip)) { if (sdslen(addr) < NET_HOST_STR_LEN) {
memcpy(c->slave_ip,ip,sdslen(ip)+1); if (c->slave_addr) sdsfree(c->slave_addr);
c->slave_addr = sdsdup(addr);
} else { } else {
addReplyErrorFormat(c,"REPLCONF ip-address provided by " addReplyErrorFormat(c,"REPLCONF ip-address provided by "
"replica instance is too long: %zd bytes", sdslen(ip)); "replica instance is too long: %zd bytes", sdslen(addr));
return; return;
} }
} else if (!strcasecmp(c->argv[j]->ptr,"capa")) { } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
/* Ignore capabilities not understood by this master. */ /* Ignore capabilities not understood by this master. */
if (!strcasecmp(c->argv[j+1]->ptr,"eof")) if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF; c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp(c->argv[j+1]->ptr,"psync2")) else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2; c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) { } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount /* REPLCONF ACK is used by slave to inform the master the amount
skipping to change at line 2798 skipping to change at line 2797
void *mbcount; void *mbcount;
int slaves = 0; int slaves = 0;
addReplyArrayLen(c,3); addReplyArrayLen(c,3);
addReplyBulkCBuffer(c,"master",6); addReplyBulkCBuffer(c,"master",6);
addReplyLongLong(c,server.master_repl_offset); addReplyLongLong(c,server.master_repl_offset);
mbcount = addReplyDeferredLen(c); mbcount = addReplyDeferredLen(c);
listRewind(server.slaves,&li); listRewind(server.slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = ln->value; client *slave = ln->value;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip; char ip[NET_IP_STR_LEN], *slaveaddr = slave->slave_addr;
if (slaveip[0] == '\0') { if (!slaveaddr) {
if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1) if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1)
continue; continue;
slaveip = ip; slaveaddr = ip;
} }
if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (slave->replstate != SLAVE_STATE_ONLINE) continue;
addReplyArrayLen(c,3); addReplyArrayLen(c,3);
addReplyBulkCString(c,slaveip); addReplyBulkCString(c,slaveaddr);
addReplyBulkLongLong(c,slave->slave_listening_port); addReplyBulkLongLong(c,slave->slave_listening_port);
addReplyBulkLongLong(c,slave->repl_ack_off); addReplyBulkLongLong(c,slave->repl_ack_off);
slaves++; slaves++;
} }
setDeferredArrayLen(c,mbcount,slaves); setDeferredArrayLen(c,mbcount,slaves);
} else { } else {
char *slavestate = NULL; char *slavestate = NULL;
addReplyArrayLen(c,5); addReplyArrayLen(c,5);
addReplyBulkCBuffer(c,"slave",5); addReplyBulkCBuffer(c,"slave",5);
skipping to change at line 3334 skipping to change at line 3333
* a Redis Cluster manual failover: the PING we send will otherwise * a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer * alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */ * match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress = int manual_failover_in_progress =
((server.cluster_enabled && ((server.cluster_enabled &&
server.cluster->mf_end) || server.cluster->mf_end) ||
server.failover_end_time) && server.failover_end_time) &&
checkClientPauseTimeoutAndReturnIfPaused(); checkClientPauseTimeoutAndReturnIfPaused();
if (!manual_failover_in_progress) { if (!manual_failover_in_progress) {
ping_argv[0] = createStringObject("PING",4); ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, server.slaveseldb, replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1); ping_argv, 1);
decrRefCount(ping_argv[0]);
} }
} }
/* Second, send a newline to all the slaves in pre-synchronization /* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file. * stage, that is, slaves waiting for the master to create the RDB file.
* *
* Also send the a newline to all the chained slaves we have, if we lost * Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their * connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied * master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order * data from top-level masters, so there is no explicit pinging in order
skipping to change at line 3494 skipping to change at line 3492
/* Find replica at IP:PORT from replica list */ /* Find replica at IP:PORT from replica list */
static client *findReplica(char *host, int port) { static client *findReplica(char *host, int port) {
listIter li; listIter li;
listNode *ln; listNode *ln;
client *replica; client *replica;
listRewind(server.slaves,&li); listRewind(server.slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
replica = ln->value; replica = ln->value;
char ip[NET_IP_STR_LEN], *replicaip = replica->slave_ip; char ip[NET_IP_STR_LEN], *replicaip = replica->slave_addr;
if (replicaip[0] == '\0') { if (!replicaip) {
if (connPeerToString(replica->conn, ip, sizeof(ip), NULL) == -1) if (connPeerToString(replica->conn, ip, sizeof(ip), NULL) == -1)
continue; continue;
replicaip = ip; replicaip = ip;
} }
if (!strcasecmp(host, replicaip) && if (!strcasecmp(host, replicaip) &&
(port == replica->slave_listening_port)) (port == replica->slave_listening_port))
return replica; return replica;
} }
skipping to change at line 3725 skipping to change at line 3723
server.target_replica_port); server.target_replica_port);
} else { } else {
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(server.slaves,&li); listRewind(server.slaves,&li);
/* Find any replica that has matched our repl_offset */ /* Find any replica that has matched our repl_offset */
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
replica = ln->value; replica = ln->value;
if (replica->repl_ack_off == server.master_repl_offset) { if (replica->repl_ack_off == server.master_repl_offset) {
char ip[NET_IP_STR_LEN], *replicaip = replica->slave_ip; char ip[NET_IP_STR_LEN], *replicaaddr = replica->slave_addr;
if (replicaip[0] == '\0') { if (!replicaaddr) {
if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1 ) if (connPeerToString(replica->conn,ip,sizeof(ip),NULL) == -1 )
continue; continue;
replicaip = ip; replicaaddr = ip;
} }
/* We are now failing over to this specific node */ /* We are now failing over to this specific node */
server.target_replica_host = zstrdup(replicaip); server.target_replica_host = zstrdup(replicaaddr);
server.target_replica_port = replica->slave_listening_port; server.target_replica_port = replica->slave_listening_port;
break; break;
} }
} }
} }
/* We've found a replica that is caught up */ /* We've found a replica that is caught up */
if (replica && (replica->repl_ack_off == server.master_repl_offset)) { if (replica && (replica->repl_ack_off == server.master_repl_offset)) {
server.failover_state = FAILOVER_IN_PROGRESS; server.failover_state = FAILOVER_IN_PROGRESS;
serverLog(LL_NOTICE, serverLog(LL_NOTICE,
 End of changes. 19 change blocks. 
23 lines changed or deleted 21 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)