"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.2.5/tests/modules/blockonbackground.c" (21 Jul 2021, 11303 Bytes) of package /linux/misc/redis-6.2.5.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "blockonbackground.c": 6.2.4_vs_6.2.5.

    1 #define REDISMODULE_EXPERIMENTAL_API
    2 #define _XOPEN_SOURCE 700
    3 #include "redismodule.h"
    4 #include <stdio.h>
    5 #include <stdlib.h>
    6 #include <pthread.h>
    7 #include <time.h>
    8 
    9 #define UNUSED(x) (void)(x)
   10 
   11 /* Reply callback for blocking command BLOCK.DEBUG */
   12 int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
   13     UNUSED(argv);
   14     UNUSED(argc);
   15     int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
   16     return RedisModule_ReplyWithLongLong(ctx,*myint);
   17 }
   18 
   19 /* Timeout callback for blocking command BLOCK.DEBUG */
   20 int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
   21     UNUSED(argv);
   22     UNUSED(argc);
   23     RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
   24     RedisModule_BlockedClientMeasureTimeEnd(bc);
   25     return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
   26 }
   27 
   28 /* Private data freeing callback for BLOCK.DEBUG command. */
   29 void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
   30     UNUSED(ctx);
   31     RedisModule_Free(privdata);
   32 }
   33 
   34 /* The thread entry point that actually executes the blocking part
   35  * of the command BLOCK.DEBUG. */
   36 void *BlockDebug_ThreadMain(void *arg) {
   37     void **targ = arg;
   38     RedisModuleBlockedClient *bc = targ[0];
   39     long long delay = (unsigned long)targ[1];
   40     long long enable_time_track = (unsigned long)targ[2];
   41     if (enable_time_track)
   42         RedisModule_BlockedClientMeasureTimeStart(bc);
   43     RedisModule_Free(targ);
   44 
   45     struct timespec ts;
   46     ts.tv_sec = delay / 1000;
   47     ts.tv_nsec = (delay % 1000) * 1000000;
   48     nanosleep(&ts, NULL);
   49     int *r = RedisModule_Alloc(sizeof(int));
   50     *r = rand();
   51     if (enable_time_track)
   52         RedisModule_BlockedClientMeasureTimeEnd(bc);
   53     RedisModule_UnblockClient(bc,r);
   54     return NULL;
   55 }
   56 
   57 /* The thread entry point that actually executes the blocking part
   58  * of the command BLOCK.DOUBLE_DEBUG. */
   59 void *DoubleBlock_ThreadMain(void *arg) {
   60     void **targ = arg;
   61     RedisModuleBlockedClient *bc = targ[0];
   62     long long delay = (unsigned long)targ[1];
   63     RedisModule_BlockedClientMeasureTimeStart(bc);
   64     RedisModule_Free(targ);
   65     struct timespec ts;
   66     ts.tv_sec = delay / 1000;
   67     ts.tv_nsec = (delay % 1000) * 1000000;
   68     nanosleep(&ts, NULL);
   69     int *r = RedisModule_Alloc(sizeof(int));
   70     *r = rand();
   71     RedisModule_BlockedClientMeasureTimeEnd(bc);
   72     /* call again RedisModule_BlockedClientMeasureTimeStart() and
   73      * RedisModule_BlockedClientMeasureTimeEnd and ensure that the
   74      * total execution time is 2x the delay. */
   75     RedisModule_BlockedClientMeasureTimeStart(bc);
   76     nanosleep(&ts, NULL);
   77     RedisModule_BlockedClientMeasureTimeEnd(bc);
   78 
   79     RedisModule_UnblockClient(bc,r);
   80     return NULL;
   81 }
   82 
   83 void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
   84     RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
   85         (void*)bc);
   86 }
   87 
   88 /* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
   89  * a random number. Timeout is the command timeout, so that you can test
   90  * what happens when the delay is greater than the timeout. */
   91 int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
   92     if (argc != 3) return RedisModule_WrongArity(ctx);
   93     long long delay;
   94     long long timeout;
   95 
   96     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
   97         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
   98     }
   99 
  100     if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
  101         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  102     }
  103 
  104     pthread_t tid;
  105     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
  106 
  107     /* Here we set a disconnection handler, however since this module will
  108      * block in sleep() in a thread, there is not much we can do in the
  109      * callback, so this is just to show you the API. */
  110     RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
  111 
  112     /* Now that we setup a blocking client, we need to pass the control
  113      * to the thread. However we need to pass arguments to the thread:
  114      * the delay and a reference to the blocked client handle. */
  115     void **targ = RedisModule_Alloc(sizeof(void*)*3);
  116     targ[0] = bc;
  117     targ[1] = (void*)(unsigned long) delay;
  118     // pass 1 as flag to enable time tracking
  119     targ[2] = (void*)(unsigned long) 1;
  120 
  121     if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
  122         RedisModule_AbortBlock(bc);
  123         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
  124     }
  125     return REDISMODULE_OK;
  126 }
  127 
  128 /* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
  129  * a random number. Timeout is the command timeout, so that you can test
  130  * what happens when the delay is greater than the timeout.
  131  * this command does not track background time so the background time should no appear in stats*/
  132 int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  133     if (argc != 3) return RedisModule_WrongArity(ctx);
  134     long long delay;
  135     long long timeout;
  136 
  137     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
  138         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  139     }
  140 
  141     if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
  142         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  143     }
  144 
  145     pthread_t tid;
  146     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
  147 
  148     /* Here we set a disconnection handler, however since this module will
  149      * block in sleep() in a thread, there is not much we can do in the
  150      * callback, so this is just to show you the API. */
  151     RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
  152 
  153     /* Now that we setup a blocking client, we need to pass the control
  154      * to the thread. However we need to pass arguments to the thread:
  155      * the delay and a reference to the blocked client handle. */
  156     void **targ = RedisModule_Alloc(sizeof(void*)*3);
  157     targ[0] = bc;
  158     targ[1] = (void*)(unsigned long) delay;
  159     // pass 0 as flag to enable time tracking
  160     targ[2] = (void*)(unsigned long) 0;
  161 
  162     if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
  163         RedisModule_AbortBlock(bc);
  164         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
  165     }
  166     return REDISMODULE_OK;
  167 }
  168 
  169 /* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
  170  * then reply with a random number.
  171  * This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()
  172  * and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
  173 int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  174     if (argc != 2) return RedisModule_WrongArity(ctx);
  175     long long delay;
  176 
  177     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
  178         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
  179     }
  180 
  181     pthread_t tid;
  182     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
  183 
  184     /* Now that we setup a blocking client, we need to pass the control
  185      * to the thread. However we need to pass arguments to the thread:
  186      * the delay and a reference to the blocked client handle. */
  187     void **targ = RedisModule_Alloc(sizeof(void*)*2);
  188     targ[0] = bc;
  189     targ[1] = (void*)(unsigned long) delay;
  190 
  191     if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
  192         RedisModule_AbortBlock(bc);
  193         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
  194     }
  195     return REDISMODULE_OK;
  196 }
  197 
  198 RedisModuleBlockedClient *blocked_client = NULL;
  199 
  200 /* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
  201  * or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
  202  * registered.
  203  */
  204 int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  205     if (RedisModule_IsBlockedReplyRequest(ctx)) {
  206         RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);
  207         return RedisModule_ReplyWithString(ctx, r);
  208     } else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
  209         RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */
  210         blocked_client = NULL;
  211         return RedisModule_ReplyWithSimpleString(ctx, "Timed out");
  212     }
  213 
  214     if (argc != 2) return RedisModule_WrongArity(ctx);
  215     long long timeout;
  216 
  217     if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {
  218         return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");
  219     }
  220     if (blocked_client) {
  221         return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");
  222     }
  223 
  224     /* Block client. We use this function as both a reply and optional timeout
  225      * callback and differentiate the different code flows above.
  226      */
  227     blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,
  228             timeout > 0 ? Block_RedisCommand : NULL, NULL, timeout);
  229     return REDISMODULE_OK;
  230 }
  231 
  232 /* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
  233  */
  234 int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  235     UNUSED(argv);
  236     UNUSED(argc);
  237     RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
  238     return REDISMODULE_OK;
  239 }
  240 
  241 /* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
  242  */
  243 int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  244     if (argc != 2) return RedisModule_WrongArity(ctx);
  245     if (!blocked_client) {
  246         return RedisModule_ReplyWithError(ctx, "ERR No blocked client");
  247     }
  248 
  249     RedisModuleString *replystr = argv[1];
  250     RedisModule_RetainString(ctx, replystr);
  251     int err = RedisModule_UnblockClient(blocked_client, replystr);
  252     blocked_client = NULL;
  253 
  254     RedisModule_ReplyWithSimpleString(ctx, "OK");
  255 
  256     return REDISMODULE_OK;
  257 }
  258 
  259 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
  260     UNUSED(argv);
  261     UNUSED(argc);
  262 
  263     if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)
  264         == REDISMODULE_ERR) return REDISMODULE_ERR;
  265 
  266     if (RedisModule_CreateCommand(ctx,"block.debug",
  267         HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  268         return REDISMODULE_ERR;
  269 
  270     if (RedisModule_CreateCommand(ctx,"block.double_debug",
  271         HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  272         return REDISMODULE_ERR;
  273 
  274     if (RedisModule_CreateCommand(ctx,"block.debug_no_track",
  275         HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  276         return REDISMODULE_ERR;
  277 
  278     if (RedisModule_CreateCommand(ctx, "block.block",
  279         Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)
  280         return REDISMODULE_ERR;
  281 
  282     if (RedisModule_CreateCommand(ctx,"block.is_blocked",
  283         IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  284         return REDISMODULE_ERR;
  285 
  286     if (RedisModule_CreateCommand(ctx,"block.release",
  287         Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
  288         return REDISMODULE_ERR;
  289 
  290     return REDISMODULE_OK;
  291 }