"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/t_stream.c" between
redis-6.2-rc3.tar.gz and redis-6.2.0.tar.gz

About: redis is an advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.

t_stream.c  (redis-6.2-rc3):t_stream.c  (redis-6.2.0)
skipping to change at line 46 skipping to change at line 46
* entry at the start of the listpack> */ * entry at the start of the listpack> */
#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ #define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ #define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
/* For stream commands that require multiple IDs /* For stream commands that require multiple IDs
* when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
* avoid malloc allocation.*/ * avoid malloc allocation.*/
#define STREAMID_STATIC_VECTOR_LEN 8 #define STREAMID_STATIC_VECTOR_LEN 8
/* Max pre-allocation for listpack. This is done to avoid abuse of a user
* setting stream_node_max_bytes to a huge number. */
#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na); void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start , streamID *end, size_t count, streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start , streamID *end, size_t count, streamConsumer *consumer);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin g_seq); int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin g_seq);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) ; int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) ;
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks. * Low level stream encoding: a radix tree of listpacks.
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */
skipping to change at line 511 skipping to change at line 515
/* First of all, check if we can append to the current macro node or /* First of all, check if we can append to the current macro node or
* if we need to switch to the next one. 'lp' will be set to NULL if * if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */ * the current node is full. */
if (lp != NULL) { if (lp != NULL) {
if (server.stream_node_max_bytes && if (server.stream_node_max_bytes &&
lp_bytes >= server.stream_node_max_bytes) lp_bytes >= server.stream_node_max_bytes)
{ {
lp = NULL; lp = NULL;
} else if (server.stream_node_max_entries) { } else if (server.stream_node_max_entries) {
int64_t count = lpGetInteger(lpFirst(lp)); unsigned char *lp_ele = lpFirst(lp);
if (count >= server.stream_node_max_entries) lp = NULL; /* Count both live entries and deleted ones. */
int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele
));
if (count >= server.stream_node_max_entries) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
} }
} }
int flags = STREAM_ITEM_FLAG_NONE; int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL) { if (lp == NULL) {
master_id = id; master_id = id;
streamEncodeID(rax_key,&id); streamEncodeID(rax_key,&id);
/* Create the listpack having the master entry ID and fields. */ /* Create the listpack having the master entry ID and fields.
lp = lpNew(); * Pre-allocate some bytes when creating listpack to avoid realloc on
* every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
* and won't realloc on every XADD.
* When listpack reaches max number of entries, we'll shrink the
* allocation to fit the data. */
size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < p
realloc) {
prealloc = server.stream_node_max_bytes;
}
lp = lpNew(prealloc);
lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
lp = lpAppendInteger(lp,numfields); lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) { for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr; sds field = argv[i*2]->ptr;
lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
} }
lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the /* The first entry we insert, has obviously the same fields of the
skipping to change at line 1331 skipping to change at line 1352
* to propagate this changes in the form of XCLAIM commands. */ * to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam e, robj *id, streamNACK *nack) { void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam e, robj *id, streamNACK *nack) {
/* We need to generate an XCLAIM that will work in a idempotent fashion: /* We need to generate an XCLAIM that will work in a idempotent fashion:
* *
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
* RETRYCOUNT <count> FORCE JUSTID LASTID <id>. * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
* *
* Note that JUSTID is useful in order to avoid that XCLAIM will do * Note that JUSTID is useful in order to avoid that XCLAIM will do
* useless work in the slave side, trying to fetch the stream item. */ * useless work in the slave side, trying to fetch the stream item. */
robj *argv[14]; robj *argv[14];
argv[0] = createStringObject("XCLAIM",6); argv[0] = shared.xclaim;
argv[1] = key; argv[1] = key;
argv[2] = groupname; argv[2] = groupname;
argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->nam e)); argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->nam e));
argv[4] = createStringObjectFromLongLong(0); argv[4] = shared.integers[0];
argv[5] = id; argv[5] = id;
argv[6] = createStringObject("TIME",4); argv[6] = shared.time;
argv[7] = createStringObjectFromLongLong(nack->delivery_time); argv[7] = createStringObjectFromLongLong(nack->delivery_time);
argv[8] = createStringObject("RETRYCOUNT",10); argv[8] = shared.retrycount;
argv[9] = createStringObjectFromLongLong(nack->delivery_count); argv[9] = createStringObjectFromLongLong(nack->delivery_count);
argv[10] = createStringObject("FORCE",5); argv[10] = shared.force;
argv[11] = createStringObject("JUSTID",6); argv[11] = shared.justid;
argv[12] = createStringObject("LASTID",6); argv[12] = shared.lastid;
argv[13] = createObjectFromStreamID(&group->last_id); argv[13] = createObjectFromStreamID(&group->last_id);
/* We use progagate() because this code path is not always called from /* We use progagate() because this code path is not always called from
* the command execution context. Moreover this will just alter the * the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because * consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */ * there is no message state cross-message atomicity required. */
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REP L); propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REP L);
decrRefCount(argv[0]);
decrRefCount(argv[3]); decrRefCount(argv[3]);
decrRefCount(argv[4]);
decrRefCount(argv[6]);
decrRefCount(argv[7]); decrRefCount(argv[7]);
decrRefCount(argv[8]);
decrRefCount(argv[9]); decrRefCount(argv[9]);
decrRefCount(argv[10]);
decrRefCount(argv[11]);
decrRefCount(argv[12]);
decrRefCount(argv[13]); decrRefCount(argv[13]);
} }
/* We need this when we want to propoagate the new last-id of a consumer group /* We need this when we want to propoagate the new last-id of a consumer group
* that was consumed by XREADGROUP with the NOACK option: in that case we can't * that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit * propagate the last ID just using the XCLAIM LASTID option, so we emit
* *
* XGROUP SETID <key> <groupname> <id> * XGROUP SETID <key> <groupname> <id>
*/ */
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna me) { void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna me) {
robj *argv[5]; robj *argv[5];
argv[0] = createStringObject("XGROUP",6); argv[0] = shared.xgroup;
argv[1] = createStringObject("SETID",5); argv[1] = shared.setid;
argv[2] = key; argv[2] = key;
argv[3] = groupname; argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id); argv[4] = createObjectFromStreamID(&group->last_id);
/* We use progagate() because this code path is not always called from /* We use progagate() because this code path is not always called from
* the command execution context. Moreover this will just alter the * the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because * consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */ * there is no message state cross-message atomicity required. */
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL ); propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL );
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]); decrRefCount(argv[4]);
} }
/* We need this when we want to propagate creation of consumer that was created /* We need this when we want to propagate creation of consumer that was created
* by XREADGROUP with the NOACK option. In that case, the only way to create * by XREADGROUP with the NOACK option. In that case, the only way to create
* the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #714 0) * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #714 0)
* *
* XGROUP CREATECONSUMER <key> <groupname> <consumername> * XGROUP CREATECONSUMER <key> <groupname> <consumername>
*/ */
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) {
robj *argv[5]; robj *argv[5];
argv[0] = createStringObject("XGROUP",6); argv[0] = shared.xgroup;
argv[1] = createStringObject("CREATECONSUMER",14); argv[1] = shared.createconsumer;
argv[2] = key; argv[2] = key;
argv[3] = groupname; argv[3] = groupname;
argv[4] = createObject(OBJ_STRING,sdsdup(consumername)); argv[4] = createObject(OBJ_STRING,sdsdup(consumername));
/* We use progagate() because this code path is not always called from /* We use progagate() because this code path is not always called from
* the command execution context. Moreover this will just alter the * the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because * consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */ * there is no message state cross-message atomicity required. */
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL ); propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL );
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]); decrRefCount(argv[4]);
} }
/* Send the stream items in the specified range to the client 'c'. The range /* Send the stream items in the specified range to the client 'c'. The range
* the client will receive is between start and end inclusive, if 'count' is * the client will receive is between start and end inclusive, if 'count' is
* non zero, no more than 'count' elements are sent. * non zero, no more than 'count' elements are sent.
* *
* The 'end' pointer can be NULL to mean that we want all the elements from * The 'end' pointer can be NULL to mean that we want all the elements from
* 'start' till the end of the stream. If 'rev' is non zero, elements are * 'start' till the end of the stream. If 'rev' is non zero, elements are
* produced in reversed order from end to start. * produced in reversed order from end to start.
skipping to change at line 1728 skipping to change at line 1738
invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR); invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR);
decrRefCount(t); decrRefCount(t);
} else } else
invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
if (invalid) if (invalid)
return C_ERR; return C_ERR;
return C_OK; return C_OK;
} }
void streamRewriteApproxSpecifier(client *c, int idx) { void streamRewriteApproxSpecifier(client *c, int idx) {
robj *equal_obj = createStringObject("=",1); rewriteClientCommandArgument(c,idx,shared.special_equals);
rewriteClientCommandArgument(c,idx,equal_obj);
decrRefCount(equal_obj);
} }
/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-strea m> /* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-strea m>
* otherwise trimming is no longer deterministic on replicas / AOF. */ * otherwise trimming is no longer deterministic on replicas / AOF. */
void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
robj *arg; robj *arg;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) { if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
arg = createStringObjectFromLongLong(s->length); arg = createStringObjectFromLongLong(s->length);
} else { } else {
streamID first_id; streamID first_id;
skipping to change at line 3474 skipping to change at line 3482
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
return; return;
} }
/* With the exception of HELP handled before any other sub commands, all /* With the exception of HELP handled before any other sub commands, all
* the ones are in the form of "<subcommand> <key>". */ * the ones are in the form of "<subcommand> <key>". */
opt = c->argv[1]->ptr; opt = c->argv[1]->ptr;
key = c->argv[2]; key = c->argv[2];
/* Lookup the key now, this is common for all the subcommands but HELP. */ /* Lookup the key now, this is common for all the subcommands but HELP. */
robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr); robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return; if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr; s = o->ptr;
/* Dispatch the different subcommands. */ /* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
/* XINFO CONSUMERS <key> <group>. */ /* XINFO CONSUMERS <key> <group>. */
streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
if (cg == NULL) { if (cg == NULL) {
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
"for key name '%s'", "for key name '%s'",
 End of changes. 18 change blocks. 
30 lines changed or deleted 40 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)