"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.0.8/src/module.c" (10 Sep 2020, 323294 Bytes) of package /linux/misc/redis-6.0.8.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "module.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.7_vs_6.0.8.

    1 /*
    2  * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions are met:
    7  *
    8  *   * Redistributions of source code must retain the above copyright notice,
    9  *     this list of conditions and the following disclaimer.
   10  *   * Redistributions in binary form must reproduce the above copyright
   11  *     notice, this list of conditions and the following disclaimer in the
   12  *     documentation and/or other materials provided with the distribution.
   13  *   * Neither the name of Redis nor the names of its contributors may be used
   14  *     to endorse or promote products derived from this software without
   15  *     specific prior written permission.
   16  *
   17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   27  * POSSIBILITY OF SUCH DAMAGE.
   28  */
   29 
   30 #include "server.h"
   31 #include "cluster.h"
   32 #include "rdb.h"
   33 #include <dlfcn.h>
   34 #include <sys/stat.h>
   35 #include <sys/wait.h>
   36 
   37 /* --------------------------------------------------------------------------
   38  * Private data structures used by the modules system. Those are data
   39  * structures that are never exposed to Redis Modules, if not as void
   40  * pointers that have an API the module can call with them)
   41  * -------------------------------------------------------------------------- */
   42 
   43 typedef struct RedisModuleInfoCtx {
   44     struct RedisModule *module;
   45     const char *requested_section;
   46     sds info;           /* info string we collected so far */
   47     int sections;       /* number of sections we collected so far */
   48     int in_section;     /* indication if we're in an active section or not */
   49     int in_dict_field;  /* indication that we're curreintly appending to a dict */
   50 } RedisModuleInfoCtx;
   51 
   52 typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
   53 
   54 /* This structure represents a module inside the system. */
   55 struct RedisModule {
   56     void *handle;   /* Module dlopen() handle. */
   57     char *name;     /* Module name. */
   58     int ver;        /* Module version. We use just progressive integers. */
   59     int apiver;     /* Module API version as requested during initialization.*/
   60     list *types;    /* Module data types. */
   61     list *usedby;   /* List of modules using APIs from this one. */
   62     list *using;    /* List of modules we use some APIs of. */
   63     list *filters;  /* List of filters the module has registered. */
   64     int in_call;    /* RM_Call() nesting level */
   65     int in_hook;    /* Hooks callback nesting level for this module (0 or 1). */
   66     int options;    /* Module options and capabilities. */
   67     int blocked_clients;         /* Count of RedisModuleBlockedClient in this module. */
   68     RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
   69 };
   70 typedef struct RedisModule RedisModule;
   71 
   72 /* This represents a shared API. Shared APIs will be used to populate
   73  * the server.sharedapi dictionary, mapping names of APIs exported by
   74  * modules for other modules to use, to their structure specifying the
   75  * function pointer that can be called. */
   76 struct RedisModuleSharedAPI {
   77     void *func;
   78     RedisModule *module;
   79 };
   80 typedef struct RedisModuleSharedAPI RedisModuleSharedAPI;
   81 
   82 static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/
   83 
   84 /* Entries in the context->amqueue array, representing objects to free
   85  * when the callback returns. */
   86 struct AutoMemEntry {
   87     void *ptr;
   88     int type;
   89 };
   90 
   91 /* AutMemEntry type field values. */
   92 #define REDISMODULE_AM_KEY 0
   93 #define REDISMODULE_AM_STRING 1
   94 #define REDISMODULE_AM_REPLY 2
   95 #define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */
   96 #define REDISMODULE_AM_DICT 4
   97 #define REDISMODULE_AM_INFO 5
   98 
   99 /* The pool allocator block. Redis Modules can allocate memory via this special
  100  * allocator that will automatically release it all once the callback returns.
  101  * This means that it can only be used for ephemeral allocations. However
  102  * there are two advantages for modules to use this API:
  103  *
  104  * 1) The memory is automatically released when the callback returns.
  105  * 2) This allocator is faster for many small allocations since whole blocks
  106  *    are allocated, and small pieces returned to the caller just advancing
  107  *    the index of the allocation.
  108  *
  109  * Allocations are always rounded to the size of the void pointer in order
  110  * to always return aligned memory chunks. */
  111 
  112 #define REDISMODULE_POOL_ALLOC_MIN_SIZE (1024*8)
  113 #define REDISMODULE_POOL_ALLOC_ALIGN (sizeof(void*))
  114 
  115 typedef struct RedisModulePoolAllocBlock {
  116     uint32_t size;
  117     uint32_t used;
  118     struct RedisModulePoolAllocBlock *next;
  119     char memory[];
  120 } RedisModulePoolAllocBlock;
  121 
  122 /* This structure represents the context in which Redis modules operate.
  123  * Most APIs module can access, get a pointer to the context, so that the API
  124  * implementation can hold state across calls, or remember what to free after
  125  * the call and so forth.
  126  *
  127  * Note that not all the context structure is always filled with actual values
  128  * but only the fields needed in a given context. */
  129 
  130 struct RedisModuleBlockedClient;
  131 
  132 struct RedisModuleCtx {
  133     void *getapifuncptr;            /* NOTE: Must be the first field. */
  134     struct RedisModule *module;     /* Module reference. */
  135     client *client;                 /* Client calling a command. */
  136     struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
  137                                                         thread safe context. */
  138     struct AutoMemEntry *amqueue;   /* Auto memory queue of objects to free. */
  139     int amqueue_len;                /* Number of slots in amqueue. */
  140     int amqueue_used;               /* Number of used slots in amqueue. */
  141     int flags;                      /* REDISMODULE_CTX_... flags. */
  142     void **postponed_arrays;        /* To set with RM_ReplySetArrayLength(). */
  143     int postponed_arrays_count;     /* Number of entries in postponed_arrays. */
  144     void *blocked_privdata;         /* Privdata set when unblocking a client. */
  145     RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
  146                                              gets called for clients blocked
  147                                              on keys. */
  148 
  149     /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
  150     int *keys_pos;
  151     int keys_count;
  152 
  153     struct RedisModulePoolAllocBlock *pa_head;
  154     redisOpArray saved_oparray;    /* When propagating commands in a callback
  155                                       we reallocate the "also propagate" op
  156                                       array. Here we save the old one to
  157                                       restore it later. */
  158 };
  159 typedef struct RedisModuleCtx RedisModuleCtx;
  160 
  161 #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
  162 #define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
  163 #define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
  164 #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
  165 #define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
  166 #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
  167 #define REDISMODULE_CTX_THREAD_SAFE (1<<5)
  168 #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
  169 #define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<7)
  170 
  171 /* This represents a Redis key opened with RM_OpenKey(). */
  172 struct RedisModuleKey {
  173     RedisModuleCtx *ctx;
  174     redisDb *db;
  175     robj *key;      /* Key name object. */
  176     robj *value;    /* Value object, or NULL if the key was not found. */
  177     void *iter;     /* Iterator. */
  178     int mode;       /* Opening mode. */
  179 
  180     /* Zset iterator. */
  181     uint32_t ztype;         /* REDISMODULE_ZSET_RANGE_* */
  182     zrangespec zrs;         /* Score range. */
  183     zlexrangespec zlrs;     /* Lex range. */
  184     uint32_t zstart;        /* Start pos for positional ranges. */
  185     uint32_t zend;          /* End pos for positional ranges. */
  186     void *zcurrent;         /* Zset iterator current node. */
  187     int zer;                /* Zset iterator end reached flag
  188                                (true if end was reached). */
  189 };
  190 typedef struct RedisModuleKey RedisModuleKey;
  191 
  192 /* RedisModuleKey 'ztype' values. */
  193 #define REDISMODULE_ZSET_RANGE_NONE 0       /* This must always be 0. */
  194 #define REDISMODULE_ZSET_RANGE_LEX 1
  195 #define REDISMODULE_ZSET_RANGE_SCORE 2
  196 #define REDISMODULE_ZSET_RANGE_POS 3
  197 
  198 /* Function pointer type of a function representing a command inside
  199  * a Redis module. */
  200 struct RedisModuleBlockedClient;
  201 typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
  202 typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
  203 
  204 /* This struct holds the information about a command registered by a module.*/
  205 struct RedisModuleCommandProxy {
  206     struct RedisModule *module;
  207     RedisModuleCmdFunc func;
  208     struct redisCommand *rediscmd;
  209 };
  210 typedef struct RedisModuleCommandProxy RedisModuleCommandProxy;
  211 
  212 #define REDISMODULE_REPLYFLAG_NONE 0
  213 #define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */
  214 #define REDISMODULE_REPLYFLAG_NESTED (1<<1)  /* Nested reply object. No proto
  215                                                 or struct free. */
  216 
  217 /* Reply of RM_Call() function. The function is filled in a lazy
  218  * way depending on the function called on the reply structure. By default
  219  * only the type, proto and protolen are filled. */
  220 typedef struct RedisModuleCallReply {
  221     RedisModuleCtx *ctx;
  222     int type;       /* REDISMODULE_REPLY_... */
  223     int flags;      /* REDISMODULE_REPLYFLAG_...  */
  224     size_t len;     /* Len of strings or num of elements of arrays. */
  225     char *proto;    /* Raw reply protocol. An SDS string at top-level object. */
  226     size_t protolen;/* Length of protocol. */
  227     union {
  228         const char *str; /* String pointer for string and error replies. This
  229                             does not need to be freed, always points inside
  230                             a reply->proto buffer of the reply object or, in
  231                             case of array elements, of parent reply objects. */
  232         long long ll;    /* Reply value for integer reply. */
  233         struct RedisModuleCallReply *array; /* Array of sub-reply elements. */
  234     } val;
  235 } RedisModuleCallReply;
  236 
  237 /* Structure representing a blocked client. We get a pointer to such
  238  * an object when blocking from modules. */
  239 typedef struct RedisModuleBlockedClient {
  240     client *client;  /* Pointer to the blocked client. or NULL if the client
  241                         was destroyed during the life of this object. */
  242     RedisModule *module;    /* Module blocking the client. */
  243     RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
  244     RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
  245     RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
  246     void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
  247     void *privdata;     /* Module private data that may be used by the reply
  248                            or timeout callback. It is set via the
  249                            RedisModule_UnblockClient() API. */
  250     client *reply_client;           /* Fake client used to accumulate replies
  251                                        in thread safe contexts. */
  252     int dbid;           /* Database number selected by the original client. */
  253     int blocked_on_keys;    /* If blocked via RM_BlockClientOnKeys(). */
  254     int unblocked;          /* Already on the moduleUnblocked list. */
  255 } RedisModuleBlockedClient;
  256 
  257 static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
  258 static list *moduleUnblockedClients;
  259 
  260 /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
  261  * allow thread safe contexts to execute commands at a safe moment. */
  262 static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
  263 
  264 
  265 /* Function pointer type for keyspace event notification subscriptions from modules. */
  266 typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
  267 
  268 /* Keyspace notification subscriber information.
  269  * See RM_SubscribeToKeyspaceEvents() for more information. */
  270 typedef struct RedisModuleKeyspaceSubscriber {
  271     /* The module subscribed to the event */
  272     RedisModule *module;
  273     /* Notification callback in the module*/
  274     RedisModuleNotificationFunc notify_callback;
  275     /* A bit mask of the events the module is interested in */
  276     int event_mask;
  277     /* Active flag set on entry, to avoid reentrant subscribers
  278      * calling themselves */
  279     int active;
  280 } RedisModuleKeyspaceSubscriber;
  281 
  282 /* The module keyspace notification subscribers list */
  283 static list *moduleKeyspaceSubscribers;
  284 
  285 /* Static client recycled for when we need to provide a context with a client
  286  * in a situation where there is no client to provide. This avoidsallocating
  287  * a new client per round. For instance this is used in the keyspace
  288  * notifications, timers and cluster messages callbacks. */
  289 static client *moduleFreeContextReusedClient;
  290 
  291 /* Data structures related to the exported dictionary data structure. */
  292 typedef struct RedisModuleDict {
  293     rax *rax;                       /* The radix tree. */
  294 } RedisModuleDict;
  295 
  296 typedef struct RedisModuleDictIter {
  297     RedisModuleDict *dict;
  298     raxIterator ri;
  299 } RedisModuleDictIter;
  300 
  301 typedef struct RedisModuleCommandFilterCtx {
  302     RedisModuleString **argv;
  303     int argc;
  304 } RedisModuleCommandFilterCtx;
  305 
  306 typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
  307 
  308 typedef struct RedisModuleCommandFilter {
  309     /* The module that registered the filter */
  310     RedisModule *module;
  311     /* Filter callback function */
  312     RedisModuleCommandFilterFunc callback;
  313     /* REDISMODULE_CMDFILTER_* flags */
  314     int flags;
  315 } RedisModuleCommandFilter;
  316 
  317 /* Registered filters */
  318 static list *moduleCommandFilters;
  319 
  320 typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
  321 
  322 static struct RedisModuleForkInfo {
  323     RedisModuleForkDoneHandler done_handler;
  324     void* done_handler_user_data;
  325 } moduleForkInfo = {0};
  326 
  327 typedef struct RedisModuleServerInfoData {
  328     rax *rax;                       /* parsed info data. */
  329 } RedisModuleServerInfoData;
  330 
  331 /* Flags for moduleCreateArgvFromUserFormat(). */
  332 #define REDISMODULE_ARGV_REPLICATE (1<<0)
  333 #define REDISMODULE_ARGV_NO_AOF (1<<1)
  334 #define REDISMODULE_ARGV_NO_REPLICAS (1<<2)
  335 
  336 /* Determine whether Redis should signalModifiedKey implicitly.
  337  * In case 'ctx' has no 'module' member (and therefore no module->options),
  338  * we assume default behavior, that is, Redis signals.
  339  * (see RM_GetThreadSafeContext) */
  340 #define SHOULD_SIGNAL_MODIFIED_KEYS(ctx) \
  341     ctx->module? !(ctx->module->options & REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED) : 1
  342 
  343 /* Server events hooks data structures and defines: this modules API
  344  * allow modules to subscribe to certain events in Redis, such as
  345  * the start and end of an RDB or AOF save, the change of role in replication,
  346  * and similar other events. */
  347 
  348 typedef struct RedisModuleEventListener {
  349     RedisModule *module;
  350     RedisModuleEvent event;
  351     RedisModuleEventCallback callback;
  352 } RedisModuleEventListener;
  353 
  354 list *RedisModule_EventListeners; /* Global list of all the active events. */
  355 unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
  356                                           callbacks right now. */
  357 
  358 /* Data structures related to the redis module users */
  359 
  360 /* This callback type is called by moduleNotifyUserChanged() every time
  361  * a user authenticated via the module API is associated with a different
  362  * user or gets disconnected. */
  363 typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
  364 
  365 /* This is the object returned by RM_CreateModuleUser(). The module API is
  366  * able to create users, set ACLs to such users, and later authenticate
  367  * clients using such newly created users. */
  368 typedef struct RedisModuleUser {
  369     user *user; /* Reference to the real redis user */
  370 } RedisModuleUser;
  371 
  372 
  373 /* --------------------------------------------------------------------------
  374  * Prototypes
  375  * -------------------------------------------------------------------------- */
  376 
  377 void RM_FreeCallReply(RedisModuleCallReply *reply);
  378 void RM_CloseKey(RedisModuleKey *key);
  379 void autoMemoryCollect(RedisModuleCtx *ctx);
  380 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
  381 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
  382 void RM_ZsetRangeStop(RedisModuleKey *kp);
  383 static void zsetKeyReset(RedisModuleKey *key);
  384 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d);
  385 void RM_FreeServerInfo(RedisModuleCtx *ctx, RedisModuleServerInfoData *data);
  386 
  387 /* --------------------------------------------------------------------------
  388  * Heap allocation raw functions
  389  * -------------------------------------------------------------------------- */
  390 
  391 /* Use like malloc(). Memory allocated with this function is reported in
  392  * Redis INFO memory, used for keys eviction according to maxmemory settings
  393  * and in general is taken into account as memory allocated by Redis.
  394  * You should avoid using malloc(). */
  395 void *RM_Alloc(size_t bytes) {
  396     return zmalloc(bytes);
  397 }
  398 
  399 /* Use like calloc(). Memory allocated with this function is reported in
  400  * Redis INFO memory, used for keys eviction according to maxmemory settings
  401  * and in general is taken into account as memory allocated by Redis.
  402  * You should avoid using calloc() directly. */
  403 void *RM_Calloc(size_t nmemb, size_t size) {
  404     return zcalloc(nmemb*size);
  405 }
  406 
  407 /* Use like realloc() for memory obtained with RedisModule_Alloc(). */
  408 void* RM_Realloc(void *ptr, size_t bytes) {
  409     return zrealloc(ptr,bytes);
  410 }
  411 
  412 /* Use like free() for memory obtained by RedisModule_Alloc() and
  413  * RedisModule_Realloc(). However you should never try to free with
  414  * RedisModule_Free() memory allocated with malloc() inside your module. */
  415 void RM_Free(void *ptr) {
  416     zfree(ptr);
  417 }
  418 
  419 /* Like strdup() but returns memory allocated with RedisModule_Alloc(). */
  420 char *RM_Strdup(const char *str) {
  421     return zstrdup(str);
  422 }
  423 
  424 /* --------------------------------------------------------------------------
  425  * Pool allocator
  426  * -------------------------------------------------------------------------- */
  427 
  428 /* Release the chain of blocks used for pool allocations. */
  429 void poolAllocRelease(RedisModuleCtx *ctx) {
  430     RedisModulePoolAllocBlock *head = ctx->pa_head, *next;
  431 
  432     while(head != NULL) {
  433         next = head->next;
  434         zfree(head);
  435         head = next;
  436     }
  437     ctx->pa_head = NULL;
  438 }
  439 
  440 /* Return heap allocated memory that will be freed automatically when the
  441  * module callback function returns. Mostly suitable for small allocations
  442  * that are short living and must be released when the callback returns
  443  * anyway. The returned memory is aligned to the architecture word size
  444  * if at least word size bytes are requested, otherwise it is just
  445  * aligned to the next power of two, so for example a 3 bytes request is
  446  * 4 bytes aligned while a 2 bytes request is 2 bytes aligned.
  447  *
  448  * There is no realloc style function since when this is needed to use the
  449  * pool allocator is not a good idea.
  450  *
  451  * The function returns NULL if `bytes` is 0. */
  452 void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
  453     if (bytes == 0) return NULL;
  454     RedisModulePoolAllocBlock *b = ctx->pa_head;
  455     size_t left = b ? b->size - b->used : 0;
  456 
  457     /* Fix alignment. */
  458     if (left >= bytes) {
  459         size_t alignment = REDISMODULE_POOL_ALLOC_ALIGN;
  460         while (bytes < alignment && alignment/2 >= bytes) alignment /= 2;
  461         if (b->used % alignment)
  462             b->used += alignment - (b->used % alignment);
  463         left = (b->used > b->size) ? 0 : b->size - b->used;
  464     }
  465 
  466     /* Create a new block if needed. */
  467     if (left < bytes) {
  468         size_t blocksize = REDISMODULE_POOL_ALLOC_MIN_SIZE;
  469         if (blocksize < bytes) blocksize = bytes;
  470         b = zmalloc(sizeof(*b) + blocksize);
  471         b->size = blocksize;
  472         b->used = 0;
  473         b->next = ctx->pa_head;
  474         ctx->pa_head = b;
  475     }
  476 
  477     char *retval = b->memory + b->used;
  478     b->used += bytes;
  479     return retval;
  480 }
  481 
  482 /* --------------------------------------------------------------------------
  483  * Helpers for modules API implementation
  484  * -------------------------------------------------------------------------- */
  485 
  486 /* Create an empty key of the specified type. 'kp' must point to a key object
  487  * opened for writing where the .value member is set to NULL because the
  488  * key was found to be non existing.
  489  *
  490  * On success REDISMODULE_OK is returned and the key is populated with
  491  * the value of the specified type. The function fails and returns
  492  * REDISMODULE_ERR if:
  493  *
  494  * 1) The key is not open for writing.
  495  * 2) The key is not empty.
  496  * 3) The specified type is unknown.
  497  */
  498 int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
  499     robj *obj;
  500 
  501     /* The key must be open for writing and non existing to proceed. */
  502     if (!(key->mode & REDISMODULE_WRITE) || key->value)
  503         return REDISMODULE_ERR;
  504 
  505     switch(type) {
  506     case REDISMODULE_KEYTYPE_LIST:
  507         obj = createQuicklistObject();
  508         quicklistSetOptions(obj->ptr, server.list_max_ziplist_size,
  509                             server.list_compress_depth);
  510         break;
  511     case REDISMODULE_KEYTYPE_ZSET:
  512         obj = createZsetZiplistObject();
  513         break;
  514     case REDISMODULE_KEYTYPE_HASH:
  515         obj = createHashObject();
  516         break;
  517     default: return REDISMODULE_ERR;
  518     }
  519     dbAdd(key->db,key->key,obj);
  520     key->value = obj;
  521     return REDISMODULE_OK;
  522 }
  523 
  524 /* This function is called in low-level API implementation functions in order
  525  * to check if the value associated with the key remained empty after an
  526  * operation that removed elements from an aggregate data type.
  527  *
  528  * If this happens, the key is deleted from the DB and the key object state
  529  * is set to the right one in order to be targeted again by write operations
  530  * possibly recreating the key if needed.
  531  *
  532  * The function returns 1 if the key value object is found empty and is
  533  * deleted, otherwise 0 is returned. */
  534 int moduleDelKeyIfEmpty(RedisModuleKey *key) {
  535     if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0;
  536     int isempty;
  537     robj *o = key->value;
  538 
  539     switch(o->type) {
  540     case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
  541     case OBJ_SET: isempty = setTypeSize(o) == 0; break;
  542     case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
  543     case OBJ_HASH: isempty = hashTypeLength(o) == 0; break;
  544     case OBJ_STREAM: isempty = streamLength(o) == 0; break;
  545     default: isempty = 0;
  546     }
  547 
  548     if (isempty) {
  549         dbDelete(key->db,key->key);
  550         key->value = NULL;
  551         return 1;
  552     } else {
  553         return 0;
  554     }
  555 }
  556 
  557 /* --------------------------------------------------------------------------
  558  * Service API exported to modules
  559  *
  560  * Note that all the exported APIs are called RM_<funcname> in the core
  561  * and RedisModule_<funcname> in the module side (defined as function
  562  * pointers in redismodule.h). In this way the dynamic linker does not
  563  * mess with our global function pointers, overriding it with the symbols
  564  * defined in the main executable having the same names.
  565  * -------------------------------------------------------------------------- */
  566 
  567 /* Lookup the requested module API and store the function pointer into the
  568  * target pointer. The function returns REDISMODULE_ERR if there is no such
  569  * named API, otherwise REDISMODULE_OK.
  570  *
  571  * This function is not meant to be used by modules developer, it is only
  572  * used implicitly by including redismodule.h. */
  573 int RM_GetApi(const char *funcname, void **targetPtrPtr) {
  574     dictEntry *he = dictFind(server.moduleapi, funcname);
  575     if (!he) return REDISMODULE_ERR;
  576     *targetPtrPtr = dictGetVal(he);
  577     return REDISMODULE_OK;
  578 }
  579 
  580 /* Helper function for when a command callback is called, in order to handle
  581  * details needed to correctly replicate commands. */
  582 void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
  583     client *c = ctx->client;
  584 
  585     /* We don't need to do anything here if the context was never used
  586      * in order to propagate commands. */
  587     if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
  588 
  589     if (c->flags & CLIENT_LUA) return;
  590 
  591     /* Handle the replication of the final EXEC, since whatever a command
  592      * emits is always wrapped around MULTI/EXEC. */
  593     alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
  594         PROPAGATE_AOF|PROPAGATE_REPL);
  595 
  596     /* If this is not a module command context (but is instead a simple
  597      * callback context), we have to handle directly the "also propagate"
  598      * array and emit it. In a module command call this will be handled
  599      * directly by call(). */
  600     if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
  601         server.also_propagate.numops)
  602     {
  603         for (int j = 0; j < server.also_propagate.numops; j++) {
  604             redisOp *rop = &server.also_propagate.ops[j];
  605             int target = rop->target;
  606             if (target)
  607                 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
  608         }
  609         redisOpArrayFree(&server.also_propagate);
  610         /* Restore the previous oparray in case of nexted use of the API. */
  611         server.also_propagate = ctx->saved_oparray;
  612         /* We're done with saved_oparray, let's invalidate it. */
  613         redisOpArrayInit(&ctx->saved_oparray);
  614     }
  615 }
  616 
  617 /* Free the context after the user function was called. */
  618 void moduleFreeContext(RedisModuleCtx *ctx) {
  619     moduleHandlePropagationAfterCommandCallback(ctx);
  620     autoMemoryCollect(ctx);
  621     poolAllocRelease(ctx);
  622     if (ctx->postponed_arrays) {
  623         zfree(ctx->postponed_arrays);
  624         ctx->postponed_arrays_count = 0;
  625         serverLog(LL_WARNING,
  626             "API misuse detected in module %s: "
  627             "RedisModule_ReplyWithArray(REDISMODULE_POSTPONED_ARRAY_LEN) "
  628             "not matched by the same number of RedisModule_SetReplyArrayLen() "
  629             "calls.",
  630             ctx->module->name);
  631     }
  632     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
  633 }
  634 
  635 /* This Redis command binds the normal Redis command invocation with commands
  636  * exported by modules. */
  637 void RedisModuleCommandDispatcher(client *c) {
  638     RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
  639     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
  640 
  641     ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL;
  642     ctx.module = cp->module;
  643     ctx.client = c;
  644     cp->func(&ctx,(void**)c->argv,c->argc);
  645     moduleFreeContext(&ctx);
  646 
  647     /* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
  648      * expand the query buffer, and in order to avoid a big object copy
  649      * the query buffer SDS may be used directly as the SDS string backing
  650      * the client argument vectors: sometimes this will result in the SDS
  651      * string having unused space at the end. Later if a module takes ownership
  652      * of the RedisString, such space will be wasted forever. Inside the
  653      * Redis core this is not a problem because tryObjectEncoding() is called
  654      * before storing strings in the key space. Here we need to do it
  655      * for the module. */
  656     for (int i = 0; i < c->argc; i++) {
  657         /* Only do the work if the module took ownership of the object:
  658          * in that case the refcount is no longer 1. */
  659         if (c->argv[i]->refcount > 1)
  660             trimStringObjectIfNeeded(c->argv[i]);
  661     }
  662 }
  663 
  664 /* This function returns the list of keys, with the same interface as the
  665  * 'getkeys' function of the native commands, for module commands that exported
  666  * the "getkeys-api" flag during the registration. This is done when the
  667  * list of keys are not at fixed positions, so that first/last/step cannot
  668  * be used.
  669  *
  670  * In order to accomplish its work, the module command is called, flagging
  671  * the context in a way that the command can recognize this is a special
  672  * "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */
  673 int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
  674     RedisModuleCommandProxy *cp = (void*)(unsigned long)cmd->getkeys_proc;
  675     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
  676 
  677     ctx.module = cp->module;
  678     ctx.client = NULL;
  679     ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST;
  680     cp->func(&ctx,(void**)argv,argc);
  681     int *res = ctx.keys_pos;
  682     if (numkeys) *numkeys = ctx.keys_count;
  683     moduleFreeContext(&ctx);
  684     return res;
  685 }
  686 
  687 /* Return non-zero if a module command, that was declared with the
  688  * flag "getkeys-api", is called in a special way to get the keys positions
  689  * and not to get executed. Otherwise zero is returned. */
  690 int RM_IsKeysPositionRequest(RedisModuleCtx *ctx) {
  691     return (ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST) != 0;
  692 }
  693 
  694 /* When a module command is called in order to obtain the position of
  695  * keys, since it was flagged as "getkeys-api" during the registration,
  696  * the command implementation checks for this special call using the
  697  * RedisModule_IsKeysPositionRequest() API and uses this function in
  698  * order to report keys, like in the following example:
  699  *
  700  *     if (RedisModule_IsKeysPositionRequest(ctx)) {
  701  *         RedisModule_KeyAtPos(ctx,1);
  702  *         RedisModule_KeyAtPos(ctx,2);
  703  *     }
  704  *
  705  *  Note: in the example below the get keys API would not be needed since
  706  *  keys are at fixed positions. This interface is only used for commands
  707  *  with a more complex structure. */
  708 void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
  709     if (!(ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST)) return;
  710     if (pos <= 0) return;
  711     ctx->keys_pos = zrealloc(ctx->keys_pos,sizeof(int)*(ctx->keys_count+1));
  712     ctx->keys_pos[ctx->keys_count++] = pos;
  713 }
  714 
  715 /* Helper for RM_CreateCommand(). Turns a string representing command
  716  * flags into the command flags used by the Redis core.
  717  *
  718  * It returns the set of flags, or -1 if unknown flags are found. */
  719 int64_t commandFlagsFromString(char *s) {
  720     int count, j;
  721     int64_t flags = 0;
  722     sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
  723     for (j = 0; j < count; j++) {
  724         char *t = tokens[j];
  725         if (!strcasecmp(t,"write")) flags |= CMD_WRITE;
  726         else if (!strcasecmp(t,"readonly")) flags |= CMD_READONLY;
  727         else if (!strcasecmp(t,"admin")) flags |= CMD_ADMIN;
  728         else if (!strcasecmp(t,"deny-oom")) flags |= CMD_DENYOOM;
  729         else if (!strcasecmp(t,"deny-script")) flags |= CMD_NOSCRIPT;
  730         else if (!strcasecmp(t,"allow-loading")) flags |= CMD_LOADING;
  731         else if (!strcasecmp(t,"pubsub")) flags |= CMD_PUBSUB;
  732         else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
  733         else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
  734         else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
  735         else if (!strcasecmp(t,"no-slowlog")) flags |= CMD_SKIP_SLOWLOG;
  736         else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
  737         else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH;
  738         else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
  739         else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
  740         else break;
  741     }
  742     sdsfreesplitres(tokens,count);
  743     if (j != count) return -1; /* Some token not processed correctly. */
  744     return flags;
  745 }
  746 
  747 /* Register a new command in the Redis server, that will be handled by
  748  * calling the function pointer 'func' using the RedisModule calling
  749  * convention. The function returns REDISMODULE_ERR if the specified command
  750  * name is already busy or a set of invalid flags were passed, otherwise
  751  * REDISMODULE_OK is returned and the new command is registered.
  752  *
  753  * This function must be called during the initialization of the module
  754  * inside the RedisModule_OnLoad() function. Calling this function outside
  755  * of the initialization function is not defined.
  756  *
  757  * The command function type is the following:
  758  *
  759  *      int MyCommand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
  760  *
  761  * And is supposed to always return REDISMODULE_OK.
  762  *
  763  * The set of flags 'strflags' specify the behavior of the command, and should
  764  * be passed as a C string composed of space separated words, like for
  765  * example "write deny-oom". The set of flags are:
  766  *
  767  * * **"write"**:     The command may modify the data set (it may also read
  768  *                    from it).
  769  * * **"readonly"**:  The command returns data from keys but never writes.
  770  * * **"admin"**:     The command is an administrative command (may change
  771  *                    replication or perform similar tasks).
  772  * * **"deny-oom"**:  The command may use additional memory and should be
  773  *                    denied during out of memory conditions.
  774  * * **"deny-script"**:   Don't allow this command in Lua scripts.
  775  * * **"allow-loading"**: Allow this command while the server is loading data.
  776  *                        Only commands not interacting with the data set
  777  *                        should be allowed to run in this mode. If not sure
  778  *                        don't use this flag.
  779  * * **"pubsub"**:    The command publishes things on Pub/Sub channels.
  780  * * **"random"**:    The command may have different outputs even starting
  781  *                    from the same input arguments and key values.
  782  * * **"allow-stale"**: The command is allowed to run on slaves that don't
  783  *                      serve stale data. Don't use if you don't know what
  784  *                      this means.
  785  * * **"no-monitor"**: Don't propagate the command on monitor. Use this if
  786  *                     the command has sensible data among the arguments.
  787  * * **"no-slowlog"**: Don't log this command in the slowlog. Use this if
  788  *                     the command has sensible data among the arguments.
  789  * * **"fast"**:      The command time complexity is not greater
  790  *                    than O(log(N)) where N is the size of the collection or
  791  *                    anything else representing the normal scalability
  792  *                    issue with the command.
  793  * * **"getkeys-api"**: The command implements the interface to return
  794  *                      the arguments that are keys. Used when start/stop/step
  795  *                      is not enough because of the command syntax.
  796  * * **"no-cluster"**: The command should not register in Redis Cluster
  797  *                     since is not designed to work with it because, for
  798  *                     example, is unable to report the position of the
  799  *                     keys, programmatically creates key names, or any
  800  *                     other reason.
  801  * * **"no-auth"**:    This command can be run by an un-authenticated client.
  802  *                     Normally this is used by a command that is used
  803  *                     to authenticate a client. 
  804  */
  805 int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
  806     int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
  807     if (flags == -1) return REDISMODULE_ERR;
  808     if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
  809         return REDISMODULE_ERR;
  810 
  811     struct redisCommand *rediscmd;
  812     RedisModuleCommandProxy *cp;
  813     sds cmdname = sdsnew(name);
  814 
  815     /* Check if the command name is busy. */
  816     if (lookupCommand(cmdname) != NULL) {
  817         sdsfree(cmdname);
  818         return REDISMODULE_ERR;
  819     }
  820 
  821     /* Create a command "proxy", which is a structure that is referenced
  822      * in the command table, so that the generic command that works as
  823      * binding between modules and Redis, can know what function to call
  824      * and what the module is.
  825      *
  826      * Note that we use the Redis command table 'getkeys_proc' in order to
  827      * pass a reference to the command proxy structure. */
  828     cp = zmalloc(sizeof(*cp));
  829     cp->module = ctx->module;
  830     cp->func = cmdfunc;
  831     cp->rediscmd = zmalloc(sizeof(*rediscmd));
  832     cp->rediscmd->name = cmdname;
  833     cp->rediscmd->proc = RedisModuleCommandDispatcher;
  834     cp->rediscmd->arity = -1;
  835     cp->rediscmd->flags = flags | CMD_MODULE;
  836     cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp;
  837     cp->rediscmd->firstkey = firstkey;
  838     cp->rediscmd->lastkey = lastkey;
  839     cp->rediscmd->keystep = keystep;
  840     cp->rediscmd->microseconds = 0;
  841     cp->rediscmd->calls = 0;
  842     dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd);
  843     dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd);
  844     cp->rediscmd->id = ACLGetCommandID(cmdname); /* ID used for ACL. */
  845     return REDISMODULE_OK;
  846 }
  847 
  848 /* Called by RM_Init() to setup the `ctx->module` structure.
  849  *
  850  * This is an internal function, Redis modules developers don't need
  851  * to use it. */
  852 void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
  853     RedisModule *module;
  854 
  855     if (ctx->module != NULL) return;
  856     module = zmalloc(sizeof(*module));
  857     module->name = sdsnew((char*)name);
  858     module->ver = ver;
  859     module->apiver = apiver;
  860     module->types = listCreate();
  861     module->usedby = listCreate();
  862     module->using = listCreate();
  863     module->filters = listCreate();
  864     module->in_call = 0;
  865     module->in_hook = 0;
  866     module->options = 0;
  867     module->info_cb = 0;
  868     ctx->module = module;
  869 }
  870 
  871 /* Return non-zero if the module name is busy.
  872  * Otherwise zero is returned. */
  873 int RM_IsModuleNameBusy(const char *name) {
  874     sds modulename = sdsnew(name);
  875     dictEntry *de = dictFind(modules,modulename);
  876     sdsfree(modulename);
  877     return de != NULL;
  878 }
  879 
  880 /* Return the current UNIX time in milliseconds. */
  881 long long RM_Milliseconds(void) {
  882     return mstime();
  883 }
  884 
  885 /* Set flags defining capabilities or behavior bit flags.
  886  *
  887  * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
  888  * Generally, modules don't need to bother with this, as the process will just
  889  * terminate if a read error happens, however, setting this flag would allow
  890  * repl-diskless-load to work if enabled.
  891  * The module should use RedisModule_IsIOError after reads, before using the
  892  * data that was read, and in case of error, propagate it upwards, and also be
  893  * able to release the partially populated value and all it's allocations. */
  894 void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
  895     ctx->module->options = options;
  896 }
  897 
  898 /* Signals that the key is modified from user's perspective (i.e. invalidate WATCH
  899  * and client side caching). */
  900 int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
  901     signalModifiedKey(ctx->client,ctx->client->db,keyname);
  902     return REDISMODULE_OK;
  903 }
  904 
  905 /* --------------------------------------------------------------------------
  906  * Automatic memory management for modules
  907  * -------------------------------------------------------------------------- */
  908 
  909 /* Enable automatic memory management. See API.md for more information.
  910  *
  911  * The function must be called as the first function of a command implementation
  912  * that wants to use automatic memory. */
  913 void RM_AutoMemory(RedisModuleCtx *ctx) {
  914     ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
  915 }
  916 
  917 /* Add a new object to release automatically when the callback returns. */
  918 void autoMemoryAdd(RedisModuleCtx *ctx, int type, void *ptr) {
  919     if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
  920     if (ctx->amqueue_used == ctx->amqueue_len) {
  921         ctx->amqueue_len *= 2;
  922         if (ctx->amqueue_len < 16) ctx->amqueue_len = 16;
  923         ctx->amqueue = zrealloc(ctx->amqueue,sizeof(struct AutoMemEntry)*ctx->amqueue_len);
  924     }
  925     ctx->amqueue[ctx->amqueue_used].type = type;
  926     ctx->amqueue[ctx->amqueue_used].ptr = ptr;
  927     ctx->amqueue_used++;
  928 }
  929 
  930 /* Mark an object as freed in the auto release queue, so that users can still
  931  * free things manually if they want.
  932  *
  933  * The function returns 1 if the object was actually found in the auto memory
  934  * pool, otherwise 0 is returned. */
  935 int autoMemoryFreed(RedisModuleCtx *ctx, int type, void *ptr) {
  936     if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return 0;
  937 
  938     int count = (ctx->amqueue_used+1)/2;
  939     for (int j = 0; j < count; j++) {
  940         for (int side = 0; side < 2; side++) {
  941             /* For side = 0 check right side of the array, for
  942              * side = 1 check the left side instead (zig-zag scanning). */
  943             int i = (side == 0) ? (ctx->amqueue_used - 1 - j) : j;
  944             if (ctx->amqueue[i].type == type &&
  945                 ctx->amqueue[i].ptr == ptr)
  946             {
  947                 ctx->amqueue[i].type = REDISMODULE_AM_FREED;
  948 
  949                 /* Switch the freed element and the last element, to avoid growing
  950                  * the queue unnecessarily if we allocate/free in a loop */
  951                 if (i != ctx->amqueue_used-1) {
  952                     ctx->amqueue[i] = ctx->amqueue[ctx->amqueue_used-1];
  953                 }
  954 
  955                 /* Reduce the size of the queue because we either moved the top
  956                  * element elsewhere or freed it */
  957                 ctx->amqueue_used--;
  958                 return 1;
  959             }
  960         }
  961     }
  962     return 0;
  963 }
  964 
  965 /* Release all the objects in queue. */
  966 void autoMemoryCollect(RedisModuleCtx *ctx) {
  967     if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
  968     /* Clear the AUTO_MEMORY flag from the context, otherwise the functions
  969      * we call to free the resources, will try to scan the auto release
  970      * queue to mark the entries as freed. */
  971     ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY;
  972     int j;
  973     for (j = 0; j < ctx->amqueue_used; j++) {
  974         void *ptr = ctx->amqueue[j].ptr;
  975         switch(ctx->amqueue[j].type) {
  976         case REDISMODULE_AM_STRING: decrRefCount(ptr); break;
  977         case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break;
  978         case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break;
  979         case REDISMODULE_AM_DICT: RM_FreeDict(NULL,ptr); break;
  980         case REDISMODULE_AM_INFO: RM_FreeServerInfo(NULL,ptr); break;
  981         }
  982     }
  983     ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
  984     zfree(ctx->amqueue);
  985     ctx->amqueue = NULL;
  986     ctx->amqueue_len = 0;
  987     ctx->amqueue_used = 0;
  988 }
  989 
  990 /* --------------------------------------------------------------------------
  991  * String objects APIs
  992  * -------------------------------------------------------------------------- */
  993 
  994 /* Create a new module string object. The returned string must be freed
  995  * with RedisModule_FreeString(), unless automatic memory is enabled.
  996  *
  997  * The string is created by copying the `len` bytes starting
  998  * at `ptr`. No reference is retained to the passed buffer.
  999  *
 1000  * The module context 'ctx' is optional and may be NULL if you want to create
 1001  * a string out of the context scope. However in that case, the automatic
 1002  * memory management will not be available, and the string memory must be
 1003  * managed manually. */
 1004 RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len) {
 1005     RedisModuleString *o = createStringObject(ptr,len);
 1006     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
 1007     return o;
 1008 }
 1009 
 1010 /* Create a new module string object from a printf format and arguments.
 1011  * The returned string must be freed with RedisModule_FreeString(), unless
 1012  * automatic memory is enabled.
 1013  *
 1014  * The string is created using the sds formatter function sdscatvprintf().
 1015  *
 1016  * The passed context 'ctx' may be NULL if necessary, see the
 1017  * RedisModule_CreateString() documentation for more info. */
 1018 RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...) {
 1019     sds s = sdsempty();
 1020 
 1021     va_list ap;
 1022     va_start(ap, fmt);
 1023     s = sdscatvprintf(s, fmt, ap);
 1024     va_end(ap);
 1025 
 1026     RedisModuleString *o = createObject(OBJ_STRING, s);
 1027     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
 1028 
 1029     return o;
 1030 }
 1031 
 1032 
 1033 /* Like RedisModule_CreatString(), but creates a string starting from a long long
 1034  * integer instead of taking a buffer and its length.
 1035  *
 1036  * The returned string must be released with RedisModule_FreeString() or by
 1037  * enabling automatic memory management.
 1038  *
 1039  * The passed context 'ctx' may be NULL if necessary, see the
 1040  * RedisModule_CreateString() documentation for more info. */
 1041 RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll) {
 1042     char buf[LONG_STR_SIZE];
 1043     size_t len = ll2string(buf,sizeof(buf),ll);
 1044     return RM_CreateString(ctx,buf,len);
 1045 }
 1046 
 1047 /* Like RedisModule_CreatString(), but creates a string starting from a double
 1048  * integer instead of taking a buffer and its length.
 1049  *
 1050  * The returned string must be released with RedisModule_FreeString() or by
 1051  * enabling automatic memory management. */
 1052 RedisModuleString *RM_CreateStringFromDouble(RedisModuleCtx *ctx, double d) {
 1053     char buf[128];
 1054     size_t len = d2string(buf,sizeof(buf),d);
 1055     return RM_CreateString(ctx,buf,len);
 1056 }
 1057 
 1058 /* Like RedisModule_CreatString(), but creates a string starting from a long
 1059  * double.
 1060  *
 1061  * The returned string must be released with RedisModule_FreeString() or by
 1062  * enabling automatic memory management.
 1063  *
 1064  * The passed context 'ctx' may be NULL if necessary, see the
 1065  * RedisModule_CreateString() documentation for more info. */
 1066 RedisModuleString *RM_CreateStringFromLongDouble(RedisModuleCtx *ctx, long double ld, int humanfriendly) {
 1067     char buf[MAX_LONG_DOUBLE_CHARS];
 1068     size_t len = ld2string(buf,sizeof(buf),ld,
 1069         (humanfriendly ? LD_STR_HUMAN : LD_STR_AUTO));
 1070     return RM_CreateString(ctx,buf,len);
 1071 }
 1072 
 1073 /* Like RedisModule_CreatString(), but creates a string starting from another
 1074  * RedisModuleString.
 1075  *
 1076  * The returned string must be released with RedisModule_FreeString() or by
 1077  * enabling automatic memory management.
 1078  *
 1079  * The passed context 'ctx' may be NULL if necessary, see the
 1080  * RedisModule_CreateString() documentation for more info. */
 1081 RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisModuleString *str) {
 1082     RedisModuleString *o = dupStringObject(str);
 1083     if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
 1084     return o;
 1085 }
 1086 
 1087 /* Free a module string object obtained with one of the Redis modules API calls
 1088  * that return new string objects.
 1089  *
 1090  * It is possible to call this function even when automatic memory management
 1091  * is enabled. In that case the string will be released ASAP and removed
 1092  * from the pool of string to release at the end.
 1093  *
 1094  * If the string was created with a NULL context 'ctx', it is also possible to
 1095  * pass ctx as NULL when releasing the string (but passing a context will not
 1096  * create any issue). Strings created with a context should be freed also passing
 1097  * the context, so if you want to free a string out of context later, make sure
 1098  * to create it using a NULL context. */
 1099 void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
 1100     decrRefCount(str);
 1101     if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
 1102 }
 1103 
 1104 /* Every call to this function, will make the string 'str' requiring
 1105  * an additional call to RedisModule_FreeString() in order to really
 1106  * free the string. Note that the automatic freeing of the string obtained
 1107  * enabling modules automatic memory management counts for one
 1108  * RedisModule_FreeString() call (it is just executed automatically).
 1109  *
 1110  * Normally you want to call this function when, at the same time
 1111  * the following conditions are true:
 1112  *
 1113  * 1) You have automatic memory management enabled.
 1114  * 2) You want to create string objects.
 1115  * 3) Those string objects you create need to live *after* the callback
 1116  *    function(for example a command implementation) creating them returns.
 1117  *
 1118  * Usually you want this in order to store the created string object
 1119  * into your own data structure, for example when implementing a new data
 1120  * type.
 1121  *
 1122  * Note that when memory management is turned off, you don't need
 1123  * any call to RetainString() since creating a string will always result
 1124  * into a string that lives after the callback function returns, if
 1125  * no FreeString() call is performed.
 1126  *
 1127  * It is possible to call this function with a NULL context. */
 1128 void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
 1129     if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
 1130         /* Increment the string reference counting only if we can't
 1131          * just remove the object from the list of objects that should
 1132          * be reclaimed. Why we do that, instead of just incrementing
 1133          * the refcount in any case, and let the automatic FreeString()
 1134          * call at the end to bring the refcount back at the desired
 1135          * value? Because this way we ensure that the object refcount
 1136          * value is 1 (instead of going to 2 to be dropped later to 1)
 1137          * after the call to this function. This is needed for functions
 1138          * like RedisModule_StringAppendBuffer() to work. */
 1139         incrRefCount(str);
 1140     }
 1141 }
 1142 
 1143 /**
 1144 * This function can be used instead of RedisModule_RetainString().
 1145 * The main difference between the two is that this function will always
 1146 * succeed, whereas RedisModule_RetainString() may fail because of an
 1147 * assertion.
 1148 * 
 1149 * The function returns a pointer to RedisModuleString, which is owned
 1150 * by the caller. It requires a call to RedisModule_FreeString() to free
 1151 * the string when automatic memory management is disabled for the context.
 1152 * When automatic memory management is enabled, you can either call
 1153 * RedisModule_FreeString() or let the automation free it.
 1154 * 
 1155 * This function is more efficient than RedisModule_CreateStringFromString()
 1156 * because whenever possible, it avoids copying the underlying
 1157 * RedisModuleString. The disadvantage of using this function is that it
 1158 * might not be possible to use RedisModule_StringAppendBuffer() on the
 1159 * returned RedisModuleString.
 1160 * 
 1161 * It is possible to call this function with a NULL context.
 1162  */
 1163 RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) {
 1164     if (str->refcount == OBJ_STATIC_REFCOUNT) {
 1165         return RM_CreateStringFromString(ctx, str);
 1166     }
 1167 
 1168     incrRefCount(str);
 1169     if (ctx != NULL) {
 1170         /*
 1171          * Put the str in the auto memory management of the ctx.
 1172          * It might already be there, in this case, the ref count will
 1173          * be 2 and we will decrease the ref count twice and free the
 1174          * object in the auto memory free function.
 1175          *
 1176          * Why we can not do the same trick of just remove the object
 1177          * from the auto memory (like in RM_RetainString)?
 1178          * This code shows the issue:
 1179          *
 1180          * RM_AutoMemory(ctx);
 1181          * str1 = RM_CreateString(ctx, "test", 4);
 1182          * str2 = RM_HoldString(ctx, str1);
 1183          * RM_FreeString(str1);
 1184          * RM_FreeString(str2);
 1185          *
 1186          * If after the RM_HoldString we would just remove the string from
 1187          * the auto memory, this example will cause access to a freed memory
 1188          * on 'RM_FreeString(str2);' because the String will be free
 1189          * on 'RM_FreeString(str1);'.
 1190          *
 1191          * So it's safer to just increase the ref count
 1192          * and add the String to auto memory again.
 1193          *
 1194          * The limitation is that it is not possible to use RedisModule_StringAppendBuffer
 1195          * on the String.
 1196          */
 1197         autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str);
 1198     }
 1199     return str;
 1200 }
 1201 
 1202 /* Given a string module object, this function returns the string pointer
 1203  * and length of the string. The returned pointer and length should only
 1204  * be used for read only accesses and never modified. */
 1205 const char *RM_StringPtrLen(const RedisModuleString *str, size_t *len) {
 1206     if (str == NULL) {
 1207         const char *errmsg = "(NULL string reply referenced in module)";
 1208         if (len) *len = strlen(errmsg);
 1209         return errmsg;
 1210     }
 1211     if (len) *len = sdslen(str->ptr);
 1212     return str->ptr;
 1213 }
 1214 
 1215 /* --------------------------------------------------------------------------
 1216  * Higher level string operations
 1217  * ------------------------------------------------------------------------- */
 1218 
 1219 /* Convert the string into a long long integer, storing it at `*ll`.
 1220  * Returns REDISMODULE_OK on success. If the string can't be parsed
 1221  * as a valid, strict long long (no spaces before/after), REDISMODULE_ERR
 1222  * is returned. */
 1223 int RM_StringToLongLong(const RedisModuleString *str, long long *ll) {
 1224     return string2ll(str->ptr,sdslen(str->ptr),ll) ? REDISMODULE_OK :
 1225                                                      REDISMODULE_ERR;
 1226 }
 1227 
 1228 /* Convert the string into a double, storing it at `*d`.
 1229  * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
 1230  * not a valid string representation of a double value. */
 1231 int RM_StringToDouble(const RedisModuleString *str, double *d) {
 1232     int retval = getDoubleFromObject(str,d);
 1233     return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
 1234 }
 1235 
 1236 /* Convert the string into a long double, storing it at `*ld`.
 1237  * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
 1238  * not a valid string representation of a double value. */
 1239 int RM_StringToLongDouble(const RedisModuleString *str, long double *ld) {
 1240     int retval = string2ld(str->ptr,sdslen(str->ptr),ld);
 1241     return retval ? REDISMODULE_OK : REDISMODULE_ERR;
 1242 }
 1243 
 1244 /* Compare two string objects, returning -1, 0 or 1 respectively if
 1245  * a < b, a == b, a > b. Strings are compared byte by byte as two
 1246  * binary blobs without any encoding care / collation attempt. */
 1247 int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
 1248     return compareStringObjects(a,b);
 1249 }
 1250 
 1251 /* Return the (possibly modified in encoding) input 'str' object if
 1252  * the string is unshared, otherwise NULL is returned. */
 1253 RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
 1254     if (str->refcount != 1) {
 1255         serverLog(LL_WARNING,
 1256             "Module attempted to use an in-place string modify operation "
 1257             "with a string referenced multiple times. Please check the code "
 1258             "for API usage correctness.");
 1259         return NULL;
 1260     }
 1261     if (str->encoding == OBJ_ENCODING_EMBSTR) {
 1262         /* Note: here we "leak" the additional allocation that was
 1263          * used in order to store the embedded string in the object. */
 1264         str->ptr = sdsnewlen(str->ptr,sdslen(str->ptr));
 1265         str->encoding = OBJ_ENCODING_RAW;
 1266     } else if (str->encoding == OBJ_ENCODING_INT) {
 1267         /* Convert the string from integer to raw encoding. */
 1268         str->ptr = sdsfromlonglong((long)str->ptr);
 1269         str->encoding = OBJ_ENCODING_RAW;
 1270     }
 1271     return str;
 1272 }
 1273 
 1274 /* Append the specified buffer to the string 'str'. The string must be a
 1275  * string created by the user that is referenced only a single time, otherwise
 1276  * REDISMODULE_ERR is returned and the operation is not performed. */
 1277 int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len) {
 1278     UNUSED(ctx);
 1279     str = moduleAssertUnsharedString(str);
 1280     if (str == NULL) return REDISMODULE_ERR;
 1281     str->ptr = sdscatlen(str->ptr,buf,len);
 1282     return REDISMODULE_OK;
 1283 }
 1284 
 1285 /* --------------------------------------------------------------------------
 1286  * Reply APIs
 1287  *
 1288  * Most functions always return REDISMODULE_OK so you can use it with
 1289  * 'return' in order to return from the command implementation with:
 1290  *
 1291  *     if (... some condition ...)
 1292  *         return RM_ReplyWithLongLong(ctx,mycount);
 1293  * -------------------------------------------------------------------------- */
 1294 
 1295 /* Send an error about the number of arguments given to the command,
 1296  * citing the command name in the error message.
 1297  *
 1298  * Example:
 1299  *
 1300  *     if (argc != 3) return RedisModule_WrongArity(ctx);
 1301  */
 1302 int RM_WrongArity(RedisModuleCtx *ctx) {
 1303     addReplyErrorFormat(ctx->client,
 1304         "wrong number of arguments for '%s' command",
 1305         (char*)ctx->client->argv[0]->ptr);
 1306     return REDISMODULE_OK;
 1307 }
 1308 
 1309 /* Return the client object the `RM_Reply*` functions should target.
 1310  * Normally this is just `ctx->client`, that is the client that called
 1311  * the module command, however in the case of thread safe contexts there
 1312  * is no directly associated client (since it would not be safe to access
 1313  * the client from a thread), so instead the blocked client object referenced
 1314  * in the thread safe context, has a fake client that we just use to accumulate
 1315  * the replies. Later, when the client is unblocked, the accumulated replies
 1316  * are appended to the actual client.
 1317  *
 1318  * The function returns the client pointer depending on the context, or
 1319  * NULL if there is no potential client. This happens when we are in the
 1320  * context of a thread safe context that was not initialized with a blocked
 1321  * client object. Other contexts without associated clients are the ones
 1322  * initialized to run the timers callbacks. */
 1323 client *moduleGetReplyClient(RedisModuleCtx *ctx) {
 1324     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
 1325         if (ctx->blocked_client)
 1326             return ctx->blocked_client->reply_client;
 1327         else
 1328             return NULL;
 1329     } else {
 1330         /* If this is a non thread safe context, just return the client
 1331          * that is running the command if any. This may be NULL as well
 1332          * in the case of contexts that are not executed with associated
 1333          * clients, like timer contexts. */
 1334         return ctx->client;
 1335     }
 1336 }
 1337 
 1338 /* Send an integer reply to the client, with the specified long long value.
 1339  * The function always returns REDISMODULE_OK. */
 1340 int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
 1341     client *c = moduleGetReplyClient(ctx);
 1342     if (c == NULL) return REDISMODULE_OK;
 1343     addReplyLongLong(c,ll);
 1344     return REDISMODULE_OK;
 1345 }
 1346 
 1347 /* Reply with an error or simple string (status message). Used to implement
 1348  * ReplyWithSimpleString() and ReplyWithError().
 1349  * The function always returns REDISMODULE_OK. */
 1350 int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
 1351     client *c = moduleGetReplyClient(ctx);
 1352     if (c == NULL) return REDISMODULE_OK;
 1353     addReplyProto(c,prefix,strlen(prefix));
 1354     addReplyProto(c,msg,strlen(msg));
 1355     addReplyProto(c,"\r\n",2);
 1356     return REDISMODULE_OK;
 1357 }
 1358 
 1359 /* Reply with the error 'err'.
 1360  *
 1361  * Note that 'err' must contain all the error, including
 1362  * the initial error code. The function only provides the initial "-", so
 1363  * the usage is, for example:
 1364  *
 1365  *     RedisModule_ReplyWithError(ctx,"ERR Wrong Type");
 1366  *
 1367  * and not just:
 1368  *
 1369  *     RedisModule_ReplyWithError(ctx,"Wrong Type");
 1370  *
 1371  * The function always returns REDISMODULE_OK.
 1372  */
 1373 int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
 1374     return replyWithStatus(ctx,err,"-");
 1375 }
 1376 
 1377 /* Reply with a simple string (+... \r\n in RESP protocol). This replies
 1378  * are suitable only when sending a small non-binary string with small
 1379  * overhead, like "OK" or similar replies.
 1380  *
 1381  * The function always returns REDISMODULE_OK. */
 1382 int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
 1383     return replyWithStatus(ctx,msg,"+");
 1384 }
 1385 
 1386 /* Reply with an array type of 'len' elements. However 'len' other calls
 1387  * to `ReplyWith*` style functions must follow in order to emit the elements
 1388  * of the array.
 1389  *
 1390  * When producing arrays with a number of element that is not known beforehand
 1391  * the function can be called with the special count
 1392  * REDISMODULE_POSTPONED_ARRAY_LEN, and the actual number of elements can be
 1393  * later set with RedisModule_ReplySetArrayLength() (which will set the
 1394  * latest "open" count if there are multiple ones).
 1395  *
 1396  * The function always returns REDISMODULE_OK. */
 1397 int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
 1398     client *c = moduleGetReplyClient(ctx);
 1399     if (c == NULL) return REDISMODULE_OK;
 1400     if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
 1401         ctx->postponed_arrays = zrealloc(ctx->postponed_arrays,sizeof(void*)*
 1402                 (ctx->postponed_arrays_count+1));
 1403         ctx->postponed_arrays[ctx->postponed_arrays_count] =
 1404             addReplyDeferredLen(c);
 1405         ctx->postponed_arrays_count++;
 1406     } else {
 1407         addReplyArrayLen(c,len);
 1408     }
 1409     return REDISMODULE_OK;
 1410 }
 1411 
 1412 /* Reply to the client with a null array, simply null in RESP3 
 1413  * null array in RESP2.
 1414  *
 1415  * The function always returns REDISMODULE_OK. */
 1416 int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
 1417     client *c = moduleGetReplyClient(ctx);
 1418     if (c == NULL) return REDISMODULE_OK;
 1419     addReplyNullArray(c);
 1420     return REDISMODULE_OK;
 1421 }
 1422 
 1423 /* Reply to the client with an empty array. 
 1424  *
 1425  * The function always returns REDISMODULE_OK. */
 1426 int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
 1427     client *c = moduleGetReplyClient(ctx);
 1428     if (c == NULL) return REDISMODULE_OK;
 1429     addReply(c,shared.emptyarray);
 1430     return REDISMODULE_OK;
 1431 }
 1432 
 1433 /* When RedisModule_ReplyWithArray() is used with the argument
 1434  * REDISMODULE_POSTPONED_ARRAY_LEN, because we don't know beforehand the number
 1435  * of items we are going to output as elements of the array, this function
 1436  * will take care to set the array length.
 1437  *
 1438  * Since it is possible to have multiple array replies pending with unknown
 1439  * length, this function guarantees to always set the latest array length
 1440  * that was created in a postponed way.
 1441  *
 1442  * For example in order to output an array like [1,[10,20,30]] we
 1443  * could write:
 1444  *
 1445  *      RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
 1446  *      RedisModule_ReplyWithLongLong(ctx,1);
 1447  *      RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
 1448  *      RedisModule_ReplyWithLongLong(ctx,10);
 1449  *      RedisModule_ReplyWithLongLong(ctx,20);
 1450  *      RedisModule_ReplyWithLongLong(ctx,30);
 1451  *      RedisModule_ReplySetArrayLength(ctx,3); // Set len of 10,20,30 array.
 1452  *      RedisModule_ReplySetArrayLength(ctx,2); // Set len of top array
 1453  *
 1454  * Note that in the above example there is no reason to postpone the array
 1455  * length, since we produce a fixed number of elements, but in the practice
 1456  * the code may use an iterator or other ways of creating the output so
 1457  * that is not easy to calculate in advance the number of elements.
 1458  */
 1459 void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
 1460     client *c = moduleGetReplyClient(ctx);
 1461     if (c == NULL) return;
 1462     if (ctx->postponed_arrays_count == 0) {
 1463         serverLog(LL_WARNING,
 1464             "API misuse detected in module %s: "
 1465             "RedisModule_ReplySetArrayLength() called without previous "
 1466             "RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN) "
 1467             "call.", ctx->module->name);
 1468             return;
 1469     }
 1470     ctx->postponed_arrays_count--;
 1471     setDeferredArrayLen(c,
 1472             ctx->postponed_arrays[ctx->postponed_arrays_count],
 1473             len);
 1474     if (ctx->postponed_arrays_count == 0) {
 1475         zfree(ctx->postponed_arrays);
 1476         ctx->postponed_arrays = NULL;
 1477     }
 1478 }
 1479 
 1480 /* Reply with a bulk string, taking in input a C buffer pointer and length.
 1481  *
 1482  * The function always returns REDISMODULE_OK. */
 1483 int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
 1484     client *c = moduleGetReplyClient(ctx);
 1485     if (c == NULL) return REDISMODULE_OK;
 1486     addReplyBulkCBuffer(c,(char*)buf,len);
 1487     return REDISMODULE_OK;
 1488 }
 1489 
 1490 /* Reply with a bulk string, taking in input a C buffer pointer that is
 1491  * assumed to be null-terminated.
 1492  *
 1493  * The function always returns REDISMODULE_OK. */
 1494 int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
 1495     client *c = moduleGetReplyClient(ctx);
 1496     if (c == NULL) return REDISMODULE_OK;
 1497     addReplyBulkCString(c,(char*)buf);
 1498     return REDISMODULE_OK;
 1499 }
 1500 
 1501 /* Reply with a bulk string, taking in input a RedisModuleString object.
 1502  *
 1503  * The function always returns REDISMODULE_OK. */
 1504 int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
 1505     client *c = moduleGetReplyClient(ctx);
 1506     if (c == NULL) return REDISMODULE_OK;
 1507     addReplyBulk(c,str);
 1508     return REDISMODULE_OK;
 1509 }
 1510 
 1511 /* Reply with an empty string.
 1512  *
 1513  * The function always returns REDISMODULE_OK. */
 1514 int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
 1515     client *c = moduleGetReplyClient(ctx);
 1516     if (c == NULL) return REDISMODULE_OK;
 1517     addReply(c,shared.emptybulk);
 1518     return REDISMODULE_OK;
 1519 }
 1520 
 1521 /* Reply with a binary safe string, which should not be escaped or filtered 
 1522  * taking in input a C buffer pointer and length.
 1523  *
 1524  * The function always returns REDISMODULE_OK. */
 1525 int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
 1526     client *c = moduleGetReplyClient(ctx);
 1527     if (c == NULL) return REDISMODULE_OK;
 1528     addReplyVerbatim(c, buf, len, "txt");
 1529     return REDISMODULE_OK;
 1530 }
 1531 
 1532 /* Reply to the client with a NULL.
 1533  *
 1534  * The function always returns REDISMODULE_OK. */
 1535 int RM_ReplyWithNull(RedisModuleCtx *ctx) {
 1536     client *c = moduleGetReplyClient(ctx);
 1537     if (c == NULL) return REDISMODULE_OK;
 1538     addReplyNull(c);
 1539     return REDISMODULE_OK;
 1540 }
 1541 
 1542 /* Reply exactly what a Redis command returned us with RedisModule_Call().
 1543  * This function is useful when we use RedisModule_Call() in order to
 1544  * execute some command, as we want to reply to the client exactly the
 1545  * same reply we obtained by the command.
 1546  *
 1547  * The function always returns REDISMODULE_OK. */
 1548 int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
 1549     client *c = moduleGetReplyClient(ctx);
 1550     if (c == NULL) return REDISMODULE_OK;
 1551     sds proto = sdsnewlen(reply->proto, reply->protolen);
 1552     addReplySds(c,proto);
 1553     return REDISMODULE_OK;
 1554 }
 1555 
 1556 /* Send a string reply obtained converting the double 'd' into a bulk string.
 1557  * This function is basically equivalent to converting a double into
 1558  * a string into a C buffer, and then calling the function
 1559  * RedisModule_ReplyWithStringBuffer() with the buffer and length.
 1560  *
 1561  * The function always returns REDISMODULE_OK. */
 1562 int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
 1563     client *c = moduleGetReplyClient(ctx);
 1564     if (c == NULL) return REDISMODULE_OK;
 1565     addReplyDouble(c,d);
 1566     return REDISMODULE_OK;
 1567 }
 1568 
 1569 /* Send a string reply obtained converting the long double 'ld' into a bulk
 1570  * string. This function is basically equivalent to converting a long double
 1571  * into a string into a C buffer, and then calling the function
 1572  * RedisModule_ReplyWithStringBuffer() with the buffer and length.
 1573  * The double string uses human readable formatting (see
 1574  * `addReplyHumanLongDouble` in networking.c).
 1575  *
 1576  * The function always returns REDISMODULE_OK. */
 1577 int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
 1578     client *c = moduleGetReplyClient(ctx);
 1579     if (c == NULL) return REDISMODULE_OK;
 1580     addReplyHumanLongDouble(c, ld);
 1581     return REDISMODULE_OK;
 1582 }
 1583 
 1584 /* --------------------------------------------------------------------------
 1585  * Commands replication API
 1586  * -------------------------------------------------------------------------- */
 1587 
 1588 /* Helper function to replicate MULTI the first time we replicate something
 1589  * in the context of a command execution. EXEC will be handled by the
 1590  * RedisModuleCommandDispatcher() function. */
 1591 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
 1592     /* Skip this if client explicitly wrap the command with MULTI, or if
 1593      * the module command was called by a script. */
 1594     if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return;
 1595     /* If we already emitted MULTI return ASAP. */
 1596     if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
 1597     /* If this is a thread safe context, we do not want to wrap commands
 1598      * executed into MULTI/EXEC, they are executed as single commands
 1599      * from an external client in essence. */
 1600     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
 1601     /* If this is a callback context, and not a module command execution
 1602      * context, we have to setup the op array for the "also propagate" API
 1603      * so that RM_Replicate() will work. */
 1604     if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
 1605         ctx->saved_oparray = server.also_propagate;
 1606         redisOpArrayInit(&server.also_propagate);
 1607     }
 1608     execCommandPropagateMulti(ctx->client);
 1609     ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
 1610 }
 1611 
 1612 /* Replicate the specified command and arguments to slaves and AOF, as effect
 1613  * of execution of the calling command implementation.
 1614  *
 1615  * The replicated commands are always wrapped into the MULTI/EXEC that
 1616  * contains all the commands replicated in a given module command
 1617  * execution. However the commands replicated with RedisModule_Call()
 1618  * are the first items, the ones replicated with RedisModule_Replicate()
 1619  * will all follow before the EXEC.
 1620  *
 1621  * Modules should try to use one interface or the other.
 1622  *
 1623  * This command follows exactly the same interface of RedisModule_Call(),
 1624  * so a set of format specifiers must be passed, followed by arguments
 1625  * matching the provided format specifiers.
 1626  *
 1627  * Please refer to RedisModule_Call() for more information.
 1628  *
 1629  * Using the special "A" and "R" modifiers, the caller can exclude either
 1630  * the AOF or the replicas from the propagation of the specified command.
 1631  * Otherwise, by default, the command will be propagated in both channels.
 1632  *
 1633  * ## Note about calling this function from a thread safe context:
 1634  *
 1635  * Normally when you call this function from the callback implementing a
 1636  * module command, or any other callback provided by the Redis Module API,
 1637  * Redis will accumulate all the calls to this function in the context of
 1638  * the callback, and will propagate all the commands wrapped in a MULTI/EXEC
 1639  * transaction. However when calling this function from a threaded safe context
 1640  * that can live an undefined amount of time, and can be locked/unlocked in
 1641  * at will, the behavior is different: MULTI/EXEC wrapper is not emitted
 1642  * and the command specified is inserted in the AOF and replication stream
 1643  * immediately.
 1644  *
 1645  * ## Return value
 1646  *
 1647  * The command returns REDISMODULE_ERR if the format specifiers are invalid
 1648  * or the command name does not belong to a known command. */
 1649 int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
 1650     struct redisCommand *cmd;
 1651     robj **argv = NULL;
 1652     int argc = 0, flags = 0, j;
 1653     va_list ap;
 1654 
 1655     cmd = lookupCommandByCString((char*)cmdname);
 1656     if (!cmd) return REDISMODULE_ERR;
 1657 
 1658     /* Create the client and dispatch the command. */
 1659     va_start(ap, fmt);
 1660     argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
 1661     va_end(ap);
 1662     if (argv == NULL) return REDISMODULE_ERR;
 1663 
 1664     /* Select the propagation target. Usually is AOF + replicas, however
 1665      * the caller can exclude one or the other using the "A" or "R"
 1666      * modifiers. */
 1667     int target = 0;
 1668     if (!(flags & REDISMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
 1669     if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;
 1670 
 1671     /* Replicate! When we are in a threaded context, we want to just insert
 1672      * the replicated command ASAP, since it is not clear when the context
 1673      * will stop being used, so accumulating stuff does not make much sense,
 1674      * nor we could easily use the alsoPropagate() API from threads. */
 1675     if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
 1676         propagate(cmd,ctx->client->db->id,argv,argc,target);
 1677     } else {
 1678         moduleReplicateMultiIfNeeded(ctx);
 1679         alsoPropagate(cmd,ctx->client->db->id,argv,argc,target);
 1680     }
 1681 
 1682     /* Release the argv. */
 1683     for (j = 0; j < argc; j++) decrRefCount(argv[j]);
 1684     zfree(argv);
 1685     server.dirty++;
 1686     return REDISMODULE_OK;
 1687 }
 1688 
 1689 /* This function will replicate the command exactly as it was invoked
 1690  * by the client. Note that this function will not wrap the command into
 1691  * a MULTI/EXEC stanza, so it should not be mixed with other replication
 1692  * commands.
 1693  *
 1694  * Basically this form of replication is useful when you want to propagate
 1695  * the command to the slaves and AOF file exactly as it was called, since
 1696  * the command can just be re-executed to deterministically re-create the
 1697  * new state starting from the old one.
 1698  *
 1699  * The function always returns REDISMODULE_OK. */
 1700 int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
 1701     alsoPropagate(ctx->client->cmd,ctx->client->db->id,
 1702         ctx->client->argv,ctx->client->argc,
 1703         PROPAGATE_AOF|PROPAGATE_REPL);
 1704     server.dirty++;
 1705     return REDISMODULE_OK;
 1706 }
 1707 
 1708 /* --------------------------------------------------------------------------
 1709  * DB and Key APIs -- Generic API
 1710  * -------------------------------------------------------------------------- */
 1711 
 1712 /* Return the ID of the current client calling the currently active module
 1713  * command. The returned ID has a few guarantees:
 1714  *
 1715  * 1. The ID is different for each different client, so if the same client
 1716  *    executes a module command multiple times, it can be recognized as
 1717  *    having the same ID, otherwise the ID will be different.
 1718  * 2. The ID increases monotonically. Clients connecting to the server later
 1719  *    are guaranteed to get IDs greater than any past ID previously seen.
 1720  *
 1721  * Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way
 1722  * to fetch the ID in the context the function was currently called.
 1723  *
 1724  * After obtaining the ID, it is possible to check if the command execution
 1725  * is actually happening in the context of AOF loading, using this macro:
 1726  *
 1727  *      if (RedisModule_IsAOFClient(RedisModule_GetClientId(ctx)) {
 1728  *          // Handle it differently.
 1729  *      }
 1730  */
 1731 unsigned long long RM_GetClientId(RedisModuleCtx *ctx) {
 1732     if (ctx->client == NULL) return 0;
 1733     return ctx->client->id;
 1734 }
 1735 
 1736 /* This is an helper for RM_GetClientInfoById() and other functions: given
 1737  * a client, it populates the client info structure with the appropriate
 1738  * fields depending on the version provided. If the version is not valid
 1739  * then REDISMODULE_ERR is returned. Otherwise the function returns
 1740  * REDISMODULE_OK and the structure pointed by 'ci' gets populated. */
 1741 
 1742 int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
 1743     if (structver != 1) return REDISMODULE_ERR;
 1744 
 1745     RedisModuleClientInfoV1 *ci1 = ci;
 1746     memset(ci1,0,sizeof(*ci1));
 1747     ci1->version = structver;
 1748     if (client->flags & CLIENT_MULTI)
 1749         ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_MULTI;
 1750     if (client->flags & CLIENT_PUBSUB)
 1751         ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_PUBSUB;
 1752     if (client->flags & CLIENT_UNIX_SOCKET)
 1753         ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_UNIXSOCKET;
 1754     if (client->flags & CLIENT_TRACKING)
 1755         ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_TRACKING;
 1756     if (client->flags & CLIENT_BLOCKED)
 1757         ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_BLOCKED;
 1758     if (connGetType(client->conn) == CONN_TYPE_TLS)
 1759         ci1->flags |= REDISMODULE_CLIENTINFO_FLAG_SSL;
 1760 
 1761     int port;
 1762     connPeerToString(client->conn,ci1->addr,sizeof(ci1->addr),&port);
 1763     ci1->port = port;
 1764     ci1->db = client->db->id;
 1765     ci1->id = client->id;
 1766     return REDISMODULE_OK;
 1767 }
 1768 
 1769 /* This is an helper for moduleFireServerEvent() and other functions:
 1770  * It populates the replication info structure with the appropriate
 1771  * fields depending on the version provided. If the version is not valid
 1772  * then REDISMODULE_ERR is returned. Otherwise the function returns
 1773  * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
 1774 int modulePopulateReplicationInfoStructure(void *ri, int structver) {
 1775     if (structver != 1) return REDISMODULE_ERR;
 1776 
 1777     RedisModuleReplicationInfoV1 *ri1 = ri;
 1778     memset(ri1,0,sizeof(*ri1));
 1779     ri1->version = structver;
 1780     ri1->master = server.masterhost==NULL;
 1781     ri1->masterhost = server.masterhost? server.masterhost: "";
 1782     ri1->masterport = server.masterport;
 1783     ri1->replid1 = server.replid;
 1784     ri1->replid2 = server.replid2;
 1785     ri1->repl1_offset = server.master_repl_offset;
 1786     ri1->repl2_offset = server.second_replid_offset;
 1787     return REDISMODULE_OK;
 1788 }
 1789 
 1790 /* Return information about the client with the specified ID (that was
 1791  * previously obtained via the RedisModule_GetClientId() API). If the
 1792  * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
 1793  * is returned.
 1794  *
 1795  * When the client exist and the `ci` pointer is not NULL, but points to
 1796  * a structure of type RedisModuleClientInfo, previously initialized with
 1797  * the correct REDISMODULE_CLIENTINFO_INITIALIZER, the structure is populated
 1798  * with the following fields:
 1799  *
 1800  *      uint64_t flags;         // REDISMODULE_CLIENTINFO_FLAG_*
 1801  *      uint64_t id;            // Client ID
 1802  *      char addr[46];          // IPv4 or IPv6 address.
 1803  *      uint16_t port;          // TCP port.
 1804  *      uint16_t db;            // Selected DB.
 1805  *
 1806  * Note: the client ID is useless in the context of this call, since we
 1807  *       already know, however the same structure could be used in other
 1808  *       contexts where we don't know the client ID, yet the same structure
 1809  *       is returned.
 1810  *
 1811  * With flags having the following meaning:
 1812  *
 1813  *     REDISMODULE_CLIENTINFO_FLAG_SSL          Client using SSL connection.
 1814  *     REDISMODULE_CLIENTINFO_FLAG_PUBSUB       Client in Pub/Sub mode.
 1815  *     REDISMODULE_CLIENTINFO_FLAG_BLOCKED      Client blocked in command.
 1816  *     REDISMODULE_CLIENTINFO_FLAG_TRACKING     Client with keys tracking on.
 1817  *     REDISMODULE_CLIENTINFO_FLAG_UNIXSOCKET   Client using unix domain socket.
 1818  *     REDISMODULE_CLIENTINFO_FLAG_MULTI        Client in MULTI state.
 1819  *
 1820  * However passing NULL is a way to just check if the client exists in case
 1821  * we are not interested in any additional information.
 1822  *
 1823  * This is the correct usage when we want the client info structure
 1824  * returned:
 1825  *
 1826  *      RedisModuleClientInfo ci = REDISMODULE_CLIENTINFO_INITIALIZER;
 1827  *      int retval = RedisModule_GetClientInfoById(&ci,client_id);
 1828  *      if (retval == REDISMODULE_OK) {
 1829  *          printf("Address: %s\n", ci.addr);
 1830  *      }
 1831  */
 1832 int RM_GetClientInfoById(void *ci, uint64_t id) {
 1833     client *client = lookupClientByID(id);
 1834     if (client == NULL) return REDISMODULE_ERR;
 1835     if (ci == NULL) return REDISMODULE_OK;
 1836 
 1837     /* Fill the info structure if passed. */
 1838     uint64_t structver = ((uint64_t*)ci)[0];
 1839     return modulePopulateClientInfoStructure(ci,client,structver);
 1840 }
 1841 
 1842 /* Publish a message to subscribers (see PUBLISH command). */
 1843 int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
 1844     UNUSED(ctx);
 1845     int receivers = pubsubPublishMessage(channel, message);
 1846     if (server.cluster_enabled)
 1847         clusterPropagatePublish(channel, message);
 1848     return receivers;
 1849 }
 1850 
 1851 /* Return the currently selected DB. */
 1852 int RM_GetSelectedDb(RedisModuleCtx *ctx) {
 1853     return ctx->client->db->id;
 1854 }
 1855 
 1856 
 1857 /* Return the current context's flags. The flags provide information on the
 1858  * current request context (whether the client is a Lua script or in a MULTI),
 1859  * and about the Redis instance in general, i.e replication and persistence.
 1860  *
 1861  * It is possible to call this function even with a NULL context, however
 1862  * in this case the following flags will not be reported:
 1863  *
 1864  *  * LUA, MULTI, REPLICATED, DIRTY (see below for more info).
 1865  *
 1866  * Available flags and their meaning:
 1867  *
 1868  *  * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script
 1869  *
 1870  *  * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction
 1871  *
 1872  *  * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication
 1873  *    link by the MASTER
 1874  *
 1875  *  * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
 1876  *
 1877  *  * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
 1878  *
 1879  *  * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only
 1880  *
 1881  *  * REDISMODULE_CTX_FLAGS_CLUSTER: The Redis instance is in cluster mode
 1882  *
 1883  *  * REDISMODULE_CTX_FLAGS_AOF: The Redis instance has AOF enabled
 1884  *
 1885  *  * REDISMODULE_CTX_FLAGS_RDB: The instance has RDB enabled
 1886  *
 1887  *  * REDISMODULE_CTX_FLAGS_MAXMEMORY:  The instance has Maxmemory set
 1888  *
 1889  *  * REDISMODULE_CTX_FLAGS_EVICT:  Maxmemory is set and has an eviction
 1890  *    policy that may delete keys
 1891  *
 1892  *  * REDISMODULE_CTX_FLAGS_OOM: Redis is out of memory according to the
 1893  *    maxmemory setting.
 1894  *
 1895  *  * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
 1896  *                                       reaching the maxmemory level.
 1897  *
 1898  *  * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF
 1899  *
 1900  *  * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master.
 1901  *
 1902  *  * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to
 1903  *                                                 connect with the master.
 1904  *
 1905  *  * REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING: Master -> Replica RDB
 1906  *                                                   transfer is in progress.
 1907  *
 1908  *  * REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE: The replica has an active link
 1909  *                                             with its master. This is the
 1910  *                                             contrary of STALE state.
 1911  *
 1912  *  * REDISMODULE_CTX_FLAGS_ACTIVE_CHILD: There is currently some background
 1913  *                                        process active (RDB, AUX or module).
 1914  */
 1915 int RM_GetContextFlags(RedisModuleCtx *ctx) {
 1916 
 1917     int flags = 0;
 1918     /* Client specific flags */
 1919     if (ctx) {
 1920         if (ctx->client) {
 1921             if (ctx->client->flags & CLIENT_LUA)
 1922              flags |= REDISMODULE_CTX_FLAGS_LUA;
 1923             if (ctx->client->flags & CLIENT_MULTI)
 1924              flags |= REDISMODULE_CTX_FLAGS_MULTI;
 1925             /* Module command recieved from MASTER, is replicated. */
 1926             if (ctx->client->flags & CLIENT_MASTER)
 1927              flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
 1928         }
 1929 
 1930         /* For DIRTY flags, we need the blocked client if used */
 1931         client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client;
 1932         if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) {
 1933             flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY;
 1934         }
 1935     }
 1936 
 1937     if (server.cluster_enabled)
 1938         flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
 1939 
 1940     if (server.loading)
 1941         flags |= REDISMODULE_CTX_FLAGS_LOADING;
 1942 
 1943     /* Maxmemory and eviction policy */
 1944     if (server.maxmemory > 0) {
 1945         flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY;
 1946 
 1947         if (server.maxmemory_policy != MAXMEMORY_NO_EVICTION)
 1948             flags |= REDISMODULE_CTX_FLAGS_EVICT;
 1949     }
 1950 
 1951     /* Persistence flags */
 1952     if (server.aof_state != AOF_OFF)
 1953         flags |= REDISMODULE_CTX_FLAGS_AOF;
 1954     if (server.saveparamslen > 0)
 1955         flags |= REDISMODULE_CTX_FLAGS_RDB;
 1956 
 1957     /* Replication flags */
 1958     if (server.masterhost == NULL) {
 1959         flags |= REDISMODULE_CTX_FLAGS_MASTER;
 1960     } else {
 1961         flags |= REDISMODULE_CTX_FLAGS_SLAVE;
 1962         if (server.repl_slave_ro)
 1963             flags |= REDISMODULE_CTX_FLAGS_READONLY;
 1964 
 1965         /* Replica state flags. */
 1966         if (server.repl_state == REPL_STATE_CONNECT ||
 1967             server.repl_state == REPL_STATE_CONNECTING)
 1968         {
 1969             flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING;
 1970         } else if (server.repl_state == REPL_STATE_TRANSFER) {
 1971             flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING;
 1972         } else if (server.repl_state == REPL_STATE_CONNECTED) {
 1973             flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE;
 1974         }
 1975 
 1976         if (server.repl_state != REPL_STATE_CONNECTED)
 1977             flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE;
 1978     }
 1979 
 1980     /* OOM flag. */
 1981     float level;
 1982     int retval = getMaxmemoryState(NULL,NULL,NULL,&level);
 1983     if (retval == C_ERR) flags |= REDISMODULE_CTX_FLAGS_OOM;
 1984     if (level > 0.75) flags |= REDISMODULE_CTX_FLAGS_OOM_WARNING;
 1985 
 1986     /* Presence of children processes. */
 1987     if (hasActiveChildProcess()) flags |= REDISMODULE_CTX_FLAGS_ACTIVE_CHILD;
 1988 
 1989     return flags;
 1990 }
 1991 
 1992 /* Returns true if some client sent the CLIENT PAUSE command to the server or
 1993  * if Redis Cluster is doing a manual failover, and paused tue clients.
 1994  * This is needed when we have a master with replicas, and want to write,
 1995  * without adding further data to the replication channel, that the replicas
 1996  * replication offset, match the one of the master. When this happens, it is
 1997  * safe to failover the master without data loss.
 1998  *
 1999  * However modules may generate traffic by calling RedisModule_Call() with
 2000  * the "!" flag, or by calling RedisModule_Replicate(), in a context outside
 2001  * commands execution, for instance in timeout callbacks, threads safe
 2002  * contexts, and so forth. When modules will generate too much traffic, it
 2003  * will be hard for the master and replicas offset to match, because there
 2004  * is more data to send in the replication channel.
 2005  *
 2006  * So modules may want to try to avoid very heavy background work that has
 2007  * the effect of creating data to the replication channel, when this function
 2008  * returns true. This is mostly useful for modules that have background
 2009  * garbage collection tasks, or that do writes and replicate such writes
 2010  * periodically in timer callbacks or other periodic callbacks.
 2011  */
 2012 int RM_AvoidReplicaTraffic() {
 2013     return clientsArePaused();
 2014 }
 2015 
 2016 /* Change the currently selected DB. Returns an error if the id
 2017  * is out of range.
 2018  *
 2019  * Note that the client will retain the currently selected DB even after
 2020  * the Redis command implemented by the module calling this function
 2021  * returns.
 2022  *
 2023  * If the module command wishes to change something in a different DB and
 2024  * returns back to the original one, it should call RedisModule_GetSelectedDb()
 2025  * before in order to restore the old DB number before returning. */
 2026 int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
 2027     int retval = selectDb(ctx->client,newid);
 2028     return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
 2029 }
 2030 
 2031 /* Initialize a RedisModuleKey struct */
 2032 static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
 2033     kp->ctx = ctx;
 2034     kp->db = ctx->client->db;
 2035     kp->key = keyname;
 2036     incrRefCount(keyname);
 2037     kp->value = value;
 2038     kp->iter = NULL;
 2039     kp->mode = mode;
 2040     zsetKeyReset(kp);
 2041 }
 2042 
 2043 /* Return an handle representing a Redis key, so that it is possible
 2044  * to call other APIs with the key handle as argument to perform
 2045  * operations on the key.
 2046  *
 2047  * The return value is the handle representing the key, that must be
 2048  * closed with RM_CloseKey().
 2049  *
 2050  * If the key does not exist and WRITE mode is requested, the handle
 2051  * is still returned, since it is possible to perform operations on
 2052  * a yet not existing key (that will be created, for example, after
 2053  * a list push operation). If the mode is just READ instead, and the
 2054  * key does not exist, NULL is returned. However it is still safe to
 2055  * call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL
 2056  * value. */
 2057 void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
 2058     RedisModuleKey *kp;
 2059     robj *value;
 2060     int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0;
 2061 
 2062     if (mode & REDISMODULE_WRITE) {
 2063         value = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags);
 2064     } else {
 2065         value = lookupKeyReadWithFlags(ctx->client->db,keyname, flags);
 2066         if (value == NULL) {
 2067             return NULL;
 2068         }
 2069     }
 2070 
 2071     /* Setup the key handle. */
 2072     kp = zmalloc(sizeof(*kp));
 2073     moduleInitKey(kp, ctx, keyname, value, mode);
 2074     autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
 2075     return (void*)kp;
 2076 }
 2077 
 2078 /* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */
 2079 static void moduleCloseKey(RedisModuleKey *key) {
 2080     int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
 2081     if ((key->mode & REDISMODULE_WRITE) && signal)
 2082         signalModifiedKey(key->ctx->client,key->db,key->key);
 2083     /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
 2084     RM_ZsetRangeStop(key);
 2085     decrRefCount(key->key);
 2086 }
 2087 
 2088 /* Close a key handle. */
 2089 void RM_CloseKey(RedisModuleKey *key) {
 2090     if (key == NULL) return;
 2091     moduleCloseKey(key);
 2092     autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
 2093     zfree(key);
 2094 }
 2095 
 2096 /* Return the type of the key. If the key pointer is NULL then
 2097  * REDISMODULE_KEYTYPE_EMPTY is returned. */
 2098 int RM_KeyType(RedisModuleKey *key) {
 2099     if (key == NULL || key->value ==  NULL) return REDISMODULE_KEYTYPE_EMPTY;
 2100     /* We map between defines so that we are free to change the internal
 2101      * defines as desired. */
 2102     switch(key->value->type) {
 2103     case OBJ_STRING: return REDISMODULE_KEYTYPE_STRING;
 2104     case OBJ_LIST: return REDISMODULE_KEYTYPE_LIST;
 2105     case OBJ_SET: return REDISMODULE_KEYTYPE_SET;
 2106     case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
 2107     case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
 2108     case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
 2109     case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM;
 2110     default: return REDISMODULE_KEYTYPE_EMPTY;
 2111     }
 2112 }
 2113 
 2114 /* Return the length of the value associated with the key.
 2115  * For strings this is the length of the string. For all the other types
 2116  * is the number of elements (just counting keys for hashes).
 2117  *
 2118  * If the key pointer is NULL or the key is empty, zero is returned. */
 2119 size_t RM_ValueLength(RedisModuleKey *key) {
 2120     if (key == NULL || key->value == NULL) return 0;
 2121     switch(key->value->type) {
 2122     case OBJ_STRING: return stringObjectLen(key->value);
 2123     case OBJ_LIST: return listTypeLength(key->value);
 2124     case OBJ_SET: return setTypeSize(key->value);
 2125     case OBJ_ZSET: return zsetLength(key->value);
 2126     case OBJ_HASH: return hashTypeLength(key->value);
 2127     case OBJ_STREAM: return streamLength(key->value);
 2128     default: return 0;
 2129     }
 2130 }
 2131 
 2132 /* If the key is open for writing, remove it, and setup the key to
 2133  * accept new writes as an empty key (that will be created on demand).
 2134  * On success REDISMODULE_OK is returned. If the key is not open for
 2135  * writing REDISMODULE_ERR is returned. */
 2136 int RM_DeleteKey(RedisModuleKey *key) {
 2137     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2138     if (key->value) {
 2139         dbDelete(key->db,key->key);
 2140         key->value = NULL;
 2141     }
 2142     return REDISMODULE_OK;
 2143 }
 2144 
 2145 /* If the key is open for writing, unlink it (that is delete it in a
 2146  * non-blocking way, not reclaiming memory immediately) and setup the key to
 2147  * accept new writes as an empty key (that will be created on demand).
 2148  * On success REDISMODULE_OK is returned. If the key is not open for
 2149  * writing REDISMODULE_ERR is returned. */
 2150 int RM_UnlinkKey(RedisModuleKey *key) {
 2151     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2152     if (key->value) {
 2153         dbAsyncDelete(key->db,key->key);
 2154         key->value = NULL;
 2155     }
 2156     return REDISMODULE_OK;
 2157 }
 2158 
 2159 /* Return the key expire value, as milliseconds of remaining TTL.
 2160  * If no TTL is associated with the key or if the key is empty,
 2161  * REDISMODULE_NO_EXPIRE is returned. */
 2162 mstime_t RM_GetExpire(RedisModuleKey *key) {
 2163     mstime_t expire = getExpire(key->db,key->key);
 2164     if (expire == -1 || key->value == NULL) 
 2165         return REDISMODULE_NO_EXPIRE;
 2166     expire -= mstime();
 2167     return expire >= 0 ? expire : 0;
 2168 }
 2169 
 2170 /* Set a new expire for the key. If the special expire
 2171  * REDISMODULE_NO_EXPIRE is set, the expire is cancelled if there was
 2172  * one (the same as the PERSIST command).
 2173  *
 2174  * Note that the expire must be provided as a positive integer representing
 2175  * the number of milliseconds of TTL the key should have.
 2176  *
 2177  * The function returns REDISMODULE_OK on success or REDISMODULE_ERR if
 2178  * the key was not open for writing or is an empty key. */
 2179 int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
 2180     if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL)
 2181         return REDISMODULE_ERR;
 2182     if (expire != REDISMODULE_NO_EXPIRE) {
 2183         expire += mstime();
 2184         setExpire(key->ctx->client,key->db,key->key,expire);
 2185     } else {
 2186         removeExpire(key->db,key->key);
 2187     }
 2188     return REDISMODULE_OK;
 2189 }
 2190 
 2191 /* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
 2192  * If restart_aof is true, you must make sure the command that triggered this call is not
 2193  * propagated to the AOF file.
 2194  * When async is set to true, db contents will be freed by a background thread. */
 2195 void RM_ResetDataset(int restart_aof, int async) {
 2196     if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
 2197     flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
 2198     if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
 2199 }
 2200 
 2201 /* Returns the number of keys in the current db. */
 2202 unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
 2203     return dictSize(ctx->client->db->dict);
 2204 }
 2205 
 2206 /* Returns a name of a random key, or NULL if current db is empty. */
 2207 RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
 2208     robj *key = dbRandomKey(ctx->client->db);
 2209     autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
 2210     return key;
 2211 }
 2212 
 2213 /* --------------------------------------------------------------------------
 2214  * Key API for String type
 2215  * -------------------------------------------------------------------------- */
 2216 
 2217 /* If the key is open for writing, set the specified string 'str' as the
 2218  * value of the key, deleting the old value if any.
 2219  * On success REDISMODULE_OK is returned. If the key is not open for
 2220  * writing or there is an active iterator, REDISMODULE_ERR is returned. */
 2221 int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
 2222     if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
 2223     RM_DeleteKey(key);
 2224     genericSetKey(key->ctx->client,key->db,key->key,str,0,0);
 2225     key->value = str;
 2226     return REDISMODULE_OK;
 2227 }
 2228 
 2229 /* Prepare the key associated string value for DMA access, and returns
 2230  * a pointer and size (by reference), that the user can use to read or
 2231  * modify the string in-place accessing it directly via pointer.
 2232  *
 2233  * The 'mode' is composed by bitwise OR-ing the following flags:
 2234  *
 2235  *     REDISMODULE_READ -- Read access
 2236  *     REDISMODULE_WRITE -- Write access
 2237  *
 2238  * If the DMA is not requested for writing, the pointer returned should
 2239  * only be accessed in a read-only fashion.
 2240  *
 2241  * On error (wrong type) NULL is returned.
 2242  *
 2243  * DMA access rules:
 2244  *
 2245  * 1. No other key writing function should be called since the moment
 2246  * the pointer is obtained, for all the time we want to use DMA access
 2247  * to read or modify the string.
 2248  *
 2249  * 2. Each time RM_StringTruncate() is called, to continue with the DMA
 2250  * access, RM_StringDMA() should be called again to re-obtain
 2251  * a new pointer and length.
 2252  *
 2253  * 3. If the returned pointer is not NULL, but the length is zero, no
 2254  * byte can be touched (the string is empty, or the key itself is empty)
 2255  * so a RM_StringTruncate() call should be used if there is to enlarge
 2256  * the string, and later call StringDMA() again to get the pointer.
 2257  */
 2258 char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) {
 2259     /* We need to return *some* pointer for empty keys, we just return
 2260      * a string literal pointer, that is the advantage to be mapped into
 2261      * a read only memory page, so the module will segfault if a write
 2262      * attempt is performed. */
 2263     char *emptystring = "<dma-empty-string>";
 2264     if (key->value == NULL) {
 2265         *len = 0;
 2266         return emptystring;
 2267     }
 2268 
 2269     if (key->value->type != OBJ_STRING) return NULL;
 2270 
 2271     /* For write access, and even for read access if the object is encoded,
 2272      * we unshare the string (that has the side effect of decoding it). */
 2273     if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW)
 2274         key->value = dbUnshareStringValue(key->db, key->key, key->value);
 2275 
 2276     *len = sdslen(key->value->ptr);
 2277     return key->value->ptr;
 2278 }
 2279 
 2280 /* If the string is open for writing and is of string type, resize it, padding
 2281  * with zero bytes if the new length is greater than the old one.
 2282  *
 2283  * After this call, RM_StringDMA() must be called again to continue
 2284  * DMA access with the new pointer.
 2285  *
 2286  * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR on
 2287  * error, that is, the key is not open for writing, is not a string
 2288  * or resizing for more than 512 MB is requested.
 2289  *
 2290  * If the key is empty, a string key is created with the new string value
 2291  * unless the new length value requested is zero. */
 2292 int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
 2293     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2294     if (key->value && key->value->type != OBJ_STRING) return REDISMODULE_ERR;
 2295     if (newlen > 512*1024*1024) return REDISMODULE_ERR;
 2296 
 2297     /* Empty key and new len set to 0. Just return REDISMODULE_OK without
 2298      * doing anything. */
 2299     if (key->value == NULL && newlen == 0) return REDISMODULE_OK;
 2300 
 2301     if (key->value == NULL) {
 2302         /* Empty key: create it with the new size. */
 2303         robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
 2304         genericSetKey(key->ctx->client,key->db,key->key,o,0,0);
 2305         key->value = o;
 2306         decrRefCount(o);
 2307     } else {
 2308         /* Unshare and resize. */
 2309         key->value = dbUnshareStringValue(key->db, key->key, key->value);
 2310         size_t curlen = sdslen(key->value->ptr);
 2311         if (newlen > curlen) {
 2312             key->value->ptr = sdsgrowzero(key->value->ptr,newlen);
 2313         } else if (newlen < curlen) {
 2314             sdsrange(key->value->ptr,0,newlen-1);
 2315             /* If the string is too wasteful, reallocate it. */
 2316             if (sdslen(key->value->ptr) < sdsavail(key->value->ptr))
 2317                 key->value->ptr = sdsRemoveFreeSpace(key->value->ptr);
 2318         }
 2319     }
 2320     return REDISMODULE_OK;
 2321 }
 2322 
 2323 /* --------------------------------------------------------------------------
 2324  * Key API for List type
 2325  * -------------------------------------------------------------------------- */
 2326 
 2327 /* Push an element into a list, on head or tail depending on 'where' argument.
 2328  * If the key pointer is about an empty key opened for writing, the key
 2329  * is created. On error (key opened for read-only operations or of the wrong
 2330  * type) REDISMODULE_ERR is returned, otherwise REDISMODULE_OK is returned. */
 2331 int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
 2332     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2333     if (key->value && key->value->type != OBJ_LIST) return REDISMODULE_ERR;
 2334     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_LIST);
 2335     listTypePush(key->value, ele,
 2336         (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
 2337     return REDISMODULE_OK;
 2338 }
 2339 
 2340 /* Pop an element from the list, and returns it as a module string object
 2341  * that the user should be free with RM_FreeString() or by enabling
 2342  * automatic memory. 'where' specifies if the element should be popped from
 2343  * head or tail. The command returns NULL if:
 2344  * 1) The list is empty.
 2345  * 2) The key was not open for writing.
 2346  * 3) The key is not a list. */
 2347 RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
 2348     if (!(key->mode & REDISMODULE_WRITE) ||
 2349         key->value == NULL ||
 2350         key->value->type != OBJ_LIST) return NULL;
 2351     robj *ele = listTypePop(key->value,
 2352         (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
 2353     robj *decoded = getDecodedObject(ele);
 2354     decrRefCount(ele);
 2355     moduleDelKeyIfEmpty(key);
 2356     autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,decoded);
 2357     return decoded;
 2358 }
 2359 
 2360 /* --------------------------------------------------------------------------
 2361  * Key API for Sorted Set type
 2362  * -------------------------------------------------------------------------- */
 2363 
 2364 /* Conversion from/to public flags of the Modules API and our private flags,
 2365  * so that we have everything decoupled. */
 2366 int RM_ZsetAddFlagsToCoreFlags(int flags) {
 2367     int retflags = 0;
 2368     if (flags & REDISMODULE_ZADD_XX) retflags |= ZADD_XX;
 2369     if (flags & REDISMODULE_ZADD_NX) retflags |= ZADD_NX;
 2370     return retflags;
 2371 }
 2372 
 2373 /* See previous function comment. */
 2374 int RM_ZsetAddFlagsFromCoreFlags(int flags) {
 2375     int retflags = 0;
 2376     if (flags & ZADD_ADDED) retflags |= REDISMODULE_ZADD_ADDED;
 2377     if (flags & ZADD_UPDATED) retflags |= REDISMODULE_ZADD_UPDATED;
 2378     if (flags & ZADD_NOP) retflags |= REDISMODULE_ZADD_NOP;
 2379     return retflags;
 2380 }
 2381 
 2382 /* Add a new element into a sorted set, with the specified 'score'.
 2383  * If the element already exists, the score is updated.
 2384  *
 2385  * A new sorted set is created at value if the key is an empty open key
 2386  * setup for writing.
 2387  *
 2388  * Additional flags can be passed to the function via a pointer, the flags
 2389  * are both used to receive input and to communicate state when the function
 2390  * returns. 'flagsptr' can be NULL if no special flags are used.
 2391  *
 2392  * The input flags are:
 2393  *
 2394  *     REDISMODULE_ZADD_XX: Element must already exist. Do nothing otherwise.
 2395  *     REDISMODULE_ZADD_NX: Element must not exist. Do nothing otherwise.
 2396  *
 2397  * The output flags are:
 2398  *
 2399  *     REDISMODULE_ZADD_ADDED: The new element was added to the sorted set.
 2400  *     REDISMODULE_ZADD_UPDATED: The score of the element was updated.
 2401  *     REDISMODULE_ZADD_NOP: No operation was performed because XX or NX flags.
 2402  *
 2403  * On success the function returns REDISMODULE_OK. On the following errors
 2404  * REDISMODULE_ERR is returned:
 2405  *
 2406  * * The key was not opened for writing.
 2407  * * The key is of the wrong type.
 2408  * * 'score' double value is not a number (NaN).
 2409  */
 2410 int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr) {
 2411     int flags = 0;
 2412     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2413     if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
 2414     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
 2415     if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
 2416     if (zsetAdd(key->value,score,ele->ptr,&flags,NULL) == 0) {
 2417         if (flagsptr) *flagsptr = 0;
 2418         return REDISMODULE_ERR;
 2419     }
 2420     if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
 2421     return REDISMODULE_OK;
 2422 }
 2423 
 2424 /* This function works exactly like RM_ZsetAdd(), but instead of setting
 2425  * a new score, the score of the existing element is incremented, or if the
 2426  * element does not already exist, it is added assuming the old score was
 2427  * zero.
 2428  *
 2429  * The input and output flags, and the return value, have the same exact
 2430  * meaning, with the only difference that this function will return
 2431  * REDISMODULE_ERR even when 'score' is a valid double number, but adding it
 2432  * to the existing score results into a NaN (not a number) condition.
 2433  *
 2434  * This function has an additional field 'newscore', if not NULL is filled
 2435  * with the new score of the element after the increment, if no error
 2436  * is returned. */
 2437 int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore) {
 2438     int flags = 0;
 2439     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2440     if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
 2441     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
 2442     if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
 2443     flags |= ZADD_INCR;
 2444     if (zsetAdd(key->value,score,ele->ptr,&flags,newscore) == 0) {
 2445         if (flagsptr) *flagsptr = 0;
 2446         return REDISMODULE_ERR;
 2447     }
 2448     /* zsetAdd() may signal back that the resulting score is not a number. */
 2449     if (flagsptr && (*flagsptr & ZADD_NAN)) {
 2450         *flagsptr = 0;
 2451         return REDISMODULE_ERR;
 2452     }
 2453     if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
 2454     return REDISMODULE_OK;
 2455 }
 2456 
 2457 /* Remove the specified element from the sorted set.
 2458  * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR
 2459  * on one of the following conditions:
 2460  *
 2461  * * The key was not opened for writing.
 2462  * * The key is of the wrong type.
 2463  *
 2464  * The return value does NOT indicate the fact the element was really
 2465  * removed (since it existed) or not, just if the function was executed
 2466  * with success.
 2467  *
 2468  * In order to know if the element was removed, the additional argument
 2469  * 'deleted' must be passed, that populates the integer by reference
 2470  * setting it to 1 or 0 depending on the outcome of the operation.
 2471  * The 'deleted' argument can be NULL if the caller is not interested
 2472  * to know if the element was really removed.
 2473  *
 2474  * Empty keys will be handled correctly by doing nothing. */
 2475 int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) {
 2476     if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
 2477     if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
 2478     if (key->value != NULL && zsetDel(key->value,ele->ptr)) {
 2479         if (deleted) *deleted = 1;
 2480     } else {
 2481         if (deleted) *deleted = 0;
 2482     }
 2483     return REDISMODULE_OK;
 2484 }
 2485 
 2486 /* On success retrieve the double score associated at the sorted set element
 2487  * 'ele' and returns REDISMODULE_OK. Otherwise REDISMODULE_ERR is returned
 2488  * to signal one of the following conditions:
 2489  *
 2490  * * There is no such element 'ele' in the sorted set.
 2491  * * The key is not a sorted set.
 2492  * * The key is an open empty key.
 2493  */
 2494 int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) {
 2495     if (key->value == NULL) return REDISMODULE_ERR;
 2496     if (key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
 2497     if (zsetScore(key->value,ele->ptr,score) == C_ERR) return REDISMODULE_ERR;
 2498     return REDISMODULE_OK;
 2499 }
 2500 
 2501 /* --------------------------------------------------------------------------
 2502  * Key API for Sorted Set iterator
 2503  * -------------------------------------------------------------------------- */
 2504 
 2505 void zsetKeyReset(RedisModuleKey *key) {
 2506     key->ztype = REDISMODULE_ZSET_RANGE_NONE;
 2507     key->zcurrent = NULL;
 2508     key->zer = 1;
 2509 }
 2510 
 2511 /* Stop a sorted set iteration. */
 2512 void RM_ZsetRangeStop(RedisModuleKey *key) {
 2513     /* Free resources if needed. */
 2514     if (key->ztype == REDISMODULE_ZSET_RANGE_LEX)
 2515         zslFreeLexRange(&key->zlrs);
 2516     /* Setup sensible values so that misused iteration API calls when an
 2517      * iterator is not active will result into something more sensible
 2518      * than crashing. */
 2519     zsetKeyReset(key);
 2520 }
 2521 
 2522 /* Return the "End of range" flag value to signal the end of the iteration. */
 2523 int RM_ZsetRangeEndReached(RedisModuleKey *key) {
 2524     return key->zer;
 2525 }
 2526 
 2527 /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange().
 2528  * Setup the sorted set iteration according to the specified score range
 2529  * (see the functions calling it for more info). If 'first' is true the
 2530  * first element in the range is used as a starting point for the iterator
 2531  * otherwise the last. Return REDISMODULE_OK on success otherwise
 2532  * REDISMODULE_ERR. */
 2533 int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex, int first) {
 2534     if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
 2535 
 2536     RM_ZsetRangeStop(key);
 2537     key->ztype = REDISMODULE_ZSET_RANGE_SCORE;
 2538     key->zer = 0;
 2539 
 2540     /* Setup the range structure used by the sorted set core implementation
 2541      * in order to seek at the specified element. */
 2542     zrangespec *zrs = &key->zrs;
 2543     zrs->min = min;
 2544     zrs->max = max;
 2545     zrs->minex = minex;
 2546     zrs->maxex = maxex;
 2547 
 2548     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
 2549         key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) :
 2550                                 zzlLastInRange(key->value->ptr,zrs);
 2551     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
 2552         zset *zs = key->value->ptr;
 2553         zskiplist *zsl = zs->zsl;
 2554         key->zcurrent = first ? zslFirstInRange(zsl,zrs) :
 2555                                 zslLastInRange(zsl,zrs);
 2556     } else {
 2557         serverPanic("Unsupported zset encoding");
 2558     }
 2559     if (key->zcurrent == NULL) key->zer = 1;
 2560     return REDISMODULE_OK;
 2561 }
 2562 
 2563 /* Setup a sorted set iterator seeking the first element in the specified
 2564  * range. Returns REDISMODULE_OK if the iterator was correctly initialized
 2565  * otherwise REDISMODULE_ERR is returned in the following conditions:
 2566  *
 2567  * 1. The value stored at key is not a sorted set or the key is empty.
 2568  *
 2569  * The range is specified according to the two double values 'min' and 'max'.
 2570  * Both can be infinite using the following two macros:
 2571  *
 2572  * REDISMODULE_POSITIVE_INFINITE for positive infinite value
 2573  * REDISMODULE_NEGATIVE_INFINITE for negative infinite value
 2574  *
 2575  * 'minex' and 'maxex' parameters, if true, respectively setup a range
 2576  * where the min and max value are exclusive (not included) instead of
 2577  * inclusive. */
 2578 int RM_ZsetFirstInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
 2579     return zsetInitScoreRange(key,min,max,minex,maxex,1);
 2580 }
 2581 
 2582 /* Exactly like RedisModule_ZsetFirstInScoreRange() but the last element of
 2583  * the range is selected for the start of the iteration instead. */
 2584 int RM_ZsetLastInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
 2585     return zsetInitScoreRange(key,min,max,minex,maxex,0);
 2586 }
 2587 
 2588 /* Helper function for RM_ZsetFirstInLexRange() and RM_ZsetLastInLexRange().
 2589  * Setup the sorted set iteration according to the specified lexicographical
 2590  * range (see the functions calling it for more info). If 'first' is true the
 2591  * first element in the range is used as a starting point for the iterator
 2592  * otherwise the last. Return REDISMODULE_OK on success otherwise
 2593  * REDISMODULE_ERR.
 2594  *
 2595  * Note that this function takes 'min' and 'max' in the same form of the
 2596  * Redis ZRANGEBYLEX command. */
 2597 int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max, int first) {
 2598     if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
 2599 
 2600     RM_ZsetRangeStop(key);
 2601     key->zer = 0;
 2602 
 2603     /* Setup the range structure used by the sorted set core implementation
 2604      * in order to seek at the specified element. */
 2605     zlexrangespec *zlrs = &key->zlrs;
 2606     if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR;
 2607 
 2608     /* Set the range type to lex only after successfully parsing the range,
 2609      * otherwise we don't want the zlexrangespec to be freed. */
 2610     key->ztype = REDISMODULE_ZSET_RANGE_LEX;
 2611 
 2612     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
 2613         key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) :
 2614                                 zzlLastInLexRange(key->value->ptr,zlrs);
 2615     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
 2616         zset *zs = key->value->ptr;
 2617         zskiplist *zsl = zs->zsl;
 2618         key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) :
 2619                                 zslLastInLexRange(zsl,zlrs);
 2620     } else {
 2621         serverPanic("Unsupported zset encoding");
 2622     }
 2623     if (key->zcurrent == NULL) key->zer = 1;
 2624 
 2625     return REDISMODULE_OK;
 2626 }
 2627 
 2628 /* Setup a sorted set iterator seeking the first element in the specified
 2629  * lexicographical range. Returns REDISMODULE_OK if the iterator was correctly
 2630  * initialized otherwise REDISMODULE_ERR is returned in the
 2631  * following conditions:
 2632  *
 2633  * 1. The value stored at key is not a sorted set or the key is empty.
 2634  * 2. The lexicographical range 'min' and 'max' format is invalid.
 2635  *
 2636  * 'min' and 'max' should be provided as two RedisModuleString objects
 2637  * in the same format as the parameters passed to the ZRANGEBYLEX command.
 2638  * The function does not take ownership of the objects, so they can be released
 2639  * ASAP after the iterator is setup. */
 2640 int RM_ZsetFirstInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
 2641     return zsetInitLexRange(key,min,max,1);
 2642 }
 2643 
 2644 /* Exactly like RedisModule_ZsetFirstInLexRange() but the last element of
 2645  * the range is selected for the start of the iteration instead. */
 2646 int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
 2647     return zsetInitLexRange(key,min,max,0);
 2648 }
 2649 
 2650 /* Return the current sorted set element of an active sorted set iterator
 2651  * or NULL if the range specified in the iterator does not include any
 2652  * element. */
 2653 RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) {
 2654     RedisModuleString *str;
 2655 
 2656     if (key->zcurrent == NULL) return NULL;
 2657     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
 2658         unsigned char *eptr, *sptr;
 2659         eptr = key->zcurrent;
 2660         sds ele = ziplistGetObject(eptr);
 2661         if (score) {
 2662             sptr = ziplistNext(key->value->ptr,eptr);
 2663             *score = zzlGetScore(sptr);
 2664         }
 2665         str = createObject(OBJ_STRING,ele);
 2666     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
 2667         zskiplistNode *ln = key->zcurrent;
 2668         if (score) *score = ln->score;
 2669         str = createStringObject(ln->ele,sdslen(ln->ele));
 2670     } else {
 2671         serverPanic("Unsupported zset encoding");
 2672     }
 2673     autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,str);
 2674     return str;
 2675 }
 2676 
 2677 /* Go to the next element of the sorted set iterator. Returns 1 if there was
 2678  * a next element, 0 if we are already at the latest element or the range
 2679  * does not include any item at all. */
 2680 int RM_ZsetRangeNext(RedisModuleKey *key) {
 2681     if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
 2682 
 2683     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
 2684         unsigned char *zl = key->value->ptr;
 2685         unsigned char *eptr = key->zcurrent;
 2686         unsigned char *next;
 2687         next = ziplistNext(zl,eptr); /* Skip element. */
 2688         if (next) next = ziplistNext(zl,next); /* Skip score. */
 2689         if (next == NULL) {
 2690             key->zer = 1;
 2691             return 0;
 2692         } else {
 2693             /* Are we still within the range? */
 2694             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
 2695                 /* Fetch the next element score for the
 2696                  * range check. */
 2697                 unsigned char *saved_next = next;
 2698                 next = ziplistNext(zl,next); /* Skip next element. */
 2699                 double score = zzlGetScore(next); /* Obtain the next score. */
 2700                 if (!zslValueLteMax(score,&key->zrs)) {
 2701                     key->zer = 1;
 2702                     return 0;
 2703                 }
 2704                 next = saved_next;
 2705             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
 2706                 if (!zzlLexValueLteMax(next,&key->zlrs)) {
 2707                     key->zer = 1;
 2708                     return 0;
 2709                 }
 2710             }
 2711             key->zcurrent = next;
 2712             return 1;
 2713         }
 2714     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
 2715         zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward;
 2716         if (next == NULL) {
 2717             key->zer = 1;
 2718             return 0;
 2719         } else {
 2720             /* Are we still within the range? */
 2721             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
 2722                 !zslValueLteMax(next->score,&key->zrs))
 2723             {
 2724                 key->zer = 1;
 2725                 return 0;
 2726             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
 2727                 if (!zslLexValueLteMax(next->ele,&key->zlrs)) {
 2728                     key->zer = 1;
 2729                     return 0;
 2730                 }
 2731             }
 2732             key->zcurrent = next;
 2733             return 1;
 2734         }
 2735     } else {
 2736         serverPanic("Unsupported zset encoding");
 2737     }
 2738 }
 2739 
 2740 /* Go to the previous element of the sorted set iterator. Returns 1 if there was
 2741  * a previous element, 0 if we are already at the first element or the range
 2742  * does not include any item at all. */
 2743 int RM_ZsetRangePrev(RedisModuleKey *key) {
 2744     if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
 2745 
 2746     if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
 2747         unsigned char *zl = key->value->ptr;
 2748         unsigned char *eptr = key->zcurrent;
 2749         unsigned char *prev;
 2750         prev = ziplistPrev(zl,eptr); /* Go back to previous score. */
 2751         if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */
 2752         if (prev == NULL) {
 2753             key->zer = 1;
 2754             return 0;
 2755         } else {
 2756             /* Are we still within the range? */
 2757             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
 2758                 /* Fetch the previous element score for the
 2759                  * range check. */
 2760                 unsigned char *saved_prev = prev;
 2761                 prev = ziplistNext(zl,prev); /* Skip element to get the score.*/
 2762                 double score = zzlGetScore(prev); /* Obtain the prev score. */
 2763                 if (!zslValueGteMin(score,&key->zrs)) {
 2764                     key->zer = 1;
 2765                     return 0;
 2766                 }
 2767                 prev = saved_prev;
 2768             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
 2769                 if (!zzlLexValueGteMin(prev,&key->zlrs)) {
 2770                     key->zer = 1;
 2771                     return 0;
 2772                 }
 2773             }
 2774             key->zcurrent = prev;
 2775             return 1;
 2776         }
 2777     } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
 2778         zskiplistNode *ln = key->zcurrent, *prev = ln->backward;
 2779         if (prev == NULL) {
 2780             key->zer = 1;
 2781             return 0;
 2782         } else {
 2783             /* Are we still within the range? */
 2784             if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
 2785                 !zslValueGteMin(prev->score,&key->zrs))
 2786             {
 2787                 key->zer = 1;
 2788                 return 0;
 2789             } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
 2790                 if (!zslLexValueGteMin(prev->ele,&key->zlrs)) {
 2791                     key->zer = 1;
 2792                     return 0;
 2793                 }
 2794             }
 2795             key->zcurrent = prev;
 2796             return 1;
 2797         }
 2798     } else {
 2799         serverPanic("Unsupported zset encoding");
 2800     }
 2801 }
 2802 
 2803 /* --------------------------------------------------------------------------
 2804  * Key API for Hash type
 2805  * -------------------------------------------------------------------------- */
 2806 
 2807 /* Set the field of the specified hash field to the specified value.
 2808  * If the key is an empty key open for writing, it is created with an empty
 2809  * hash value, in order to set the specified field.
 2810  *
 2811  * The function is variadic and the user must specify pairs of field
 2812  * names and values, both as RedisModuleString pointers (unless the
 2813  * CFIELD option is set, see later). At the end of the field/value-ptr pairs, 
 2814  * NULL must be specified as last argument to signal the end of the arguments 
 2815  * in the variadic function.
 2816  *
 2817  * Example to set the hash argv[1] to the value argv[2]:
 2818  *
 2819  *      RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],argv[2],NULL);
 2820  *
 2821  * The function can also be used in order to delete fields (if they exist)
 2822  * by setting them to the specified value of REDISMODULE_HASH_DELETE:
 2823  *
 2824  *      RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],
 2825  *                          REDISMODULE_HASH_DELETE,NULL);
 2826  *
 2827  * The behavior of the command changes with the specified flags, that can be
 2828  * set to REDISMODULE_HASH_NONE if no special behavior is needed.
 2829  *
 2830  *     REDISMODULE_HASH_NX: The operation is performed only if the field was not
 2831  *                          already existing in the hash.
 2832  *     REDISMODULE_HASH_XX: The operation is performed only if the field was
 2833  *                          already existing, so that a new value could be
 2834  *                          associated to an existing filed, but no new fields
 2835  *                          are created.
 2836  *     REDISMODULE_HASH_CFIELDS: The field names passed are null terminated C
 2837  *                               strings instead of RedisModuleString objects.
 2838  *
 2839  * Unless NX is specified, the command overwrites the old field value with
 2840  * the new one.
 2841  *
 2842  * When using REDISMODULE_HASH_CFIELDS, field names are reported using
 2843  * normal C strings, so for example to delete the field "foo" the following
 2844  * code can be used:
 2845  *
 2846  *      RedisModule_HashSet(key,REDISMODULE_HASH_CFIELDS,"foo",
 2847  *                          REDISMODULE_HASH_DELETE,NULL);
 2848  *
 2849  * Return value:
 2850  *
 2851  * The number of fields updated (that may be less than the number of fields
 2852  * specified because of the XX or NX options).
 2853  *
 2854  * In the following case the return value is always zero:
 2855  *
 2856  * * The key was not open for writing.
 2857  * * The key was associated with a non Hash value.
 2858  */
 2859 int RM_HashSet(RedisModuleKey *key, int flags, ...) {
 2860     va_list ap;
 2861     if (!(key->mode & REDISMODULE_WRITE)) return 0;
 2862     if (key->value && key->value->type != OBJ_HASH) return 0;
 2863     if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_HASH);
 2864 
 2865     int updated = 0;
 2866     va_start(ap, flags);
 2867     while(1) {
 2868         RedisModuleString *field, *value;
 2869         /* Get the field and value objects. */
 2870         if (flags & REDISMODULE_HASH_CFIELDS) {
 2871             char *cfield = va_arg(ap,char*);
 2872             if (cfield == NULL) break;
 2873             field = createRawStringObject(cfield,strlen(cfield));
 2874         } else {
 2875             field = va_arg(ap,RedisModuleString*);
 2876             if (field == NULL) break;
 2877         }
 2878         value = va_arg(ap,RedisModuleString*);
 2879 
 2880         /* Handle XX and NX */
 2881         if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
 2882             int exists = hashTypeExists(key->value, field->ptr);
 2883             if (((flags & REDISMODULE_HASH_XX) && !exists) ||
 2884                 ((flags & REDISMODULE_HASH_NX) && exists))
 2885             {
 2886                 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
 2887                 continue;
 2888             }
 2889         }
 2890 
 2891         /* Handle deletion if value is REDISMODULE_HASH_DELETE. */
 2892         if (value == REDISMODULE_HASH_DELETE) {
 2893             updated += hashTypeDelete(key->value, field->ptr);
 2894             if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
 2895             continue;
 2896         }
 2897 
 2898         int low_flags = HASH_SET_COPY;
 2899         /* If CFIELDS is active, we can pass the ownership of the
 2900          * SDS object to the low level function that sets the field
 2901          * to avoid a useless copy. */
 2902         if (flags & REDISMODULE_HASH_CFIELDS)
 2903             low_flags |= HASH_SET_TAKE_FIELD;
 2904 
 2905         robj *argv[2] = {field,value};
 2906         hashTypeTryConversion(key->value,argv,0,1);
 2907         updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
 2908 
 2909         /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
 2910          * however we still have to release the 'field' object shell. */
 2911         if (flags & REDISMODULE_HASH_CFIELDS) {
 2912            field->ptr = NULL; /* Prevent the SDS string from being freed. */
 2913            decrRefCount(field);
 2914         }
 2915     }
 2916     va_end(ap);
 2917     moduleDelKeyIfEmpty(key);
 2918     return updated;
 2919 }
 2920 
 2921 /* Get fields from an hash value. This function is called using a variable
 2922  * number of arguments, alternating a field name (as a StringRedisModule
 2923  * pointer) with a pointer to a StringRedisModule pointer, that is set to the
 2924  * value of the field if the field exist, or NULL if the field did not exist.
 2925  * At the end of the field/value-ptr pairs, NULL must be specified as last
 2926  * argument to signal the end of the arguments in the variadic function.
 2927  *
 2928  * This is an example usage:
 2929  *
 2930  *      RedisModuleString *first, *second;
 2931  *      RedisModule_HashGet(mykey,REDISMODULE_HASH_NONE,argv[1],&first,
 2932  *                      argv[2],&second,NULL);
 2933  *
 2934  * As with RedisModule_HashSet() the behavior of the command can be specified
 2935  * passing flags different than REDISMODULE_HASH_NONE:
 2936  *
 2937  * REDISMODULE_HASH_CFIELD: field names as null terminated C strings.
 2938  *
 2939  * REDISMODULE_HASH_EXISTS: instead of setting the value of the field
 2940  * expecting a RedisModuleString pointer to pointer, the function just
 2941  * reports if the field exists or not and expects an integer pointer
 2942  * as the second element of each pair.
 2943  *
 2944  * Example of REDISMODULE_HASH_CFIELD:
 2945  *
 2946  *      RedisModuleString *username, *hashedpass;
 2947  *      RedisModule_HashGet(mykey,"username",&username,"hp",&hashedpass, NULL);
 2948  *
 2949  * Example of REDISMODULE_HASH_EXISTS:
 2950  *
 2951  *      int exists;
 2952  *      RedisModule_HashGet(mykey,argv[1],&exists,NULL);
 2953  *
 2954  * The function returns REDISMODULE_OK on success and REDISMODULE_ERR if
 2955  * the key is not an hash value.
 2956  *
 2957  * Memory management:
 2958  *
 2959  * The returned RedisModuleString objects should be released with
 2960  * RedisModule_FreeString(), or by enabling automatic memory management.
 2961  */
 2962 int RM_HashGet(RedisModuleKey *key, int flags, ...) {
 2963     va_list ap;
 2964     if (key->value && key->value->type != OBJ_HASH) return REDISMODULE_ERR;
 2965 
 2966     va_start(ap, flags);
 2967     while(1) {
 2968         RedisModuleString *field, **valueptr;
 2969         int *existsptr;
 2970         /* Get the field object and the value pointer to pointer. */
 2971         if (flags & REDISMODULE_HASH_CFIELDS) {
 2972             char *cfield = va_arg(ap,char*);
 2973             if (cfield == NULL) break;
 2974             field = createRawStringObject(cfield,strlen(cfield));
 2975         } else {
 2976             field = va_arg(ap,RedisModuleString*);
 2977             if (field == NULL) break;
 2978         }
 2979 
 2980         /* Query the hash for existence or value object. */
 2981         if (flags & REDISMODULE_HASH_EXISTS) {
 2982             existsptr = va_arg(ap,int*);
 2983             if (key->value)
 2984                 *existsptr = hashTypeExists(key->value,field->ptr);
 2985             else
 2986                 *existsptr = 0;
 2987         } else {
 2988             valueptr = va_arg(ap,RedisModuleString**);
 2989             if (key->value) {
 2990                 *valueptr = hashTypeGetValueObject(key->value,field->ptr);
 2991                 if (*valueptr) {
 2992                     robj *decoded = getDecodedObject(*valueptr);
 2993                     decrRefCount(*valueptr);
 2994                     *valueptr = decoded;
 2995                 }
 2996                 if (*valueptr)
 2997                     autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,*valueptr);
 2998             } else {
 2999                 *valueptr = NULL;
 3000             }
 3001         }
 3002 
 3003         /* Cleanup */
 3004         if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
 3005     }
 3006     va_end(ap);
 3007     return REDISMODULE_OK;
 3008 }
 3009 
 3010 /* --------------------------------------------------------------------------
 3011  * Redis <-> Modules generic Call() API
 3012  * -------------------------------------------------------------------------- */
 3013 
 3014 /* Create a new RedisModuleCallReply object. The processing of the reply
 3015  * is lazy, the object is just populated with the raw protocol and later
 3016  * is processed as needed. Initially we just make sure to set the right
 3017  * reply type, which is extremely cheap to do. */
 3018 RedisModuleCallReply *moduleCreateCallReplyFromProto(RedisModuleCtx *ctx, sds proto) {
 3019     RedisModuleCallReply *reply = zmalloc(sizeof(*reply));
 3020     reply->ctx = ctx;
 3021     reply->proto = proto;
 3022     reply->protolen = sdslen(proto);
 3023     reply->flags = REDISMODULE_REPLYFLAG_TOPARSE; /* Lazy parsing. */
 3024     switch(proto[0]) {
 3025     case '$':
 3026     case '+': reply->type = REDISMODULE_REPLY_STRING; break;
 3027     case '-': reply->type = REDISMODULE_REPLY_ERROR; break;
 3028     case ':': reply->type = REDISMODULE_REPLY_INTEGER; break;
 3029     case '*': reply->type = REDISMODULE_REPLY_ARRAY; break;
 3030     default: reply->type = REDISMODULE_REPLY_UNKNOWN; break;
 3031     }
 3032     if ((proto[0] == '*' || proto[0] == '$') && proto[1] == '-')
 3033         reply->type = REDISMODULE_REPLY_NULL;
 3034     return reply;
 3035 }
 3036 
 3037 void moduleParseCallReply_Int(RedisModuleCallReply *reply);
 3038 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply);
 3039 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply);
 3040 void moduleParseCallReply_Array(RedisModuleCallReply *reply);
 3041 
 3042 /* Do nothing if REDISMODULE_REPLYFLAG_TOPARSE is false, otherwise
 3043  * use the protcol of the reply in reply->proto in order to fill the
 3044  * reply with parsed data according to the reply type. */
 3045 void moduleParseCallReply(RedisModuleCallReply *reply) {
 3046     if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) return;
 3047     reply->flags &= ~REDISMODULE_REPLYFLAG_TOPARSE;
 3048 
 3049     switch(reply->proto[0]) {
 3050     case ':': moduleParseCallReply_Int(reply); break;
 3051     case '$': moduleParseCallReply_BulkString(reply); break;
 3052     case '-': /* handled by next item. */
 3053     case '+': moduleParseCallReply_SimpleString(reply); break;
 3054     case '*': moduleParseCallReply_Array(reply); break;
 3055     }
 3056 }
 3057 
 3058 void moduleParseCallReply_Int(RedisModuleCallReply *reply) {
 3059     char *proto = reply->proto;
 3060     char *p = strchr(proto+1,'\r');
 3061 
 3062     string2ll(proto+1,p-proto-1,&reply->val.ll);
 3063     reply->protolen = p-proto+2;
 3064     reply->type = REDISMODULE_REPLY_INTEGER;
 3065 }
 3066 
 3067 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply) {
 3068     char *proto = reply->proto;
 3069     char *p = strchr(proto+1,'\r');
 3070     long long bulklen;
 3071 
 3072     string2ll(proto+1,p-proto-1,&bulklen);
 3073     if (bulklen == -1) {
 3074         reply->protolen = p-proto+2;
 3075         reply->type = REDISMODULE_REPLY_NULL;
 3076     } else {
 3077         reply->val.str = p+2;
 3078         reply->len = bulklen;
 3079         reply->protolen = p-proto+2+bulklen+2;
 3080         reply->type = REDISMODULE_REPLY_STRING;
 3081     }
 3082 }
 3083 
 3084 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply) {
 3085     char *proto = reply->proto;
 3086     char *p = strchr(proto+1,'\r');
 3087 
 3088     reply->val.str = proto+1;
 3089     reply->len = p-proto-1;
 3090     reply->protolen = p-proto+2;
 3091     reply->type = proto[0] == '+' ? REDISMODULE_REPLY_STRING :
 3092                                     REDISMODULE_REPLY_ERROR;
 3093 }
 3094 
 3095 void moduleParseCallReply_Array(RedisModuleCallReply *reply) {
 3096     char *proto = reply->proto;
 3097     char *p = strchr(proto+1,'\r');
 3098     long long arraylen, j;
 3099 
 3100     string2ll(proto+1,p-proto-1,&arraylen);
 3101     p += 2;
 3102 
 3103     if (arraylen == -1) {
 3104         reply->protolen = p-proto;
 3105         reply->type = REDISMODULE_REPLY_NULL;
 3106         return;
 3107     }
 3108 
 3109     reply->val.array = zmalloc(sizeof(RedisModuleCallReply)*arraylen);
 3110     reply->len = arraylen;
 3111     for (j = 0; j < arraylen; j++) {
 3112         RedisModuleCallReply *ele = reply->val.array+j;
 3113         ele->flags = REDISMODULE_REPLYFLAG_NESTED |
 3114                      REDISMODULE_REPLYFLAG_TOPARSE;
 3115         ele->proto = p;
 3116         ele->ctx = reply->ctx;
 3117         moduleParseCallReply(ele);
 3118         p += ele->protolen;
 3119     }
 3120     reply->protolen = p-proto;
 3121     reply->type = REDISMODULE_REPLY_ARRAY;
 3122 }
 3123 
 3124 /* Free a Call reply and all the nested replies it contains if it's an
 3125  * array. */
 3126 void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){
 3127     /* Don't free nested replies by default: the user must always free the
 3128      * toplevel reply. However be gentle and don't crash if the module
 3129      * misuses the API. */
 3130     if (!freenested && reply->flags & REDISMODULE_REPLYFLAG_NESTED) return;
 3131 
 3132     if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) {
 3133         if (reply->type == REDISMODULE_REPLY_ARRAY) {
 3134             size_t j;
 3135             for (j = 0; j < reply->len; j++)
 3136                 RM_FreeCallReply_Rec(reply->val.array+j,1);
 3137             zfree(reply->val.array);
 3138         }
 3139     }
 3140 
 3141     /* For nested replies, we don't free reply->proto (which if not NULL
 3142      * references the parent reply->proto buffer), nor the structure
 3143      * itself which is allocated as an array of structures, and is freed
 3144      * when the array value is released. */
 3145     if (!(reply->flags & REDISMODULE_REPLYFLAG_NESTED)) {
 3146         if (reply->proto) sdsfree(reply->proto);
 3147         zfree(reply);
 3148     }
 3149 }
 3150 
 3151 /* Wrapper for the recursive free reply function. This is needed in order
 3152  * to have the first level function to return on nested replies, but only
 3153  * if called by the module API. */
 3154 void RM_FreeCallReply(RedisModuleCallReply *reply) {
 3155 
 3156     RedisModuleCtx *ctx = reply->ctx;
 3157     RM_FreeCallReply_Rec(reply,0);
 3158     autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
 3159 }
 3160 
 3161 /* Return the reply type. */
 3162 int RM_CallReplyType(RedisModuleCallReply *reply) {
 3163     if (!reply) return REDISMODULE_REPLY_UNKNOWN;
 3164     return reply->type;
 3165 }
 3166 
 3167 /* Return the reply type length, where applicable. */
 3168 size_t RM_CallReplyLength(RedisModuleCallReply *reply) {
 3169     moduleParseCallReply(reply);
 3170     switch(reply->type) {
 3171     case REDISMODULE_REPLY_STRING:
 3172     case REDISMODULE_REPLY_ERROR:
 3173     case REDISMODULE_REPLY_ARRAY:
 3174         return reply->len;
 3175     default:
 3176         return 0;
 3177     }
 3178 }
 3179 
 3180 /* Return the 'idx'-th nested call reply element of an array reply, or NULL
 3181  * if the reply type is wrong or the index is out of range. */
 3182 RedisModuleCallReply *RM_CallReplyArrayElement(RedisModuleCallReply *reply, size_t idx) {
 3183     moduleParseCallReply(reply);
 3184     if (reply->type != REDISMODULE_REPLY_ARRAY) return NULL;
 3185     if (idx >= reply->len) return NULL;
 3186     return reply->val.array+idx;
 3187 }
 3188 
 3189 /* Return the long long of an integer reply. */
 3190 long long RM_CallReplyInteger(RedisModuleCallReply *reply) {
 3191     moduleParseCallReply(reply);
 3192     if (reply->type != REDISMODULE_REPLY_INTEGER) return LLONG_MIN;
 3193     return reply->val.ll;
 3194 }
 3195 
 3196 /* Return the pointer and length of a string or error reply. */
 3197 const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
 3198     moduleParseCallReply(reply);
 3199     if (reply->type != REDISMODULE_REPLY_STRING &&
 3200         reply->type != REDISMODULE_REPLY_ERROR) return NULL;
 3201     if (len) *len = reply->len;
 3202     return reply->val.str;
 3203 }
 3204 
 3205 /* Return a new string object from a call reply of type string, error or
 3206  * integer. Otherwise (wrong reply type) return NULL. */
 3207 RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) {
 3208     moduleParseCallReply(reply);
 3209     switch(reply->type) {
 3210     case REDISMODULE_REPLY_STRING:
 3211     case REDISMODULE_REPLY_ERROR:
 3212         return RM_CreateString(reply->ctx,reply->val.str,reply->len);
 3213     case REDISMODULE_REPLY_INTEGER: {
 3214         char buf[64];
 3215         int len = ll2string(buf,sizeof(buf),reply->val.ll);
 3216         return RM_CreateString(reply->ctx,buf,len);
 3217         }
 3218     default: return NULL;
 3219     }
 3220 }
 3221 
 3222 /* Returns an array of robj pointers, and populates *argc with the number
 3223  * of items, by parsing the format specifier "fmt" as described for
 3224  * the RM_Call(), RM_Replicate() and other module APIs.
 3225  *
 3226  * The integer pointed by 'flags' is populated with flags according
 3227  * to special modifiers in "fmt". For now only one exists:
 3228  *
 3229  *     "!" -> REDISMODULE_ARGV_REPLICATE
 3230  *     "A" -> REDISMODULE_ARGV_NO_AOF
 3231  *     "R" -> REDISMODULE_ARGV_NO_REPLICAS
 3232  *
 3233  * On error (format specifier error) NULL is returned and nothing is
 3234  * allocated. On success the argument vector is returned. */
 3235 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) {
 3236     int argc = 0, argv_size, j;
 3237     robj **argv = NULL;
 3238 
 3239     /* As a first guess to avoid useless reallocations, size argv to
 3240      * hold one argument for each char specifier in 'fmt'. */
 3241     argv_size = strlen(fmt)+1; /* +1 because of the command name. */
 3242     argv = zrealloc(argv,sizeof(robj*)*argv_size);
 3243 
 3244     /* Build the arguments vector based on the format specifier. */
 3245     argv[0] = createStringObject(cmdname,strlen(cmdname));
 3246     argc++;
 3247 
 3248     /* Create the client and dispatch the command. */
 3249     const char *p = fmt;
 3250     while(*p) {
 3251         if (*p == 'c') {
 3252             char *cstr = va_arg(ap,char*);
 3253             argv[argc++] = createStringObject(cstr,strlen(cstr));
 3254         } else if (*p == 's') {
 3255             robj *obj = va_arg(ap,void*);
 3256             if (obj->refcount == OBJ_STATIC_REFCOUNT)
 3257                 obj = createStringObject(obj->ptr,sdslen(obj->ptr));
 3258             else
 3259                 incrRefCount(obj);
 3260             argv[argc++] = obj;
 3261         } else if (*p == 'b') {
 3262             char *buf = va_arg(ap,char*);
 3263             size_t len = va_arg(ap,size_t);
 3264             argv[argc++] = createStringObject(buf,len);
 3265         } else if (*p == 'l') {
 3266             long long ll = va_arg(ap,long long);
 3267             argv[argc++] = createObject(OBJ_STRING,sdsfromlonglong(ll));
 3268         } else if (*p == 'v') {
 3269              /* A vector of strings */
 3270              robj **v = va_arg(ap, void*);
 3271              size_t vlen = va_arg(ap, size_t);
 3272 
 3273              /* We need to grow argv to hold the vector's elements.
 3274               * We resize by vector_len-1 elements, because we held
 3275               * one element in argv for the vector already */
 3276              argv_size += vlen-1;
 3277              argv = zrealloc(argv,sizeof(robj*)*argv_size);
 3278 
 3279              size_t i = 0;
 3280              for (i = 0; i < vlen; i++) {
 3281                  incrRefCount(v[i]);
 3282                  argv[argc++] = v[i];
 3283              }
 3284         } else if (*p == '!') {
 3285             if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE;
 3286         } else if (*p == 'A') {
 3287             if (flags) (*flags) |= REDISMODULE_ARGV_NO_AOF;
 3288         } else if (*p == 'R') {
 3289             if (flags) (*flags) |= REDISMODULE_ARGV_NO_REPLICAS;
 3290         } else {
 3291             goto fmterr;
 3292         }
 3293         p++;
 3294     }
 3295     *argcp = argc;
 3296     return argv;
 3297 
 3298 fmterr:
 3299     for (j = 0; j < argc; j++)
 3300         decrRefCount(argv[j]);
 3301     zfree(argv);
 3302     return NULL;
 3303 }
 3304 
 3305 /* Exported API to call any Redis command from modules.
 3306  *
 3307  * * **cmdname**: The Redis command to call.
 3308  * * **fmt**: A format specifier string for the command's arguments. Each
 3309  *   of the arguments should be specified by a valid type specification:
 3310  *   b    The argument is a buffer and is immediately followed by another
 3311  *        argument that is the buffer's length.
 3312  *   c    The argument is a pointer to a plain C string (null-terminated).
 3313  *   l    The argument is long long integer.
 3314  *   s    The argument is a RedisModuleString.
 3315  *   v    The argument(s) is a vector of RedisModuleString.
 3316  *
 3317  *   The format specifier can also include modifiers:
 3318  *   !    Sends the Redis command and its arguments to replicas and AOF.
 3319  *   A    Suppress AOF propagation, send only to replicas (requires `!`).
 3320  *   R    Suppress replicas propagation, send only to AOF (requires `!`).
 3321  * * **...**: The actual arguments to the Redis command.
 3322  *
 3323  * On success a RedisModuleCallReply object is returned, otherwise
 3324  * NULL is returned and errno is set to the following values:
 3325  *
 3326  * EBADF: wrong format specifier.
 3327  * EINVAL: wrong command arity.
 3328  * ENOENT: command does not exist.
 3329  * EPERM:  operation in Cluster instance with key in non local slot.
 3330  * EROFS:  operation in Cluster instance when a write command is sent
 3331  *         in a readonly state.
 3332  * ENETDOWN: operation in Cluster instance when cluster is down.
 3333  *
 3334  * Example code fragment:
 3335  * 
 3336  *      reply = RedisModule_Call(ctx,"INCRBY","sc",argv[1],"10");
 3337  *      if (RedisModule_CallReplyType(reply) == REDISMODULE_REPLY_INTEGER) {
 3338  *        long long myval = RedisModule_CallReplyInteger(reply);
 3339  *        // Do something with myval.
 3340  *      }
 3341  *
 3342  * This API is documented here: https://redis.io/topics/modules-intro
 3343  */
 3344 RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
 3345     struct redisCommand *cmd;
 3346     client *c = NULL;
 3347     robj **argv = NULL;
 3348     int argc = 0, flags = 0;
 3349     va_list ap;
 3350     RedisModuleCallReply *reply = NULL;
 3351     int replicate = 0; /* Replicate this command? */
 3352 
 3353     /* Create the client and dispatch the command. */
 3354     va_start(ap, fmt);
 3355     c = createClient(NULL);
 3356     c->user = NULL; /* Root user. */
 3357     argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
 3358     replicate = flags & REDISMODULE_ARGV_REPLICATE;
 3359     va_end(ap);
 3360 
 3361     /* Setup our fake client for command execution. */
 3362     c->flags |= CLIENT_MODULE;
 3363     c->db = ctx->client->db;
 3364     c->argv = argv;
 3365     c->argc = argc;
 3366     if (ctx->module) ctx->module->in_call++;
 3367 
 3368     /* We handle the above format error only when the client is setup so that
 3369      * we can free it normally. */
 3370     if (argv == NULL) {
 3371         errno = EBADF;
 3372         goto cleanup;
 3373     }
 3374 
 3375     /* Call command filters */
 3376     moduleCallCommandFilters(c);
 3377 
 3378     /* Lookup command now, after filters had a chance to make modifications
 3379      * if necessary.
 3380      */
 3381     cmd = lookupCommand(c->argv[0]->ptr);
 3382     if (!cmd) {
 3383         errno = ENOENT;
 3384         goto cleanup;
 3385     }
 3386     c->cmd = c->lastcmd = cmd;
 3387 
 3388     /* Basic arity checks. */
 3389     if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
 3390         errno = EINVAL;
 3391         goto cleanup;
 3392     }
 3393 
 3394     /* If this is a Redis Cluster node, we need to make sure the module is not
 3395      * trying to access non-local keys, with the exception of commands
 3396      * received from our master. */
 3397     if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
 3398         int error_code;
 3399         /* Duplicate relevant flags in the module client. */
 3400         c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
 3401         c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
 3402         if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) !=
 3403                            server.cluster->myself)
 3404         {
 3405             if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { 
 3406                 errno = EROFS;
 3407             } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { 
 3408                 errno = ENETDOWN;
 3409             } else {
 3410                 errno = EPERM;
 3411             }
 3412             goto cleanup;
 3413         }
 3414     }
 3415 
 3416     /* If we are using single commands replication, we need to wrap what
 3417      * we propagate into a MULTI/EXEC block, so that it will be atomic like
 3418      * a Lua script in the context of AOF and slaves. */
 3419     if (replicate) moduleReplicateMultiIfNeeded(ctx);
 3420 
 3421     /* Run the command */
 3422     int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP;
 3423     if (replicate) {
 3424         if (!(flags & REDISMODULE_ARGV_NO_AOF))
 3425             call_flags |= CMD_CALL_PROPAGATE_AOF;
 3426         if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
 3427             call_flags |= CMD_CALL_PROPAGATE_REPL;
 3428     }
 3429     call(c,call_flags);
 3430 
 3431     /* Convert the result of the Redis command into a module reply. */
 3432     sds proto = sdsnewlen(c->buf,c->bufpos);
 3433     c->bufpos = 0;
 3434     while(listLength(c->reply)) {
 3435         clientReplyBlock *o = listNodeValue(listFirst(c->reply));
 3436 
 3437         proto = sdscatlen(proto,o->buf,o->used);
 3438         listDelNode(c->reply,listFirst(c->reply));
 3439     }
 3440     reply = moduleCreateCallReplyFromProto(ctx,proto);
 3441     autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
 3442 
 3443 cleanup:
 3444     if (ctx->module) ctx->module->in_call--;
 3445     freeClient(c);
 3446     return reply;
 3447 }
 3448 
 3449 /* Return a pointer, and a length, to the protocol returned by the command
 3450  * that returned the reply object. */
 3451 const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) {
 3452     if (reply->proto) *len = sdslen(reply->proto);
 3453     return reply->proto;
 3454 }
 3455 
 3456 /* --------------------------------------------------------------------------
 3457  * Modules data types
 3458  *
 3459  * When String DMA or using existing data structures is not enough, it is
 3460  * possible to create new data types from scratch and export them to
 3461  * Redis. The module must provide a set of callbacks for handling the
 3462  * new values exported (for example in order to provide RDB saving/loading,
 3463  * AOF rewrite, and so forth). In this section we define this API.
 3464  * -------------------------------------------------------------------------- */
 3465 
 3466 /* Turn a 9 chars name in the specified charset and a 10 bit encver into
 3467  * a single 64 bit unsigned integer that represents this exact module name
 3468  * and version. This final number is called a "type ID" and is used when
 3469  * writing module exported values to RDB files, in order to re-associate the
 3470  * value to the right module to load them during RDB loading.
 3471  *
 3472  * If the string is not of the right length or the charset is wrong, or
 3473  * if encver is outside the unsigned 10 bit integer range, 0 is returned,
 3474  * otherwise the function returns the right type ID.
 3475  *
 3476  * The resulting 64 bit integer is composed as follows:
 3477  *
 3478  *     (high order bits) 6|6|6|6|6|6|6|6|6|10 (low order bits)
 3479  *
 3480  * The first 6 bits value is the first character, name[0], while the last
 3481  * 6 bits value, immediately before the 10 bits integer, is name[8].
 3482  * The last 10 bits are the encoding version.
 3483  *
 3484  * Note that a name and encver combo of "AAAAAAAAA" and 0, will produce
 3485  * zero as return value, that is the same we use to signal errors, thus
 3486  * this combination is invalid, and also useless since type names should
 3487  * try to be vary to avoid collisions. */
 3488 
 3489 const char *ModuleTypeNameCharSet =
 3490              "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
 3491              "abcdefghijklmnopqrstuvwxyz"
 3492              "0123456789-_";
 3493 
 3494 uint64_t moduleTypeEncodeId(const char *name, int encver) {
 3495     /* We use 64 symbols so that we can map each character into 6 bits
 3496      * of the final output. */
 3497     const char *cset = ModuleTypeNameCharSet;
 3498     if (strlen(name) != 9) return 0;
 3499     if (encver < 0 || encver > 1023) return 0;
 3500 
 3501     uint64_t id = 0;
 3502     for (int j = 0; j < 9; j++) {
 3503         char *p = strchr(cset,name[j]);
 3504         if (!p) return 0;
 3505         unsigned long pos = p-cset;
 3506         id = (id << 6) | pos;
 3507     }
 3508     id = (id << 10) | encver;
 3509     return id;
 3510 }
 3511 
 3512 /* Search, in the list of exported data types of all the modules registered,
 3513  * a type with the same name as the one given. Returns the moduleType
 3514  * structure pointer if such a module is found, or NULL otherwise. */
 3515 moduleType *moduleTypeLookupModuleByName(const char *name) {
 3516     dictIterator *di = dictGetIterator(modules);
 3517     dictEntry *de;
 3518 
 3519     while ((de = dictNext(di)) != NULL) {
 3520         struct RedisModule *module = dictGetVal(de);
 3521         listIter li;
 3522         listNode *ln;
 3523 
 3524         listRewind(module->types,&li);
 3525         while((ln = listNext(&li))) {
 3526             moduleType *mt = ln->value;
 3527             if (memcmp(name,mt->name,sizeof(mt->name)) == 0) {
 3528                 dictReleaseIterator(di);
 3529                 return mt;
 3530             }
 3531         }
 3532     }
 3533     dictReleaseIterator(di);
 3534     return NULL;
 3535 }
 3536 
 3537 /* Lookup a module by ID, with caching. This function is used during RDB
 3538  * loading. Modules exporting data types should never be able to unload, so
 3539  * our cache does not need to expire. */
 3540 #define MODULE_LOOKUP_CACHE_SIZE 3
 3541 
 3542 moduleType *moduleTypeLookupModuleByID(uint64_t id) {
 3543     static struct {
 3544         uint64_t id;
 3545         moduleType *mt;
 3546     } cache[MODULE_LOOKUP_CACHE_SIZE];
 3547 
 3548     /* Search in cache to start. */
 3549     int j;
 3550     for (j = 0; j < MODULE_LOOKUP_CACHE_SIZE && cache[j].mt != NULL; j++)
 3551         if (cache[j].id == id) return cache[j].mt;
 3552 
 3553     /* Slow module by module lookup. */
 3554     moduleType *mt = NULL;
 3555     dictIterator *di = dictGetIterator(modules);
 3556     dictEntry *de;
 3557 
 3558     while ((de = dictNext(di)) != NULL && mt == NULL) {
 3559         struct RedisModule *module = dictGetVal(de);
 3560         listIter li;
 3561         listNode *ln;
 3562 
 3563         listRewind(module->types,&li);
 3564         while((ln = listNext(&li))) {
 3565             moduleType *this_mt = ln->value;
 3566             /* Compare only the 54 bit module identifier and not the
 3567              * encoding version. */
 3568             if (this_mt->id >> 10 == id >> 10) {
 3569                 mt = this_mt;
 3570                 break;
 3571             }
 3572         }
 3573     }
 3574     dictReleaseIterator(di);
 3575 
 3576     /* Add to cache if possible. */
 3577     if (mt && j < MODULE_LOOKUP_CACHE_SIZE) {
 3578         cache[j].id = id;
 3579         cache[j].mt = mt;
 3580     }
 3581     return mt;
 3582 }
 3583 
 3584 /* Turn an (unresolved) module ID into a type name, to show the user an
 3585  * error when RDB files contain module data we can't load.
 3586  * The buffer pointed by 'name' must be 10 bytes at least. The function will
 3587  * fill it with a null terminated module name. */
 3588 void moduleTypeNameByID(char *name, uint64_t moduleid) {
 3589     const char *cset = ModuleTypeNameCharSet;
 3590 
 3591     name[9] = '\0';
 3592     char *p = name+8;
 3593     moduleid >>= 10;
 3594     for (int j = 0; j < 9; j++) {
 3595         *p-- = cset[moduleid & 63];
 3596         moduleid >>= 6;
 3597     }
 3598 }
 3599 
 3600 /* Register a new data type exported by the module. The parameters are the
 3601  * following. Please for in depth documentation check the modules API
 3602  * documentation, especially the TYPES.md file.
 3603  *
 3604  * * **name**: A 9 characters data type name that MUST be unique in the Redis
 3605  *   Modules ecosystem. Be creative... and there will be no collisions. Use
 3606  *   the charset A-Z a-z 9-0, plus the two "-_" characters. A good
 3607  *   idea is to use, for example `<typename>-<vendor>`. For example
 3608  *   "tree-AntZ" may mean "Tree data structure by @antirez". To use both
 3609  *   lower case and upper case letters helps in order to prevent collisions.
 3610  * * **encver**: Encoding version, which is, the version of the serialization
 3611  *   that a module used in order to persist data. As long as the "name"
 3612  *   matches, the RDB loading will be dispatched to the type callbacks
 3613  *   whatever 'encver' is used, however the module can understand if
 3614  *   the encoding it must load are of an older version of the module.
 3615  *   For example the module "tree-AntZ" initially used encver=0. Later
 3616  *   after an upgrade, it started to serialize data in a different format
 3617  *   and to register the type with encver=1. However this module may
 3618  *   still load old data produced by an older version if the rdb_load
 3619  *   callback is able to check the encver value and act accordingly.
 3620  *   The encver must be a positive value between 0 and 1023.
 3621  * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure
 3622  *   that should be populated with the methods callbacks and structure
 3623  *   version, like in the following example:
 3624  *
 3625  *      RedisModuleTypeMethods tm = {
 3626  *          .version = REDISMODULE_TYPE_METHOD_VERSION,
 3627  *          .rdb_load = myType_RDBLoadCallBack,
 3628  *          .rdb_save = myType_RDBSaveCallBack,
 3629  *          .aof_rewrite = myType_AOFRewriteCallBack,
 3630  *          .free = myType_FreeCallBack,
 3631  *
 3632  *          // Optional fields
 3633  *          .digest = myType_DigestCallBack,
 3634  *          .mem_usage = myType_MemUsageCallBack,
 3635  *          .aux_load = myType_AuxRDBLoadCallBack,
 3636  *          .aux_save = myType_AuxRDBSaveCallBack,
 3637  *      }
 3638  *
 3639  * * **rdb_load**: A callback function pointer that loads data from RDB files.
 3640  * * **rdb_save**: A callback function pointer that saves data to RDB files.
 3641  * * **aof_rewrite**: A callback function pointer that rewrites data as commands.
 3642  * * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
 3643  * * **free**: A callback function pointer that can free a type value.
 3644  * * **aux_save**: A callback function pointer that saves out of keyspace data to RDB files.
 3645  *   'when' argument is either REDISMODULE_AUX_BEFORE_RDB or REDISMODULE_AUX_AFTER_RDB.
 3646  * * **aux_load**: A callback function pointer that loads out of keyspace data from RDB files.
 3647  *   Similar to aux_save, returns REDISMODULE_OK on success, and ERR otherwise.
 3648  *
 3649  * The **digest* and **mem_usage** methods should currently be omitted since
 3650  * they are not yet implemented inside the Redis modules core.
 3651  *
 3652  * Note: the module name "AAAAAAAAA" is reserved and produces an error, it
 3653  * happens to be pretty lame as well.
 3654  *
 3655  * If there is already a module registering a type with the same name,
 3656  * and if the module name or encver is invalid, NULL is returned.
 3657  * Otherwise the new type is registered into Redis, and a reference of
 3658  * type RedisModuleType is returned: the caller of the function should store
 3659  * this reference into a gobal variable to make future use of it in the
 3660  * modules type API, since a single module may register multiple types.
 3661  * Example code fragment:
 3662  *
 3663  *      static RedisModuleType *BalancedTreeType;
 3664  *
 3665  *      int RedisModule_OnLoad(RedisModuleCtx *ctx) {
 3666  *          // some code here ...
 3667  *          BalancedTreeType = RM_CreateDataType(...);
 3668  *      }
 3669  */
 3670 moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr) {
 3671     uint64_t id = moduleTypeEncodeId(name,encver);
 3672     if (id == 0) return NULL;
 3673     if (moduleTypeLookupModuleByName(name) != NULL) return NULL;
 3674 
 3675     long typemethods_version = ((long*)typemethods_ptr)[0];
 3676     if (typemethods_version == 0) return NULL;
 3677 
 3678     struct typemethods {
 3679         uint64_t version;
 3680         moduleTypeLoadFunc rdb_load;
 3681         moduleTypeSaveFunc rdb_save;
 3682         moduleTypeRewriteFunc aof_rewrite;
 3683         moduleTypeMemUsageFunc mem_usage;
 3684         moduleTypeDigestFunc digest;
 3685         moduleTypeFreeFunc free;
 3686         struct {
 3687             moduleTypeAuxLoadFunc aux_load;
 3688             moduleTypeAuxSaveFunc aux_save;
 3689             int aux_save_triggers;
 3690         } v2;
 3691     } *tms = (struct typemethods*) typemethods_ptr;
 3692 
 3693     moduleType *mt = zcalloc(sizeof(*mt));
 3694     mt->id = id;
 3695     mt->module = ctx->module;
 3696     mt->rdb_load = tms->rdb_load;
 3697     mt->rdb_save = tms->rdb_save;
 3698     mt->aof_rewrite = tms->aof_rewrite;
 3699     mt->mem_usage = tms->mem_usage;
 3700     mt->digest = tms->digest;
 3701     mt->free = tms->free;
 3702     if (tms->version >= 2) {
 3703         mt->aux_load = tms->v2.aux_load;
 3704         mt->aux_save = tms->v2.aux_save;
 3705         mt->aux_save_triggers = tms->v2.aux_save_triggers;
 3706     }
 3707     memcpy(mt->name,name,sizeof(mt->name));
 3708     listAddNodeTail(ctx->module->types,mt);
 3709     return mt;
 3710 }
 3711 
 3712 /* If the key is open for writing, set the specified module type object
 3713  * as the value of the key, deleting the old value if any.
 3714  * On success REDISMODULE_OK is returned. If the key is not open for
 3715  * writing or there is an active iterator, REDISMODULE_ERR is returned. */
 3716 int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
 3717     if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
 3718     RM_DeleteKey(key);
 3719     robj *o = createModuleObject(mt,value);
 3720     genericSetKey(key->ctx->client,key->db,key->key,o,0,0);
 3721     decrRefCount(o);
 3722     key->value = o;
 3723     return REDISMODULE_OK;
 3724 }
 3725 
 3726 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
 3727  * the key, returns the module type pointer of the value stored at key.
 3728  *
 3729  * If the key is NULL, is not associated with a module type, or is empty,
 3730  * then NULL is returned instead. */
 3731 moduleType *RM_ModuleTypeGetType(RedisModuleKey *key) {
 3732     if (key == NULL ||
 3733         key->value == NULL ||
 3734         RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
 3735     moduleValue *mv = key->value->ptr;
 3736     return mv->type;
 3737 }
 3738 
 3739 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
 3740  * the key, returns the module type low-level value stored at key, as
 3741  * it was set by the user via RedisModule_ModuleTypeSet().
 3742  *
 3743  * If the key is NULL, is not associated with a module type, or is empty,
 3744  * then NULL is returned instead. */
 3745 void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
 3746     if (key == NULL ||
 3747         key->value == NULL ||
 3748         RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
 3749     moduleValue *mv = key->value->ptr;
 3750     return mv->value;
 3751 }
 3752 
 3753 /* --------------------------------------------------------------------------
 3754  * RDB loading and saving functions
 3755  * -------------------------------------------------------------------------- */
 3756 
 3757 /* Called when there is a load error in the context of a module. On some
 3758  * modules this cannot be recovered, but if the module declared capability
 3759  * to handle errors, we'll raise a flag rather than exiting. */
 3760 void moduleRDBLoadError(RedisModuleIO *io) {
 3761     if (io->type->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) {
 3762         io->error = 1;
 3763         return;
 3764     }
 3765     serverPanic(
 3766         "Error loading data from RDB (short read or EOF). "
 3767         "Read performed by module '%s' about type '%s' "
 3768         "after reading '%llu' bytes of a value "
 3769         "for key named: '%s'.",
 3770         io->type->module->name,
 3771         io->type->name,
 3772         (unsigned long long)io->bytes,
 3773         io->key? (char*)io->key->ptr: "(null)");
 3774 }
 3775 
 3776 /* Returns 0 if there's at least one registered data type that did not declare
 3777  * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should
 3778  * be avoided since it could cause data loss. */
 3779 int moduleAllDatatypesHandleErrors() {
 3780     dictIterator *di = dictGetIterator(modules);
 3781     dictEntry *de;
 3782 
 3783     while ((de = dictNext(di)) != NULL) {
 3784         struct RedisModule *module = dictGetVal(de);
 3785         if (listLength(module->types) &&
 3786             !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS))
 3787         {
 3788             dictReleaseIterator(di);
 3789             return 0;
 3790         }
 3791     }
 3792     dictReleaseIterator(di);
 3793     return 1;
 3794 }
 3795 
 3796 /* Returns true if any previous IO API failed.
 3797  * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
 3798  * RediModule_SetModuleOptions first. */
 3799 int RM_IsIOError(RedisModuleIO *io) {
 3800     return io->error;
 3801 }
 3802 
 3803 /* Save an unsigned 64 bit value into the RDB file. This function should only
 3804  * be called in the context of the rdb_save method of modules implementing new
 3805  * data types. */
 3806 void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
 3807     if (io->error) return;
 3808     /* Save opcode. */
 3809     int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT);
 3810     if (retval == -1) goto saveerr;
 3811     io->bytes += retval;
 3812     /* Save value. */
 3813     retval = rdbSaveLen(io->rio, value);
 3814     if (retval == -1) goto saveerr;
 3815     io->bytes += retval;
 3816     return;
 3817 
 3818 saveerr:
 3819     io->error = 1;
 3820 }
 3821 
 3822 /* Load an unsigned 64 bit value from the RDB file. This function should only
 3823  * be called in the context of the rdb_load method of modules implementing
 3824  * new data types. */
 3825 uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
 3826     if (io->error) return 0;
 3827     if (io->ver == 2) {
 3828         uint64_t opcode = rdbLoadLen(io->rio,NULL);
 3829         if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
 3830     }
 3831     uint64_t value;
 3832     int retval = rdbLoadLenByRef(io->rio, NULL, &value);
 3833     if (retval == -1) goto loaderr;
 3834     return value;
 3835 
 3836 loaderr:
 3837     moduleRDBLoadError(io);
 3838     return 0;
 3839 }
 3840 
 3841 /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
 3842 void RM_SaveSigned(RedisModuleIO *io, int64_t value) {
 3843     union {uint64_t u; int64_t i;} conv;
 3844     conv.i = value;
 3845     RM_SaveUnsigned(io,conv.u);
 3846 }
 3847 
 3848 /* Like RedisModule_LoadUnsigned() but for signed 64 bit values. */
 3849 int64_t RM_LoadSigned(RedisModuleIO *io) {
 3850     union {uint64_t u; int64_t i;} conv;
 3851     conv.u = RM_LoadUnsigned(io);
 3852     return conv.i;
 3853 }
 3854 
 3855 /* In the context of the rdb_save method of a module type, saves a
 3856  * string into the RDB file taking as input a RedisModuleString.
 3857  *
 3858  * The string can be later loaded with RedisModule_LoadString() or
 3859  * other Load family functions expecting a serialized string inside
 3860  * the RDB file. */
 3861 void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
 3862     if (io->error) return;
 3863     /* Save opcode. */
 3864     ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
 3865     if (retval == -1) goto saveerr;
 3866     io->bytes += retval;
 3867     /* Save value. */
 3868     retval = rdbSaveStringObject(io->rio, s);
 3869     if (retval == -1) goto saveerr;
 3870     io->bytes += retval;
 3871     return;
 3872 
 3873 saveerr:
 3874     io->error = 1;
 3875 }
 3876 
 3877 /* Like RedisModule_SaveString() but takes a raw C pointer and length
 3878  * as input. */
 3879 void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
 3880     if (io->error) return;
 3881     /* Save opcode. */
 3882     ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
 3883     if (retval == -1) goto saveerr;
 3884     io->bytes += retval;
 3885     /* Save value. */
 3886     retval = rdbSaveRawString(io->rio, (unsigned char*)str,len);
 3887     if (retval == -1) goto saveerr;
 3888     io->bytes += retval;
 3889     return;
 3890 
 3891 saveerr:
 3892     io->error = 1;
 3893 }
 3894 
 3895 /* Implements RM_LoadString() and RM_LoadStringBuffer() */
 3896 void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
 3897     if (io->error) return NULL;
 3898     if (io->ver == 2) {
 3899         uint64_t opcode = rdbLoadLen(io->rio,NULL);
 3900         if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
 3901     }
 3902     void *s = rdbGenericLoadStringObject(io->rio,
 3903               plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr);
 3904     if (s == NULL) goto loaderr;
 3905     return s;
 3906 
 3907 loaderr:
 3908     moduleRDBLoadError(io);
 3909     return NULL;
 3910 }
 3911 
 3912 /* In the context of the rdb_load method of a module data type, loads a string
 3913  * from the RDB file, that was previously saved with RedisModule_SaveString()
 3914  * functions family.
 3915  *
 3916  * The returned string is a newly allocated RedisModuleString object, and
 3917  * the user should at some point free it with a call to RedisModule_FreeString().
 3918  *
 3919  * If the data structure does not store strings as RedisModuleString objects,
 3920  * the similar function RedisModule_LoadStringBuffer() could be used instead. */
 3921 RedisModuleString *RM_LoadString(RedisModuleIO *io) {
 3922     return moduleLoadString(io,0,NULL);
 3923 }
 3924 
 3925 /* Like RedisModule_LoadString() but returns an heap allocated string that
 3926  * was allocated with RedisModule_Alloc(), and can be resized or freed with
 3927  * RedisModule_Realloc() or RedisModule_Free().
 3928  *
 3929  * The size of the string is stored at '*lenptr' if not NULL.
 3930  * The returned string is not automatically NULL terminated, it is loaded
 3931  * exactly as it was stored inisde the RDB file. */
 3932 char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
 3933     return moduleLoadString(io,1,lenptr);
 3934 }
 3935 
 3936 /* In the context of the rdb_save method of a module data type, saves a double
 3937  * value to the RDB file. The double can be a valid number, a NaN or infinity.
 3938  * It is possible to load back the value with RedisModule_LoadDouble(). */
 3939 void RM_SaveDouble(RedisModuleIO *io, double value) {
 3940     if (io->error) return;
 3941     /* Save opcode. */
 3942     int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE);
 3943     if (retval == -1) goto saveerr;
 3944     io->bytes += retval;
 3945     /* Save value. */
 3946     retval = rdbSaveBinaryDoubleValue(io->rio, value);
 3947     if (retval == -1) goto saveerr;
 3948     io->bytes += retval;
 3949     return;
 3950 
 3951 saveerr:
 3952     io->error = 1;
 3953 }
 3954 
 3955 /* In the context of the rdb_save method of a module data type, loads back the
 3956  * double value saved by RedisModule_SaveDouble(). */
 3957 double RM_LoadDouble(RedisModuleIO *io) {
 3958     if (io->error) return 0;
 3959     if (io->ver == 2) {
 3960         uint64_t opcode = rdbLoadLen(io->rio,NULL);
 3961         if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
 3962     }
 3963     double value;
 3964     int retval = rdbLoadBinaryDoubleValue(io->rio, &value);
 3965     if (retval == -1) goto loaderr;
 3966     return value;
 3967 
 3968 loaderr:
 3969     moduleRDBLoadError(io);
 3970     return 0;
 3971 }
 3972 
 3973 /* In the context of the rdb_save method of a module data type, saves a float
 3974  * value to the RDB file. The float can be a valid number, a NaN or infinity.
 3975  * It is possible to load back the value with RedisModule_LoadFloat(). */
 3976 void RM_SaveFloat(RedisModuleIO *io, float value) {
 3977     if (io->error) return;
 3978     /* Save opcode. */
 3979     int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT);
 3980     if (retval == -1) goto saveerr;
 3981     io->bytes += retval;
 3982     /* Save value. */
 3983     retval = rdbSaveBinaryFloatValue(io->rio, value);
 3984     if (retval == -1) goto saveerr;
 3985     io->bytes += retval;
 3986     return;
 3987 
 3988 saveerr:
 3989     io->error = 1;
 3990 }
 3991 
 3992 /* In the context of the rdb_save method of a module data type, loads back the
 3993  * float value saved by RedisModule_SaveFloat(). */
 3994 float RM_LoadFloat(RedisModuleIO *io) {
 3995     if (io->error) return 0;
 3996     if (io->ver == 2) {
 3997         uint64_t opcode = rdbLoadLen(io->rio,NULL);
 3998         if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
 3999     }
 4000     float value;
 4001     int retval = rdbLoadBinaryFloatValue(io->rio, &value);
 4002     if (retval == -1) goto loaderr;
 4003     return value;
 4004 
 4005 loaderr:
 4006     moduleRDBLoadError(io);
 4007     return 0;
 4008 }
 4009 
 4010 /* In the context of the rdb_save method of a module data type, saves a long double
 4011  * value to the RDB file. The double can be a valid number, a NaN or infinity.
 4012  * It is possible to load back the value with RedisModule_LoadLongDouble(). */
 4013 void RM_SaveLongDouble(RedisModuleIO *io, long double value) {
 4014     if (io->error) return;
 4015     char buf[MAX_LONG_DOUBLE_CHARS];
 4016     /* Long double has different number of bits in different platforms, so we
 4017      * save it as a string type. */
 4018     size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX);
 4019     RM_SaveStringBuffer(io,buf,len);
 4020 }
 4021 
 4022 /* In the context of the rdb_save method of a module data type, loads back the
 4023  * long double value saved by RedisModule_SaveLongDouble(). */
 4024 long double RM_LoadLongDouble(RedisModuleIO *io) {
 4025     if (io->error) return 0;
 4026     long double value;
 4027     size_t len;
 4028     char* str = RM_LoadStringBuffer(io,&len);
 4029     if (!str) return 0;
 4030     string2ld(str,len,&value);
 4031     RM_Free(str);
 4032     return value;
 4033 }
 4034 
 4035 /* Iterate over modules, and trigger rdb aux saving for the ones modules types
 4036  * who asked for it. */
 4037 ssize_t rdbSaveModulesAux(rio *rdb, int when) {
 4038     size_t total_written = 0;
 4039     dictIterator *di = dictGetIterator(modules);
 4040     dictEntry *de;
 4041 
 4042     while ((de = dictNext(di)) != NULL) {
 4043         struct RedisModule *module = dictGetVal(de);
 4044         listIter li;
 4045         listNode *ln;
 4046 
 4047         listRewind(module->types,&li);
 4048         while((ln = listNext(&li))) {
 4049             moduleType *mt = ln->value;
 4050             if (!mt->aux_save || !(mt->aux_save_triggers & when))
 4051                 continue;
 4052             ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
 4053             if (ret==-1) {
 4054                 dictReleaseIterator(di);
 4055                 return -1;
 4056             }
 4057             total_written += ret;
 4058         }
 4059     }
 4060 
 4061     dictReleaseIterator(di);
 4062     return total_written;
 4063 }
 4064 
 4065 /* --------------------------------------------------------------------------
 4066  * Key digest API (DEBUG DIGEST interface for modules types)
 4067  * -------------------------------------------------------------------------- */
 4068 
 4069 /* Add a new element to the digest. This function can be called multiple times
 4070  * one element after the other, for all the elements that constitute a given
 4071  * data structure. The function call must be followed by the call to
 4072  * `RedisModule_DigestEndSequence` eventually, when all the elements that are
 4073  * always in a given order are added. See the Redis Modules data types
 4074  * documentation for more info. However this is a quick example that uses Redis
 4075  * data types as an example.
 4076  *
 4077  * To add a sequence of unordered elements (for example in the case of a Redis
 4078  * Set), the pattern to use is:
 4079  *
 4080  *     foreach element {
 4081  *         AddElement(element);
 4082  *         EndSequence();
 4083  *     }
 4084  *
 4085  * Because Sets are not ordered, so every element added has a position that
 4086  * does not depend from the other. However if instead our elements are
 4087  * ordered in pairs, like field-value pairs of an Hash, then one should
 4088  * use:
 4089  *
 4090  *     foreach key,value {
 4091  *         AddElement(key);
 4092  *         AddElement(value);
 4093  *         EndSquence();
 4094  *     }
 4095  *
 4096  * Because the key and value will be always in the above order, while instead
 4097  * the single key-value pairs, can appear in any position into a Redis hash.
 4098  *
 4099  * A list of ordered elements would be implemented with:
 4100  *
 4101  *     foreach element {
 4102  *         AddElement(element);
 4103  *     }
 4104  *     EndSequence();
 4105  *
 4106  */
 4107 void RM_DigestAddStringBuffer(RedisModuleDigest *md, unsigned char *ele, size_t len) {
 4108     mixDigest(md->o,ele,len);
 4109 }
 4110 
 4111 /* Like `RedisModule_DigestAddStringBuffer()` but takes a long long as input
 4112  * that gets converted into a string before adding it to the digest. */
 4113 void RM_DigestAddLongLong(RedisModuleDigest *md, long long ll) {
 4114     char buf[LONG_STR_SIZE];
 4115     size_t len = ll2string(buf,sizeof(buf),ll);
 4116     mixDigest(md->o,buf,len);
 4117 }
 4118 
 4119 /* See the documentation for `RedisModule_DigestAddElement()`. */
 4120 void RM_DigestEndSequence(RedisModuleDigest *md) {
 4121     xorDigest(md->x,md->o,sizeof(md->o));
 4122     memset(md->o,0,sizeof(md->o));
 4123 }
 4124 
 4125 /* Decode a serialized representation of a module data type 'mt' from string
 4126  * 'str' and return a newly allocated value, or NULL if decoding failed.
 4127  *
 4128  * This call basically reuses the 'rdb_load' callback which module data types
 4129  * implement in order to allow a module to arbitrarily serialize/de-serialize
 4130  * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented.
 4131  *
 4132  * Modules should generally use the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag and
 4133  * make sure the de-serialization code properly checks and handles IO errors
 4134  * (freeing allocated buffers and returning a NULL).
 4135  *
 4136  * If this is NOT done, Redis will handle corrupted (or just truncated) serialized
 4137  * data by producing an error message and terminating the process.
 4138  */
 4139 
 4140 void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) {
 4141     rio payload;
 4142     RedisModuleIO io;
 4143     void *ret;
 4144 
 4145     rioInitWithBuffer(&payload, str->ptr);
 4146     moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
 4147 
 4148     /* All RM_Save*() calls always write a version 2 compatible format, so we
 4149      * need to make sure we read the same.
 4150      */
 4151     io.ver = 2;
 4152     ret = mt->rdb_load(&io,0);
 4153     if (io.ctx) {
 4154         moduleFreeContext(io.ctx);
 4155         zfree(io.ctx);
 4156     }
 4157     return ret;
 4158 }
 4159 
 4160 /* Encode a module data type 'mt' value 'data' into serialized form, and return it
 4161  * as a newly allocated RedisModuleString.
 4162  *
 4163  * This call basically reuses the 'rdb_save' callback which module data types
 4164  * implement in order to allow a module to arbitrarily serialize/de-serialize
 4165  * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented.
 4166  */
 4167 
 4168 RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) {
 4169     rio payload;
 4170     RedisModuleIO io;
 4171 
 4172     rioInitWithBuffer(&payload,sdsempty());
 4173     moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
 4174     mt->rdb_save(&io,data);
 4175     if (io.ctx) {
 4176         moduleFreeContext(io.ctx);
 4177         zfree(io.ctx);
 4178     }
 4179     if (io.error) {
 4180         return NULL;
 4181     } else {
 4182         robj *str = createObject(OBJ_STRING,payload.io.buffer.ptr);
 4183         if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str);
 4184         return str;
 4185     }
 4186 }
 4187 
 4188 /* --------------------------------------------------------------------------
 4189  * AOF API for modules data types
 4190  * -------------------------------------------------------------------------- */
 4191 
 4192 /* Emits a command into the AOF during the AOF rewriting process. This function
 4193  * is only called in the context of the aof_rewrite method of data types exported
 4194  * by a module. The command works exactly like RedisModule_Call() in the way
 4195  * the parameters are passed, but it does not return anything as the error
 4196  * handling is performed by Redis itself. */
 4197 void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
 4198     if (io->error) return;
 4199     struct redisCommand *cmd;
 4200     robj **argv = NULL;
 4201     int argc = 0, flags = 0, j;
 4202     va_list ap;
 4203 
 4204     cmd = lookupCommandByCString((char*)cmdname);
 4205     if (!cmd) {
 4206         serverLog(LL_WARNING,
 4207             "Fatal: AOF method for module data type '%s' tried to "
 4208             "emit unknown command '%s'",
 4209             io->type->name, cmdname);
 4210         io->error = 1;
 4211         errno = EINVAL;
 4212         return;
 4213     }
 4214 
 4215     /* Emit the arguments into the AOF in Redis protocol format. */
 4216     va_start(ap, fmt);
 4217     argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
 4218     va_end(ap);
 4219     if (argv == NULL) {
 4220         serverLog(LL_WARNING,
 4221             "Fatal: AOF method for module data type '%s' tried to "
 4222             "call RedisModule_EmitAOF() with wrong format specifiers '%s'",
 4223             io->type->name, fmt);
 4224         io->error = 1;
 4225         errno = EINVAL;
 4226         return;
 4227     }
 4228 
 4229     /* Bulk count. */
 4230     if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0)
 4231         io->error = 1;
 4232 
 4233     /* Arguments. */
 4234     for (j = 0; j < argc; j++) {
 4235         if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0)
 4236             io->error = 1;
 4237         decrRefCount(argv[j]);
 4238     }
 4239     zfree(argv);
 4240     return;
 4241 }
 4242 
 4243 /* --------------------------------------------------------------------------
 4244  * IO context handling
 4245  * -------------------------------------------------------------------------- */
 4246 
 4247 RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
 4248     if (io->ctx) return io->ctx; /* Can't have more than one... */
 4249     RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT;
 4250     io->ctx = zmalloc(sizeof(RedisModuleCtx));
 4251     *(io->ctx) = ctxtemplate;
 4252     io->ctx->module = io->type->module;
 4253     io->ctx->client = NULL;
 4254     return io->ctx;
 4255 }
 4256 
 4257 /* Returns a RedisModuleString with the name of the key currently saving or
 4258  * loading, when an IO data type callback is called.  There is no guarantee
 4259  * that the key name is always available, so this may return NULL.
 4260  */
 4261 const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
 4262     return io->key;
 4263 }
 4264 
 4265 /* Returns a RedisModuleString with the name of the key from RedisModuleKey */
 4266 const RedisModuleString *RM_GetKeyNameFromModuleKey(RedisModuleKey *key) {
 4267     return key ? key->key : NULL;
 4268 }
 4269 
 4270 /* --------------------------------------------------------------------------
 4271  * Logging
 4272  * -------------------------------------------------------------------------- */
 4273 
 4274 /* This is the low level function implementing both:
 4275  *
 4276  *      RM_Log()
 4277  *      RM_LogIOError()
 4278  *
 4279  */
 4280 void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) {
 4281     char msg[LOG_MAX_LEN];
 4282     size_t name_len;
 4283     int level;
 4284 
 4285     if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG;
 4286     else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE;
 4287     else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE;
 4288     else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
 4289     else level = LL_VERBOSE; /* Default. */
 4290 
 4291     if (level < server.verbosity) return;
 4292 
 4293     name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module");
 4294     vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
 4295     serverLogRaw(level,msg);
 4296 }
 4297 
 4298 /* Produces a log message to the standard Redis log, the format accepts
 4299  * printf-alike specifiers, while level is a string describing the log
 4300  * level to use when emitting the log, and must be one of the following:
 4301  *
 4302  * * "debug"
 4303  * * "verbose"
 4304  * * "notice"
 4305  * * "warning"
 4306  *
 4307  * If the specified log level is invalid, verbose is used by default.
 4308  * There is a fixed limit to the length of the log line this function is able
 4309  * to emit, this limit is not specified but is guaranteed to be more than
 4310  * a few lines of text.
 4311  *
 4312  * The ctx argument may be NULL if cannot be provided in the context of the
 4313  * caller for instance threads or callbacks, in which case a generic "module"
 4314  * will be used instead of the module name.
 4315  */
 4316 void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
 4317     va_list ap;
 4318     va_start(ap, fmt);
 4319     RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap);
 4320     va_end(ap);
 4321 }
 4322 
 4323 /* Log errors from RDB / AOF serialization callbacks.
 4324  *
 4325  * This function should be used when a callback is returning a critical
 4326  * error to the caller since cannot load or save the data for some
 4327  * critical reason. */
 4328 void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) {
 4329     va_list ap;
 4330     va_start(ap, fmt);
 4331     RM_LogRaw(io->type->module,levelstr,fmt,ap);
 4332     va_end(ap);
 4333 }
 4334 
 4335 /* Redis-like assert function.
 4336  *
 4337  * A failed assertion will shut down the server and produce logging information
 4338  * that looks identical to information generated by Redis itself.
 4339  */
 4340 void RM__Assert(const char *estr, const char *file, int line) {
 4341     _serverAssert(estr, file, line);
 4342 }
 4343 
 4344 /* Allows adding event to the latency monitor to be observed by the LATENCY
 4345  * command. The call is skipped if the latency is smaller than the configured
 4346  * latency-monitor-threshold. */
 4347 void RM_LatencyAddSample(const char *event, mstime_t latency) {
 4348     if (latency >= server.latency_monitor_threshold)
 4349         latencyAddSample(event, latency);
 4350 }
 4351 
 4352 /* --------------------------------------------------------------------------
 4353  * Blocking clients from modules
 4354  * -------------------------------------------------------------------------- */
 4355 
 4356 /* Readable handler for the awake pipe. We do nothing here, the awake bytes
 4357  * will be actually read in a more appropriate place in the
 4358  * moduleHandleBlockedClients() function that is where clients are actually
 4359  * served. */
 4360 void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
 4361     UNUSED(el);
 4362     UNUSED(fd);
 4363     UNUSED(mask);
 4364     UNUSED(privdata);
 4365 }
 4366 
 4367 /* This is called from blocked.c in order to unblock a client: may be called
 4368  * for multiple reasons while the client is in the middle of being blocked
 4369  * because the client is terminated, but is also called for cleanup when a
 4370  * client is unblocked in a clean way after replaying.
 4371  *
 4372  * What we do here is just to set the client to NULL in the redis module
 4373  * blocked client handle. This way if the client is terminated while there
 4374  * is a pending threaded operation involving the blocked client, we'll know
 4375  * that the client no longer exists and no reply callback should be called.
 4376  *
 4377  * The structure RedisModuleBlockedClient will be always deallocated when
 4378  * running the list of clients blocked by a module that need to be unblocked. */
 4379 void unblockClientFromModule(client *c) {
 4380     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
 4381 
 4382     /* Call the disconnection callback if any. Note that
 4383      * bc->disconnect_callback is set to NULL if the client gets disconnected
 4384      * by the module itself or because of a timeout, so the callback will NOT
 4385      * get called if this is not an actual disconnection event. */
 4386     if (bc->disconnect_callback) {
 4387         RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 4388         ctx.blocked_privdata = bc->privdata;
 4389         ctx.module = bc->module;
 4390         ctx.client = bc->client;
 4391         bc->disconnect_callback(&ctx,bc);
 4392         moduleFreeContext(&ctx);
 4393     }
 4394 
 4395     /* If we made it here and client is still blocked it means that the command
 4396      * timed-out, client was killed or disconnected and disconnect_callback was
 4397      * not implemented (or it was, but RM_UnblockClient was not called from
 4398      * within it, as it should).
 4399      * We must call moduleUnblockClient in order to free privdata and
 4400      * RedisModuleBlockedClient.
 4401      *
 4402      * Note that we only do that for clients that are blocked on keys, for which
 4403      * the contract is that the module should not call RM_UnblockClient under
 4404      * normal circumstances.
 4405      * Clients implementing threads and working with private data should be
 4406      * aware that calling RM_UnblockClient for every blocked client is their
 4407      * responsibility, and if they fail to do so memory may leak. Ideally they
 4408      * should implement the disconnect and timeout callbacks and call
 4409      * RM_UnblockClient, but any other way is also acceptable. */
 4410     if (bc->blocked_on_keys && !bc->unblocked)
 4411         moduleUnblockClient(c);
 4412 
 4413     bc->client = NULL;
 4414     /* Reset the client for a new query since, for blocking commands implemented
 4415      * into modules, we do not it immediately after the command returns (and
 4416      * the client blocks) in order to be still able to access the argument
 4417      * vector from callbacks. */
 4418     resetClient(c);
 4419 }
 4420 
 4421 /* Block a client in the context of a module: this function implements both
 4422  * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
 4423  * keys are passed or not.
 4424  *
 4425  * When not blocking for keys, the keys, numkeys, and privdata parameters are
 4426  * not needed. The privdata in that case must be NULL, since later is
 4427  * RM_UnblockClient() that will provide some private data that the reply
 4428  * callback will receive.
 4429  *
 4430  * Instead when blocking for keys, normally RM_UnblockClient() will not be
 4431  * called (because the client will unblock when the key is modified), so
 4432  * 'privdata' should be provided in that case, so that once the client is
 4433  * unlocked and the reply callback is called, it will receive its associated
 4434  * private data.
 4435  *
 4436  * Even when blocking on keys, RM_UnblockClient() can be called however, but
 4437  * in that case the privdata argument is disregarded, because we pass the
 4438  * reply callback the privdata that is set here while blocking.
 4439  *
 4440  */
 4441 RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
 4442     client *c = ctx->client;
 4443     int islua = c->flags & CLIENT_LUA;
 4444     int ismulti = c->flags & CLIENT_MULTI;
 4445 
 4446     c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
 4447     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
 4448     ctx->module->blocked_clients++;
 4449 
 4450     /* We need to handle the invalid operation of calling modules blocking
 4451      * commands from Lua or MULTI. We actually create an already aborted
 4452      * (client set to NULL) blocked client handle, and actually reply with
 4453      * an error. */
 4454     mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
 4455     bc->client = (islua || ismulti) ? NULL : c;
 4456     bc->module = ctx->module;
 4457     bc->reply_callback = reply_callback;
 4458     bc->timeout_callback = timeout_callback;
 4459     bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
 4460     bc->free_privdata = free_privdata;
 4461     bc->privdata = privdata;
 4462     bc->reply_client = createClient(NULL);
 4463     bc->reply_client->flags |= CLIENT_MODULE;
 4464     bc->dbid = c->db->id;
 4465     bc->blocked_on_keys = keys != NULL;
 4466     bc->unblocked = 0;
 4467     c->bpop.timeout = timeout;
 4468 
 4469     if (islua || ismulti) {
 4470         c->bpop.module_blocked_handle = NULL;
 4471         addReplyError(c, islua ?
 4472             "Blocking module command called from Lua script" :
 4473             "Blocking module command called from transaction");
 4474     } else {
 4475         if (keys) {
 4476             blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
 4477         } else {
 4478             blockClient(c,BLOCKED_MODULE);
 4479         }
 4480     }
 4481     return bc;
 4482 }
 4483 
 4484 /* This function is called from module.c in order to check if a module
 4485  * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
 4486  * can really be unblocked, since the module was able to serve the client.
 4487  * If the callback returns REDISMODULE_OK, then the client can be unblocked,
 4488  * otherwise the client remains blocked and we'll retry again when one of
 4489  * the keys it blocked for becomes "ready" again.
 4490  * This function returns 1 if client was served (and should be unblocked) */
 4491 int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
 4492     int served = 0;
 4493     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
 4494 
 4495     /* Protect against re-processing: don't serve clients that are already
 4496      * in the unblocking list for any reason (including RM_UnblockClient()
 4497      * explicit call). See #6798. */
 4498     if (bc->unblocked) return 0;
 4499 
 4500     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 4501     ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
 4502     ctx.blocked_ready_key = key;
 4503     ctx.blocked_privdata = bc->privdata;
 4504     ctx.module = bc->module;
 4505     ctx.client = bc->client;
 4506     ctx.blocked_client = bc;
 4507     if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
 4508         served = 1;
 4509     moduleFreeContext(&ctx);
 4510     return served;
 4511 }
 4512 
 4513 /* Block a client in the context of a blocking command, returning an handle
 4514  * which will be used, later, in order to unblock the client with a call to
 4515  * RedisModule_UnblockClient(). The arguments specify callback functions
 4516  * and a timeout after which the client is unblocked.
 4517  *
 4518  * The callbacks are called in the following contexts:
 4519  *
 4520  *     reply_callback:  called after a successful RedisModule_UnblockClient()
 4521  *                      call in order to reply to the client and unblock it.
 4522  *
 4523  *     reply_timeout:   called when the timeout is reached in order to send an
 4524  *                      error to the client.
 4525  *
 4526  *     free_privdata:   called in order to free the private data that is passed
 4527  *                      by RedisModule_UnblockClient() call.
 4528  *
 4529  * Note: RedisModule_UnblockClient should be called for every blocked client,
 4530  *       even if client was killed, timed-out or disconnected. Failing to do so
 4531  *       will result in memory leaks.
 4532  *
 4533  * There are some cases where RedisModule_BlockClient() cannot be used:
 4534  *
 4535  * 1. If the client is a Lua script.
 4536  * 2. If the client is executing a MULTI block.
 4537  *
 4538  * In these cases, a call to RedisModule_BlockClient() will **not** block the
 4539  * client, but instead produce a specific error reply.
 4540  */
 4541 RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
 4542     return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
 4543 }
 4544 
 4545 /* This call is similar to RedisModule_BlockClient(), however in this case we
 4546  * don't just block the client, but also ask Redis to unblock it automatically
 4547  * once certain keys become "ready", that is, contain more data.
 4548  *
 4549  * Basically this is similar to what a typical Redis command usually does,
 4550  * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
 4551  * and later when the key receives new data (a list push for instance), the
 4552  * client is unblocked and served.
 4553  *
 4554  * However in the case of this module API, when the client is unblocked?
 4555  *
 4556  * 1. If you block ok a key of a type that has blocking operations associated,
 4557  *    like a list, a sorted set, a stream, and so forth, the client may be
 4558  *    unblocked once the relevant key is targeted by an operation that normally
 4559  *    unblocks the native blocking operations for that type. So if we block
 4560  *    on a list key, an RPUSH command may unblock our client and so forth.
 4561  * 2. If you are implementing your native data type, or if you want to add new
 4562  *    unblocking conditions in addition to "1", you can call the modules API
 4563  *    RedisModule_SignalKeyAsReady().
 4564  *
 4565  * Anyway we can't be sure if the client should be unblocked just because the
 4566  * key is signaled as ready: for instance a successive operation may change the
 4567  * key, or a client in queue before this one can be served, modifying the key
 4568  * as well and making it empty again. So when a client is blocked with
 4569  * RedisModule_BlockClientOnKeys() the reply callback is not called after
 4570  * RM_UnblockCLient() is called, but every time a key is signaled as ready:
 4571  * if the reply callback can serve the client, it returns REDISMODULE_OK
 4572  * and the client is unblocked, otherwise it will return REDISMODULE_ERR
 4573  * and we'll try again later.
 4574  *
 4575  * The reply callback can access the key that was signaled as ready by
 4576  * calling the API RedisModule_GetBlockedClientReadyKey(), that returns
 4577  * just the string name of the key as a RedisModuleString object.
 4578  *
 4579  * Thanks to this system we can setup complex blocking scenarios, like
 4580  * unblocking a client only if a list contains at least 5 items or other
 4581  * more fancy logics.
 4582  *
 4583  * Note that another difference with RedisModule_BlockClient(), is that here
 4584  * we pass the private data directly when blocking the client: it will
 4585  * be accessible later in the reply callback. Normally when blocking with
 4586  * RedisModule_BlockClient() the private data to reply to the client is
 4587  * passed when calling RedisModule_UnblockClient() but here the unblocking
 4588  * is performed by Redis itself, so we need to have some private data before
 4589  * hand. The private data is used to store any information about the specific
 4590  * unblocking operation that you are implementing. Such information will be
 4591  * freed using the free_privdata callback provided by the user.
 4592  *
 4593  * However the reply callback will be able to access the argument vector of
 4594  * the command, so the private data is often not needed.
 4595  *
 4596  * Note: Under normal circumstances RedisModule_UnblockClient should not be
 4597  *       called for clients that are blocked on keys (Either the key will
 4598  *       become ready or a timeout will occur). If for some reason you do want
 4599  *       to call RedisModule_UnblockClient it is possible: Client will be
 4600  *       handled as if it were timed-out (You must implement the timeout
 4601  *       callback in that case).
 4602  */
 4603 RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
 4604     return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
 4605 }
 4606 
 4607 /* This function is used in order to potentially unblock a client blocked
 4608  * on keys with RedisModule_BlockClientOnKeys(). When this function is called,
 4609  * all the clients blocked for this key will get their reply callback called,
 4610  * and if the callback returns REDISMODULE_OK the client will be unblocked. */
 4611 void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
 4612     signalKeyAsReady(ctx->client->db, key);
 4613 }
 4614 
 4615 /* Implements RM_UnblockClient() and moduleUnblockClient(). */
 4616 int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
 4617     pthread_mutex_lock(&moduleUnblockedClientsMutex);
 4618     if (!bc->blocked_on_keys) bc->privdata = privdata;
 4619     bc->unblocked = 1;
 4620     listAddNodeTail(moduleUnblockedClients,bc);
 4621     if (write(server.module_blocked_pipe[1],"A",1) != 1) {
 4622         /* Ignore the error, this is best-effort. */
 4623     }
 4624     pthread_mutex_unlock(&moduleUnblockedClientsMutex);
 4625     return REDISMODULE_OK;
 4626 }
 4627 
 4628 /* This API is used by the Redis core to unblock a client that was blocked
 4629  * by a module. */
 4630 void moduleUnblockClient(client *c) {
 4631     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
 4632     moduleUnblockClientByHandle(bc,NULL);
 4633 }
 4634 
 4635 /* Return true if the client 'c' was blocked by a module using
 4636  * RM_BlockClientOnKeys(). */
 4637 int moduleClientIsBlockedOnKeys(client *c) {
 4638     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
 4639     return bc->blocked_on_keys;
 4640 }
 4641 
 4642 /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
 4643  * the reply callbacks to be called in order to reply to the client.
 4644  * The 'privdata' argument will be accessible by the reply callback, so
 4645  * the caller of this function can pass any value that is needed in order to
 4646  * actually reply to the client.
 4647  *
 4648  * A common usage for 'privdata' is a thread that computes something that
 4649  * needs to be passed to the client, included but not limited some slow
 4650  * to compute reply or some reply obtained via networking.
 4651  *
 4652  * Note 1: this function can be called from threads spawned by the module.
 4653  *
 4654  * Note 2: when we unblock a client that is blocked for keys using
 4655  * the API RedisModule_BlockClientOnKeys(), the privdata argument here is
 4656  * not used, and the reply callback is called with the privdata pointer that
 4657  * was passed when blocking the client.
 4658  *
 4659  * Unblocking a client that was blocked for keys using this API will still
 4660  * require the client to get some reply, so the function will use the
 4661  * "timeout" handler in order to do so. */
 4662 int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
 4663     if (bc->blocked_on_keys) {
 4664         /* In theory the user should always pass the timeout handler as an
 4665          * argument, but better to be safe than sorry. */
 4666         if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
 4667         if (bc->unblocked) return REDISMODULE_OK;
 4668         if (bc->client) moduleBlockedClientTimedOut(bc->client);
 4669     }
 4670     moduleUnblockClientByHandle(bc,privdata);
 4671     return REDISMODULE_OK;
 4672 }
 4673 
 4674 /* Abort a blocked client blocking operation: the client will be unblocked
 4675  * without firing any callback. */
 4676 int RM_AbortBlock(RedisModuleBlockedClient *bc) {
 4677     bc->reply_callback = NULL;
 4678     bc->disconnect_callback = NULL;
 4679     return RM_UnblockClient(bc,NULL);
 4680 }
 4681 
 4682 /* Set a callback that will be called if a blocked client disconnects
 4683  * before the module has a chance to call RedisModule_UnblockClient()
 4684  *
 4685  * Usually what you want to do there, is to cleanup your module state
 4686  * so that you can call RedisModule_UnblockClient() safely, otherwise
 4687  * the client will remain blocked forever if the timeout is large.
 4688  *
 4689  * Notes:
 4690  *
 4691  * 1. It is not safe to call Reply* family functions here, it is also
 4692  *    useless since the client is gone.
 4693  *
 4694  * 2. This callback is not called if the client disconnects because of
 4695  *    a timeout. In such a case, the client is unblocked automatically
 4696  *    and the timeout callback is called.
 4697  */
 4698 void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) {
 4699     bc->disconnect_callback = callback;
 4700 }
 4701 
 4702 /* This function will check the moduleUnblockedClients queue in order to
 4703  * call the reply callback and really unblock the client.
 4704  *
 4705  * Clients end into this list because of calls to RM_UnblockClient(),
 4706  * however it is possible that while the module was doing work for the
 4707  * blocked client, it was terminated by Redis (for timeout or other reasons).
 4708  * When this happens the RedisModuleBlockedClient structure in the queue
 4709  * will have the 'client' field set to NULL. */
 4710 void moduleHandleBlockedClients(void) {
 4711     listNode *ln;
 4712     RedisModuleBlockedClient *bc;
 4713 
 4714     pthread_mutex_lock(&moduleUnblockedClientsMutex);
 4715     /* Here we unblock all the pending clients blocked in modules operations
 4716      * so we can read every pending "awake byte" in the pipe. */
 4717     char buf[1];
 4718     while (read(server.module_blocked_pipe[0],buf,1) == 1);
 4719     while (listLength(moduleUnblockedClients)) {
 4720         ln = listFirst(moduleUnblockedClients);
 4721         bc = ln->value;
 4722         client *c = bc->client;
 4723         listDelNode(moduleUnblockedClients,ln);
 4724         pthread_mutex_unlock(&moduleUnblockedClientsMutex);
 4725 
 4726         /* Release the lock during the loop, as long as we don't
 4727          * touch the shared list. */
 4728 
 4729         /* Call the reply callback if the client is valid and we have
 4730          * any callback. However the callback is not called if the client
 4731          * was blocked on keys (RM_BlockClientOnKeys()), because we already
 4732          * called such callback in moduleTryServeClientBlockedOnKey() when
 4733          * the key was signaled as ready. */
 4734         if (c && !bc->blocked_on_keys && bc->reply_callback) {
 4735             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 4736             ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
 4737             ctx.blocked_privdata = bc->privdata;
 4738             ctx.blocked_ready_key = NULL;
 4739             ctx.module = bc->module;
 4740             ctx.client = bc->client;
 4741             ctx.blocked_client = bc;
 4742             bc->reply_callback(&ctx,(void**)c->argv,c->argc);
 4743             moduleFreeContext(&ctx);
 4744         }
 4745 
 4746         /* Free privdata if any. */
 4747         if (bc->privdata && bc->free_privdata) {
 4748             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 4749             if (c == NULL)
 4750                 ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED;
 4751             ctx.blocked_privdata = bc->privdata;
 4752             ctx.module = bc->module;
 4753             ctx.client = bc->client;
 4754             bc->free_privdata(&ctx,bc->privdata);
 4755             moduleFreeContext(&ctx);
 4756         }
 4757 
 4758         /* It is possible that this blocked client object accumulated
 4759          * replies to send to the client in a thread safe context.
 4760          * We need to glue such replies to the client output buffer and
 4761          * free the temporary client we just used for the replies. */
 4762         if (c) AddReplyFromClient(c, bc->reply_client);
 4763         freeClient(bc->reply_client);
 4764 
 4765         if (c != NULL) {
 4766             /* Before unblocking the client, set the disconnect callback
 4767              * to NULL, because if we reached this point, the client was
 4768              * properly unblocked by the module. */
 4769             bc->disconnect_callback = NULL;
 4770             unblockClient(c);
 4771             /* Put the client in the list of clients that need to write
 4772              * if there are pending replies here. This is needed since
 4773              * during a non blocking command the client may receive output. */
 4774             if (clientHasPendingReplies(c) &&
 4775                 !(c->flags & CLIENT_PENDING_WRITE))
 4776             {
 4777                 c->flags |= CLIENT_PENDING_WRITE;
 4778                 listAddNodeHead(server.clients_pending_write,c);
 4779             }
 4780         }
 4781 
 4782         /* Free 'bc' only after unblocking the client, since it is
 4783          * referenced in the client blocking context, and must be valid
 4784          * when calling unblockClient(). */
 4785         bc->module->blocked_clients--;
 4786         zfree(bc);
 4787 
 4788         /* Lock again before to iterate the loop. */
 4789         pthread_mutex_lock(&moduleUnblockedClientsMutex);
 4790     }
 4791     pthread_mutex_unlock(&moduleUnblockedClientsMutex);
 4792 }
 4793 
 4794 /* Called when our client timed out. After this function unblockClient()
 4795  * is called, and it will invalidate the blocked client. So this function
 4796  * does not need to do any cleanup. Eventually the module will call the
 4797  * API to unblock the client and the memory will be released. */
 4798 void moduleBlockedClientTimedOut(client *c) {
 4799     RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
 4800     RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 4801     ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
 4802     ctx.module = bc->module;
 4803     ctx.client = bc->client;
 4804     ctx.blocked_client = bc;
 4805     bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
 4806     moduleFreeContext(&ctx);
 4807     /* For timeout events, we do not want to call the disconnect callback,
 4808      * because the blocked client will be automatically disconnected in
 4809      * this case, and the user can still hook using the timeout callback. */
 4810     bc->disconnect_callback = NULL;
 4811 }
 4812 
 4813 /* Return non-zero if a module command was called in order to fill the
 4814  * reply for a blocked client. */
 4815 int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
 4816     return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
 4817 }
 4818 
 4819 /* Return non-zero if a module command was called in order to fill the
 4820  * reply for a blocked client that timed out. */
 4821 int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
 4822     return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
 4823 }
 4824 
 4825 /* Get the private data set by RedisModule_UnblockClient() */
 4826 void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
 4827     return ctx->blocked_privdata;
 4828 }
 4829 
 4830 /* Get the key that is ready when the reply callback is called in the context
 4831  * of a client blocked by RedisModule_BlockClientOnKeys(). */
 4832 RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
 4833     return ctx->blocked_ready_key;
 4834 }
 4835 
 4836 /* Get the blocked client associated with a given context.
 4837  * This is useful in the reply and timeout callbacks of blocked clients,
 4838  * before sometimes the module has the blocked client handle references
 4839  * around, and wants to cleanup it. */
 4840 RedisModuleBlockedClient *RM_GetBlockedClientHandle(RedisModuleCtx *ctx) {
 4841     return ctx->blocked_client;
 4842 }
 4843 
 4844 /* Return true if when the free callback of a blocked client is called,
 4845  * the reason for the client to be unblocked is that it disconnected
 4846  * while it was blocked. */
 4847 int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
 4848     return (ctx->flags & REDISMODULE_CTX_BLOCKED_DISCONNECTED) != 0;
 4849 }
 4850 
 4851 /* --------------------------------------------------------------------------
 4852  * Thread Safe Contexts
 4853  * -------------------------------------------------------------------------- */
 4854 
 4855 /* Return a context which can be used inside threads to make Redis context
 4856  * calls with certain modules APIs. If 'bc' is not NULL then the module will
 4857  * be bound to a blocked client, and it will be possible to use the
 4858  * `RedisModule_Reply*` family of functions to accumulate a reply for when the
 4859  * client will be unblocked. Otherwise the thread safe context will be
 4860  * detached by a specific client.
 4861  *
 4862  * To call non-reply APIs, the thread safe context must be prepared with:
 4863  *
 4864  *     RedisModule_ThreadSafeContextLock(ctx);
 4865  *     ... make your call here ...
 4866  *     RedisModule_ThreadSafeContextUnlock(ctx);
 4867  *
 4868  * This is not needed when using `RedisModule_Reply*` functions, assuming
 4869  * that a blocked client was used when the context was created, otherwise
 4870  * no RedisModule_Reply* call should be made at all.
 4871  *
 4872  * TODO: thread safe contexts do not inherit the blocked client
 4873  * selected database. */
 4874 RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
 4875     RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
 4876     RedisModuleCtx empty = REDISMODULE_CTX_INIT;
 4877     memcpy(ctx,&empty,sizeof(empty));
 4878     if (bc) {
 4879         ctx->blocked_client = bc;
 4880         ctx->module = bc->module;
 4881     }
 4882     ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
 4883     /* Even when the context is associated with a blocked client, we can't
 4884      * access it safely from another thread, so we create a fake client here
 4885      * in order to keep things like the currently selected database and similar
 4886      * things. */
 4887     ctx->client = createClient(NULL);
 4888     if (bc) {
 4889         selectDb(ctx->client,bc->dbid);
 4890         if (bc->client) ctx->client->id = bc->client->id;
 4891     }
 4892     return ctx;
 4893 }
 4894 
 4895 /* Release a thread safe context. */
 4896 void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
 4897     moduleFreeContext(ctx);
 4898     zfree(ctx);
 4899 }
 4900 
 4901 /* Acquire the server lock before executing a thread safe API call.
 4902  * This is not needed for `RedisModule_Reply*` calls when there is
 4903  * a blocked client connected to the thread safe context. */
 4904 void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
 4905     UNUSED(ctx);
 4906     moduleAcquireGIL();
 4907 }
 4908 
 4909 /* Similar to RM_ThreadSafeContextLock but this function
 4910  * would not block if the server lock is already acquired.
 4911  *
 4912  * If successful (lock acquired) REDISMODULE_OK is returned,
 4913  * otherwise REDISMODULE_ERR is returned and errno is set
 4914  * accordingly. */
 4915 int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
 4916     UNUSED(ctx);
 4917 
 4918     int res = moduleTryAcquireGIL();
 4919     if(res != 0) {
 4920         errno = res;
 4921         return REDISMODULE_ERR;
 4922     }
 4923     return REDISMODULE_OK;
 4924 }
 4925 
 4926 /* Release the server lock after a thread safe API call was executed. */
 4927 void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
 4928     UNUSED(ctx);
 4929     moduleReleaseGIL();
 4930 }
 4931 
 4932 void moduleAcquireGIL(void) {
 4933     pthread_mutex_lock(&moduleGIL);
 4934 }
 4935 
 4936 int moduleTryAcquireGIL(void) {
 4937     return pthread_mutex_trylock(&moduleGIL);
 4938 }
 4939 
 4940 void moduleReleaseGIL(void) {
 4941     pthread_mutex_unlock(&moduleGIL);
 4942 }
 4943 
 4944 
 4945 /* --------------------------------------------------------------------------
 4946  * Module Keyspace Notifications API
 4947  * -------------------------------------------------------------------------- */
 4948 
 4949 /* Subscribe to keyspace notifications. This is a low-level version of the
 4950  * keyspace-notifications API. A module can register callbacks to be notified
 4951  * when keyspce events occur.
 4952  *
 4953  * Notification events are filtered by their type (string events, set events,
 4954  * etc), and the subscriber callback receives only events that match a specific
 4955  * mask of event types.
 4956  *
 4957  * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents 
 4958  * the module must provide an event type-mask, denoting the events the subscriber
 4959  * is interested in. This can be an ORed mask of any of the following flags:
 4960  *
 4961  *  - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME
 4962  *  - REDISMODULE_NOTIFY_STRING: String events
 4963  *  - REDISMODULE_NOTIFY_LIST: List events
 4964  *  - REDISMODULE_NOTIFY_SET: Set events
 4965  *  - REDISMODULE_NOTIFY_HASH: Hash events
 4966  *  - REDISMODULE_NOTIFY_ZSET: Sorted Set events
 4967  *  - REDISMODULE_NOTIFY_EXPIRED: Expiration events
 4968  *  - REDISMODULE_NOTIFY_EVICTED: Eviction events
 4969  *  - REDISMODULE_NOTIFY_STREAM: Stream events
 4970  *  - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
 4971  *  - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
 4972  *  - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules,
 4973  *                               indicates that the key was loaded from persistence.
 4974  *                               Notice, when this event fires, the given key
 4975  *                               can not be retained, use RM_CreateStringFromString
 4976  *                               instead.
 4977  *
 4978  * We do not distinguish between key events and keyspace events, and it is up
 4979  * to the module to filter the actions taken based on the key.
 4980  *
 4981  * The subscriber signature is:
 4982  *
 4983  *   int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type,
 4984  *                                       const char *event,
 4985  *                                       RedisModuleString *key);
 4986  *
 4987  * `type` is the event type bit, that must match the mask given at registration
 4988  * time. The event string is the actual command being executed, and key is the
 4989  * relevant Redis key.
 4990  *
 4991  * Notification callback gets executed with a redis context that can not be
 4992  * used to send anything to the client, and has the db number where the event
 4993  * occurred as its selected db number.
 4994  *
 4995  * Notice that it is not necessary to enable notifications in redis.conf for
 4996  * module notifications to work.
 4997  *
 4998  * Warning: the notification callbacks are performed in a synchronous manner,
 4999  * so notification callbacks must to be fast, or they would slow Redis down.
 5000  * If you need to take long actions, use threads to offload them.
 5001  *
 5002  * See https://redis.io/topics/notifications for more information.
 5003  */
 5004 int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
 5005     RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub));
 5006     sub->module = ctx->module;
 5007     sub->event_mask = types;
 5008     sub->notify_callback = callback;
 5009     sub->active = 0;
 5010 
 5011     listAddNodeTail(moduleKeyspaceSubscribers, sub);
 5012     return REDISMODULE_OK;
 5013 }
 5014 
 5015 /* Get the configured bitmap of notify-keyspace-events (Could be used
 5016  * for additional filtering in RedisModuleNotificationFunc) */
 5017 int RM_GetNotifyKeyspaceEvents() {
 5018     return server.notify_keyspace_events;
 5019 }
 5020 
 5021 /* Expose notifyKeyspaceEvent to modules */
 5022 int RM_NotifyKeyspaceEvent(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
 5023     if (!ctx || !ctx->client)
 5024         return REDISMODULE_ERR;
 5025     notifyKeyspaceEvent(type, (char *)event, key, ctx->client->db->id);
 5026     return REDISMODULE_OK;
 5027 }
 5028 
 5029 /* Dispatcher for keyspace notifications to module subscriber functions.
 5030  * This gets called  only if at least one module requested to be notified on
 5031  * keyspace notifications */
 5032 void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
 5033     /* Don't do anything if there aren't any subscribers */
 5034     if (listLength(moduleKeyspaceSubscribers) == 0) return;
 5035 
 5036     listIter li;
 5037     listNode *ln;
 5038     listRewind(moduleKeyspaceSubscribers,&li);
 5039 
 5040     /* Remove irrelevant flags from the type mask */
 5041     type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
 5042 
 5043     while((ln = listNext(&li))) {
 5044         RedisModuleKeyspaceSubscriber *sub = ln->value;
 5045         /* Only notify subscribers on events matching they registration,
 5046          * and avoid subscribers triggering themselves */
 5047         if ((sub->event_mask & type) && sub->active == 0) {
 5048             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 5049             ctx.module = sub->module;
 5050             ctx.client = moduleFreeContextReusedClient;
 5051             selectDb(ctx.client, dbid);
 5052 
 5053             /* mark the handler as active to avoid reentrant loops.
 5054              * If the subscriber performs an action triggering itself,
 5055              * it will not be notified about it. */
 5056             sub->active = 1;
 5057             sub->notify_callback(&ctx, type, event, key);
 5058             sub->active = 0;
 5059             moduleFreeContext(&ctx);
 5060         }
 5061     }
 5062 }
 5063 
 5064 /* Unsubscribe any notification subscribers this module has upon unloading */
 5065 void moduleUnsubscribeNotifications(RedisModule *module) {
 5066     listIter li;
 5067     listNode *ln;
 5068     listRewind(moduleKeyspaceSubscribers,&li);
 5069     while((ln = listNext(&li))) {
 5070         RedisModuleKeyspaceSubscriber *sub = ln->value;
 5071         if (sub->module == module) {
 5072             listDelNode(moduleKeyspaceSubscribers, ln);
 5073             zfree(sub);
 5074         }
 5075     }
 5076 }
 5077 
 5078 /* --------------------------------------------------------------------------
 5079  * Modules Cluster API
 5080  * -------------------------------------------------------------------------- */
 5081 
 5082 /* The Cluster message callback function pointer type. */
 5083 typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
 5084 
 5085 /* This structure identifies a registered caller: it must match a given module
 5086  * ID, for a given message type. The callback function is just the function
 5087  * that was registered as receiver. */
 5088 typedef struct moduleClusterReceiver {
 5089     uint64_t module_id;
 5090     RedisModuleClusterMessageReceiver callback;
 5091     struct RedisModule *module;
 5092     struct moduleClusterReceiver *next;
 5093 } moduleClusterReceiver;
 5094 
 5095 typedef struct moduleClusterNodeInfo {
 5096     int flags;
 5097     char ip[NET_IP_STR_LEN];
 5098     int port;
 5099     char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */
 5100 } mdouleClusterNodeInfo;
 5101 
 5102 /* We have an array of message types: each bucket is a linked list of
 5103  * configured receivers. */
 5104 static moduleClusterReceiver *clusterReceivers[UINT8_MAX];
 5105 
 5106 /* Dispatch the message to the right module receiver. */
 5107 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
 5108     moduleClusterReceiver *r = clusterReceivers[type];
 5109     while(r) {
 5110         if (r->module_id == module_id) {
 5111             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 5112             ctx.module = r->module;
 5113             ctx.client = moduleFreeContextReusedClient;
 5114             selectDb(ctx.client, 0);
 5115             r->callback(&ctx,sender_id,type,payload,len);
 5116             moduleFreeContext(&ctx);
 5117             return;
 5118         }
 5119         r = r->next;
 5120     }
 5121 }
 5122 
 5123 /* Register a callback receiver for cluster messages of type 'type'. If there
 5124  * was already a registered callback, this will replace the callback function
 5125  * with the one provided, otherwise if the callback is set to NULL and there
 5126  * is already a callback for this function, the callback is unregistered
 5127  * (so this API call is also used in order to delete the receiver). */
 5128 void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) {
 5129     if (!server.cluster_enabled) return;
 5130 
 5131     uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
 5132     moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL;
 5133     while(r) {
 5134         if (r->module_id == module_id) {
 5135             /* Found! Set or delete. */
 5136             if (callback) {
 5137                 r->callback = callback;
 5138             } else {
 5139                 /* Delete the receiver entry if the user is setting
 5140                  * it to NULL. Just unlink the receiver node from the
 5141                  * linked list. */
 5142                 if (prev)
 5143                     prev->next = r->next;
 5144                 else
 5145                     clusterReceivers[type]->next = r->next;
 5146                 zfree(r);
 5147             }
 5148             return;
 5149         }
 5150         prev = r;
 5151         r = r->next;
 5152     }
 5153 
 5154     /* Not found, let's add it. */
 5155     if (callback) {
 5156         r = zmalloc(sizeof(*r));
 5157         r->module_id = module_id;
 5158         r->module = ctx->module;
 5159         r->callback = callback;
 5160         r->next = clusterReceivers[type];
 5161         clusterReceivers[type] = r;
 5162     }
 5163 }
 5164 
 5165 /* Send a message to all the nodes in the cluster if `target` is NULL, otherwise
 5166  * at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as
 5167  * returned by the receiver callback or by the nodes iteration functions.
 5168  *
 5169  * The function returns REDISMODULE_OK if the message was successfully sent,
 5170  * otherwise if the node is not connected or such node ID does not map to any
 5171  * known cluster node, REDISMODULE_ERR is returned. */
 5172 int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) {
 5173     if (!server.cluster_enabled) return REDISMODULE_ERR;
 5174     uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
 5175     if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK)
 5176         return REDISMODULE_OK;
 5177     else
 5178         return REDISMODULE_ERR;
 5179 }
 5180 
 5181 /* Return an array of string pointers, each string pointer points to a cluster
 5182  * node ID of exactly REDISMODULE_NODE_ID_SIZE bytes (without any null term).
 5183  * The number of returned node IDs is stored into `*numnodes`.
 5184  * However if this function is called by a module not running an a Redis
 5185  * instance with Redis Cluster enabled, NULL is returned instead.
 5186  *
 5187  * The IDs returned can be used with RedisModule_GetClusterNodeInfo() in order
 5188  * to get more information about single nodes.
 5189  *
 5190  * The array returned by this function must be freed using the function
 5191  * RedisModule_FreeClusterNodesList().
 5192  *
 5193  * Example:
 5194  *
 5195  *     size_t count, j;
 5196  *     char **ids = RedisModule_GetClusterNodesList(ctx,&count);
 5197  *     for (j = 0; j < count; j++) {
 5198  *         RedisModule_Log("notice","Node %.*s",
 5199  *             REDISMODULE_NODE_ID_LEN,ids[j]);
 5200  *     }
 5201  *     RedisModule_FreeClusterNodesList(ids);
 5202  */
 5203 char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) {
 5204     UNUSED(ctx);
 5205 
 5206     if (!server.cluster_enabled) return NULL;
 5207     size_t count = dictSize(server.cluster->nodes);
 5208     char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN);
 5209     dictIterator *di = dictGetIterator(server.cluster->nodes);
 5210     dictEntry *de;
 5211     int j = 0;
 5212     while((de = dictNext(di)) != NULL) {
 5213         clusterNode *node = dictGetVal(de);
 5214         if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
 5215         ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN);
 5216         memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN);
 5217         j++;
 5218     }
 5219     *numnodes = j;
 5220     ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
 5221                     * to also get the count argument. */
 5222     dictReleaseIterator(di);
 5223     return ids;
 5224 }
 5225 
 5226 /* Free the node list obtained with RedisModule_GetClusterNodesList. */
 5227 void RM_FreeClusterNodesList(char **ids) {
 5228     if (ids == NULL) return;
 5229     for (int j = 0; ids[j]; j++) zfree(ids[j]);
 5230     zfree(ids);
 5231 }
 5232 
 5233 /* Return this node ID (REDISMODULE_CLUSTER_ID_LEN bytes) or NULL if the cluster
 5234  * is disabled. */
 5235 const char *RM_GetMyClusterID(void) {
 5236     if (!server.cluster_enabled) return NULL;
 5237     return server.cluster->myself->name;
 5238 }
 5239 
 5240 /* Return the number of nodes in the cluster, regardless of their state
 5241  * (handshake, noaddress, ...) so that the number of active nodes may actually
 5242  * be smaller, but not greater than this number. If the instance is not in
 5243  * cluster mode, zero is returned. */
 5244 size_t RM_GetClusterSize(void) {
 5245     if (!server.cluster_enabled) return 0;
 5246     return dictSize(server.cluster->nodes);
 5247 }
 5248 
 5249 /* Populate the specified info for the node having as ID the specified 'id',
 5250  * then returns REDISMODULE_OK. Otherwise if the node ID does not exist from
 5251  * the POV of this local node, REDISMODULE_ERR is returned.
 5252  *
 5253  * The arguments ip, master_id, port and flags can be NULL in case we don't
 5254  * need to populate back certain info. If an ip and master_id (only populated
 5255  * if the instance is a slave) are specified, they point to buffers holding
 5256  * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip
 5257  * and master_id are not null terminated.
 5258  *
 5259  * The list of flags reported is the following:
 5260  *
 5261  * * REDISMODULE_NODE_MYSELF        This node
 5262  * * REDISMODULE_NODE_MASTER        The node is a master
 5263  * * REDISMODULE_NODE_SLAVE         The node is a replica
 5264  * * REDISMODULE_NODE_PFAIL         We see the node as failing
 5265  * * REDISMODULE_NODE_FAIL          The cluster agrees the node is failing
 5266  * * REDISMODULE_NODE_NOFAILOVER    The slave is configured to never failover
 5267  */
 5268 
 5269 clusterNode *clusterLookupNode(const char *name); /* We need access to internals */
 5270 
 5271 int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) {
 5272     UNUSED(ctx);
 5273 
 5274     clusterNode *node = clusterLookupNode(id);
 5275     if (node == NULL ||
 5276         node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
 5277     {
 5278         return REDISMODULE_ERR;
 5279     }
 5280 
 5281     if (ip) strncpy(ip,node->ip,NET_IP_STR_LEN);
 5282 
 5283     if (master_id) {
 5284         /* If the information is not available, the function will set the
 5285          * field to zero bytes, so that when the field can't be populated the
 5286          * function kinda remains predictable. */
 5287         if (node->flags & CLUSTER_NODE_MASTER && node->slaveof)
 5288             memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
 5289         else
 5290             memset(master_id,0,REDISMODULE_NODE_ID_LEN);
 5291     }
 5292     if (port) *port = node->port;
 5293 
 5294     /* As usually we have to remap flags for modules, in order to ensure
 5295      * we can provide binary compatibility. */
 5296     if (flags) {
 5297         *flags = 0;
 5298         if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF;
 5299         if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER;
 5300         if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE;
 5301         if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL;
 5302         if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL;
 5303         if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER;
 5304     }
 5305     return REDISMODULE_OK;
 5306 }
 5307 
 5308 /* Set Redis Cluster flags in order to change the normal behavior of
 5309  * Redis Cluster, especially with the goal of disabling certain functions.
 5310  * This is useful for modules that use the Cluster API in order to create
 5311  * a different distributed system, but still want to use the Redis Cluster
 5312  * message bus. Flags that can be set:
 5313  *
 5314  *  CLUSTER_MODULE_FLAG_NO_FAILOVER
 5315  *  CLUSTER_MODULE_FLAG_NO_REDIRECTION
 5316  *
 5317  * With the following effects:
 5318  *
 5319  *  NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master.
 5320  *               Also disables the replica migration feature.
 5321  *
 5322  *  NO_REDIRECTION: Every node will accept any key, without trying to perform
 5323  *                  partitioning according to the user Redis Cluster algorithm.
 5324  *                  Slots informations will still be propagated across the
 5325  *                  cluster, but without effects. */
 5326 void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) {
 5327     UNUSED(ctx);
 5328     if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER)
 5329         server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_FAILOVER;
 5330     if (flags & REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION)
 5331         server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_REDIRECTION;
 5332 }
 5333 
 5334 /* --------------------------------------------------------------------------
 5335  * Modules Timers API
 5336  *
 5337  * Module timers are an high precision "green timers" abstraction where
 5338  * every module can register even millions of timers without problems, even if
 5339  * the actual event loop will just have a single timer that is used to awake the
 5340  * module timers subsystem in order to process the next event.
 5341  *
 5342  * All the timers are stored into a radix tree, ordered by expire time, when
 5343  * the main Redis event loop timer callback is called, we try to process all
 5344  * the timers already expired one after the other. Then we re-enter the event
 5345  * loop registering a timer that will expire when the next to process module
 5346  * timer will expire.
 5347  *
 5348  * Every time the list of active timers drops to zero, we unregister the
 5349  * main event loop timer, so that there is no overhead when such feature is
 5350  * not used.
 5351  * -------------------------------------------------------------------------- */
 5352 
 5353 static rax *Timers;     /* The radix tree of all the timers sorted by expire. */
 5354 long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */
 5355 
 5356 typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
 5357 
 5358 /* The timer descriptor, stored as value in the radix tree. */
 5359 typedef struct RedisModuleTimer {
 5360     RedisModule *module;                /* Module reference. */
 5361     RedisModuleTimerProc callback;      /* The callback to invoke on expire. */
 5362     void *data;                         /* Private data for the callback. */
 5363     int dbid;                           /* Database number selected by the original client. */
 5364 } RedisModuleTimer;
 5365 
 5366 /* This is the timer handler that is called by the main event loop. We schedule
 5367  * this timer to be called when the nearest of our module timers will expire. */
 5368 int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 5369     UNUSED(eventLoop);
 5370     UNUSED(id);
 5371     UNUSED(clientData);
 5372 
 5373     /* To start let's try to fire all the timers already expired. */
 5374     raxIterator ri;
 5375     raxStart(&ri,Timers);
 5376     uint64_t now = ustime();
 5377     long long next_period = 0;
 5378     while(1) {
 5379         raxSeek(&ri,"^",NULL,0);
 5380         if (!raxNext(&ri)) break;
 5381         uint64_t expiretime;
 5382         memcpy(&expiretime,ri.key,sizeof(expiretime));
 5383         expiretime = ntohu64(expiretime);
 5384         if (now >= expiretime) {
 5385             RedisModuleTimer *timer = ri.data;
 5386             RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
 5387 
 5388             ctx.module = timer->module;
 5389             ctx.client = moduleFreeContextReusedClient;
 5390             selectDb(ctx.client, timer->dbid);
 5391             timer->callback(&ctx,timer->data);
 5392             moduleFreeContext(&ctx);
 5393             raxRemove(Timers,(unsigned char*)ri.key,ri.key_len,NULL);
 5394             zfree(timer);
 5395         } else {
 5396             next_period = (expiretime-now)/1000; /* Scale to milliseconds. */
 5397             break;
 5398         }
 5399     }
 5400     raxStop(&ri);
 5401 
 5402     /* Reschedule the next timer or cancel it. */
 5403     if (next_period <= 0) next_period = 1;
 5404     return (raxSize(Timers) > 0) ? next_period : AE_NOMORE;
 5405 }
 5406 
 5407 /* Create a new timer that will fire after `period` milliseconds, and will call
 5408  * the specified function using `data` as argument. The returned timer ID can be
 5409  * used to get information from the timer or to stop it before it fires. */