"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.2.5/src/multi.c" (21 Jul 2021, 14848 Bytes) of package /linux/misc/redis-6.2.5.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "multi.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.2.4_vs_6.2.5.

    1 /*
    2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions are met:
    7  *
    8  *   * Redistributions of source code must retain the above copyright notice,
    9  *     this list of conditions and the following disclaimer.
   10  *   * Redistributions in binary form must reproduce the above copyright
   11  *     notice, this list of conditions and the following disclaimer in the
   12  *     documentation and/or other materials provided with the distribution.
   13  *   * Neither the name of Redis nor the names of its contributors may be used
   14  *     to endorse or promote products derived from this software without
   15  *     specific prior written permission.
   16  *
   17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   27  * POSSIBILITY OF SUCH DAMAGE.
   28  */
   29 
   30 #include "server.h"
   31 
   32 /* ================================ MULTI/EXEC ============================== */
   33 
   34 /* Client state initialization for MULTI/EXEC */
   35 void initClientMultiState(client *c) {
   36     c->mstate.commands = NULL;
   37     c->mstate.count = 0;
   38     c->mstate.cmd_flags = 0;
   39     c->mstate.cmd_inv_flags = 0;
   40 }
   41 
   42 /* Release all the resources associated with MULTI/EXEC state */
   43 void freeClientMultiState(client *c) {
   44     int j;
   45 
   46     for (j = 0; j < c->mstate.count; j++) {
   47         int i;
   48         multiCmd *mc = c->mstate.commands+j;
   49 
   50         for (i = 0; i < mc->argc; i++)
   51             decrRefCount(mc->argv[i]);
   52         zfree(mc->argv);
   53     }
   54     zfree(c->mstate.commands);
   55 }
   56 
   57 /* Add a new command into the MULTI commands queue */
   58 void queueMultiCommand(client *c) {
   59     multiCmd *mc;
   60     int j;
   61 
   62     /* No sense to waste memory if the transaction is already aborted.
   63      * this is useful in case client sends these in a pipeline, or doesn't
   64      * bother to read previous responses and didn't notice the multi was already
   65      * aborted. */
   66     if (c->flags & CLIENT_DIRTY_EXEC)
   67         return;
   68 
   69     c->mstate.commands = zrealloc(c->mstate.commands,
   70             sizeof(multiCmd)*(c->mstate.count+1));
   71     mc = c->mstate.commands+c->mstate.count;
   72     mc->cmd = c->cmd;
   73     mc->argc = c->argc;
   74     mc->argv = zmalloc(sizeof(robj*)*c->argc);
   75     memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
   76     for (j = 0; j < c->argc; j++)
   77         incrRefCount(mc->argv[j]);
   78     c->mstate.count++;
   79     c->mstate.cmd_flags |= c->cmd->flags;
   80     c->mstate.cmd_inv_flags |= ~c->cmd->flags;
   81 }
   82 
   83 void discardTransaction(client *c) {
   84     freeClientMultiState(c);
   85     initClientMultiState(c);
   86     c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
   87     unwatchAllKeys(c);
   88 }
   89 
   90 /* Flag the transaction as DIRTY_EXEC so that EXEC will fail.
   91  * Should be called every time there is an error while queueing a command. */
   92 void flagTransaction(client *c) {
   93     if (c->flags & CLIENT_MULTI)
   94         c->flags |= CLIENT_DIRTY_EXEC;
   95 }
   96 
   97 void multiCommand(client *c) {
   98     if (c->flags & CLIENT_MULTI) {
   99         addReplyError(c,"MULTI calls can not be nested");
  100         return;
  101     }
  102     c->flags |= CLIENT_MULTI;
  103 
  104     addReply(c,shared.ok);
  105 }
  106 
  107 void discardCommand(client *c) {
  108     if (!(c->flags & CLIENT_MULTI)) {
  109         addReplyError(c,"DISCARD without MULTI");
  110         return;
  111     }
  112     discardTransaction(c);
  113     addReply(c,shared.ok);
  114 }
  115 
  116 void beforePropagateMulti() {
  117     /* Propagating MULTI */
  118     serverAssert(!server.propagate_in_transaction);
  119     server.propagate_in_transaction = 1;
  120 }
  121 
  122 void afterPropagateExec() {
  123     /* Propagating EXEC */
  124     serverAssert(server.propagate_in_transaction == 1);
  125     server.propagate_in_transaction = 0;
  126 }
  127 
  128 /* Send a MULTI command to all the slaves and AOF file. Check the execCommand
  129  * implementation for more information. */
  130 void execCommandPropagateMulti(int dbid) {
  131     beforePropagateMulti();
  132     propagate(server.multiCommand,dbid,&shared.multi,1,
  133               PROPAGATE_AOF|PROPAGATE_REPL);
  134 }
  135 
  136 void execCommandPropagateExec(int dbid) {
  137     propagate(server.execCommand,dbid,&shared.exec,1,
  138               PROPAGATE_AOF|PROPAGATE_REPL);
  139     afterPropagateExec();
  140 }
  141 
  142 /* Aborts a transaction, with a specific error message.
  143  * The transaction is always aborted with -EXECABORT so that the client knows
  144  * the server exited the multi state, but the actual reason for the abort is
  145  * included too.
  146  * Note: 'error' may or may not end with \r\n. see addReplyErrorFormat. */
  147 void execCommandAbort(client *c, sds error) {
  148     discardTransaction(c);
  149 
  150     if (error[0] == '-') error++;
  151     addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
  152 
  153     /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
  154      * already, and didn't send any of the queued commands, now we'll just send
  155      * EXEC so it is clear that the transaction is over. */
  156     replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
  157 }
  158 
  159 void execCommand(client *c) {
  160     int j;
  161     robj **orig_argv;
  162     int orig_argc;
  163     struct redisCommand *orig_cmd;
  164     int was_master = server.masterhost == NULL;
  165 
  166     if (!(c->flags & CLIENT_MULTI)) {
  167         addReplyError(c,"EXEC without MULTI");
  168         return;
  169     }
  170 
  171     /* EXEC with expired watched key is disallowed*/
  172     if (isWatchedKeyExpired(c)) {
  173         c->flags |= (CLIENT_DIRTY_CAS);
  174     }
  175 
  176     /* Check if we need to abort the EXEC because:
  177      * 1) Some WATCHed key was touched.
  178      * 2) There was a previous error while queueing commands.
  179      * A failed EXEC in the first case returns a multi bulk nil object
  180      * (technically it is not an error but a special behavior), while
  181      * in the second an EXECABORT error is returned. */
  182     if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
  183         addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
  184                                                    shared.nullarray[c->resp]);
  185         discardTransaction(c);
  186         return;
  187     }
  188 
  189     uint64_t old_flags = c->flags;
  190 
  191     /* we do not want to allow blocking commands inside multi */
  192     c->flags |= CLIENT_DENY_BLOCKING;
  193 
  194     /* Exec all the queued commands */
  195     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
  196 
  197     server.in_exec = 1;
  198 
  199     orig_argv = c->argv;
  200     orig_argc = c->argc;
  201     orig_cmd = c->cmd;
  202     addReplyArrayLen(c,c->mstate.count);
  203     for (j = 0; j < c->mstate.count; j++) {
  204         c->argc = c->mstate.commands[j].argc;
  205         c->argv = c->mstate.commands[j].argv;
  206         c->cmd = c->mstate.commands[j].cmd;
  207 
  208         /* ACL permissions are also checked at the time of execution in case
  209          * they were changed after the commands were queued. */
  210         int acl_errpos;
  211         int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
  212         if (acl_retval != ACL_OK) {
  213             char *reason;
  214             switch (acl_retval) {
  215             case ACL_DENIED_CMD:
  216                 reason = "no permission to execute the command or subcommand";
  217                 break;
  218             case ACL_DENIED_KEY:
  219                 reason = "no permission to touch the specified keys";
  220                 break;
  221             case ACL_DENIED_CHANNEL:
  222                 reason = "no permission to access one of the channels used "
  223                          "as arguments";
  224                 break;
  225             default:
  226                 reason = "no permission";
  227                 break;
  228             }
  229             addACLLogEntry(c,acl_retval,acl_errpos,NULL);
  230             addReplyErrorFormat(c,
  231                 "-NOPERM ACLs rules changed between the moment the "
  232                 "transaction was accumulated and the EXEC call. "
  233                 "This command is no longer allowed for the "
  234                 "following reason: %s", reason);
  235         } else {
  236             call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
  237             serverAssert((c->flags & CLIENT_BLOCKED) == 0);
  238         }
  239 
  240         /* Commands may alter argc/argv, restore mstate. */
  241         c->mstate.commands[j].argc = c->argc;
  242         c->mstate.commands[j].argv = c->argv;
  243         c->mstate.commands[j].cmd = c->cmd;
  244     }
  245 
  246     // restore old DENY_BLOCKING value
  247     if (!(old_flags & CLIENT_DENY_BLOCKING))
  248         c->flags &= ~CLIENT_DENY_BLOCKING;
  249 
  250     c->argv = orig_argv;
  251     c->argc = orig_argc;
  252     c->cmd = orig_cmd;
  253     discardTransaction(c);
  254 
  255     /* Make sure the EXEC command will be propagated as well if MULTI
  256      * was already propagated. */
  257     if (server.propagate_in_transaction) {
  258         int is_master = server.masterhost == NULL;
  259         server.dirty++;
  260         /* If inside the MULTI/EXEC block this instance was suddenly
  261          * switched from master to slave (using the SLAVEOF command), the
  262          * initial MULTI was propagated into the replication backlog, but the
  263          * rest was not. We need to make sure to at least terminate the
  264          * backlog with the final EXEC. */
  265         if (server.repl_backlog && was_master && !is_master) {
  266             char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
  267             feedReplicationBacklog(execcmd,strlen(execcmd));
  268         }
  269         afterPropagateExec();
  270     }
  271 
  272     server.in_exec = 0;
  273 }
  274 
  275 /* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
  276  *
  277  * The implementation uses a per-DB hash table mapping keys to list of clients
  278  * WATCHing those keys, so that given a key that is going to be modified
  279  * we can mark all the associated clients as dirty.
  280  *
  281  * Also every client contains a list of WATCHed keys so that's possible to
  282  * un-watch such keys when the client is freed or when UNWATCH is called. */
  283 
  284 /* In the client->watched_keys list we need to use watchedKey structures
  285  * as in order to identify a key in Redis we need both the key name and the
  286  * DB */
  287 typedef struct watchedKey {
  288     robj *key;
  289     redisDb *db;
  290 } watchedKey;
  291 
  292 /* Watch for the specified key */
  293 void watchForKey(client *c, robj *key) {
  294     list *clients = NULL;
  295     listIter li;
  296     listNode *ln;
  297     watchedKey *wk;
  298 
  299     /* Check if we are already watching for this key */
  300     listRewind(c->watched_keys,&li);
  301     while((ln = listNext(&li))) {
  302         wk = listNodeValue(ln);
  303         if (wk->db == c->db && equalStringObjects(key,wk->key))
  304             return; /* Key already watched */
  305     }
  306     /* This key is not already watched in this DB. Let's add it */
  307     clients = dictFetchValue(c->db->watched_keys,key);
  308     if (!clients) {
  309         clients = listCreate();
  310         dictAdd(c->db->watched_keys,key,clients);
  311         incrRefCount(key);
  312     }
  313     listAddNodeTail(clients,c);
  314     /* Add the new key to the list of keys watched by this client */
  315     wk = zmalloc(sizeof(*wk));
  316     wk->key = key;
  317     wk->db = c->db;
  318     incrRefCount(key);
  319     listAddNodeTail(c->watched_keys,wk);
  320 }
  321 
  322 /* Unwatch all the keys watched by this client. To clean the EXEC dirty
  323  * flag is up to the caller. */
  324 void unwatchAllKeys(client *c) {
  325     listIter li;
  326     listNode *ln;
  327 
  328     if (listLength(c->watched_keys) == 0) return;
  329     listRewind(c->watched_keys,&li);
  330     while((ln = listNext(&li))) {
  331         list *clients;
  332         watchedKey *wk;
  333 
  334         /* Lookup the watched key -> clients list and remove the client
  335          * from the list */
  336         wk = listNodeValue(ln);
  337         clients = dictFetchValue(wk->db->watched_keys, wk->key);
  338         serverAssertWithInfo(c,NULL,clients != NULL);
  339         listDelNode(clients,listSearchKey(clients,c));
  340         /* Kill the entry at all if this was the only client */
  341         if (listLength(clients) == 0)
  342             dictDelete(wk->db->watched_keys, wk->key);
  343         /* Remove this watched key from the client->watched list */
  344         listDelNode(c->watched_keys,ln);
  345         decrRefCount(wk->key);
  346         zfree(wk);
  347     }
  348 }
  349 
  350 /* iterates over the watched_keys list and
  351  * look for an expired key . */
  352 int isWatchedKeyExpired(client *c) {
  353     listIter li;
  354     listNode *ln;
  355     watchedKey *wk;
  356     if (listLength(c->watched_keys) == 0) return 0;
  357     listRewind(c->watched_keys,&li);
  358     while ((ln = listNext(&li))) {
  359         wk = listNodeValue(ln);
  360         if (keyIsExpired(wk->db, wk->key)) return 1;
  361     }
  362 
  363     return 0;
  364 }
  365 
  366 /* "Touch" a key, so that if this key is being WATCHed by some client the
  367  * next EXEC will fail. */
  368 void touchWatchedKey(redisDb *db, robj *key) {
  369     list *clients;
  370     listIter li;
  371     listNode *ln;
  372 
  373     if (dictSize(db->watched_keys) == 0) return;
  374     clients = dictFetchValue(db->watched_keys, key);
  375     if (!clients) return;
  376 
  377     /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
  378     /* Check if we are already watching for this key */
  379     listRewind(clients,&li);
  380     while((ln = listNext(&li))) {
  381         client *c = listNodeValue(ln);
  382 
  383         c->flags |= CLIENT_DIRTY_CAS;
  384     }
  385 }
  386 
  387 /* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
  388  * It may happen in the following situations:
  389  * FLUSHDB, FLUSHALL, SWAPDB
  390  *
  391  * replaced_with: for SWAPDB, the WATCH should be invalidated if
  392  * the key exists in either of them, and skipped only if it
  393  * doesn't exist in both. */
  394 void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
  395     listIter li;
  396     listNode *ln;
  397     dictEntry *de;
  398 
  399     if (dictSize(emptied->watched_keys) == 0) return;
  400 
  401     dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
  402     while((de = dictNext(di)) != NULL) {
  403         robj *key = dictGetKey(de);
  404         list *clients = dictGetVal(de);
  405         if (!clients) continue;
  406         listRewind(clients,&li);
  407         while((ln = listNext(&li))) {
  408             client *c = listNodeValue(ln);
  409             if (dictFind(emptied->dict, key->ptr)) {
  410                 c->flags |= CLIENT_DIRTY_CAS;
  411             } else if (replaced_with && dictFind(replaced_with->dict, key->ptr)) {
  412                 c->flags |= CLIENT_DIRTY_CAS;
  413             }
  414         }
  415     }
  416     dictReleaseIterator(di);
  417 }
  418 
  419 void watchCommand(client *c) {
  420     int j;
  421 
  422     if (c->flags & CLIENT_MULTI) {
  423         addReplyError(c,"WATCH inside MULTI is not allowed");
  424         return;
  425     }
  426     for (j = 1; j < c->argc; j++)
  427         watchForKey(c,c->argv[j]);
  428     addReply(c,shared.ok);
  429 }
  430 
  431 void unwatchCommand(client *c) {
  432     unwatchAllKeys(c);
  433     c->flags &= (~CLIENT_DIRTY_CAS);
  434     addReply(c,shared.ok);
  435 }