"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/networking.c" between
redis-6.2-rc3.tar.gz and redis-6.2.0.tar.gz

About: redis is an advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.

networking.c  (redis-6.2-rc3):networking.c  (redis-6.2.0)
skipping to change at line 158 skipping to change at line 158
c->flags = 0; c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime; c->ctime = c->lastinteraction = server.unixtime;
clientSetDefaultAuth(c); clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE; c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0; c->repl_put_online_on_ack = 0;
c->reploff = 0; c->reploff = 0;
c->read_reploff = 0; c->read_reploff = 0;
c->repl_ack_off = 0; c->repl_ack_off = 0;
c->repl_ack_time = 0; c->repl_ack_time = 0;
c->slave_listening_port = 0; c->slave_listening_port = 0;
c->slave_ip[0] = '\0'; c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE; c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate(); c->reply = listCreate();
c->reply_bytes = 0; c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0; c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue); listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE; c->btype = BLOCKED_NONE;
c->bpop.timeout = 0; c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL; c->bpop.target = NULL;
skipping to change at line 530 skipping to change at line 530
sdsfree(s); sdsfree(s);
} }
/* Sometimes we are forced to create a new reply node, and we can't append to /* Sometimes we are forced to create a new reply node, and we can't append to
* the previous one, when that happens, we wanna try to trim the unused space * the previous one, when that happens, we wanna try to trim the unused space
* at the end of the last reply node which we won't use anymore. */ * at the end of the last reply node which we won't use anymore. */
void trimReplyUnusedTailSpace(client *c) { void trimReplyUnusedTailSpace(client *c) {
listNode *ln = listLast(c->reply); listNode *ln = listLast(c->reply);
clientReplyBlock *tail = ln? listNodeValue(ln): NULL; clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
/* Note that 'tail' may be NULL even if we have a tail node, becuase when /* Note that 'tail' may be NULL even if we have a tail node, because when
* addReplyDeferredLen() is used */ * addReplyDeferredLen() is used */
if (!tail) return; if (!tail) return;
/* We only try to trim the space is relatively high (more than a 1/4 of the /* We only try to trim the space is relatively high (more than a 1/4 of the
* allocation), otherwise there's a high chance realloc will NOP. * allocation), otherwise there's a high chance realloc will NOP.
* Also, to avoid large memmove which happens as part of realloc, we only do * Also, to avoid large memmove which happens as part of realloc, we only do
* that if the used part is small. */ * that if the used part is small. */
if (tail->size - tail->used > tail->size / 4 && if (tail->size - tail->used > tail->size / 4 &&
tail->used < PROTO_REPLY_CHUNK_BYTES) tail->used < PROTO_REPLY_CHUNK_BYTES)
{ {
skipping to change at line 720 skipping to change at line 720
if (ll == 0) if (ll == 0)
addReply(c,shared.czero); addReply(c,shared.czero);
else if (ll == 1) else if (ll == 1)
addReply(c,shared.cone); addReply(c,shared.cone);
else else
addReplyLongLongWithPrefix(c,ll,':'); addReplyLongLongWithPrefix(c,ll,':');
} }
void addReplyAggregateLen(client *c, long length, int prefix) { void addReplyAggregateLen(client *c, long length, int prefix) {
serverAssert(length >= 0); serverAssert(length >= 0);
if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN) addReplyLongLongWithPrefix(c,length,prefix);
addReply(c,shared.mbulkhdr[length]);
else
addReplyLongLongWithPrefix(c,length,prefix);
} }
void addReplyArrayLen(client *c, long length) { void addReplyArrayLen(client *c, long length) {
addReplyAggregateLen(c,length,'*'); addReplyAggregateLen(c,length,'*');
} }
void addReplyMapLen(client *c, long length) { void addReplyMapLen(client *c, long length) {
int prefix = c->resp == 2 ? '*' : '%'; int prefix = c->resp == 2 ? '*' : '%';
if (c->resp == 2) length *= 2; if (c->resp == 2) length *= 2;
addReplyAggregateLen(c,length,prefix); addReplyAggregateLen(c,length,prefix);
skipping to change at line 784 skipping to change at line 781
addReplyProto(c,"*-1\r\n",5); addReplyProto(c,"*-1\r\n",5);
} else { } else {
addReplyProto(c,"_\r\n",3); addReplyProto(c,"_\r\n",3);
} }
} }
/* Create the length prefix of a bulk reply, example: $2234 */ /* Create the length prefix of a bulk reply, example: $2234 */
void addReplyBulkLen(client *c, robj *obj) { void addReplyBulkLen(client *c, robj *obj) {
size_t len = stringObjectLen(obj); size_t len = stringObjectLen(obj);
if (len < OBJ_SHARED_BULKHDR_LEN) addReplyLongLongWithPrefix(c,len,'$');
addReply(c,shared.bulkhdr[len]);
else
addReplyLongLongWithPrefix(c,len,'$');
} }
/* Add a Redis Object as a bulk reply */ /* Add a Redis Object as a bulk reply */
void addReplyBulk(client *c, robj *obj) { void addReplyBulk(client *c, robj *obj) {
addReplyBulkLen(c,obj); addReplyBulkLen(c,obj);
addReply(c,obj); addReply(c,obj);
addReply(c,shared.crlf); addReply(c,shared.crlf);
} }
/* Add a C buffer as bulk reply */ /* Add a C buffer as bulk reply */
skipping to change at line 1409 skipping to change at line 1403
c->client_cron_last_memory_usage; c->client_cron_last_memory_usage;
/* Release other dynamically allocated client structure fields, /* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */ * and finally release the client structure itself. */
if (c->name) decrRefCount(c->name); if (c->name) decrRefCount(c->name);
zfree(c->argv); zfree(c->argv);
c->argv_len_sum = 0; c->argv_len_sum = 0;
freeClientMultiState(c); freeClientMultiState(c);
sdsfree(c->peerid); sdsfree(c->peerid);
sdsfree(c->sockname); sdsfree(c->sockname);
sdsfree(c->slave_addr);
zfree(c); zfree(c);
} }
/* Schedule a client to free it at a safe time in the serverCron() function. /* Schedule a client to free it at a safe time in the serverCron() function.
* This function is useful when we need to terminate a client but we are in * This function is useful when we need to terminate a client but we are in
* a context where calling freeClient() is not possible, because the client * a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */ * should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) { void freeClientAsync(client *c) {
/* We need to handle concurrent access to the server.clients_to_close list /* We need to handle concurrent access to the server.clients_to_close list
* only in the freeClientAsync() function, since it's the only function that * only in the freeClientAsync() function, since it's the only function that
skipping to change at line 1435 skipping to change at line 1430
/* no need to bother with locking if there's just one thread (the main t hread) */ /* no need to bother with locking if there's just one thread (the main t hread) */
listAddNodeTail(server.clients_to_close,c); listAddNodeTail(server.clients_to_close,c);
return; return;
} }
static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&async_free_queue_mutex); pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c); listAddNodeTail(server.clients_to_close,c);
pthread_mutex_unlock(&async_free_queue_mutex); pthread_mutex_unlock(&async_free_queue_mutex);
} }
/* Free the clietns marked as CLOSE_ASAP, return the number of clients /* Free the clients marked as CLOSE_ASAP, return the number of clients
* freed. */ * freed. */
int freeClientsInAsyncFreeQueue(void) { int freeClientsInAsyncFreeQueue(void) {
int freed = 0; int freed = 0;
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(server.clients_to_close,&li); listRewind(server.clients_to_close,&li);
while ((ln = listNext(&li)) != NULL) { while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln); client *c = listNodeValue(ln);
skipping to change at line 3527 skipping to change at line 3522
} else { } else {
return 0; return 0;
} }
} }
int handleClientsWithPendingWritesUsingThreads(void) { int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write); int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */ if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't /* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */ * use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites(); return handleClientsWithPendingWrites();
} }
/* Start threads if needed. */ /* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO(); if (!server.io_threads_active) startThreadedIO();
/* Distribute the clients across N different lists. */ /* Distribute the clients across N different lists. */
listIter li; listIter li;
listNode *ln; listNode *ln;
 End of changes. 7 change blocks. 
12 lines changed or deleted 7 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)