"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/pubsub.c" between
redis-6.2-rc3.tar.gz and redis-6.2.0.tar.gz

About: redis is an advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.

pubsub.c  (redis-6.2-rc3):pubsub.c  (redis-6.2.0)
skipping to change at line 127 skipping to change at line 127
addReplyBulk(c,pattern); addReplyBulk(c,pattern);
else else
addReplyNull(c); addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c)); addReplyLongLong(c,clientSubscriptionsCount(c));
} }
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Pubsub low level API * Pubsub low level API
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) {
pubsubPattern *pat = p;
decrRefCount(pat->pattern);
zfree(pat);
}
int listMatchPubsubPattern(void *a, void *b) {
pubsubPattern *pa = a, *pb = b;
return (pa->client == pb->client) &&
(equalStringObjects(pa->pattern,pb->pattern));
}
/* Return the number of channels + patterns a client is subscribed to. */ /* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) { int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels)+ return dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns); listLength(c->pubsub_patterns);
} }
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */ * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) { int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de; dictEntry *de;
skipping to change at line 215 skipping to change at line 201
} }
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 i f the client was already subscribed to that pattern. */ /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 i f the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) { int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de; dictEntry *de;
list *clients; list *clients;
int retval = 0; int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1; retval = 1;
pubsubPattern *pat;
listAddNodeTail(c->pubsub_patterns,pattern); listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern); incrRefCount(pattern);
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
/* Add the client to the pattern -> list of clients hash table */ /* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns_dict,pattern); de = dictFind(server.pubsub_patterns,pattern);
if (de == NULL) { if (de == NULL) {
clients = listCreate(); clients = listCreate();
dictAdd(server.pubsub_patterns_dict,pattern,clients); dictAdd(server.pubsub_patterns,pattern,clients);
incrRefCount(pattern); incrRefCount(pattern);
} else { } else {
clients = dictGetVal(de); clients = dictGetVal(de);
} }
listAddNodeTail(clients,c); listAddNodeTail(clients,c);
} }
/* Notify the client */ /* Notify the client */
addReplyPubsubPatSubscribed(c,pattern); addReplyPubsubPatSubscribed(c,pattern);
return retval; return retval;
} }
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */ * 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
dictEntry *de; dictEntry *de;
list *clients; list *clients;
listNode *ln; listNode *ln;
pubsubPattern pat;
int retval = 0; int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */ incrRefCount(pattern); /* Protect the object. May be the same we remove */
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
retval = 1; retval = 1;
listDelNode(c->pubsub_patterns,ln); listDelNode(c->pubsub_patterns,ln);
pat.client = c;
pat.pattern = pattern;
ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln);
/* Remove the client from the pattern -> clients list hash table */ /* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns_dict,pattern); de = dictFind(server.pubsub_patterns,pattern);
serverAssertWithInfo(c,NULL,de != NULL); serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de); clients = dictGetVal(de);
ln = listSearchKey(clients,c); ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL); serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln); listDelNode(clients,ln);
if (listLength(clients) == 0) { if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was /* Free the list and associated hash entry at all if this was
* the latest client. */ * the latest client. */
dictDelete(server.pubsub_patterns_dict,pattern); dictDelete(server.pubsub_patterns,pattern);
} }
} }
/* Notify the client */ /* Notify the client */
if (notify) addReplyPubsubPatUnsubscribed(c,pattern); if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
decrRefCount(pattern); decrRefCount(pattern);
return retval; return retval;
} }
/* Unsubscribe from all the channels. Return the number of channels the /* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed to. */ * client was subscribed to. */
skipping to change at line 332 skipping to change at line 308
listIter li; listIter li;
listRewind(list,&li); listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) { while ((ln = listNext(&li)) != NULL) {
client *c = ln->value; client *c = ln->value;
addReplyPubsubMessage(c,channel,message); addReplyPubsubMessage(c,channel,message);
receivers++; receivers++;
} }
} }
/* Send to clients listening to matching channels */ /* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns_dict); di = dictGetIterator(server.pubsub_patterns);
if (di) { if (di) {
channel = getDecodedObject(channel); channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de); robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de); list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr, if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr), sdslen(pattern->ptr),
(char*)channel->ptr, (char*)channel->ptr,
sdslen(channel->ptr),0)) continue; sdslen(channel->ptr),0)) continue;
skipping to change at line 505 skipping to change at line 481
addReplyArrayLen(c,(c->argc-2)*2); addReplyArrayLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) { for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]); list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
addReplyBulk(c,c->argv[j]); addReplyBulk(c,c->argv[j]);
addReplyLongLong(c,l ? listLength(l) : 0); addReplyLongLong(c,l ? listLength(l) : 0);
} }
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) { } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */ /* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns)); addReplyLongLong(c,dictSize(server.pubsub_patterns));
} else { } else {
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
} }
} }
 End of changes. 11 change blocks. 
30 lines changed or deleted 6 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)