"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "tests/modules/blockonbackground.c" between
redis-6.2-rc3.tar.gz and redis-6.2.0.tar.gz

About: redis is an advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.

blockonbackground.c  (redis-6.2-rc3):blockonbackground.c  (redis-6.2.0)
#define REDISMODULE_EXPERIMENTAL_API #define REDISMODULE_EXPERIMENTAL_API
#define _XOPEN_SOURCE 700
#include "redismodule.h" #include "redismodule.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <pthread.h> #include <pthread.h>
#include <time.h> #include <time.h>
#include "assert.h"
#define UNUSED(x) (void)(x) #define UNUSED(x) (void)(x)
/* Reply callback for blocking command BLOCK.DEBUG */ /* Reply callback for blocking command BLOCK.DEBUG */
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv); UNUSED(argv);
UNUSED(argc); UNUSED(argc);
int *myint = RedisModule_GetBlockedClientPrivateData(ctx); int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx,*myint); return RedisModule_ReplyWithLongLong(ctx,*myint);
} }
/* Timeout callback for blocking command BLOCK.DEBUG */ /* Timeout callback for blocking command BLOCK.DEBUG */
int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv); UNUSED(argv);
UNUSED(argc); UNUSED(argc);
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx); RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
assert(RedisModule_BlockedClientMeasureTimeEnd(bc)==REDISMODULE_OK); RedisModule_BlockedClientMeasureTimeEnd(bc);
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
} }
/* Private data freeing callback for BLOCK.DEBUG command. */ /* Private data freeing callback for BLOCK.DEBUG command. */
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
UNUSED(ctx); UNUSED(ctx);
RedisModule_Free(privdata); RedisModule_Free(privdata);
} }
/* The thread entry point that actually executes the blocking part /* The thread entry point that actually executes the blocking part
* of the command BLOCK.DEBUG. */ * of the command BLOCK.DEBUG. */
void *BlockDebug_ThreadMain(void *arg) { void *BlockDebug_ThreadMain(void *arg) {
void **targ = arg; void **targ = arg;
RedisModuleBlockedClient *bc = targ[0]; RedisModuleBlockedClient *bc = targ[0];
long long delay = (unsigned long)targ[1]; long long delay = (unsigned long)targ[1];
long long enable_time_track = (unsigned long)targ[2]; long long enable_time_track = (unsigned long)targ[2];
if (enable_time_track) if (enable_time_track)
assert(RedisModule_BlockedClientMeasureTimeStart(bc)==REDISMODULE_OK); RedisModule_BlockedClientMeasureTimeStart(bc);
RedisModule_Free(targ); RedisModule_Free(targ);
struct timespec ts; struct timespec ts;
ts.tv_sec = delay / 1000; ts.tv_sec = delay / 1000;
ts.tv_nsec = (delay % 1000) * 1000000; ts.tv_nsec = (delay % 1000) * 1000000;
nanosleep(&ts, NULL); nanosleep(&ts, NULL);
int *r = RedisModule_Alloc(sizeof(int)); int *r = RedisModule_Alloc(sizeof(int));
*r = rand(); *r = rand();
if (enable_time_track) if (enable_time_track)
assert(RedisModule_BlockedClientMeasureTimeEnd(bc)==REDISMODULE_OK); RedisModule_BlockedClientMeasureTimeEnd(bc);
RedisModule_UnblockClient(bc,r); RedisModule_UnblockClient(bc,r);
return NULL; return NULL;
} }
/* The thread entry point that actually executes the blocking part /* The thread entry point that actually executes the blocking part
* of the command BLOCK.DEBUG. */ * of the command BLOCK.DOUBLE_DEBUG. */
void *DoubleBlock_ThreadMain(void *arg) { void *DoubleBlock_ThreadMain(void *arg) {
void **targ = arg; void **targ = arg;
RedisModuleBlockedClient *bc = targ[0]; RedisModuleBlockedClient *bc = targ[0];
long long delay = (unsigned long)targ[1]; long long delay = (unsigned long)targ[1];
assert(RedisModule_BlockedClientMeasureTimeStart(bc)==REDISMODULE_OK); RedisModule_BlockedClientMeasureTimeStart(bc);
RedisModule_Free(targ); RedisModule_Free(targ);
struct timespec ts; struct timespec ts;
ts.tv_sec = delay / 1000; ts.tv_sec = delay / 1000;
ts.tv_nsec = (delay % 1000) * 1000000; ts.tv_nsec = (delay % 1000) * 1000000;
nanosleep(&ts, NULL); nanosleep(&ts, NULL);
int *r = RedisModule_Alloc(sizeof(int)); int *r = RedisModule_Alloc(sizeof(int));
*r = rand(); *r = rand();
RedisModule_BlockedClientMeasureTimeEnd(bc); RedisModule_BlockedClientMeasureTimeEnd(bc);
/* call again RedisModule_BlockedClientMeasureTimeStart() and /* call again RedisModule_BlockedClientMeasureTimeStart() and
* RedisModule_BlockedClientMeasureTimeEnd and ensure that the * RedisModule_BlockedClientMeasureTimeEnd and ensure that the
* total execution time is 2x the delay. */ * total execution time is 2x the delay. */
assert(RedisModule_BlockedClientMeasureTimeStart(bc)==REDISMODULE_OK); RedisModule_BlockedClientMeasureTimeStart(bc);
nanosleep(&ts, NULL); nanosleep(&ts, NULL);
RedisModule_BlockedClientMeasureTimeEnd(bc); RedisModule_BlockedClientMeasureTimeEnd(bc);
RedisModule_UnblockClient(bc,r); RedisModule_UnblockClient(bc,r);
return NULL; return NULL;
} }
void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) { void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
RedisModule_Log(ctx,"warning","Blocked client %p disconnected!", RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
(void*)bc); (void*)bc);
skipping to change at line 176 skipping to change at line 176
return REDISMODULE_OK; return REDISMODULE_OK;
} }
/* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds, /* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
* then reply with a random number. * then reply with a random number.
* This command is used to test multiple calls to RedisModule_BlockedClientMeasu reTimeStart() * This command is used to test multiple calls to RedisModule_BlockedClientMeasu reTimeStart()
* and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */ * and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) return RedisModule_WrongArity(ctx); if (argc != 2) return RedisModule_WrongArity(ctx);
long long delay; long long delay;
long long timeout;
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) { if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,"ERR invalid count"); return RedisModule_ReplyWithError(ctx,"ERR invalid count");
} }
pthread_t tid; pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply, HelloBlock_Timeout,HelloBlock_FreeData,timeout); RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply, HelloBlock_Timeout,HelloBlock_FreeData,0);
/* Now that we setup a blocking client, we need to pass the control /* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread: * to the thread. However we need to pass arguments to the thread:
* the delay and a reference to the blocked client handle. */ * the delay and a reference to the blocked client handle. */
void **targ = RedisModule_Alloc(sizeof(void*)*2); void **targ = RedisModule_Alloc(sizeof(void*)*2);
targ[0] = bc; targ[0] = bc;
targ[1] = (void*)(unsigned long) delay; targ[1] = (void*)(unsigned long) delay;
if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) { if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
RedisModule_AbortBlock(bc); RedisModule_AbortBlock(bc);
 End of changes. 10 change blocks. 
9 lines changed or deleted 8 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)