"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.2.5/src/server.c" (21 Jul 2021, 238206 Bytes) of package /linux/misc/redis-6.2.5.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "server.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.2.4_vs_6.2.5.

    1 /*
    2  * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com>
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions are met:
    7  *
    8  *   * Redistributions of source code must retain the above copyright notice,
    9  *     this list of conditions and the following disclaimer.
   10  *   * Redistributions in binary form must reproduce the above copyright
   11  *     notice, this list of conditions and the following disclaimer in the
   12  *     documentation and/or other materials provided with the distribution.
   13  *   * Neither the name of Redis nor the names of its contributors may be used
   14  *     to endorse or promote products derived from this software without
   15  *     specific prior written permission.
   16  *
   17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   27  * POSSIBILITY OF SUCH DAMAGE.
   28  */
   29 
   30 #include "server.h"
   31 #include "monotonic.h"
   32 #include "cluster.h"
   33 #include "slowlog.h"
   34 #include "bio.h"
   35 #include "latency.h"
   36 #include "atomicvar.h"
   37 #include "mt19937-64.h"
   38 
   39 #include <time.h>
   40 #include <signal.h>
   41 #include <sys/wait.h>
   42 #include <errno.h>
   43 #include <assert.h>
   44 #include <ctype.h>
   45 #include <stdarg.h>
   46 #include <arpa/inet.h>
   47 #include <sys/stat.h>
   48 #include <fcntl.h>
   49 #include <sys/time.h>
   50 #include <sys/resource.h>
   51 #include <sys/uio.h>
   52 #include <sys/un.h>
   53 #include <limits.h>
   54 #include <float.h>
   55 #include <math.h>
   56 #include <sys/resource.h>
   57 #include <sys/utsname.h>
   58 #include <locale.h>
   59 #include <sys/socket.h>
   60 #include <sys/resource.h>
   61 
   62 #ifdef __linux__
   63 #include <sys/mman.h>
   64 #endif
   65 
   66 /* Our shared "common" objects */
   67 
   68 struct sharedObjectsStruct shared;
   69 
   70 /* Global vars that are actually used as constants. The following double
   71  * values are used for double on-disk serialization, and are initialized
   72  * at runtime to avoid strange compiler optimizations. */
   73 
   74 double R_Zero, R_PosInf, R_NegInf, R_Nan;
   75 
   76 /*================================= Globals ================================= */
   77 
   78 /* Global vars */
   79 struct redisServer server; /* Server global state */
   80 
   81 /* Our command table.
   82  *
   83  * Every entry is composed of the following fields:
   84  *
   85  * name:        A string representing the command name.
   86  *
   87  * function:    Pointer to the C function implementing the command.
   88  *
   89  * arity:       Number of arguments, it is possible to use -N to say >= N
   90  *
   91  * sflags:      Command flags as string. See below for a table of flags.
   92  *
   93  * flags:       Flags as bitmask. Computed by Redis using the 'sflags' field.
   94  *
   95  * get_keys_proc: An optional function to get key arguments from a command.
   96  *                This is only used when the following three fields are not
   97  *                enough to specify what arguments are keys.
   98  *
   99  * first_key_index: First argument that is a key
  100  *
  101  * last_key_index: Last argument that is a key
  102  *
  103  * key_step:    Step to get all the keys from first to last argument.
  104  *              For instance in MSET the step is two since arguments
  105  *              are key,val,key,val,...
  106  *
  107  * microseconds: Microseconds of total execution time for this command.
  108  *
  109  * calls:       Total number of calls of this command.
  110  *
  111  * id:          Command bit identifier for ACLs or other goals.
  112  *
  113  * The flags, microseconds and calls fields are computed by Redis and should
  114  * always be set to zero.
  115  *
  116  * Command flags are expressed using space separated strings, that are turned
  117  * into actual flags by the populateCommandTable() function.
  118  *
  119  * This is the meaning of the flags:
  120  *
  121  * write:       Write command (may modify the key space).
  122  *
  123  * read-only:   Commands just reading from keys without changing the content.
  124  *              Note that commands that don't read from the keyspace such as
  125  *              TIME, SELECT, INFO, administrative commands, and connection
  126  *              or transaction related commands (multi, exec, discard, ...)
  127  *              are not flagged as read-only commands, since they affect the
  128  *              server or the connection in other ways.
  129  *
  130  * use-memory:  May increase memory usage once called. Don't allow if out
  131  *              of memory.
  132  *
  133  * admin:       Administrative command, like SAVE or SHUTDOWN.
  134  *
  135  * pub-sub:     Pub/Sub related command.
  136  *
  137  * no-script:   Command not allowed in scripts.
  138  *
  139  * random:      Random command. Command is not deterministic, that is, the same
  140  *              command with the same arguments, with the same key space, may
  141  *              have different results. For instance SPOP and RANDOMKEY are
  142  *              two random commands.
  143  *
  144  * to-sort:     Sort command output array if called from script, so that the
  145  *              output is deterministic. When this flag is used (not always
  146  *              possible), then the "random" flag is not needed.
  147  *
  148  * ok-loading:  Allow the command while loading the database.
  149  *
  150  * ok-stale:    Allow the command while a slave has stale data but is not
  151  *              allowed to serve this data. Normally no command is accepted
  152  *              in this condition but just a few.
  153  *
  154  * no-monitor:  Do not automatically propagate the command on MONITOR.
  155  *
  156  * no-slowlog:  Do not automatically propagate the command to the slowlog.
  157  *
  158  * cluster-asking: Perform an implicit ASKING for this command, so the
  159  *              command will be accepted in cluster mode if the slot is marked
  160  *              as 'importing'.
  161  *
  162  * fast:        Fast command: O(1) or O(log(N)) command that should never
  163  *              delay its execution as long as the kernel scheduler is giving
  164  *              us time. Note that commands that may trigger a DEL as a side
  165  *              effect (like SET) are not fast commands.
  166  * 
  167  * may-replicate: Command may produce replication traffic, but should be 
  168  *                allowed under circumstances where write commands are disallowed. 
  169  *                Examples include PUBLISH, which replicates pubsub messages,and 
  170  *                EVAL, which may execute write commands, which are replicated, 
  171  *                or may just execute read commands. A command can not be marked 
  172  *                both "write" and "may-replicate"
  173  *
  174  * The following additional flags are only used in order to put commands
  175  * in a specific ACL category. Commands can have multiple ACL categories.
  176  *
  177  * @keyspace, @read, @write, @set, @sortedset, @list, @hash, @string, @bitmap,
  178  * @hyperloglog, @stream, @admin, @fast, @slow, @pubsub, @blocking, @dangerous,
  179  * @connection, @transaction, @scripting, @geo.
  180  *
  181  * Note that:
  182  *
  183  * 1) The read-only flag implies the @read ACL category.
  184  * 2) The write flag implies the @write ACL category.
  185  * 3) The fast flag implies the @fast ACL category.
  186  * 4) The admin flag implies the @admin and @dangerous ACL category.
  187  * 5) The pub-sub flag implies the @pubsub ACL category.
  188  * 6) The lack of fast flag implies the @slow ACL category.
  189  * 7) The non obvious "keyspace" category includes the commands
  190  *    that interact with keys without having anything to do with
  191  *    specific data structures, such as: DEL, RENAME, MOVE, SELECT,
  192  *    TYPE, EXPIRE*, PEXPIRE*, TTL, PTTL, ...
  193  */
  194 
  195 struct redisCommand redisCommandTable[] = {
  196     {"module",moduleCommand,-2,
  197      "admin no-script",
  198      0,NULL,0,0,0,0,0,0},
  199 
  200     {"get",getCommand,2,
  201      "read-only fast @string",
  202      0,NULL,1,1,1,0,0,0},
  203 
  204     {"getex",getexCommand,-2,
  205      "write fast @string",
  206      0,NULL,1,1,1,0,0,0},
  207 
  208     {"getdel",getdelCommand,2,
  209      "write fast @string",
  210      0,NULL,1,1,1,0,0,0},
  211 
  212     /* Note that we can't flag set as fast, since it may perform an
  213      * implicit DEL of a large key. */
  214     {"set",setCommand,-3,
  215      "write use-memory @string",
  216      0,NULL,1,1,1,0,0,0},
  217 
  218     {"setnx",setnxCommand,3,
  219      "write use-memory fast @string",
  220      0,NULL,1,1,1,0,0,0},
  221 
  222     {"setex",setexCommand,4,
  223      "write use-memory @string",
  224      0,NULL,1,1,1,0,0,0},
  225 
  226     {"psetex",psetexCommand,4,
  227      "write use-memory @string",
  228      0,NULL,1,1,1,0,0,0},
  229 
  230     {"append",appendCommand,3,
  231      "write use-memory fast @string",
  232      0,NULL,1,1,1,0,0,0},
  233 
  234     {"strlen",strlenCommand,2,
  235      "read-only fast @string",
  236      0,NULL,1,1,1,0,0,0},
  237 
  238     {"del",delCommand,-2,
  239      "write @keyspace",
  240      0,NULL,1,-1,1,0,0,0},
  241 
  242     {"unlink",unlinkCommand,-2,
  243      "write fast @keyspace",
  244      0,NULL,1,-1,1,0,0,0},
  245 
  246     {"exists",existsCommand,-2,
  247      "read-only fast @keyspace",
  248      0,NULL,1,-1,1,0,0,0},
  249 
  250     {"setbit",setbitCommand,4,
  251      "write use-memory @bitmap",
  252      0,NULL,1,1,1,0,0,0},
  253 
  254     {"getbit",getbitCommand,3,
  255      "read-only fast @bitmap",
  256      0,NULL,1,1,1,0,0,0},
  257 
  258     {"bitfield",bitfieldCommand,-2,
  259      "write use-memory @bitmap",
  260      0,NULL,1,1,1,0,0,0},
  261 
  262     {"bitfield_ro",bitfieldroCommand,-2,
  263      "read-only fast @bitmap",
  264      0,NULL,1,1,1,0,0,0},
  265 
  266     {"setrange",setrangeCommand,4,
  267      "write use-memory @string",
  268      0,NULL,1,1,1,0,0,0},
  269 
  270     {"getrange",getrangeCommand,4,
  271      "read-only @string",
  272      0,NULL,1,1,1,0,0,0},
  273 
  274     {"substr",getrangeCommand,4,
  275      "read-only @string",
  276      0,NULL,1,1,1,0,0,0},
  277 
  278     {"incr",incrCommand,2,
  279      "write use-memory fast @string",
  280      0,NULL,1,1,1,0,0,0},
  281 
  282     {"decr",decrCommand,2,
  283      "write use-memory fast @string",
  284      0,NULL,1,1,1,0,0,0},
  285 
  286     {"mget",mgetCommand,-2,
  287      "read-only fast @string",
  288      0,NULL,1,-1,1,0,0,0},
  289 
  290     {"rpush",rpushCommand,-3,
  291      "write use-memory fast @list",
  292      0,NULL,1,1,1,0,0,0},
  293 
  294     {"lpush",lpushCommand,-3,
  295      "write use-memory fast @list",
  296      0,NULL,1,1,1,0,0,0},
  297 
  298     {"rpushx",rpushxCommand,-3,
  299      "write use-memory fast @list",
  300      0,NULL,1,1,1,0,0,0},
  301 
  302     {"lpushx",lpushxCommand,-3,
  303      "write use-memory fast @list",
  304      0,NULL,1,1,1,0,0,0},
  305 
  306     {"linsert",linsertCommand,5,
  307      "write use-memory @list",
  308      0,NULL,1,1,1,0,0,0},
  309 
  310     {"rpop",rpopCommand,-2,
  311      "write fast @list",
  312      0,NULL,1,1,1,0,0,0},
  313 
  314     {"lpop",lpopCommand,-2,
  315      "write fast @list",
  316      0,NULL,1,1,1,0,0,0},
  317 
  318     {"brpop",brpopCommand,-3,
  319      "write no-script @list @blocking",
  320      0,NULL,1,-2,1,0,0,0},
  321 
  322     {"brpoplpush",brpoplpushCommand,4,
  323      "write use-memory no-script @list @blocking",
  324      0,NULL,1,2,1,0,0,0},
  325 
  326     {"blmove",blmoveCommand,6,
  327      "write use-memory no-script @list @blocking",
  328      0,NULL,1,2,1,0,0,0},
  329 
  330     {"blpop",blpopCommand,-3,
  331      "write no-script @list @blocking",
  332      0,NULL,1,-2,1,0,0,0},
  333 
  334     {"llen",llenCommand,2,
  335      "read-only fast @list",
  336      0,NULL,1,1,1,0,0,0},
  337 
  338     {"lindex",lindexCommand,3,
  339      "read-only @list",
  340      0,NULL,1,1,1,0,0,0},
  341 
  342     {"lset",lsetCommand,4,
  343      "write use-memory @list",
  344      0,NULL,1,1,1,0,0,0},
  345 
  346     {"lrange",lrangeCommand,4,
  347      "read-only @list",
  348      0,NULL,1,1,1,0,0,0},
  349 
  350     {"ltrim",ltrimCommand,4,
  351      "write @list",
  352      0,NULL,1,1,1,0,0,0},
  353 
  354     {"lpos",lposCommand,-3,
  355      "read-only @list",
  356      0,NULL,1,1,1,0,0,0},
  357 
  358     {"lrem",lremCommand,4,
  359      "write @list",
  360      0,NULL,1,1,1,0,0,0},
  361 
  362     {"rpoplpush",rpoplpushCommand,3,
  363      "write use-memory @list",
  364      0,NULL,1,2,1,0,0,0},
  365 
  366     {"lmove",lmoveCommand,5,
  367      "write use-memory @list",
  368      0,NULL,1,2,1,0,0,0},
  369 
  370     {"sadd",saddCommand,-3,
  371      "write use-memory fast @set",
  372      0,NULL,1,1,1,0,0,0},
  373 
  374     {"srem",sremCommand,-3,
  375      "write fast @set",
  376      0,NULL,1,1,1,0,0,0},
  377 
  378     {"smove",smoveCommand,4,
  379      "write fast @set",
  380      0,NULL,1,2,1,0,0,0},
  381 
  382     {"sismember",sismemberCommand,3,
  383      "read-only fast @set",
  384      0,NULL,1,1,1,0,0,0},
  385 
  386     {"smismember",smismemberCommand,-3,
  387      "read-only fast @set",
  388      0,NULL,1,1,1,0,0,0},
  389 
  390     {"scard",scardCommand,2,
  391      "read-only fast @set",
  392      0,NULL,1,1,1,0,0,0},
  393 
  394     {"spop",spopCommand,-2,
  395      "write random fast @set",
  396      0,NULL,1,1,1,0,0,0},
  397 
  398     {"srandmember",srandmemberCommand,-2,
  399      "read-only random @set",
  400      0,NULL,1,1,1,0,0,0},
  401 
  402     {"sinter",sinterCommand,-2,
  403      "read-only to-sort @set",
  404      0,NULL,1,-1,1,0,0,0},
  405 
  406     {"sinterstore",sinterstoreCommand,-3,
  407      "write use-memory @set",
  408      0,NULL,1,-1,1,0,0,0},
  409 
  410     {"sunion",sunionCommand,-2,
  411      "read-only to-sort @set",
  412      0,NULL,1,-1,1,0,0,0},
  413 
  414     {"sunionstore",sunionstoreCommand,-3,
  415      "write use-memory @set",
  416      0,NULL,1,-1,1,0,0,0},
  417 
  418     {"sdiff",sdiffCommand,-2,
  419      "read-only to-sort @set",
  420      0,NULL,1,-1,1,0,0,0},
  421 
  422     {"sdiffstore",sdiffstoreCommand,-3,
  423      "write use-memory @set",
  424      0,NULL,1,-1,1,0,0,0},
  425 
  426     {"smembers",sinterCommand,2,
  427      "read-only to-sort @set",
  428      0,NULL,1,1,1,0,0,0},
  429 
  430     {"sscan",sscanCommand,-3,
  431      "read-only random @set",
  432      0,NULL,1,1,1,0,0,0},
  433 
  434     {"zadd",zaddCommand,-4,
  435      "write use-memory fast @sortedset",
  436      0,NULL,1,1,1,0,0,0},
  437 
  438     {"zincrby",zincrbyCommand,4,
  439      "write use-memory fast @sortedset",
  440      0,NULL,1,1,1,0,0,0},
  441 
  442     {"zrem",zremCommand,-3,
  443      "write fast @sortedset",
  444      0,NULL,1,1,1,0,0,0},
  445 
  446     {"zremrangebyscore",zremrangebyscoreCommand,4,
  447      "write @sortedset",
  448      0,NULL,1,1,1,0,0,0},
  449 
  450     {"zremrangebyrank",zremrangebyrankCommand,4,
  451      "write @sortedset",
  452      0,NULL,1,1,1,0,0,0},
  453 
  454     {"zremrangebylex",zremrangebylexCommand,4,
  455      "write @sortedset",
  456      0,NULL,1,1,1,0,0,0},
  457 
  458     {"zunionstore",zunionstoreCommand,-4,
  459      "write use-memory @sortedset",
  460      0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0},
  461 
  462     {"zinterstore",zinterstoreCommand,-4,
  463      "write use-memory @sortedset",
  464      0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0},
  465 
  466     {"zdiffstore",zdiffstoreCommand,-4,
  467      "write use-memory @sortedset",
  468      0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0},
  469 
  470     {"zunion",zunionCommand,-3,
  471      "read-only @sortedset",
  472      0,zunionInterDiffGetKeys,0,0,0,0,0,0},
  473 
  474     {"zinter",zinterCommand,-3,
  475      "read-only @sortedset",
  476      0,zunionInterDiffGetKeys,0,0,0,0,0,0},
  477 
  478     {"zdiff",zdiffCommand,-3,
  479      "read-only @sortedset",
  480      0,zunionInterDiffGetKeys,0,0,0,0,0,0},
  481 
  482     {"zrange",zrangeCommand,-4,
  483      "read-only @sortedset",
  484      0,NULL,1,1,1,0,0,0},
  485 
  486     {"zrangestore",zrangestoreCommand,-5,
  487      "write use-memory @sortedset",
  488      0,NULL,1,2,1,0,0,0},
  489 
  490     {"zrangebyscore",zrangebyscoreCommand,-4,
  491      "read-only @sortedset",
  492      0,NULL,1,1,1,0,0,0},
  493 
  494     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,
  495      "read-only @sortedset",
  496      0,NULL,1,1,1,0,0,0},
  497 
  498     {"zrangebylex",zrangebylexCommand,-4,
  499      "read-only @sortedset",
  500      0,NULL,1,1,1,0,0,0},
  501 
  502     {"zrevrangebylex",zrevrangebylexCommand,-4,
  503      "read-only @sortedset",
  504      0,NULL,1,1,1,0,0,0},
  505 
  506     {"zcount",zcountCommand,4,
  507      "read-only fast @sortedset",
  508      0,NULL,1,1,1,0,0,0},
  509 
  510     {"zlexcount",zlexcountCommand,4,
  511      "read-only fast @sortedset",
  512      0,NULL,1,1,1,0,0,0},
  513 
  514     {"zrevrange",zrevrangeCommand,-4,
  515      "read-only @sortedset",
  516      0,NULL,1,1,1,0,0,0},
  517 
  518     {"zcard",zcardCommand,2,
  519      "read-only fast @sortedset",
  520      0,NULL,1,1,1,0,0,0},
  521 
  522     {"zscore",zscoreCommand,3,
  523      "read-only fast @sortedset",
  524      0,NULL,1,1,1,0,0,0},
  525 
  526     {"zmscore",zmscoreCommand,-3,
  527      "read-only fast @sortedset",
  528      0,NULL,1,1,1,0,0,0},
  529 
  530     {"zrank",zrankCommand,3,
  531      "read-only fast @sortedset",
  532      0,NULL,1,1,1,0,0,0},
  533 
  534     {"zrevrank",zrevrankCommand,3,
  535      "read-only fast @sortedset",
  536      0,NULL,1,1,1,0,0,0},
  537 
  538     {"zscan",zscanCommand,-3,
  539      "read-only random @sortedset",
  540      0,NULL,1,1,1,0,0,0},
  541 
  542     {"zpopmin",zpopminCommand,-2,
  543      "write fast @sortedset",
  544      0,NULL,1,1,1,0,0,0},
  545 
  546     {"zpopmax",zpopmaxCommand,-2,
  547      "write fast @sortedset",
  548      0,NULL,1,1,1,0,0,0},
  549 
  550     {"bzpopmin",bzpopminCommand,-3,
  551      "write no-script fast @sortedset @blocking",
  552      0,NULL,1,-2,1,0,0,0},
  553 
  554     {"bzpopmax",bzpopmaxCommand,-3,
  555      "write no-script fast @sortedset @blocking",
  556      0,NULL,1,-2,1,0,0,0},
  557 
  558     {"zrandmember",zrandmemberCommand,-2,
  559      "read-only random @sortedset",
  560      0,NULL,1,1,1,0,0,0},
  561 
  562     {"hset",hsetCommand,-4,
  563      "write use-memory fast @hash",
  564      0,NULL,1,1,1,0,0,0},
  565 
  566     {"hsetnx",hsetnxCommand,4,
  567      "write use-memory fast @hash",
  568      0,NULL,1,1,1,0,0,0},
  569 
  570     {"hget",hgetCommand,3,
  571      "read-only fast @hash",
  572      0,NULL,1,1,1,0,0,0},
  573 
  574     {"hmset",hsetCommand,-4,
  575      "write use-memory fast @hash",
  576      0,NULL,1,1,1,0,0,0},
  577 
  578     {"hmget",hmgetCommand,-3,
  579      "read-only fast @hash",
  580      0,NULL,1,1,1,0,0,0},
  581 
  582     {"hincrby",hincrbyCommand,4,
  583      "write use-memory fast @hash",
  584      0,NULL,1,1,1,0,0,0},
  585 
  586     {"hincrbyfloat",hincrbyfloatCommand,4,
  587      "write use-memory fast @hash",
  588      0,NULL,1,1,1,0,0,0},
  589 
  590     {"hdel",hdelCommand,-3,
  591      "write fast @hash",
  592      0,NULL,1,1,1,0,0,0},
  593 
  594     {"hlen",hlenCommand,2,
  595      "read-only fast @hash",
  596      0,NULL,1,1,1,0,0,0},
  597 
  598     {"hstrlen",hstrlenCommand,3,
  599      "read-only fast @hash",
  600      0,NULL,1,1,1,0,0,0},
  601 
  602     {"hkeys",hkeysCommand,2,
  603      "read-only to-sort @hash",
  604      0,NULL,1,1,1,0,0,0},
  605 
  606     {"hvals",hvalsCommand,2,
  607      "read-only to-sort @hash",
  608      0,NULL,1,1,1,0,0,0},
  609 
  610     {"hgetall",hgetallCommand,2,
  611      "read-only random @hash",
  612      0,NULL,1,1,1,0,0,0},
  613 
  614     {"hexists",hexistsCommand,3,
  615      "read-only fast @hash",
  616      0,NULL,1,1,1,0,0,0},
  617 
  618     {"hrandfield",hrandfieldCommand,-2,
  619      "read-only random @hash",
  620      0,NULL,1,1,1,0,0,0},
  621 
  622     {"hscan",hscanCommand,-3,
  623      "read-only random @hash",
  624      0,NULL,1,1,1,0,0,0},
  625 
  626     {"incrby",incrbyCommand,3,
  627      "write use-memory fast @string",
  628      0,NULL,1,1,1,0,0,0},
  629 
  630     {"decrby",decrbyCommand,3,
  631      "write use-memory fast @string",
  632      0,NULL,1,1,1,0,0,0},
  633 
  634     {"incrbyfloat",incrbyfloatCommand,3,
  635      "write use-memory fast @string",
  636      0,NULL,1,1,1,0,0,0},
  637 
  638     {"getset",getsetCommand,3,
  639      "write use-memory fast @string",
  640      0,NULL,1,1,1,0,0,0},
  641 
  642     {"mset",msetCommand,-3,
  643      "write use-memory @string",
  644      0,NULL,1,-1,2,0,0,0},
  645 
  646     {"msetnx",msetnxCommand,-3,
  647      "write use-memory @string",
  648      0,NULL,1,-1,2,0,0,0},
  649 
  650     {"randomkey",randomkeyCommand,1,
  651      "read-only random @keyspace",
  652      0,NULL,0,0,0,0,0,0},
  653 
  654     {"select",selectCommand,2,
  655      "ok-loading fast ok-stale @keyspace",
  656      0,NULL,0,0,0,0,0,0},
  657 
  658     {"swapdb",swapdbCommand,3,
  659      "write fast @keyspace @dangerous",
  660      0,NULL,0,0,0,0,0,0},
  661 
  662     {"move",moveCommand,3,
  663      "write fast @keyspace",
  664      0,NULL,1,1,1,0,0,0},
  665 
  666     {"copy",copyCommand,-3,
  667      "write use-memory @keyspace",
  668      0,NULL,1,2,1,0,0,0},
  669 
  670     /* Like for SET, we can't mark rename as a fast command because
  671      * overwriting the target key may result in an implicit slow DEL. */
  672     {"rename",renameCommand,3,
  673      "write @keyspace",
  674      0,NULL,1,2,1,0,0,0},
  675 
  676     {"renamenx",renamenxCommand,3,
  677      "write fast @keyspace",
  678      0,NULL,1,2,1,0,0,0},
  679 
  680     {"expire",expireCommand,3,
  681      "write fast @keyspace",
  682      0,NULL,1,1,1,0,0,0},
  683 
  684     {"expireat",expireatCommand,3,
  685      "write fast @keyspace",
  686      0,NULL,1,1,1,0,0,0},
  687 
  688     {"pexpire",pexpireCommand,3,
  689      "write fast @keyspace",
  690      0,NULL,1,1,1,0,0,0},
  691 
  692     {"pexpireat",pexpireatCommand,3,
  693      "write fast @keyspace",
  694      0,NULL,1,1,1,0,0,0},
  695 
  696     {"keys",keysCommand,2,
  697      "read-only to-sort @keyspace @dangerous",
  698      0,NULL,0,0,0,0,0,0},
  699 
  700     {"scan",scanCommand,-2,
  701      "read-only random @keyspace",
  702      0,NULL,0,0,0,0,0,0},
  703 
  704     {"dbsize",dbsizeCommand,1,
  705      "read-only fast @keyspace",
  706      0,NULL,0,0,0,0,0,0},
  707 
  708     {"auth",authCommand,-2,
  709      "no-auth no-script ok-loading ok-stale fast @connection",
  710      0,NULL,0,0,0,0,0,0},
  711 
  712     /* We don't allow PING during loading since in Redis PING is used as
  713      * failure detection, and a loading server is considered to be
  714      * not available. */
  715     {"ping",pingCommand,-1,
  716      "ok-stale fast @connection",
  717      0,NULL,0,0,0,0,0,0},
  718 
  719     {"echo",echoCommand,2,
  720      "fast @connection",
  721      0,NULL,0,0,0,0,0,0},
  722 
  723     {"save",saveCommand,1,
  724      "admin no-script",
  725      0,NULL,0,0,0,0,0,0},
  726 
  727     {"bgsave",bgsaveCommand,-1,
  728      "admin no-script",
  729      0,NULL,0,0,0,0,0,0},
  730 
  731     {"bgrewriteaof",bgrewriteaofCommand,1,
  732      "admin no-script",
  733      0,NULL,0,0,0,0,0,0},
  734 
  735     {"shutdown",shutdownCommand,-1,
  736      "admin no-script ok-loading ok-stale",
  737      0,NULL,0,0,0,0,0,0},
  738 
  739     {"lastsave",lastsaveCommand,1,
  740      "random fast ok-loading ok-stale @admin @dangerous",
  741      0,NULL,0,0,0,0,0,0},
  742 
  743     {"type",typeCommand,2,
  744      "read-only fast @keyspace",
  745      0,NULL,1,1,1,0,0,0},
  746 
  747     {"multi",multiCommand,1,
  748      "no-script fast ok-loading ok-stale @transaction",
  749      0,NULL,0,0,0,0,0,0},
  750 
  751     {"exec",execCommand,1,
  752      "no-script no-slowlog ok-loading ok-stale @transaction",
  753      0,NULL,0,0,0,0,0,0},
  754 
  755     {"discard",discardCommand,1,
  756      "no-script fast ok-loading ok-stale @transaction",
  757      0,NULL,0,0,0,0,0,0},
  758 
  759     {"sync",syncCommand,1,
  760      "admin no-script",
  761      0,NULL,0,0,0,0,0,0},
  762 
  763     {"psync",syncCommand,-3,
  764      "admin no-script",
  765      0,NULL,0,0,0,0,0,0},
  766 
  767     {"replconf",replconfCommand,-1,
  768      "admin no-script ok-loading ok-stale",
  769      0,NULL,0,0,0,0,0,0},
  770 
  771     {"flushdb",flushdbCommand,-1,
  772      "write @keyspace @dangerous",
  773      0,NULL,0,0,0,0,0,0},
  774 
  775     {"flushall",flushallCommand,-1,
  776      "write @keyspace @dangerous",
  777      0,NULL,0,0,0,0,0,0},
  778 
  779     {"sort",sortCommand,-2,
  780      "write use-memory @list @set @sortedset @dangerous",
  781      0,sortGetKeys,1,1,1,0,0,0},
  782 
  783     {"info",infoCommand,-1,
  784      "ok-loading ok-stale random @dangerous",
  785      0,NULL,0,0,0,0,0,0},
  786 
  787     {"monitor",monitorCommand,1,
  788      "admin no-script ok-loading ok-stale",
  789      0,NULL,0,0,0,0,0,0},
  790 
  791     {"ttl",ttlCommand,2,
  792      "read-only fast random @keyspace",
  793      0,NULL,1,1,1,0,0,0},
  794 
  795     {"touch",touchCommand,-2,
  796      "read-only fast @keyspace",
  797      0,NULL,1,-1,1,0,0,0},
  798 
  799     {"pttl",pttlCommand,2,
  800      "read-only fast random @keyspace",
  801      0,NULL,1,1,1,0,0,0},
  802 
  803     {"persist",persistCommand,2,
  804      "write fast @keyspace",
  805      0,NULL,1,1,1,0,0,0},
  806 
  807     {"slaveof",replicaofCommand,3,
  808      "admin no-script ok-stale",
  809      0,NULL,0,0,0,0,0,0},
  810 
  811     {"replicaof",replicaofCommand,3,
  812      "admin no-script ok-stale",
  813      0,NULL,0,0,0,0,0,0},
  814 
  815     {"role",roleCommand,1,
  816      "ok-loading ok-stale no-script fast @dangerous",
  817      0,NULL,0,0,0,0,0,0},
  818 
  819     {"debug",debugCommand,-2,
  820      "admin no-script ok-loading ok-stale",
  821      0,NULL,0,0,0,0,0,0},
  822 
  823     {"config",configCommand,-2,
  824      "admin ok-loading ok-stale no-script",
  825      0,NULL,0,0,0,0,0,0},
  826 
  827     {"subscribe",subscribeCommand,-2,
  828      "pub-sub no-script ok-loading ok-stale",
  829      0,NULL,0,0,0,0,0,0},
  830 
  831     {"unsubscribe",unsubscribeCommand,-1,
  832      "pub-sub no-script ok-loading ok-stale",
  833      0,NULL,0,0,0,0,0,0},
  834 
  835     {"psubscribe",psubscribeCommand,-2,
  836      "pub-sub no-script ok-loading ok-stale",
  837      0,NULL,0,0,0,0,0,0},
  838 
  839     {"punsubscribe",punsubscribeCommand,-1,
  840      "pub-sub no-script ok-loading ok-stale",
  841      0,NULL,0,0,0,0,0,0},
  842 
  843     {"publish",publishCommand,3,
  844      "pub-sub ok-loading ok-stale fast may-replicate",
  845      0,NULL,0,0,0,0,0,0},
  846 
  847     {"pubsub",pubsubCommand,-2,
  848      "pub-sub ok-loading ok-stale random",
  849      0,NULL,0,0,0,0,0,0},
  850 
  851     {"watch",watchCommand,-2,
  852      "no-script fast ok-loading ok-stale @transaction",
  853      0,NULL,1,-1,1,0,0,0},
  854 
  855     {"unwatch",unwatchCommand,1,
  856      "no-script fast ok-loading ok-stale @transaction",
  857      0,NULL,0,0,0,0,0,0},
  858 
  859     {"cluster",clusterCommand,-2,
  860      "admin ok-stale random",
  861      0,NULL,0,0,0,0,0,0},
  862 
  863     {"restore",restoreCommand,-4,
  864      "write use-memory @keyspace @dangerous",
  865      0,NULL,1,1,1,0,0,0},
  866 
  867     {"restore-asking",restoreCommand,-4,
  868     "write use-memory cluster-asking @keyspace @dangerous",
  869     0,NULL,1,1,1,0,0,0},
  870 
  871     {"migrate",migrateCommand,-6,
  872      "write random @keyspace @dangerous",
  873      0,migrateGetKeys,0,0,0,0,0,0},
  874 
  875     {"asking",askingCommand,1,
  876      "fast @keyspace",
  877      0,NULL,0,0,0,0,0,0},
  878 
  879     {"readonly",readonlyCommand,1,
  880      "fast @keyspace",
  881      0,NULL,0,0,0,0,0,0},
  882 
  883     {"readwrite",readwriteCommand,1,
  884      "fast @keyspace",
  885      0,NULL,0,0,0,0,0,0},
  886 
  887     {"dump",dumpCommand,2,
  888      "read-only random @keyspace",
  889      0,NULL,1,1,1,0,0,0},
  890 
  891     {"object",objectCommand,-2,
  892      "read-only random @keyspace",
  893      0,NULL,2,2,1,0,0,0},
  894 
  895     {"memory",memoryCommand,-2,
  896      "random read-only",
  897      0,memoryGetKeys,0,0,0,0,0,0},
  898 
  899     {"client",clientCommand,-2,
  900      "admin no-script random ok-loading ok-stale @connection",
  901      0,NULL,0,0,0,0,0,0},
  902 
  903     {"hello",helloCommand,-1,
  904      "no-auth no-script fast ok-loading ok-stale @connection",
  905      0,NULL,0,0,0,0,0,0},
  906 
  907     /* EVAL can modify the dataset, however it is not flagged as a write
  908      * command since we do the check while running commands from Lua.
  909      * 
  910      * EVAL and EVALSHA also feed monitors before the commands are executed,
  911      * as opposed to after.
  912       */
  913     {"eval",evalCommand,-3,
  914      "no-script no-monitor may-replicate @scripting",
  915      0,evalGetKeys,0,0,0,0,0,0},
  916 
  917     {"evalsha",evalShaCommand,-3,
  918      "no-script no-monitor may-replicate @scripting",
  919      0,evalGetKeys,0,0,0,0,0,0},
  920 
  921     {"slowlog",slowlogCommand,-2,
  922      "admin random ok-loading ok-stale",
  923      0,NULL,0,0,0,0,0,0},
  924 
  925     {"script",scriptCommand,-2,
  926      "no-script may-replicate @scripting",
  927      0,NULL,0,0,0,0,0,0},
  928 
  929     {"time",timeCommand,1,
  930      "random fast ok-loading ok-stale",
  931      0,NULL,0,0,0,0,0,0},
  932 
  933     {"bitop",bitopCommand,-4,
  934      "write use-memory @bitmap",
  935      0,NULL,2,-1,1,0,0,0},
  936 
  937     {"bitcount",bitcountCommand,-2,
  938      "read-only @bitmap",
  939      0,NULL,1,1,1,0,0,0},
  940 
  941     {"bitpos",bitposCommand,-3,
  942      "read-only @bitmap",
  943      0,NULL,1,1,1,0,0,0},
  944 
  945     {"wait",waitCommand,3,
  946      "no-script @keyspace",
  947      0,NULL,0,0,0,0,0,0},
  948 
  949     {"command",commandCommand,-1,
  950      "ok-loading ok-stale random @connection",
  951      0,NULL,0,0,0,0,0,0},
  952 
  953     {"geoadd",geoaddCommand,-5,
  954      "write use-memory @geo",
  955      0,NULL,1,1,1,0,0,0},
  956 
  957     /* GEORADIUS has store options that may write. */
  958     {"georadius",georadiusCommand,-6,
  959      "write use-memory @geo",
  960      0,georadiusGetKeys,1,1,1,0,0,0},
  961 
  962     {"georadius_ro",georadiusroCommand,-6,
  963      "read-only @geo",
  964      0,NULL,1,1,1,0,0,0},
  965 
  966     {"georadiusbymember",georadiusbymemberCommand,-5,
  967      "write use-memory @geo",
  968      0,georadiusGetKeys,1,1,1,0,0,0},
  969 
  970     {"georadiusbymember_ro",georadiusbymemberroCommand,-5,
  971      "read-only @geo",
  972      0,NULL,1,1,1,0,0,0},
  973 
  974     {"geohash",geohashCommand,-2,
  975      "read-only @geo",
  976      0,NULL,1,1,1,0,0,0},
  977 
  978     {"geopos",geoposCommand,-2,
  979      "read-only @geo",
  980      0,NULL,1,1,1,0,0,0},
  981 
  982     {"geodist",geodistCommand,-4,
  983      "read-only @geo",
  984      0,NULL,1,1,1,0,0,0},
  985 
  986     {"geosearch",geosearchCommand,-7,
  987      "read-only @geo",
  988       0,NULL,1,1,1,0,0,0},
  989 
  990     {"geosearchstore",geosearchstoreCommand,-8,
  991      "write use-memory @geo",
  992       0,NULL,1,2,1,0,0,0},
  993 
  994     {"pfselftest",pfselftestCommand,1,
  995      "admin @hyperloglog",
  996       0,NULL,0,0,0,0,0,0},
  997 
  998     {"pfadd",pfaddCommand,-2,
  999      "write use-memory fast @hyperloglog",
 1000      0,NULL,1,1,1,0,0,0},
 1001 
 1002     /* Technically speaking PFCOUNT may change the key since it changes the
 1003      * final bytes in the HyperLogLog representation. However in this case
 1004      * we claim that the representation, even if accessible, is an internal
 1005      * affair, and the command is semantically read only. */
 1006     {"pfcount",pfcountCommand,-2,
 1007      "read-only may-replicate @hyperloglog",
 1008      0,NULL,1,-1,1,0,0,0},
 1009 
 1010     {"pfmerge",pfmergeCommand,-2,
 1011      "write use-memory @hyperloglog",
 1012      0,NULL,1,-1,1,0,0,0},
 1013 
 1014     /* Unlike PFCOUNT that is considered as a read-only command (although
 1015      * it changes a bit), PFDEBUG may change the entire key when converting
 1016      * from sparse to dense representation */
 1017     {"pfdebug",pfdebugCommand,-3,
 1018      "admin write use-memory @hyperloglog",
 1019      0,NULL,2,2,1,0,0,0},
 1020 
 1021     {"xadd",xaddCommand,-5,
 1022      "write use-memory fast random @stream",
 1023      0,NULL,1,1,1,0,0,0},
 1024 
 1025     {"xrange",xrangeCommand,-4,
 1026      "read-only @stream",
 1027      0,NULL,1,1,1,0,0,0},
 1028 
 1029     {"xrevrange",xrevrangeCommand,-4,
 1030      "read-only @stream",
 1031      0,NULL,1,1,1,0,0,0},
 1032 
 1033     {"xlen",xlenCommand,2,
 1034      "read-only fast @stream",
 1035      0,NULL,1,1,1,0,0,0},
 1036 
 1037     {"xread",xreadCommand,-4,
 1038      "read-only @stream @blocking",
 1039      0,xreadGetKeys,0,0,0,0,0,0},
 1040 
 1041     {"xreadgroup",xreadCommand,-7,
 1042      "write @stream @blocking",
 1043      0,xreadGetKeys,0,0,0,0,0,0},
 1044 
 1045     {"xgroup",xgroupCommand,-2,
 1046      "write use-memory @stream",
 1047      0,NULL,2,2,1,0,0,0},
 1048 
 1049     {"xsetid",xsetidCommand,3,
 1050      "write use-memory fast @stream",
 1051      0,NULL,1,1,1,0,0,0},
 1052 
 1053     {"xack",xackCommand,-4,
 1054      "write fast random @stream",
 1055      0,NULL,1,1,1,0,0,0},
 1056 
 1057     {"xpending",xpendingCommand,-3,
 1058      "read-only random @stream",
 1059      0,NULL,1,1,1,0,0,0},
 1060 
 1061     {"xclaim",xclaimCommand,-6,
 1062      "write random fast @stream",
 1063      0,NULL,1,1,1,0,0,0},
 1064 
 1065     {"xautoclaim",xautoclaimCommand,-6,
 1066      "write random fast @stream",
 1067      0,NULL,1,1,1,0,0,0},
 1068 
 1069     {"xinfo",xinfoCommand,-2,
 1070      "read-only random @stream",
 1071      0,NULL,2,2,1,0,0,0},
 1072 
 1073     {"xdel",xdelCommand,-3,
 1074      "write fast @stream",
 1075      0,NULL,1,1,1,0,0,0},
 1076 
 1077     {"xtrim",xtrimCommand,-4,
 1078      "write random @stream",
 1079      0,NULL,1,1,1,0,0,0},
 1080 
 1081     {"post",securityWarningCommand,-1,
 1082      "ok-loading ok-stale read-only",
 1083      0,NULL,0,0,0,0,0,0},
 1084 
 1085     {"host:",securityWarningCommand,-1,
 1086      "ok-loading ok-stale read-only",
 1087      0,NULL,0,0,0,0,0,0},
 1088 
 1089     {"latency",latencyCommand,-2,
 1090      "admin no-script ok-loading ok-stale",
 1091      0,NULL,0,0,0,0,0,0},
 1092 
 1093     {"lolwut",lolwutCommand,-1,
 1094      "read-only fast",
 1095      0,NULL,0,0,0,0,0,0},
 1096 
 1097     {"acl",aclCommand,-2,
 1098      "admin no-script ok-loading ok-stale",
 1099      0,NULL,0,0,0,0,0,0},
 1100 
 1101     {"stralgo",stralgoCommand,-2,
 1102      "read-only @string",
 1103      0,lcsGetKeys,0,0,0,0,0,0},
 1104 
 1105     {"reset",resetCommand,1,
 1106      "no-script ok-stale ok-loading fast @connection",
 1107      0,NULL,0,0,0,0,0,0},
 1108 
 1109     {"failover",failoverCommand,-1,
 1110      "admin no-script ok-stale",
 1111      0,NULL,0,0,0,0,0,0}
 1112 };
 1113 
 1114 /*============================ Utility functions ============================ */
 1115 
 1116 /* We use a private localtime implementation which is fork-safe. The logging
 1117  * function of Redis may be called from other threads. */
 1118 void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
 1119 
 1120 /* Low level logging. To use only for very big messages, otherwise
 1121  * serverLog() is to prefer. */
 1122 void serverLogRaw(int level, const char *msg) {
 1123     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
 1124     const char *c = ".-*#";
 1125     FILE *fp;
 1126     char buf[64];
 1127     int rawmode = (level & LL_RAW);
 1128     int log_to_stdout = server.logfile[0] == '\0';
 1129 
 1130     level &= 0xff; /* clear flags */
 1131     if (level < server.verbosity) return;
 1132 
 1133     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
 1134     if (!fp) return;
 1135 
 1136     if (rawmode) {
 1137         fprintf(fp,"%s",msg);
 1138     } else {
 1139         int off;
 1140         struct timeval tv;
 1141         int role_char;
 1142         pid_t pid = getpid();
 1143 
 1144         gettimeofday(&tv,NULL);
 1145         struct tm tm;
 1146         nolocks_localtime(&tm,tv.tv_sec,server.timezone,server.daylight_active);
 1147         off = strftime(buf,sizeof(buf),"%d %b %Y %H:%M:%S.",&tm);
 1148         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
 1149         if (server.sentinel_mode) {
 1150             role_char = 'X'; /* Sentinel. */
 1151         } else if (pid != server.pid) {
 1152             role_char = 'C'; /* RDB / AOF writing child. */
 1153         } else {
 1154             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
 1155         }
 1156         fprintf(fp,"%d:%c %s %c %s\n",
 1157             (int)getpid(),role_char, buf,c[level],msg);
 1158     }
 1159     fflush(fp);
 1160 
 1161     if (!log_to_stdout) fclose(fp);
 1162     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
 1163 }
 1164 
 1165 /* Like serverLogRaw() but with printf-alike support. This is the function that
 1166  * is used across the code. The raw version is only used in order to dump
 1167  * the INFO output on crash. */
 1168 void _serverLog(int level, const char *fmt, ...) {
 1169     va_list ap;
 1170     char msg[LOG_MAX_LEN];
 1171 
 1172     va_start(ap, fmt);
 1173     vsnprintf(msg, sizeof(msg), fmt, ap);
 1174     va_end(ap);
 1175 
 1176     serverLogRaw(level,msg);
 1177 }
 1178 
 1179 /* Log a fixed message without printf-alike capabilities, in a way that is
 1180  * safe to call from a signal handler.
 1181  *
 1182  * We actually use this only for signals that are not fatal from the point
 1183  * of view of Redis. Signals that are going to kill the server anyway and
 1184  * where we need printf-alike features are served by serverLog(). */
 1185 void serverLogFromHandler(int level, const char *msg) {
 1186     int fd;
 1187     int log_to_stdout = server.logfile[0] == '\0';
 1188     char buf[64];
 1189 
 1190     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
 1191         return;
 1192     fd = log_to_stdout ? STDOUT_FILENO :
 1193                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
 1194     if (fd == -1) return;
 1195     ll2string(buf,sizeof(buf),getpid());
 1196     if (write(fd,buf,strlen(buf)) == -1) goto err;
 1197     if (write(fd,":signal-handler (",17) == -1) goto err;
 1198     ll2string(buf,sizeof(buf),time(NULL));
 1199     if (write(fd,buf,strlen(buf)) == -1) goto err;
 1200     if (write(fd,") ",2) == -1) goto err;
 1201     if (write(fd,msg,strlen(msg)) == -1) goto err;
 1202     if (write(fd,"\n",1) == -1) goto err;
 1203 err:
 1204     if (!log_to_stdout) close(fd);
 1205 }
 1206 
 1207 /* Return the UNIX time in microseconds */
 1208 long long ustime(void) {
 1209     struct timeval tv;
 1210     long long ust;
 1211 
 1212     gettimeofday(&tv, NULL);
 1213     ust = ((long long)tv.tv_sec)*1000000;
 1214     ust += tv.tv_usec;
 1215     return ust;
 1216 }
 1217 
 1218 /* Return the UNIX time in milliseconds */
 1219 mstime_t mstime(void) {
 1220     return ustime()/1000;
 1221 }
 1222 
 1223 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
 1224  * exit(), because the latter may interact with the same file objects used by
 1225  * the parent process. However if we are testing the coverage normal exit() is
 1226  * used in order to obtain the right coverage information. */
 1227 void exitFromChild(int retcode) {
 1228 #ifdef COVERAGE_TEST
 1229     exit(retcode);
 1230 #else
 1231     _exit(retcode);
 1232 #endif
 1233 }
 1234 
 1235 /*====================== Hash table type implementation  ==================== */
 1236 
 1237 /* This is a hash table type that uses the SDS dynamic strings library as
 1238  * keys and redis objects as values (objects can hold SDS strings,
 1239  * lists, sets). */
 1240 
 1241 void dictVanillaFree(void *privdata, void *val)
 1242 {
 1243     DICT_NOTUSED(privdata);
 1244     zfree(val);
 1245 }
 1246 
 1247 void dictListDestructor(void *privdata, void *val)
 1248 {
 1249     DICT_NOTUSED(privdata);
 1250     listRelease((list*)val);
 1251 }
 1252 
 1253 int dictSdsKeyCompare(void *privdata, const void *key1,
 1254         const void *key2)
 1255 {
 1256     int l1,l2;
 1257     DICT_NOTUSED(privdata);
 1258 
 1259     l1 = sdslen((sds)key1);
 1260     l2 = sdslen((sds)key2);
 1261     if (l1 != l2) return 0;
 1262     return memcmp(key1, key2, l1) == 0;
 1263 }
 1264 
 1265 /* A case insensitive version used for the command lookup table and other
 1266  * places where case insensitive non binary-safe comparison is needed. */
 1267 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
 1268         const void *key2)
 1269 {
 1270     DICT_NOTUSED(privdata);
 1271 
 1272     return strcasecmp(key1, key2) == 0;
 1273 }
 1274 
 1275 void dictObjectDestructor(void *privdata, void *val)
 1276 {
 1277     DICT_NOTUSED(privdata);
 1278 
 1279     if (val == NULL) return; /* Lazy freeing will set value to NULL. */
 1280     decrRefCount(val);
 1281 }
 1282 
 1283 void dictSdsDestructor(void *privdata, void *val)
 1284 {
 1285     DICT_NOTUSED(privdata);
 1286 
 1287     sdsfree(val);
 1288 }
 1289 
 1290 int dictObjKeyCompare(void *privdata, const void *key1,
 1291         const void *key2)
 1292 {
 1293     const robj *o1 = key1, *o2 = key2;
 1294     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
 1295 }
 1296 
 1297 uint64_t dictObjHash(const void *key) {
 1298     const robj *o = key;
 1299     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
 1300 }
 1301 
 1302 uint64_t dictSdsHash(const void *key) {
 1303     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
 1304 }
 1305 
 1306 uint64_t dictSdsCaseHash(const void *key) {
 1307     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
 1308 }
 1309 
 1310 int dictEncObjKeyCompare(void *privdata, const void *key1,
 1311         const void *key2)
 1312 {
 1313     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
 1314     int cmp;
 1315 
 1316     if (o1->encoding == OBJ_ENCODING_INT &&
 1317         o2->encoding == OBJ_ENCODING_INT)
 1318             return o1->ptr == o2->ptr;
 1319 
 1320     /* Due to OBJ_STATIC_REFCOUNT, we avoid calling getDecodedObject() without
 1321      * good reasons, because it would incrRefCount() the object, which
 1322      * is invalid. So we check to make sure dictFind() works with static
 1323      * objects as well. */
 1324     if (o1->refcount != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1);
 1325     if (o2->refcount != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2);
 1326     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
 1327     if (o1->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o1);
 1328     if (o2->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o2);
 1329     return cmp;
 1330 }
 1331 
 1332 uint64_t dictEncObjHash(const void *key) {
 1333     robj *o = (robj*) key;
 1334 
 1335     if (sdsEncodedObject(o)) {
 1336         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
 1337     } else if (o->encoding == OBJ_ENCODING_INT) {
 1338         char buf[32];
 1339         int len;
 1340 
 1341         len = ll2string(buf,32,(long)o->ptr);
 1342         return dictGenHashFunction((unsigned char*)buf, len);
 1343     } else {
 1344         serverPanic("Unknown string encoding");
 1345     }
 1346 }
 1347 
 1348 /* Return 1 if currently we allow dict to expand. Dict may allocate huge
 1349  * memory to contain hash buckets when dict expands, that may lead redis
 1350  * rejects user's requests or evicts some keys, we can stop dict to expand
 1351  * provisionally if used memory will be over maxmemory after dict expands,
 1352  * but to guarantee the performance of redis, we still allow dict to expand
 1353  * if dict load factor exceeds HASHTABLE_MAX_LOAD_FACTOR. */
 1354 int dictExpandAllowed(size_t moreMem, double usedRatio) {
 1355     if (usedRatio <= HASHTABLE_MAX_LOAD_FACTOR) {
 1356         return !overMaxmemoryAfterAlloc(moreMem);
 1357     } else {
 1358         return 1;
 1359     }
 1360 }
 1361 
 1362 /* Generic hash table type where keys are Redis Objects, Values
 1363  * dummy pointers. */
 1364 dictType objectKeyPointerValueDictType = {
 1365     dictEncObjHash,            /* hash function */
 1366     NULL,                      /* key dup */
 1367     NULL,                      /* val dup */
 1368     dictEncObjKeyCompare,      /* key compare */
 1369     dictObjectDestructor,      /* key destructor */
 1370     NULL,                      /* val destructor */
 1371     NULL                       /* allow to expand */
 1372 };
 1373 
 1374 /* Like objectKeyPointerValueDictType(), but values can be destroyed, if
 1375  * not NULL, calling zfree(). */
 1376 dictType objectKeyHeapPointerValueDictType = {
 1377     dictEncObjHash,            /* hash function */
 1378     NULL,                      /* key dup */
 1379     NULL,                      /* val dup */
 1380     dictEncObjKeyCompare,      /* key compare */
 1381     dictObjectDestructor,      /* key destructor */
 1382     dictVanillaFree,           /* val destructor */
 1383     NULL                       /* allow to expand */
 1384 };
 1385 
 1386 /* Set dictionary type. Keys are SDS strings, values are not used. */
 1387 dictType setDictType = {
 1388     dictSdsHash,               /* hash function */
 1389     NULL,                      /* key dup */
 1390     NULL,                      /* val dup */
 1391     dictSdsKeyCompare,         /* key compare */
 1392     dictSdsDestructor,         /* key destructor */
 1393     NULL                       /* val destructor */
 1394 };
 1395 
 1396 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
 1397 dictType zsetDictType = {
 1398     dictSdsHash,               /* hash function */
 1399     NULL,                      /* key dup */
 1400     NULL,                      /* val dup */
 1401     dictSdsKeyCompare,         /* key compare */
 1402     NULL,                      /* Note: SDS string shared & freed by skiplist */
 1403     NULL,                      /* val destructor */
 1404     NULL                       /* allow to expand */
 1405 };
 1406 
 1407 /* Db->dict, keys are sds strings, vals are Redis objects. */
 1408 dictType dbDictType = {
 1409     dictSdsHash,                /* hash function */
 1410     NULL,                       /* key dup */
 1411     NULL,                       /* val dup */
 1412     dictSdsKeyCompare,          /* key compare */
 1413     dictSdsDestructor,          /* key destructor */
 1414     dictObjectDestructor,       /* val destructor */
 1415     dictExpandAllowed           /* allow to expand */
 1416 };
 1417 
 1418 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
 1419 dictType shaScriptObjectDictType = {
 1420     dictSdsCaseHash,            /* hash function */
 1421     NULL,                       /* key dup */
 1422     NULL,                       /* val dup */
 1423     dictSdsKeyCaseCompare,      /* key compare */
 1424     dictSdsDestructor,          /* key destructor */
 1425     dictObjectDestructor,       /* val destructor */
 1426     NULL                        /* allow to expand */
 1427 };
 1428 
 1429 /* Db->expires */
 1430 dictType dbExpiresDictType = {
 1431     dictSdsHash,                /* hash function */
 1432     NULL,                       /* key dup */
 1433     NULL,                       /* val dup */
 1434     dictSdsKeyCompare,          /* key compare */
 1435     NULL,                       /* key destructor */
 1436     NULL,                       /* val destructor */
 1437     dictExpandAllowed           /* allow to expand */
 1438 };
 1439 
 1440 /* Command table. sds string -> command struct pointer. */
 1441 dictType commandTableDictType = {
 1442     dictSdsCaseHash,            /* hash function */
 1443     NULL,                       /* key dup */
 1444     NULL,                       /* val dup */
 1445     dictSdsKeyCaseCompare,      /* key compare */
 1446     dictSdsDestructor,          /* key destructor */
 1447     NULL,                       /* val destructor */
 1448     NULL                        /* allow to expand */
 1449 };
 1450 
 1451 /* Hash type hash table (note that small hashes are represented with ziplists) */
 1452 dictType hashDictType = {
 1453     dictSdsHash,                /* hash function */
 1454     NULL,                       /* key dup */
 1455     NULL,                       /* val dup */
 1456     dictSdsKeyCompare,          /* key compare */
 1457     dictSdsDestructor,          /* key destructor */
 1458     dictSdsDestructor,          /* val destructor */
 1459     NULL                        /* allow to expand */
 1460 };
 1461 
 1462 /* Dict type without destructor */
 1463 dictType sdsReplyDictType = {
 1464     dictSdsHash,                /* hash function */
 1465     NULL,                       /* key dup */
 1466     NULL,                       /* val dup */
 1467     dictSdsKeyCompare,          /* key compare */
 1468     NULL,                       /* key destructor */
 1469     NULL,                       /* val destructor */
 1470     NULL                        /* allow to expand */
 1471 };
 1472 
 1473 /* Keylist hash table type has unencoded redis objects as keys and
 1474  * lists as values. It's used for blocking operations (BLPOP) and to
 1475  * map swapped keys to a list of clients waiting for this keys to be loaded. */
 1476 dictType keylistDictType = {
 1477     dictObjHash,                /* hash function */
 1478     NULL,                       /* key dup */
 1479     NULL,                       /* val dup */
 1480     dictObjKeyCompare,          /* key compare */
 1481     dictObjectDestructor,       /* key destructor */
 1482     dictListDestructor,         /* val destructor */
 1483     NULL                        /* allow to expand */
 1484 };
 1485 
 1486 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
 1487  * clusterNode structures. */
 1488 dictType clusterNodesDictType = {
 1489     dictSdsHash,                /* hash function */
 1490     NULL,                       /* key dup */
 1491     NULL,                       /* val dup */
 1492     dictSdsKeyCompare,          /* key compare */
 1493     dictSdsDestructor,          /* key destructor */
 1494     NULL,                       /* val destructor */
 1495     NULL                        /* allow to expand */
 1496 };
 1497 
 1498 /* Cluster re-addition blacklist. This maps node IDs to the time
 1499  * we can re-add this node. The goal is to avoid readding a removed
 1500  * node for some time. */
 1501 dictType clusterNodesBlackListDictType = {
 1502     dictSdsCaseHash,            /* hash function */
 1503     NULL,                       /* key dup */
 1504     NULL,                       /* val dup */
 1505     dictSdsKeyCaseCompare,      /* key compare */
 1506     dictSdsDestructor,          /* key destructor */
 1507     NULL,                       /* val destructor */
 1508     NULL                        /* allow to expand */
 1509 };
 1510 
 1511 /* Modules system dictionary type. Keys are module name,
 1512  * values are pointer to RedisModule struct. */
 1513 dictType modulesDictType = {
 1514     dictSdsCaseHash,            /* hash function */
 1515     NULL,                       /* key dup */
 1516     NULL,                       /* val dup */
 1517     dictSdsKeyCaseCompare,      /* key compare */
 1518     dictSdsDestructor,          /* key destructor */
 1519     NULL,                       /* val destructor */
 1520     NULL                        /* allow to expand */
 1521 };
 1522 
 1523 /* Migrate cache dict type. */
 1524 dictType migrateCacheDictType = {
 1525     dictSdsHash,                /* hash function */
 1526     NULL,                       /* key dup */
 1527     NULL,                       /* val dup */
 1528     dictSdsKeyCompare,          /* key compare */
 1529     dictSdsDestructor,          /* key destructor */
 1530     NULL,                       /* val destructor */
 1531     NULL                        /* allow to expand */
 1532 };
 1533 
 1534 /* Replication cached script dict (server.repl_scriptcache_dict).
 1535  * Keys are sds SHA1 strings, while values are not used at all in the current
 1536  * implementation. */
 1537 dictType replScriptCacheDictType = {
 1538     dictSdsCaseHash,            /* hash function */
 1539     NULL,                       /* key dup */
 1540     NULL,                       /* val dup */
 1541     dictSdsKeyCaseCompare,      /* key compare */
 1542     dictSdsDestructor,          /* key destructor */
 1543     NULL,                       /* val destructor */
 1544     NULL                        /* allow to expand */
 1545 };
 1546 
 1547 int htNeedsResize(dict *dict) {
 1548     long long size, used;
 1549 
 1550     size = dictSlots(dict);
 1551     used = dictSize(dict);
 1552     return (size > DICT_HT_INITIAL_SIZE &&
 1553             (used*100/size < HASHTABLE_MIN_FILL));
 1554 }
 1555 
 1556 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
 1557  * we resize the hash table to save memory */
 1558 void tryResizeHashTables(int dbid) {
 1559     if (htNeedsResize(server.db[dbid].dict))
 1560         dictResize(server.db[dbid].dict);
 1561     if (htNeedsResize(server.db[dbid].expires))
 1562         dictResize(server.db[dbid].expires);
 1563 }
 1564 
 1565 /* Our hash table implementation performs rehashing incrementally while
 1566  * we write/read from the hash table. Still if the server is idle, the hash
 1567  * table will use two tables for a long time. So we try to use 1 millisecond
 1568  * of CPU time at every call of this function to perform some rehashing.
 1569  *
 1570  * The function returns 1 if some rehashing was performed, otherwise 0
 1571  * is returned. */
 1572 int incrementallyRehash(int dbid) {
 1573     /* Keys dictionary */
 1574     if (dictIsRehashing(server.db[dbid].dict)) {
 1575         dictRehashMilliseconds(server.db[dbid].dict,1);
 1576         return 1; /* already used our millisecond for this loop... */
 1577     }
 1578     /* Expires */
 1579     if (dictIsRehashing(server.db[dbid].expires)) {
 1580         dictRehashMilliseconds(server.db[dbid].expires,1);
 1581         return 1; /* already used our millisecond for this loop... */
 1582     }
 1583     return 0;
 1584 }
 1585 
 1586 /* This function is called once a background process of some kind terminates,
 1587  * as we want to avoid resizing the hash tables when there is a child in order
 1588  * to play well with copy-on-write (otherwise when a resize happens lots of
 1589  * memory pages are copied). The goal of this function is to update the ability
 1590  * for dict.c to resize the hash tables accordingly to the fact we have an
 1591  * active fork child running. */
 1592 void updateDictResizePolicy(void) {
 1593     if (!hasActiveChildProcess())
 1594         dictEnableResize();
 1595     else
 1596         dictDisableResize();
 1597 }
 1598 
 1599 const char *strChildType(int type) {
 1600     switch(type) {
 1601         case CHILD_TYPE_RDB: return "RDB";
 1602         case CHILD_TYPE_AOF: return "AOF";
 1603         case CHILD_TYPE_LDB: return "LDB";
 1604         case CHILD_TYPE_MODULE: return "MODULE";
 1605         default: return "Unknown";
 1606     }
 1607 }
 1608 
 1609 /* Return true if there are active children processes doing RDB saving,
 1610  * AOF rewriting, or some side process spawned by a loaded module. */
 1611 int hasActiveChildProcess() {
 1612     return server.child_pid != -1;
 1613 }
 1614 
 1615 void resetChildState() {
 1616     server.child_type = CHILD_TYPE_NONE;
 1617     server.child_pid = -1;
 1618     server.stat_current_cow_bytes = 0;
 1619     server.stat_current_cow_updated = 0;
 1620     server.stat_current_save_keys_processed = 0;
 1621     server.stat_module_progress = 0;
 1622     server.stat_current_save_keys_total = 0;
 1623     updateDictResizePolicy();
 1624     closeChildInfoPipe();
 1625     moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD,
 1626                           REDISMODULE_SUBEVENT_FORK_CHILD_DIED,
 1627                           NULL);
 1628 }
 1629 
 1630 /* Return if child type is mutual exclusive with other fork children */
 1631 int isMutuallyExclusiveChildType(int type) {
 1632     return type == CHILD_TYPE_RDB || type == CHILD_TYPE_AOF || type == CHILD_TYPE_MODULE;
 1633 }
 1634 
 1635 /* Return true if this instance has persistence completely turned off:
 1636  * both RDB and AOF are disabled. */
 1637 int allPersistenceDisabled(void) {
 1638     return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
 1639 }
 1640 
 1641 /* ======================= Cron: called every 100 ms ======================== */
 1642 
 1643 /* Add a sample to the operations per second array of samples. */
 1644 void trackInstantaneousMetric(int metric, long long current_reading) {
 1645     long long now = mstime();
 1646     long long t = now - server.inst_metric[metric].last_sample_time;
 1647     long long ops = current_reading -
 1648                     server.inst_metric[metric].last_sample_count;
 1649     long long ops_sec;
 1650 
 1651     ops_sec = t > 0 ? (ops*1000/t) : 0;
 1652 
 1653     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
 1654         ops_sec;
 1655     server.inst_metric[metric].idx++;
 1656     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
 1657     server.inst_metric[metric].last_sample_time = now;
 1658     server.inst_metric[metric].last_sample_count = current_reading;
 1659 }
 1660 
 1661 /* Return the mean of all the samples. */
 1662 long long getInstantaneousMetric(int metric) {
 1663     int j;
 1664     long long sum = 0;
 1665 
 1666     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
 1667         sum += server.inst_metric[metric].samples[j];
 1668     return sum / STATS_METRIC_SAMPLES;
 1669 }
 1670 
 1671 /* The client query buffer is an sds.c string that can end with a lot of
 1672  * free space not used, this function reclaims space if needed.
 1673  *
 1674  * The function always returns 0 as it never terminates the client. */
 1675 int clientsCronResizeQueryBuffer(client *c) {
 1676     size_t querybuf_size = sdsAllocSize(c->querybuf);
 1677     time_t idletime = server.unixtime - c->lastinteraction;
 1678 
 1679     /* There are two conditions to resize the query buffer:
 1680      * 1) Query buffer is > BIG_ARG and too big for latest peak.
 1681      * 2) Query buffer is > BIG_ARG and client is idle. */
 1682     if (querybuf_size > PROTO_MBULK_BIG_ARG &&
 1683          ((querybuf_size/(c->querybuf_peak+1)) > 2 ||
 1684           idletime > 2))
 1685     {
 1686         /* Only resize the query buffer if it is actually wasting
 1687          * at least a few kbytes. */
 1688         if (sdsavail(c->querybuf) > 1024*4) {
 1689             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
 1690         }
 1691     }
 1692     /* Reset the peak again to capture the peak memory usage in the next
 1693      * cycle. */
 1694     c->querybuf_peak = 0;
 1695 
 1696     /* Clients representing masters also use a "pending query buffer" that
 1697      * is the yet not applied part of the stream we are reading. Such buffer
 1698      * also needs resizing from time to time, otherwise after a very large
 1699      * transfer (a huge value or a big MIGRATE operation) it will keep using
 1700      * a lot of memory. */
 1701     if (c->flags & CLIENT_MASTER) {
 1702         /* There are two conditions to resize the pending query buffer:
 1703          * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
 1704          * 2) Used length is smaller than pending_querybuf_size/2 */
 1705         size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
 1706         if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
 1707            sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
 1708         {
 1709             c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
 1710         }
 1711     }
 1712     return 0;
 1713 }
 1714 
 1715 /* This function is used in order to track clients using the biggest amount
 1716  * of memory in the latest few seconds. This way we can provide such information
 1717  * in the INFO output (clients section), without having to do an O(N) scan for
 1718  * all the clients.
 1719  *
 1720  * This is how it works. We have an array of CLIENTS_PEAK_MEM_USAGE_SLOTS slots
 1721  * where we track, for each, the biggest client output and input buffers we
 1722  * saw in that slot. Every slot correspond to one of the latest seconds, since
 1723  * the array is indexed by doing UNIXTIME % CLIENTS_PEAK_MEM_USAGE_SLOTS.
 1724  *
 1725  * When we want to know what was recently the peak memory usage, we just scan
 1726  * such few slots searching for the maximum value. */
 1727 #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8
 1728 size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
 1729 size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
 1730 
 1731 int clientsCronTrackExpansiveClients(client *c, int time_idx) {
 1732     size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum +
 1733                   (c->argv ? zmalloc_size(c->argv) : 0);
 1734     size_t out_usage = getClientOutputBufferMemoryUsage(c);
 1735 
 1736     /* Track the biggest values observed so far in this slot. */
 1737     if (in_usage > ClientsPeakMemInput[time_idx]) ClientsPeakMemInput[time_idx] = in_usage;
 1738     if (out_usage > ClientsPeakMemOutput[time_idx]) ClientsPeakMemOutput[time_idx] = out_usage;
 1739 
 1740     return 0; /* This function never terminates the client. */
 1741 }
 1742 
 1743 /* Iterating all the clients in getMemoryOverheadData() is too slow and
 1744  * in turn would make the INFO command too slow. So we perform this
 1745  * computation incrementally and track the (not instantaneous but updated
 1746  * to the second) total memory used by clients using clinetsCron() in
 1747  * a more incremental way (depending on server.hz). */
 1748 int clientsCronTrackClientsMemUsage(client *c) {
 1749     size_t mem = 0;
 1750     int type = getClientType(c);
 1751     mem += getClientOutputBufferMemoryUsage(c);
 1752     mem += sdsZmallocSize(c->querybuf);
 1753     mem += zmalloc_size(c);
 1754     mem += c->argv_len_sum;
 1755     if (c->argv) mem += zmalloc_size(c->argv);
 1756     /* Now that we have the memory used by the client, remove the old
 1757      * value from the old category, and add it back. */
 1758     server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
 1759         c->client_cron_last_memory_usage;
 1760     server.stat_clients_type_memory[type] += mem;
 1761     /* Remember what we added and where, to remove it next time. */
 1762     c->client_cron_last_memory_usage = mem;
 1763     c->client_cron_last_memory_type = type;
 1764     return 0;
 1765 }
 1766 
 1767 /* Return the max samples in the memory usage of clients tracked by
 1768  * the function clientsCronTrackExpansiveClients(). */
 1769 void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
 1770     size_t i = 0, o = 0;
 1771     for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) {
 1772         if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j];
 1773         if (ClientsPeakMemOutput[j] > o) o = ClientsPeakMemOutput[j];
 1774     }
 1775     *in_usage = i;
 1776     *out_usage = o;
 1777 }
 1778 
 1779 /* This function is called by serverCron() and is used in order to perform
 1780  * operations on clients that are important to perform constantly. For instance
 1781  * we use this function in order to disconnect clients after a timeout, including
 1782  * clients blocked in some blocking command with a non-zero timeout.
 1783  *
 1784  * The function makes some effort to process all the clients every second, even
 1785  * if this cannot be strictly guaranteed, since serverCron() may be called with
 1786  * an actual frequency lower than server.hz in case of latency events like slow
 1787  * commands.
 1788  *
 1789  * It is very important for this function, and the functions it calls, to be
 1790  * very fast: sometimes Redis has tens of hundreds of connected clients, and the
 1791  * default server.hz value is 10, so sometimes here we need to process thousands
 1792  * of clients per second, turning this function into a source of latency.
 1793  */
 1794 #define CLIENTS_CRON_MIN_ITERATIONS 5
 1795 void clientsCron(void) {
 1796     /* Try to process at least numclients/server.hz of clients
 1797      * per call. Since normally (if there are no big latency events) this
 1798      * function is called server.hz times per second, in the average case we
 1799      * process all the clients in 1 second. */
 1800     int numclients = listLength(server.clients);
 1801     int iterations = numclients/server.hz;
 1802     mstime_t now = mstime();
 1803 
 1804     /* Process at least a few clients while we are at it, even if we need
 1805      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
 1806      * of processing each client once per second. */
 1807     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
 1808         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
 1809                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
 1810 
 1811 
 1812     int curr_peak_mem_usage_slot = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
 1813     /* Always zero the next sample, so that when we switch to that second, we'll
 1814      * only register samples that are greater in that second without considering
 1815      * the history of such slot.
 1816      *
 1817      * Note: our index may jump to any random position if serverCron() is not
 1818      * called for some reason with the normal frequency, for instance because
 1819      * some slow command is called taking multiple seconds to execute. In that
 1820      * case our array may end containing data which is potentially older
 1821      * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
 1822      * since here we want just to track if "recently" there were very expansive
 1823      * clients from the POV of memory usage. */
 1824     int zeroidx = (curr_peak_mem_usage_slot+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
 1825     ClientsPeakMemInput[zeroidx] = 0;
 1826     ClientsPeakMemOutput[zeroidx] = 0;
 1827 
 1828 
 1829     while(listLength(server.clients) && iterations--) {
 1830         client *c;
 1831         listNode *head;
 1832 
 1833         /* Rotate the list, take the current head, process.
 1834          * This way if the client must be removed from the list it's the
 1835          * first element and we don't incur into O(N) computation. */
 1836         listRotateTailToHead(server.clients);
 1837         head = listFirst(server.clients);
 1838         c = listNodeValue(head);
 1839         /* The following functions do different service checks on the client.
 1840          * The protocol is that they return non-zero if the client was
 1841          * terminated. */
 1842         if (clientsCronHandleTimeout(c,now)) continue;
 1843         if (clientsCronResizeQueryBuffer(c)) continue;
 1844         if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
 1845         if (clientsCronTrackClientsMemUsage(c)) continue;
 1846         if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
 1847     }
 1848 }
 1849 
 1850 /* This function handles 'background' operations we are required to do
 1851  * incrementally in Redis databases, such as active key expiring, resizing,
 1852  * rehashing. */
 1853 void databasesCron(void) {
 1854     /* Expire keys by random sampling. Not required for slaves
 1855      * as master will synthesize DELs for us. */
 1856     if (server.active_expire_enabled) {
 1857         if (iAmMaster()) {
 1858             activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
 1859         } else {
 1860             expireSlaveKeys();
 1861         }
 1862     }
 1863 
 1864     /* Defrag keys gradually. */
 1865     activeDefragCycle();
 1866 
 1867     /* Perform hash tables rehashing if needed, but only if there are no
 1868      * other processes saving the DB on disk. Otherwise rehashing is bad
 1869      * as will cause a lot of copy-on-write of memory pages. */
 1870     if (!hasActiveChildProcess()) {
 1871         /* We use global counters so if we stop the computation at a given
 1872          * DB we'll be able to start from the successive in the next
 1873          * cron loop iteration. */
 1874         static unsigned int resize_db = 0;
 1875         static unsigned int rehash_db = 0;
 1876         int dbs_per_call = CRON_DBS_PER_CALL;
 1877         int j;
 1878 
 1879         /* Don't test more DBs than we have. */
 1880         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
 1881 
 1882         /* Resize */
 1883         for (j = 0; j < dbs_per_call; j++) {
 1884             tryResizeHashTables(resize_db % server.dbnum);
 1885             resize_db++;
 1886         }
 1887 
 1888         /* Rehash */
 1889         if (server.activerehashing) {
 1890             for (j = 0; j < dbs_per_call; j++) {
 1891                 int work_done = incrementallyRehash(rehash_db);
 1892                 if (work_done) {
 1893                     /* If the function did some work, stop here, we'll do
 1894                      * more at the next cron loop. */
 1895                     break;
 1896                 } else {
 1897                     /* If this db didn't need rehash, we'll try the next one. */
 1898                     rehash_db++;
 1899                     rehash_db %= server.dbnum;
 1900                 }
 1901             }
 1902         }
 1903     }
 1904 }
 1905 
 1906 /* We take a cached value of the unix time in the global state because with
 1907  * virtual memory and aging there is to store the current time in objects at
 1908  * every object access, and accuracy is not needed. To access a global var is
 1909  * a lot faster than calling time(NULL).
 1910  *
 1911  * This function should be fast because it is called at every command execution
 1912  * in call(), so it is possible to decide if to update the daylight saving
 1913  * info or not using the 'update_daylight_info' argument. Normally we update
 1914  * such info only when calling this function from serverCron() but not when
 1915  * calling it from call(). */
 1916 void updateCachedTime(int update_daylight_info) {
 1917     server.ustime = ustime();
 1918     server.mstime = server.ustime / 1000;
 1919     time_t unixtime = server.mstime / 1000;
 1920     atomicSet(server.unixtime, unixtime);
 1921 
 1922     /* To get information about daylight saving time, we need to call
 1923      * localtime_r and cache the result. However calling localtime_r in this
 1924      * context is safe since we will never fork() while here, in the main
 1925      * thread. The logging function will call a thread safe version of
 1926      * localtime that has no locks. */
 1927     if (update_daylight_info) {
 1928         struct tm tm;
 1929         time_t ut = server.unixtime;
 1930         localtime_r(&ut,&tm);
 1931         server.daylight_active = tm.tm_isdst;
 1932     }
 1933 }
 1934 
 1935 void checkChildrenDone(void) {
 1936     int statloc = 0;
 1937     pid_t pid;
 1938 
 1939     if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
 1940         int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
 1941         int bysignal = 0;
 1942 
 1943         if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
 1944 
 1945         /* sigKillChildHandler catches the signal and calls exit(), but we
 1946          * must make sure not to flag lastbgsave_status, etc incorrectly.
 1947          * We could directly terminate the child process via SIGUSR1
 1948          * without handling it */
 1949         if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
 1950             bysignal = SIGUSR1;
 1951             exitcode = 1;
 1952         }
 1953 
 1954         if (pid == -1) {
 1955             serverLog(LL_WARNING,"waitpid() returned an error: %s. "
 1956                 "child_type: %s, child_pid = %d",
 1957                 strerror(errno),
 1958                 strChildType(server.child_type),
 1959                 (int) server.child_pid);
 1960         } else if (pid == server.child_pid) {
 1961             if (server.child_type == CHILD_TYPE_RDB) {
 1962                 backgroundSaveDoneHandler(exitcode, bysignal);
 1963             } else if (server.child_type == CHILD_TYPE_AOF) {
 1964                 backgroundRewriteDoneHandler(exitcode, bysignal);
 1965             } else if (server.child_type == CHILD_TYPE_MODULE) {
 1966                 ModuleForkDoneHandler(exitcode, bysignal);
 1967             } else {
 1968                 serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
 1969                 exit(1);
 1970             }
 1971             if (!bysignal && exitcode == 0) receiveChildInfo();
 1972             resetChildState();
 1973         } else {
 1974             if (!ldbRemoveChild(pid)) {
 1975                 serverLog(LL_WARNING,
 1976                           "Warning, detected child with unmatched pid: %ld",
 1977                           (long) pid);
 1978             }
 1979         }
 1980 
 1981         /* start any pending forks immediately. */
 1982         replicationStartPendingFork();
 1983     }
 1984 }
 1985 
 1986 /* Called from serverCron and loadingCron to update cached memory metrics. */
 1987 void cronUpdateMemoryStats() {
 1988     /* Record the max memory used since the server was started. */
 1989     if (zmalloc_used_memory() > server.stat_peak_memory)
 1990         server.stat_peak_memory = zmalloc_used_memory();
 1991 
 1992     run_with_period(100) {
 1993         /* Sample the RSS and other metrics here since this is a relatively slow call.
 1994          * We must sample the zmalloc_used at the same time we take the rss, otherwise
 1995          * the frag ratio calculate may be off (ratio of two samples at different times) */
 1996         server.cron_malloc_stats.process_rss = zmalloc_get_rss();
 1997         server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
 1998         /* Sampling the allocator info can be slow too.
 1999          * The fragmentation ratio it'll show is potentially more accurate
 2000          * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc
 2001          * allocations, and allocator reserved pages that can be pursed (all not actual frag) */
 2002         zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
 2003                                    &server.cron_malloc_stats.allocator_active,
 2004                                    &server.cron_malloc_stats.allocator_resident);
 2005         /* in case the allocator isn't providing these stats, fake them so that
 2006          * fragmentation info still shows some (inaccurate metrics) */
 2007         if (!server.cron_malloc_stats.allocator_resident) {
 2008             /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS,
 2009              * so we must deduct it in order to be able to calculate correct
 2010              * "allocator fragmentation" ratio */
 2011             size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
 2012             server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
 2013         }
 2014         if (!server.cron_malloc_stats.allocator_active)
 2015             server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
 2016         if (!server.cron_malloc_stats.allocator_allocated)
 2017             server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
 2018     }
 2019 }
 2020 
 2021 /* This is our timer interrupt, called server.hz times per second.
 2022  * Here is where we do a number of things that need to be done asynchronously.
 2023  * For instance:
 2024  *
 2025  * - Active expired keys collection (it is also performed in a lazy way on
 2026  *   lookup).
 2027  * - Software watchdog.
 2028  * - Update some statistic.
 2029  * - Incremental rehashing of the DBs hash tables.
 2030  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
 2031  * - Clients timeout of different kinds.
 2032  * - Replication reconnection.
 2033  * - Many more...
 2034  *
 2035  * Everything directly called here will be called server.hz times per second,
 2036  * so in order to throttle execution of things we want to do less frequently
 2037  * a macro is used: run_with_period(milliseconds) { .... }
 2038  */
 2039 
 2040 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 2041     int j;
 2042     UNUSED(eventLoop);
 2043     UNUSED(id);
 2044     UNUSED(clientData);
 2045 
 2046     /* Software watchdog: deliver the SIGALRM that will reach the signal
 2047      * handler if we don't return here fast enough. */
 2048     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
 2049 
 2050     /* Update the time cache. */
 2051     updateCachedTime(1);
 2052 
 2053     server.hz = server.config_hz;
 2054     /* Adapt the server.hz value to the number of configured clients. If we have
 2055      * many clients, we want to call serverCron() with an higher frequency. */
 2056     if (server.dynamic_hz) {
 2057         while (listLength(server.clients) / server.hz >
 2058                MAX_CLIENTS_PER_CLOCK_TICK)
 2059         {
 2060             server.hz *= 2;
 2061             if (server.hz > CONFIG_MAX_HZ) {
 2062                 server.hz = CONFIG_MAX_HZ;
 2063                 break;
 2064             }
 2065         }
 2066     }
 2067 
 2068     run_with_period(100) {
 2069         long long stat_net_input_bytes, stat_net_output_bytes;
 2070         atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
 2071         atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
 2072 
 2073         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
 2074         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
 2075                 stat_net_input_bytes);
 2076         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
 2077                 stat_net_output_bytes);
 2078     }
 2079 
 2080     /* We have just LRU_BITS bits per object for LRU information.
 2081      * So we use an (eventually wrapping) LRU clock.
 2082      *
 2083      * Note that even if the counter wraps it's not a big problem,
 2084      * everything will still work but some object will appear younger
 2085      * to Redis. However for this to happen a given object should never be
 2086      * touched for all the time needed to the counter to wrap, which is
 2087      * not likely.
 2088      *
 2089      * Note that you can change the resolution altering the
 2090      * LRU_CLOCK_RESOLUTION define. */
 2091     unsigned int lruclock = getLRUClock();
 2092     atomicSet(server.lruclock,lruclock);
 2093 
 2094     cronUpdateMemoryStats();
 2095 
 2096     /* We received a SIGTERM, shutting down here in a safe way, as it is
 2097      * not ok doing so inside the signal handler. */
 2098     if (server.shutdown_asap) {
 2099         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
 2100         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
 2101         server.shutdown_asap = 0;
 2102     }
 2103 
 2104     /* Show some info about non-empty databases */
 2105     if (server.verbosity <= LL_VERBOSE) {
 2106         run_with_period(5000) {
 2107             for (j = 0; j < server.dbnum; j++) {
 2108                 long long size, used, vkeys;
 2109 
 2110                 size = dictSlots(server.db[j].dict);
 2111                 used = dictSize(server.db[j].dict);
 2112                 vkeys = dictSize(server.db[j].expires);
 2113                 if (used || vkeys) {
 2114                     serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
 2115                 }
 2116             }
 2117         }
 2118     }
 2119 
 2120     /* Show information about connected clients */
 2121     if (!server.sentinel_mode) {
 2122         run_with_period(5000) {
 2123             serverLog(LL_DEBUG,
 2124                 "%lu clients connected (%lu replicas), %zu bytes in use",
 2125                 listLength(server.clients)-listLength(server.slaves),
 2126                 listLength(server.slaves),
 2127                 zmalloc_used_memory());
 2128         }
 2129     }
 2130 
 2131     /* We need to do a few operations on clients asynchronously. */
 2132     clientsCron();
 2133 
 2134     /* Handle background operations on Redis databases. */
 2135     databasesCron();
 2136 
 2137     /* Start a scheduled AOF rewrite if this was requested by the user while
 2138      * a BGSAVE was in progress. */
 2139     if (!hasActiveChildProcess() &&
 2140         server.aof_rewrite_scheduled)
 2141     {
 2142         rewriteAppendOnlyFileBackground();
 2143     }
 2144 
 2145     /* Check if a background saving or AOF rewrite in progress terminated. */
 2146     if (hasActiveChildProcess() || ldbPendingChildren())
 2147     {
 2148         run_with_period(1000) receiveChildInfo();
 2149         checkChildrenDone();
 2150     } else {
 2151         /* If there is not a background saving/rewrite in progress check if
 2152          * we have to save/rewrite now. */
 2153         for (j = 0; j < server.saveparamslen; j++) {
 2154             struct saveparam *sp = server.saveparams+j;
 2155 
 2156             /* Save if we reached the given amount of changes,
 2157              * the given amount of seconds, and if the latest bgsave was
 2158              * successful or if, in case of an error, at least
 2159              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
 2160             if (server.dirty >= sp->changes &&
 2161                 server.unixtime-server.lastsave > sp->seconds &&
 2162                 (server.unixtime-server.lastbgsave_try >
 2163                  CONFIG_BGSAVE_RETRY_DELAY ||
 2164                  server.lastbgsave_status == C_OK))
 2165             {
 2166                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
 2167                     sp->changes, (int)sp->seconds);
 2168                 rdbSaveInfo rsi, *rsiptr;
 2169                 rsiptr = rdbPopulateSaveInfo(&rsi);
 2170                 rdbSaveBackground(server.rdb_filename,rsiptr);
 2171                 break;
 2172             }
 2173         }
 2174 
 2175         /* Trigger an AOF rewrite if needed. */
 2176         if (server.aof_state == AOF_ON &&
 2177             !hasActiveChildProcess() &&
 2178             server.aof_rewrite_perc &&
 2179             server.aof_current_size > server.aof_rewrite_min_size)
 2180         {
 2181             long long base = server.aof_rewrite_base_size ?
 2182                 server.aof_rewrite_base_size : 1;
 2183             long long growth = (server.aof_current_size*100/base) - 100;
 2184             if (growth >= server.aof_rewrite_perc) {
 2185                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
 2186                 rewriteAppendOnlyFileBackground();
 2187             }
 2188         }
 2189     }
 2190     /* Just for the sake of defensive programming, to avoid forgeting to
 2191      * call this function when need. */
 2192     updateDictResizePolicy();
 2193 
 2194 
 2195     /* AOF postponed flush: Try at every cron cycle if the slow fsync
 2196      * completed. */
 2197     if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)
 2198         flushAppendOnlyFile(0);
 2199 
 2200     /* AOF write errors: in this case we have a buffer to flush as well and
 2201      * clear the AOF error in case of success to make the DB writable again,
 2202      * however to try every second is enough in case of 'hz' is set to
 2203      * a higher frequency. */
 2204     run_with_period(1000) {
 2205         if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
 2206             flushAppendOnlyFile(0);
 2207     }
 2208 
 2209     /* Clear the paused clients state if needed. */
 2210     checkClientPauseTimeoutAndReturnIfPaused();
 2211 
 2212     /* Replication cron function -- used to reconnect to master,
 2213      * detect transfer failures, start background RDB transfers and so forth. 
 2214      * 
 2215      * If Redis is trying to failover then run the replication cron faster so
 2216      * progress on the handshake happens more quickly. */
 2217     if (server.failover_state != NO_FAILOVER) {
 2218         run_with_period(100) replicationCron();
 2219     } else {
 2220         run_with_period(1000) replicationCron();
 2221     }
 2222 
 2223     /* Run the Redis Cluster cron. */
 2224     run_with_period(100) {
 2225         if (server.cluster_enabled) clusterCron();
 2226     }
 2227 
 2228     /* Run the Sentinel timer if we are in sentinel mode. */
 2229     if (server.sentinel_mode) sentinelTimer();
 2230 
 2231     /* Cleanup expired MIGRATE cached sockets. */
 2232     run_with_period(1000) {
 2233         migrateCloseTimedoutSockets();
 2234     }
 2235 
 2236     /* Stop the I/O threads if we don't have enough pending work. */
 2237     stopThreadedIOIfNeeded();
 2238 
 2239     /* Resize tracking keys table if needed. This is also done at every
 2240      * command execution, but we want to be sure that if the last command
 2241      * executed changes the value via CONFIG SET, the server will perform
 2242      * the operation even if completely idle. */
 2243     if (server.tracking_clients) trackingLimitUsedSlots();
 2244 
 2245     /* Start a scheduled BGSAVE if the corresponding flag is set. This is
 2246      * useful when we are forced to postpone a BGSAVE because an AOF
 2247      * rewrite is in progress.
 2248      *
 2249      * Note: this code must be after the replicationCron() call above so
 2250      * make sure when refactoring this file to keep this order. This is useful
 2251      * because we want to give priority to RDB savings for replication. */
 2252     if (!hasActiveChildProcess() &&
 2253         server.rdb_bgsave_scheduled &&
 2254         (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
 2255          server.lastbgsave_status == C_OK))
 2256     {
 2257         rdbSaveInfo rsi, *rsiptr;
 2258         rsiptr = rdbPopulateSaveInfo(&rsi);
 2259         if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
 2260             server.rdb_bgsave_scheduled = 0;
 2261     }
 2262 
 2263     /* Fire the cron loop modules event. */
 2264     RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
 2265     moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
 2266                           0,
 2267                           &ei);
 2268 
 2269     server.cronloops++;
 2270     return 1000/server.hz;
 2271 }
 2272 
 2273 
 2274 void blockingOperationStarts() {
 2275     if(!server.blocking_op_nesting++){
 2276         updateCachedTime(0);
 2277         server.blocked_last_cron = server.mstime;
 2278     }
 2279 }
 2280 
 2281 void blockingOperationEnds() {
 2282     if(!(--server.blocking_op_nesting)){
 2283         server.blocked_last_cron = 0;
 2284     }
 2285 }
 2286 
 2287 /* This function fill in the role of serverCron during RDB or AOF loading, and
 2288  * also during blocked scripts.
 2289  * It attempts to do its duties at a similar rate as the configured server.hz,
 2290  * and updates cronloops variable so that similarly to serverCron, the
 2291  * run_with_period can be used. */
 2292 void whileBlockedCron() {
 2293     /* Here we may want to perform some cron jobs (normally done server.hz times
 2294      * per second). */
 2295 
 2296     /* Since this function depends on a call to blockingOperationStarts, let's
 2297      * make sure it was done. */
 2298     serverAssert(server.blocked_last_cron);
 2299 
 2300     /* In case we where called too soon, leave right away. This way one time
 2301      * jobs after the loop below don't need an if. and we don't bother to start
 2302      * latency monitor if this function is called too often. */
 2303     if (server.blocked_last_cron >= server.mstime)
 2304         return;
 2305 
 2306     mstime_t latency;
 2307     latencyStartMonitor(latency);
 2308 
 2309     /* In some cases we may be called with big intervals, so we may need to do
 2310      * extra work here. This is because some of the functions in serverCron rely
 2311      * on the fact that it is performed every 10 ms or so. For instance, if
 2312      * activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
 2313      * need to call it multiple times. */
 2314     long hz_ms = 1000/server.hz;
 2315     while (server.blocked_last_cron < server.mstime) {
 2316 
 2317         /* Defrag keys gradually. */
 2318         activeDefragCycle();
 2319 
 2320         server.blocked_last_cron += hz_ms;
 2321 
 2322         /* Increment cronloop so that run_with_period works. */
 2323         server.cronloops++;
 2324     }
 2325 
 2326     /* Other cron jobs do not need to be done in a loop. No need to check
 2327      * server.blocked_last_cron since we have an early exit at the top. */
 2328 
 2329     /* Update memory stats during loading (excluding blocked scripts) */
 2330     if (server.loading) cronUpdateMemoryStats();
 2331 
 2332     latencyEndMonitor(latency);
 2333     latencyAddSampleIfNeeded("while-blocked-cron",latency);
 2334 }
 2335 
 2336 extern int ProcessingEventsWhileBlocked;
 2337 
 2338 /* This function gets called every time Redis is entering the
 2339  * main loop of the event driven library, that is, before to sleep
 2340  * for ready file descriptors.
 2341  *
 2342  * Note: This function is (currently) called from two functions:
 2343  * 1. aeMain - The main server loop
 2344  * 2. processEventsWhileBlocked - Process clients during RDB/AOF load
 2345  *
 2346  * If it was called from processEventsWhileBlocked we don't want
 2347  * to perform all actions (For example, we don't want to expire
 2348  * keys), but we do need to perform some actions.
 2349  *
 2350  * The most important is freeClientsInAsyncFreeQueue but we also
 2351  * call some other low-risk functions. */
 2352 void beforeSleep(struct aeEventLoop *eventLoop) {
 2353     UNUSED(eventLoop);
 2354 
 2355     size_t zmalloc_used = zmalloc_used_memory();
 2356     if (zmalloc_used > server.stat_peak_memory)
 2357         server.stat_peak_memory = zmalloc_used;
 2358 
 2359     /* Just call a subset of vital functions in case we are re-entering
 2360      * the event loop from processEventsWhileBlocked(). Note that in this
 2361      * case we keep track of the number of events we are processing, since
 2362      * processEventsWhileBlocked() wants to stop ASAP if there are no longer
 2363      * events to handle. */
 2364     if (ProcessingEventsWhileBlocked) {
 2365         uint64_t processed = 0;
 2366         processed += handleClientsWithPendingReadsUsingThreads();
 2367         processed += tlsProcessPendingData();
 2368         processed += handleClientsWithPendingWrites();
 2369         processed += freeClientsInAsyncFreeQueue();
 2370         server.events_processed_while_blocked += processed;
 2371         return;
 2372     }
 2373 
 2374     /* Handle precise timeouts of blocked clients. */
 2375     handleBlockedClientsTimeout();
 2376 
 2377     /* We should handle pending reads clients ASAP after event loop. */
 2378     handleClientsWithPendingReadsUsingThreads();
 2379 
 2380     /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
 2381     tlsProcessPendingData();
 2382 
 2383     /* If tls still has pending unread data don't sleep at all. */
 2384     aeSetDontWait(server.el, tlsHasPendingData());
 2385 
 2386     /* Call the Redis Cluster before sleep function. Note that this function
 2387      * may change the state of Redis Cluster (from ok to fail or vice versa),
 2388      * so it's a good idea to call it before serving the unblocked clients
 2389      * later in this function. */
 2390     if (server.cluster_enabled) clusterBeforeSleep();
 2391 
 2392     /* Run a fast expire cycle (the called function will return
 2393      * ASAP if a fast cycle is not needed). */
 2394     if (server.active_expire_enabled && server.masterhost == NULL)
 2395         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
 2396 
 2397     /* Unblock all the clients blocked for synchronous replication
 2398      * in WAIT. */
 2399     if (listLength(server.clients_waiting_acks))
 2400         processClientsWaitingReplicas();
 2401 
 2402     /* Check if there are clients unblocked by modules that implement
 2403      * blocking commands. */
 2404     if (moduleCount()) moduleHandleBlockedClients();
 2405 
 2406     /* Try to process pending commands for clients that were just unblocked. */
 2407     if (listLength(server.unblocked_clients))
 2408         processUnblockedClients();
 2409 
 2410     /* Send all the slaves an ACK request if at least one client blocked
 2411      * during the previous event loop iteration. Note that we do this after
 2412      * processUnblockedClients(), so if there are multiple pipelined WAITs
 2413      * and the just unblocked WAIT gets blocked again, we don't have to wait
 2414      * a server cron cycle in absence of other event loop events. See #6623.
 2415      * 
 2416      * We also don't send the ACKs while clients are paused, since it can
 2417      * increment the replication backlog, they'll be sent after the pause
 2418      * if we are still the master. */
 2419     if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
 2420         robj *argv[3];
 2421 
 2422         argv[0] = shared.replconf;
 2423         argv[1] = shared.getack;
 2424         argv[2] = shared.special_asterick; /* Not used argument. */
 2425         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
 2426         server.get_ack_from_slaves = 0;
 2427     }
 2428 
 2429     /* We may have recieved updates from clients about their current offset. NOTE:
 2430      * this can't be done where the ACK is recieved since failover will disconnect 
 2431      * our clients. */
 2432     updateFailoverStatus();
 2433 
 2434     /* Send the invalidation messages to clients participating to the
 2435      * client side caching protocol in broadcasting (BCAST) mode. */
 2436     trackingBroadcastInvalidationMessages();
 2437 
 2438     /* Write the AOF buffer on disk */
 2439     if (server.aof_state == AOF_ON)
 2440         flushAppendOnlyFile(0);
 2441 
 2442     /* Handle writes with pending output buffers. */
 2443     handleClientsWithPendingWritesUsingThreads();
 2444 
 2445     /* Close clients that need to be closed asynchronous */
 2446     freeClientsInAsyncFreeQueue();
 2447 
 2448     /* Try to process blocked clients every once in while. Example: A module
 2449      * calls RM_SignalKeyAsReady from within a timer callback (So we don't
 2450      * visit processCommand() at all). */
 2451     handleClientsBlockedOnKeys();
 2452 
 2453     /* Before we are going to sleep, let the threads access the dataset by
 2454      * releasing the GIL. Redis main thread will not touch anything at this
 2455      * time. */
 2456     if (moduleCount()) moduleReleaseGIL();
 2457 
 2458     /* Do NOT add anything below moduleReleaseGIL !!! */
 2459 }
 2460 
 2461 /* This function is called immediately after the event loop multiplexing
 2462  * API returned, and the control is going to soon return to Redis by invoking
 2463  * the different events callbacks. */
 2464 void afterSleep(struct aeEventLoop *eventLoop) {
 2465     UNUSED(eventLoop);
 2466 
 2467     /* Do NOT add anything above moduleAcquireGIL !!! */
 2468 
 2469     /* Aquire the modules GIL so that their threads won't touch anything. */
 2470     if (!ProcessingEventsWhileBlocked) {
 2471         if (moduleCount()) moduleAcquireGIL();
 2472     }
 2473 }
 2474 
 2475 /* =========================== Server initialization ======================== */
 2476 
 2477 void createSharedObjects(void) {
 2478     int j;
 2479 
 2480     /* Shared command responses */
 2481     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
 2482     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
 2483     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
 2484     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
 2485     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
 2486     shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));
 2487     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
 2488     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
 2489     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
 2490     shared.space = createObject(OBJ_STRING,sdsnew(" "));
 2491     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
 2492     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
 2493 
 2494     /* Shared command error responses */
 2495     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
 2496         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
 2497     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
 2498     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
 2499         "-ERR no such key\r\n"));
 2500     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
 2501         "-ERR syntax error\r\n"));
 2502     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
 2503         "-ERR source and destination objects are the same\r\n"));
 2504     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
 2505         "-ERR index out of range\r\n"));
 2506     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
 2507         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
 2508     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
 2509         "-LOADING Redis is loading the dataset in memory\r\n"));
 2510     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
 2511         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
 2512     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
 2513         "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
 2514     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
 2515         "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
 2516     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
 2517         "-READONLY You can't write against a read only replica.\r\n"));
 2518     shared.noautherr = createObject(OBJ_STRING,sdsnew(
 2519         "-NOAUTH Authentication required.\r\n"));
 2520     shared.oomerr = createObject(OBJ_STRING,sdsnew(
 2521         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
 2522     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
 2523         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
 2524     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
 2525         "-NOREPLICAS Not enough good replicas to write.\r\n"));
 2526     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
 2527         "-BUSYKEY Target key name already exists.\r\n"));
 2528 
 2529     /* The shared NULL depends on the protocol version. */
 2530     shared.null[0] = NULL;
 2531     shared.null[1] = NULL;
 2532     shared.null[2] = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
 2533     shared.null[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));
 2534 
 2535     shared.nullarray[0] = NULL;
 2536     shared.nullarray[1] = NULL;
 2537     shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
 2538     shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));
 2539 
 2540     shared.emptymap[0] = NULL;
 2541     shared.emptymap[1] = NULL;
 2542     shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
 2543     shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n"));
 2544 
 2545     shared.emptyset[0] = NULL;
 2546     shared.emptyset[1] = NULL;
 2547     shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
 2548     shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n"));
 2549 
 2550     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
 2551         char dictid_str[64];
 2552         int dictid_len;
 2553 
 2554         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
 2555         shared.select[j] = createObject(OBJ_STRING,
 2556             sdscatprintf(sdsempty(),
 2557                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
 2558                 dictid_len, dictid_str));
 2559     }
 2560     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
 2561     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
 2562     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
 2563     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
 2564     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
 2565     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
 2566 
 2567     /* Shared command names */
 2568     shared.del = createStringObject("DEL",3);
 2569     shared.unlink = createStringObject("UNLINK",6);
 2570     shared.rpop = createStringObject("RPOP",4);
 2571     shared.lpop = createStringObject("LPOP",4);
 2572     shared.lpush = createStringObject("LPUSH",5);
 2573     shared.rpoplpush = createStringObject("RPOPLPUSH",9);
 2574     shared.lmove = createStringObject("LMOVE",5);
 2575     shared.blmove = createStringObject("BLMOVE",6);
 2576     shared.zpopmin = createStringObject("ZPOPMIN",7);
 2577     shared.zpopmax = createStringObject("ZPOPMAX",7);
 2578     shared.multi = createStringObject("MULTI",5);
 2579     shared.exec = createStringObject("EXEC",4);
 2580     shared.hset = createStringObject("HSET",4);
 2581     shared.srem = createStringObject("SREM",4);
 2582     shared.xgroup = createStringObject("XGROUP",6);
 2583     shared.xclaim = createStringObject("XCLAIM",6);
 2584     shared.script = createStringObject("SCRIPT",6);
 2585     shared.replconf = createStringObject("REPLCONF",8);
 2586     shared.pexpireat = createStringObject("PEXPIREAT",9);
 2587     shared.pexpire = createStringObject("PEXPIRE",7);
 2588     shared.persist = createStringObject("PERSIST",7);
 2589     shared.set = createStringObject("SET",3);
 2590     shared.eval = createStringObject("EVAL",4);
 2591 
 2592     /* Shared command argument */
 2593     shared.left = createStringObject("left",4);
 2594     shared.right = createStringObject("right",5);
 2595     shared.pxat = createStringObject("PXAT", 4);
 2596     shared.px = createStringObject("PX",2);
 2597     shared.time = createStringObject("TIME",4);
 2598     shared.retrycount = createStringObject("RETRYCOUNT",10);
 2599     shared.force = createStringObject("FORCE",5);
 2600     shared.justid = createStringObject("JUSTID",6);
 2601     shared.lastid = createStringObject("LASTID",6);
 2602     shared.default_username = createStringObject("default",7);
 2603     shared.ping = createStringObject("ping",4);
 2604     shared.setid = createStringObject("SETID",5);
 2605     shared.keepttl = createStringObject("KEEPTTL",7);
 2606     shared.load = createStringObject("LOAD",4);
 2607     shared.createconsumer = createStringObject("CREATECONSUMER",14);
 2608     shared.getack = createStringObject("GETACK",6);
 2609     shared.special_asterick = createStringObject("*",1);
 2610     shared.special_equals = createStringObject("=",1);
 2611     shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
 2612 
 2613     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
 2614         shared.integers[j] =
 2615             makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
 2616         shared.integers[j]->encoding = OBJ_ENCODING_INT;
 2617     }
 2618     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
 2619         shared.mbulkhdr[j] = createObject(OBJ_STRING,
 2620             sdscatprintf(sdsempty(),"*%d\r\n",j));
 2621         shared.bulkhdr[j] = createObject(OBJ_STRING,
 2622             sdscatprintf(sdsempty(),"$%d\r\n",j));
 2623     }
 2624     /* The following two shared objects, minstring and maxstrings, are not
 2625      * actually used for their value but as a special object meaning
 2626      * respectively the minimum possible string and the maximum possible
 2627      * string in string comparisons for the ZRANGEBYLEX command. */
 2628     shared.minstring = sdsnew("minstring");
 2629     shared.maxstring = sdsnew("maxstring");
 2630 }
 2631 
 2632 void initServerConfig(void) {
 2633     int j;
 2634 
 2635     updateCachedTime(1);
 2636     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
 2637     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
 2638     changeReplicationId();
 2639     clearReplicationId2();
 2640     server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get
 2641                                       updated later after loading the config.
 2642                                       This value may be used before the server
 2643                                       is initialized. */
 2644     server.timezone = getTimeZone(); /* Initialized by tzset(). */
 2645     server.configfile = NULL;
 2646     server.executable = NULL;
 2647     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
 2648     server.bindaddr_count = 0;
 2649     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
 2650     server.ipfd.count = 0;
 2651     server.tlsfd.count = 0;
 2652     server.sofd = -1;
 2653     server.active_expire_enabled = 1;
 2654     server.skip_checksum_validation = 0;
 2655     server.saveparams = NULL;
 2656     server.loading = 0;
 2657     server.loading_rdb_used_mem = 0;
 2658     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
 2659     server.aof_state = AOF_OFF;
 2660     server.aof_rewrite_base_size = 0;
 2661     server.aof_rewrite_scheduled = 0;
 2662     server.aof_flush_sleep = 0;
 2663     server.aof_last_fsync = time(NULL);
 2664     atomicSet(server.aof_bio_fsync_status,C_OK);
 2665     server.aof_rewrite_time_last = -1;
 2666     server.aof_rewrite_time_start = -1;
 2667     server.aof_lastbgrewrite_status = C_OK;
 2668     server.aof_delayed_fsync = 0;
 2669     server.aof_fd = -1;
 2670     server.aof_selected_db = -1; /* Make sure the first time will not match */
 2671     server.aof_flush_postponed_start = 0;
 2672     server.pidfile = NULL;
 2673     server.active_defrag_running = 0;
 2674     server.notify_keyspace_events = 0;
 2675     server.blocked_clients = 0;
 2676     memset(server.blocked_clients_by_type,0,
 2677            sizeof(server.blocked_clients_by_type));
 2678     server.shutdown_asap = 0;
 2679     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
 2680     server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
 2681     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
 2682     server.next_client_id = 1; /* Client IDs, start from 1 .*/
 2683     server.loading_process_events_interval_bytes = (1024*1024*2);
 2684 
 2685     unsigned int lruclock = getLRUClock();
 2686     atomicSet(server.lruclock,lruclock);
 2687     resetServerSaveParams();
 2688 
 2689     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
 2690     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
 2691     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
 2692 
 2693     /* Replication related */
 2694     server.masterauth = NULL;
 2695     server.masterhost = NULL;
 2696     server.masterport = 6379;
 2697     server.master = NULL;
 2698     server.cached_master = NULL;
 2699     server.master_initial_offset = -1;
 2700     server.repl_state = REPL_STATE_NONE;
 2701     server.repl_transfer_tmpfile = NULL;
 2702     server.repl_transfer_fd = -1;
 2703     server.repl_transfer_s = NULL;
 2704     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
 2705     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
 2706     server.master_repl_offset = 0;
 2707 
 2708     /* Replication partial resync backlog */
 2709     server.repl_backlog = NULL;
 2710     server.repl_backlog_histlen = 0;
 2711     server.repl_backlog_idx = 0;
 2712     server.repl_backlog_off = 0;
 2713     server.repl_no_slaves_since = time(NULL);
 2714 
 2715     /* Failover related */
 2716     server.failover_end_time = 0;
 2717     server.force_failover = 0;
 2718     server.target_replica_host = NULL;
 2719     server.target_replica_port = 0;
 2720     server.failover_state = NO_FAILOVER;
 2721 
 2722     /* Client output buffer limits */
 2723     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
 2724         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
 2725 
 2726     /* Linux OOM Score config */
 2727     for (j = 0; j < CONFIG_OOM_COUNT; j++)
 2728         server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j];
 2729 
 2730     /* Double constants initialization */
 2731     R_Zero = 0.0;
 2732     R_PosInf = 1.0/R_Zero;
 2733     R_NegInf = -1.0/R_Zero;
 2734     R_Nan = R_Zero/R_Zero;
 2735 
 2736     /* Command table -- we initialize it here as it is part of the
 2737      * initial configuration, since command names may be changed via
 2738      * redis.conf using the rename-command directive. */
 2739     server.commands = dictCreate(&commandTableDictType,NULL);
 2740     server.orig_commands = dictCreate(&commandTableDictType,NULL);
 2741     populateCommandTable();
 2742     server.delCommand = lookupCommandByCString("del");
 2743     server.multiCommand = lookupCommandByCString("multi");
 2744     server.lpushCommand = lookupCommandByCString("lpush");
 2745     server.lpopCommand = lookupCommandByCString("lpop");
 2746     server.rpopCommand = lookupCommandByCString("rpop");
 2747     server.zpopminCommand = lookupCommandByCString("zpopmin");
 2748     server.zpopmaxCommand = lookupCommandByCString("zpopmax");
 2749     server.sremCommand = lookupCommandByCString("srem");
 2750     server.execCommand = lookupCommandByCString("exec");
 2751     server.expireCommand = lookupCommandByCString("expire");
 2752     server.pexpireCommand = lookupCommandByCString("pexpire");
 2753     server.xclaimCommand = lookupCommandByCString("xclaim");
 2754     server.xgroupCommand = lookupCommandByCString("xgroup");
 2755     server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
 2756     server.lmoveCommand = lookupCommandByCString("lmove");
 2757 
 2758     /* Debugging */
 2759     server.watchdog_period = 0;
 2760 
 2761     /* By default we want scripts to be always replicated by effects
 2762      * (single commands executed by the script), and not by sending the
 2763      * script to the slave / AOF. This is the new way starting from
 2764      * Redis 5. However it is possible to revert it via redis.conf. */
 2765     server.lua_always_replicate_commands = 1;
 2766 
 2767     initConfigValues();
 2768 }
 2769 
 2770 extern char **environ;
 2771 
 2772 /* Restart the server, executing the same executable that started this
 2773  * instance, with the same arguments and configuration file.
 2774  *
 2775  * The function is designed to directly call execve() so that the new
 2776  * server instance will retain the PID of the previous one.
 2777  *
 2778  * The list of flags, that may be bitwise ORed together, alter the
 2779  * behavior of this function:
 2780  *
 2781  * RESTART_SERVER_NONE              No flags.
 2782  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
 2783  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
 2784  *
 2785  * On success the function does not return, because the process turns into
 2786  * a different process. On error C_ERR is returned. */
 2787 int restartServer(int flags, mstime_t delay) {
 2788     int j;
 2789 
 2790     /* Check if we still have accesses to the executable that started this
 2791      * server instance. */
 2792     if (access(server.executable,X_OK) == -1) {
 2793         serverLog(LL_WARNING,"Can't restart: this process has no "
 2794                              "permissions to execute %s", server.executable);
 2795         return C_ERR;
 2796     }
 2797 
 2798     /* Config rewriting. */
 2799     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
 2800         server.configfile &&
 2801         rewriteConfig(server.configfile, 0) == -1)
 2802     {
 2803         serverLog(LL_WARNING,"Can't restart: configuration rewrite process "
 2804                              "failed");
 2805         return C_ERR;
 2806     }
 2807 
 2808     /* Perform a proper shutdown. */
 2809     if (flags & RESTART_SERVER_GRACEFULLY &&
 2810         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
 2811     {
 2812         serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
 2813         return C_ERR;
 2814     }
 2815 
 2816     /* Close all file descriptors, with the exception of stdin, stdout, strerr
 2817      * which are useful if we restart a Redis server which is not daemonized. */
 2818     for (j = 3; j < (int)server.maxclients + 1024; j++) {
 2819         /* Test the descriptor validity before closing it, otherwise
 2820          * Valgrind issues a warning on close(). */
 2821         if (fcntl(j,F_GETFD) != -1) close(j);
 2822     }
 2823 
 2824     /* Execute the server with the original command line. */
 2825     if (delay) usleep(delay*1000);
 2826     zfree(server.exec_argv[0]);
 2827     server.exec_argv[0] = zstrdup(server.executable);
 2828     execve(server.executable,server.exec_argv,environ);
 2829 
 2830     /* If an error occurred here, there is nothing we can do, but exit. */
 2831     _exit(1);
 2832 
 2833     return C_ERR; /* Never reached. */
 2834 }
 2835 
 2836 static void readOOMScoreAdj(void) {
 2837 #ifdef HAVE_PROC_OOM_SCORE_ADJ
 2838     char buf[64];
 2839     int fd = open("/proc/self/oom_score_adj", O_RDONLY);
 2840 
 2841     if (fd < 0) return;
 2842     if (read(fd, buf, sizeof(buf)) > 0)
 2843         server.oom_score_adj_base = atoi(buf);
 2844     close(fd);
 2845 #endif
 2846 }
 2847 
 2848 /* This function will configure the current process's oom_score_adj according
 2849  * to user specified configuration. This is currently implemented on Linux
 2850  * only.
 2851  *
 2852  * A process_class value of -1 implies OOM_CONFIG_MASTER or OOM_CONFIG_REPLICA,
 2853  * depending on current role.
 2854  */
 2855 int setOOMScoreAdj(int process_class) {
 2856 
 2857     if (server.oom_score_adj == OOM_SCORE_ADJ_NO) return C_OK;
 2858     if (process_class == -1)
 2859         process_class = (server.masterhost ? CONFIG_OOM_REPLICA : CONFIG_OOM_MASTER);
 2860 
 2861     serverAssert(process_class >= 0 && process_class < CONFIG_OOM_COUNT);
 2862 
 2863 #ifdef HAVE_PROC_OOM_SCORE_ADJ
 2864     int fd;
 2865     int val;
 2866     char buf[64];
 2867 
 2868     val = server.oom_score_adj_values[process_class];
 2869     if (server.oom_score_adj == OOM_SCORE_RELATIVE)
 2870         val += server.oom_score_adj_base;
 2871     if (val > 1000) val = 1000;
 2872     if (val < -1000) val = -1000;
 2873 
 2874     snprintf(buf, sizeof(buf) - 1, "%d\n", val);
 2875 
 2876     fd = open("/proc/self/oom_score_adj", O_WRONLY);
 2877     if (fd < 0 || write(fd, buf, strlen(buf)) < 0) {
 2878         serverLog(LOG_WARNING, "Unable to write oom_score_adj: %s", strerror(errno));
 2879         if (fd != -1) close(fd);
 2880         return C_ERR;
 2881     }
 2882 
 2883     close(fd);
 2884     return C_OK;
 2885 #else
 2886     /* Unsupported */
 2887     return C_ERR;
 2888 #endif
 2889 }
 2890 
 2891 /* This function will try to raise the max number of open files accordingly to
 2892  * the configured max number of clients. It also reserves a number of file
 2893  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
 2894  * persistence, listening sockets, log files and so forth.
 2895  *
 2896  * If it will not be possible to set the limit accordingly to the configured
 2897  * max number of clients, the function will do the reverse setting
 2898  * server.maxclients to the value that we can actually handle. */
 2899 void adjustOpenFilesLimit(void) {
 2900     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
 2901     struct rlimit limit;
 2902 
 2903     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
 2904         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
 2905             strerror(errno));
 2906         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
 2907     } else {
 2908         rlim_t oldlimit = limit.rlim_cur;
 2909 
 2910         /* Set the max number of files if the current limit is not enough
 2911          * for our needs. */
 2912         if (oldlimit < maxfiles) {
 2913             rlim_t bestlimit;
 2914             int setrlimit_error = 0;
 2915 
 2916             /* Try to set the file limit to match 'maxfiles' or at least
 2917              * to the higher value supported less than maxfiles. */
 2918             bestlimit = maxfiles;
 2919             while(bestlimit > oldlimit) {
 2920                 rlim_t decr_step = 16;
 2921 
 2922                 limit.rlim_cur = bestlimit;
 2923                 limit.rlim_max = bestlimit;
 2924                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
 2925                 setrlimit_error = errno;
 2926 
 2927                 /* We failed to set file limit to 'bestlimit'. Try with a
 2928                  * smaller limit decrementing by a few FDs per iteration. */
 2929                 if (bestlimit < decr_step) break;
 2930                 bestlimit -= decr_step;
 2931             }
 2932 
 2933             /* Assume that the limit we get initially is still valid if
 2934              * our last try was even lower. */
 2935             if (bestlimit < oldlimit) bestlimit = oldlimit;
 2936 
 2937             if (bestlimit < maxfiles) {
 2938                 unsigned int old_maxclients = server.maxclients;
 2939                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
 2940                 /* maxclients is unsigned so may overflow: in order
 2941                  * to check if maxclients is now logically less than 1
 2942                  * we test indirectly via bestlimit. */
 2943                 if (bestlimit <= CONFIG_MIN_RESERVED_FDS) {
 2944                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
 2945                         "of %llu is not enough for the server to start. "
 2946                         "Please increase your open file limit to at least "
 2947                         "%llu. Exiting.",
 2948                         (unsigned long long) oldlimit,
 2949                         (unsigned long long) maxfiles);
 2950                     exit(1);
 2951                 }
 2952                 serverLog(LL_WARNING,"You requested maxclients of %d "
 2953                     "requiring at least %llu max file descriptors.",
 2954                     old_maxclients,
 2955                     (unsigned long long) maxfiles);
 2956                 serverLog(LL_WARNING,"Server can't set maximum open files "
 2957                     "to %llu because of OS error: %s.",
 2958                     (unsigned long long) maxfiles, strerror(setrlimit_error));
 2959                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
 2960                     "maxclients has been reduced to %d to compensate for "
 2961                     "low ulimit. "
 2962                     "If you need higher maxclients increase 'ulimit -n'.",
 2963                     (unsigned long long) bestlimit, server.maxclients);
 2964             } else {
 2965                 serverLog(LL_NOTICE,"Increased maximum number of open files "
 2966                     "to %llu (it was originally set to %llu).",
 2967                     (unsigned long long) maxfiles,
 2968                     (unsigned long long) oldlimit);
 2969             }
 2970         }
 2971     }
 2972 }
 2973 
 2974 /* Check that server.tcp_backlog can be actually enforced in Linux according
 2975  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
 2976 void checkTcpBacklogSettings(void) {
 2977 #ifdef HAVE_PROC_SOMAXCONN
 2978     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
 2979     char buf[1024];
 2980     if (!fp) return;
 2981     if (fgets(buf,sizeof(buf),fp) != NULL) {
 2982         int somaxconn = atoi(buf);
 2983         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
 2984             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
 2985         }
 2986     }
 2987     fclose(fp);
 2988 #endif
 2989 }
 2990 
 2991 void closeSocketListeners(socketFds *sfd) {
 2992     int j;
 2993 
 2994     for (j = 0; j < sfd->count; j++) {
 2995         if (sfd->fd[j] == -1) continue;
 2996 
 2997         aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
 2998         close(sfd->fd[j]);
 2999     }
 3000 
 3001     sfd->count = 0;
 3002 }
 3003 
 3004 /* Create an event handler for accepting new connections in TCP or TLS domain sockets.
 3005  * This works atomically for all socket fds */
 3006 int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
 3007     int j;
 3008 
 3009     for (j = 0; j < sfd->count; j++) {
 3010         if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
 3011             /* Rollback */
 3012             for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
 3013             return C_ERR;
 3014         }
 3015     }
 3016     return C_OK;
 3017 }
 3018 
 3019 /* Initialize a set of file descriptors to listen to the specified 'port'
 3020  * binding the addresses specified in the Redis server configuration.
 3021  *
 3022  * The listening file descriptors are stored in the integer array 'fds'
 3023  * and their number is set in '*count'.
 3024  *
 3025  * The addresses to bind are specified in the global server.bindaddr array
 3026  * and their number is server.bindaddr_count. If the server configuration
 3027  * contains no specific addresses to bind, this function will try to
 3028  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
 3029  *
 3030  * On success the function returns C_OK.
 3031  *
 3032  * On error the function returns C_ERR. For the function to be on
 3033  * error, at least one of the server.bindaddr addresses was
 3034  * impossible to bind, or no bind addresses were specified in the server
 3035  * configuration but the function is not able to bind * for at least
 3036  * one of the IPv4 or IPv6 protocols. */
 3037 int listenToPort(int port, socketFds *sfd) {
 3038     int j;
 3039     char **bindaddr = server.bindaddr;
 3040     int bindaddr_count = server.bindaddr_count;
 3041     char *default_bindaddr[2] = {"*", "-::*"};
 3042 
 3043     /* Force binding of 0.0.0.0 if no bind address is specified. */
 3044     if (server.bindaddr_count == 0) {
 3045         bindaddr_count = 2;
 3046         bindaddr = default_bindaddr;
 3047     }
 3048 
 3049     for (j = 0; j < bindaddr_count; j++) {
 3050         char* addr = bindaddr[j];
 3051         int optional = *addr == '-';
 3052         if (optional) addr++;
 3053         if (strchr(addr,':')) {
 3054             /* Bind IPv6 address. */
 3055             sfd->fd[sfd->count] = anetTcp6Server(server.neterr,port,addr,server.tcp_backlog);
 3056         } else {
 3057             /* Bind IPv4 address. */
 3058             sfd->fd[sfd->count] = anetTcpServer(server.neterr,port,addr,server.tcp_backlog);
 3059         }
 3060         if (sfd->fd[sfd->count] == ANET_ERR) {
 3061             int net_errno = errno;
 3062             serverLog(LL_WARNING,
 3063                 "Warning: Could not create server TCP listening socket %s:%d: %s",
 3064                 addr, port, server.neterr);
 3065             if (net_errno == EADDRNOTAVAIL && optional)
 3066                 continue;
 3067             if (net_errno == ENOPROTOOPT     || net_errno == EPROTONOSUPPORT ||
 3068                 net_errno == ESOCKTNOSUPPORT || net_errno == EPFNOSUPPORT ||
 3069                 net_errno == EAFNOSUPPORT)
 3070                 continue;
 3071 
 3072             /* Rollback successful listens before exiting */
 3073             closeSocketListeners(sfd);
 3074             return C_ERR;
 3075         }
 3076         anetNonBlock(NULL,sfd->fd[sfd->count]);
 3077         anetCloexec(sfd->fd[sfd->count]);
 3078         sfd->count++;
 3079     }
 3080     return C_OK;
 3081 }
 3082 
 3083 /* Resets the stats that we expose via INFO or other means that we want
 3084  * to reset via CONFIG RESETSTAT. The function is also used in order to
 3085  * initialize these fields in initServer() at server startup. */
 3086 void resetServerStats(void) {
 3087     int j;
 3088 
 3089     server.stat_numcommands = 0;
 3090     server.stat_numconnections = 0;
 3091     server.stat_expiredkeys = 0;
 3092     server.stat_expired_stale_perc = 0;
 3093     server.stat_expired_time_cap_reached_count = 0;
 3094     server.stat_expire_cycle_time_used = 0;
 3095     server.stat_evictedkeys = 0;
 3096     server.stat_keyspace_misses = 0;
 3097     server.stat_keyspace_hits = 0;
 3098     server.stat_active_defrag_hits = 0;
 3099     server.stat_active_defrag_misses = 0;
 3100     server.stat_active_defrag_key_hits = 0;
 3101     server.stat_active_defrag_key_misses = 0;
 3102     server.stat_active_defrag_scanned = 0;
 3103     server.stat_fork_time = 0;
 3104     server.stat_fork_rate = 0;
 3105     server.stat_total_forks = 0;
 3106     server.stat_rejected_conn = 0;
 3107     server.stat_sync_full = 0;
 3108     server.stat_sync_partial_ok = 0;
 3109     server.stat_sync_partial_err = 0;
 3110     server.stat_io_reads_processed = 0;
 3111     atomicSet(server.stat_total_reads_processed, 0);
 3112     server.stat_io_writes_processed = 0;
 3113     atomicSet(server.stat_total_writes_processed, 0);
 3114     for (j = 0; j < STATS_METRIC_COUNT; j++) {
 3115         server.inst_metric[j].idx = 0;
 3116         server.inst_metric[j].last_sample_time = mstime();
 3117         server.inst_metric[j].last_sample_count = 0;
 3118         memset(server.inst_metric[j].samples,0,
 3119             sizeof(server.inst_metric[j].samples));
 3120     }
 3121     atomicSet(server.stat_net_input_bytes, 0);
 3122     atomicSet(server.stat_net_output_bytes, 0);
 3123     server.stat_unexpected_error_replies = 0;
 3124     server.stat_total_error_replies = 0;
 3125     server.stat_dump_payload_sanitizations = 0;
 3126     server.aof_delayed_fsync = 0;
 3127 }
 3128 
 3129 /* Make the thread killable at any time, so that kill threads functions
 3130  * can work reliably (default cancelability type is PTHREAD_CANCEL_DEFERRED).
 3131  * Needed for pthread_cancel used by the fast memory test used by the crash report. */
 3132 void makeThreadKillable(void) {
 3133     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
 3134     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
 3135 }
 3136 
 3137 void initServer(void) {
 3138     int j;
 3139 
 3140     signal(SIGHUP, SIG_IGN);
 3141     signal(SIGPIPE, SIG_IGN);
 3142     setupSignalHandlers();
 3143     makeThreadKillable();
 3144 
 3145     if (server.syslog_enabled) {
 3146         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
 3147             server.syslog_facility);
 3148     }
 3149 
 3150     /* Initialization after setting defaults from the config system. */
 3151     server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
 3152     server.hz = server.config_hz;
 3153     server.pid = getpid();
 3154     server.in_fork_child = CHILD_TYPE_NONE;
 3155     server.main_thread_id = pthread_self();
 3156     server.current_client = NULL;
 3157     server.errors = raxNew();
 3158     server.fixed_time_expire = 0;
 3159     server.clients = listCreate();
 3160     server.clients_index = raxNew();
 3161     server.clients_to_close = listCreate();
 3162     server.slaves = listCreate();
 3163     server.monitors = listCreate();
 3164     server.clients_pending_write = listCreate();
 3165     server.clients_pending_read = listCreate();
 3166     server.clients_timeout_table = raxNew();
 3167     server.replication_allowed = 1;
 3168     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
 3169     server.unblocked_clients = listCreate();
 3170     server.ready_keys = listCreate();
 3171     server.clients_waiting_acks = listCreate();
 3172     server.get_ack_from_slaves = 0;
 3173     server.client_pause_type = 0;
 3174     server.paused_clients = listCreate();
 3175     server.events_processed_while_blocked = 0;
 3176     server.system_memory_size = zmalloc_get_memory_size();
 3177     server.blocked_last_cron = 0;
 3178     server.blocking_op_nesting = 0;
 3179 
 3180     if ((server.tls_port || server.tls_replication || server.tls_cluster)
 3181                 && tlsConfigure(&server.tls_ctx_config) == C_ERR) {
 3182         serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
 3183         exit(1);
 3184     }
 3185 
 3186     createSharedObjects();
 3187     adjustOpenFilesLimit();
 3188     const char *clk_msg = monotonicInit();
 3189     serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg);
 3190     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
 3191     if (server.el == NULL) {
 3192         serverLog(LL_WARNING,
 3193             "Failed creating the event loop. Error message: '%s'",
 3194             strerror(errno));
 3195         exit(1);
 3196     }
 3197     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
 3198 
 3199     /* Open the TCP listening socket for the user commands. */
 3200     if (server.port != 0 &&
 3201         listenToPort(server.port,&server.ipfd) == C_ERR) {
 3202         serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
 3203         exit(1);
 3204     }
 3205     if (server.tls_port != 0 &&
 3206         listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
 3207         serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
 3208         exit(1);
 3209     }
 3210 
 3211     /* Open the listening Unix domain socket. */
 3212     if (server.unixsocket != NULL) {
 3213         unlink(server.unixsocket); /* don't care if this fails */
 3214         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
 3215             server.unixsocketperm, server.tcp_backlog);
 3216         if (server.sofd == ANET_ERR) {
 3217             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
 3218             exit(1);
 3219         }
 3220         anetNonBlock(NULL,server.sofd);
 3221         anetCloexec(server.sofd);
 3222     }
 3223 
 3224     /* Abort if there are no listening sockets at all. */
 3225     if (server.ipfd.count == 0 && server.tlsfd.count == 0 && server.sofd < 0) {
 3226         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
 3227         exit(1);
 3228     }
 3229 
 3230     /* Create the Redis databases, and initialize other internal state. */
 3231     for (j = 0; j < server.dbnum; j++) {
 3232         server.db[j].dict = dictCreate(&dbDictType,NULL);
 3233         server.db[j].expires = dictCreate(&dbExpiresDictType,NULL);
 3234         server.db[j].expires_cursor = 0;
 3235         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
 3236         server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
 3237         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
 3238         server.db[j].id = j;
 3239         server.db[j].avg_ttl = 0;
 3240         server.db[j].defrag_later = listCreate();
 3241         listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
 3242     }
 3243     evictionPoolAlloc(); /* Initialize the LRU keys pool. */
 3244     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
 3245     server.pubsub_patterns = dictCreate(&keylistDictType,NULL);
 3246     server.cronloops = 0;
 3247     server.in_eval = 0;
 3248     server.in_exec = 0;
 3249     server.propagate_in_transaction = 0;
 3250     server.client_pause_in_transaction = 0;
 3251     server.child_pid = -1;
 3252     server.child_type = CHILD_TYPE_NONE;
 3253     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
 3254     server.rdb_pipe_conns = NULL;
 3255     server.rdb_pipe_numconns = 0;
 3256     server.rdb_pipe_numconns_writing = 0;
 3257     server.rdb_pipe_buff = NULL;
 3258     server.rdb_pipe_bufflen = 0;
 3259     server.rdb_bgsave_scheduled = 0;
 3260     server.child_info_pipe[0] = -1;
 3261     server.child_info_pipe[1] = -1;
 3262     server.child_info_nread = 0;
 3263     aofRewriteBufferReset();
 3264     server.aof_buf = sdsempty();
 3265     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
 3266     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
 3267     server.rdb_save_time_last = -1;
 3268     server.rdb_save_time_start = -1;
 3269     server.dirty = 0;
 3270     resetServerStats();
 3271     /* A few stats we don't want to reset: server startup time, and peak mem. */
 3272     server.stat_starttime = time(NULL);
 3273     server.stat_peak_memory = 0;
 3274     server.stat_current_cow_bytes = 0;
 3275     server.stat_current_cow_updated = 0;
 3276     server.stat_current_save_keys_processed = 0;
 3277     server.stat_current_save_keys_total = 0;
 3278     server.stat_rdb_cow_bytes = 0;
 3279     server.stat_aof_cow_bytes = 0;
 3280     server.stat_module_cow_bytes = 0;
 3281     server.stat_module_progress = 0;
 3282     for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
 3283         server.stat_clients_type_memory[j] = 0;
 3284     server.cron_malloc_stats.zmalloc_used = 0;
 3285     server.cron_malloc_stats.process_rss = 0;
 3286     server.cron_malloc_stats.allocator_allocated = 0;
 3287     server.cron_malloc_stats.allocator_active = 0;
 3288     server.cron_malloc_stats.allocator_resident = 0;
 3289     server.lastbgsave_status = C_OK;
 3290     server.aof_last_write_status = C_OK;
 3291     server.aof_last_write_errno = 0;
 3292     server.repl_good_slaves_count = 0;
 3293 
 3294     /* Create the timer callback, this is our way to process many background
 3295      * operations incrementally, like clients timeout, eviction of unaccessed
 3296      * expired keys and so forth. */
 3297     if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
 3298         serverPanic("Can't create event loop timers.");
 3299         exit(1);
 3300     }
 3301 
 3302     /* Create an event handler for accepting new connections in TCP and Unix
 3303      * domain sockets. */
 3304     if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
 3305         serverPanic("Unrecoverable error creating TCP socket accept handler.");
 3306     }
 3307     if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
 3308         serverPanic("Unrecoverable error creating TLS socket accept handler.");
 3309     }
 3310     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
 3311         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
 3312 
 3313 
 3314     /* Register a readable event for the pipe used to awake the event loop
 3315      * when a blocked client in a module needs attention. */
 3316     if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
 3317         moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
 3318             serverPanic(
 3319                 "Error registering the readable event for the module "
 3320                 "blocked clients subsystem.");
 3321     }
 3322 
 3323     /* Register before and after sleep handlers (note this needs to be done
 3324      * before loading persistence since it is used by processEventsWhileBlocked. */
 3325     aeSetBeforeSleepProc(server.el,beforeSleep);
 3326     aeSetAfterSleepProc(server.el,afterSleep);
 3327 
 3328     /* Open the AOF file if needed. */
 3329     if (server.aof_state == AOF_ON) {
 3330         server.aof_fd = open(server.aof_filename,
 3331                                O_WRONLY|O_APPEND|O_CREAT,0644);
 3332         if (server.aof_fd == -1) {
 3333             serverLog(LL_WARNING, "Can't open the append-only file: %s",
 3334                 strerror(errno));
 3335             exit(1);
 3336         }
 3337     }
 3338 
 3339     /* 32 bit instances are limited to 4GB of address space, so if there is
 3340      * no explicit limit in the user provided configuration we set a limit
 3341      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
 3342      * useless crashes of the Redis instance for out of memory. */
 3343     if (server.arch_bits == 32 && server.maxmemory == 0) {
 3344         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
 3345         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
 3346         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
 3347     }
 3348 
 3349     if (server.cluster_enabled) clusterInit();
 3350     replicationScriptCacheInit();
 3351     scriptingInit(1);
 3352     slowlogInit();
 3353     latencyMonitorInit();
 3354     
 3355     /* Initialize ACL default password if it exists */
 3356     ACLUpdateDefaultUserPassword(server.requirepass);
 3357 }
 3358 
 3359 /* Some steps in server initialization need to be done last (after modules
 3360  * are loaded).
 3361  * Specifically, creation of threads due to a race bug in ld.so, in which
 3362  * Thread Local Storage initialization collides with dlopen call.
 3363  * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
 3364 void InitServerLast() {
 3365     bioInit();
 3366     initThreadedIO();
 3367     set_jemalloc_bg_thread(server.jemalloc_bg_thread);
 3368     server.initial_memory_usage = zmalloc_used_memory();
 3369 }
 3370 
 3371 /* Parse the flags string description 'strflags' and set them to the
 3372  * command 'c'. If the flags are all valid C_OK is returned, otherwise
 3373  * C_ERR is returned (yet the recognized flags are set in the command). */
 3374 int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) {
 3375     int argc;
 3376     sds *argv;
 3377 
 3378     /* Split the line into arguments for processing. */
 3379     argv = sdssplitargs(strflags,&argc);
 3380     if (argv == NULL) return C_ERR;
 3381 
 3382     for (int j = 0; j < argc; j++) {
 3383         char *flag = argv[j];
 3384         if (!strcasecmp(flag,"write")) {
 3385             c->flags |= CMD_WRITE|CMD_CATEGORY_WRITE;
 3386         } else if (!strcasecmp(flag,"read-only")) {
 3387             c->flags |= CMD_READONLY|CMD_CATEGORY_READ;
 3388         } else if (!strcasecmp(flag,"use-memory")) {
 3389             c->flags |= CMD_DENYOOM;
 3390         } else if (!strcasecmp(flag,"admin")) {
 3391             c->flags |= CMD_ADMIN|CMD_CATEGORY_ADMIN|CMD_CATEGORY_DANGEROUS;
 3392         } else if (!strcasecmp(flag,"pub-sub")) {
 3393             c->flags |= CMD_PUBSUB|CMD_CATEGORY_PUBSUB;
 3394         } else if (!strcasecmp(flag,"no-script")) {
 3395             c->flags |= CMD_NOSCRIPT;
 3396         } else if (!strcasecmp(flag,"random")) {
 3397             c->flags |= CMD_RANDOM;
 3398         } else if (!strcasecmp(flag,"to-sort")) {
 3399             c->flags |= CMD_SORT_FOR_SCRIPT;
 3400         } else if (!strcasecmp(flag,"ok-loading")) {
 3401             c->flags |= CMD_LOADING;
 3402         } else if (!strcasecmp(flag,"ok-stale")) {
 3403             c->flags |= CMD_STALE;
 3404         } else if (!strcasecmp(flag,"no-monitor")) {
 3405             c->flags |= CMD_SKIP_MONITOR;
 3406         } else if (!strcasecmp(flag,"no-slowlog")) {
 3407             c->flags |= CMD_SKIP_SLOWLOG;
 3408         } else if (!strcasecmp(flag,"cluster-asking")) {
 3409             c->flags |= CMD_ASKING;
 3410         } else if (!strcasecmp(flag,"fast")) {
 3411             c->flags |= CMD_FAST | CMD_CATEGORY_FAST;
 3412         } else if (!strcasecmp(flag,"no-auth")) {
 3413             c->flags |= CMD_NO_AUTH;
 3414         } else if (!strcasecmp(flag,"may-replicate")) {
 3415             c->flags |= CMD_MAY_REPLICATE;
 3416         } else {
 3417             /* Parse ACL categories here if the flag name starts with @. */
 3418             uint64_t catflag;
 3419             if (flag[0] == '@' &&
 3420                 (catflag = ACLGetCommandCategoryFlagByName(flag+1)) != 0)
 3421             {
 3422                 c->flags |= catflag;
 3423             } else {
 3424                 sdsfreesplitres(argv,argc);
 3425                 return C_ERR;
 3426             }
 3427         }
 3428     }
 3429     /* If it's not @fast is @slow in this binary world. */
 3430     if (!(c->flags & CMD_CATEGORY_FAST)) c->flags |= CMD_CATEGORY_SLOW;
 3431 
 3432     sdsfreesplitres(argv,argc);
 3433     return C_OK;
 3434 }
 3435 
 3436 /* Populates the Redis Command Table starting from the hard coded list
 3437  * we have on top of server.c file. */
 3438 void populateCommandTable(void) {
 3439     int j;
 3440     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
 3441 
 3442     for (j = 0; j < numcommands; j++) {
 3443         struct redisCommand *c = redisCommandTable+j;
 3444         int retval1, retval2;
 3445 
 3446         /* Translate the command string flags description into an actual
 3447          * set of flags. */
 3448         if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
 3449             serverPanic("Unsupported command flag");
 3450 
 3451         c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
 3452         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
 3453         /* Populate an additional dictionary that will be unaffected
 3454          * by rename-command statements in redis.conf. */
 3455         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
 3456         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
 3457     }
 3458 }
 3459 
 3460 void resetCommandTableStats(void) {
 3461     struct redisCommand *c;
 3462     dictEntry *de;
 3463     dictIterator *di;
 3464 
 3465     di = dictGetSafeIterator(server.commands);
 3466     while((de = dictNext(di)) != NULL) {
 3467         c = (struct redisCommand *) dictGetVal(de);
 3468         c->microseconds = 0;
 3469         c->calls = 0;
 3470         c->rejected_calls = 0;
 3471         c->failed_calls = 0;
 3472     }
 3473     dictReleaseIterator(di);
 3474 
 3475 }
 3476 
 3477 void resetErrorTableStats(void) {
 3478     raxFreeWithCallback(server.errors, zfree);
 3479     server.errors = raxNew();
 3480 }
 3481 
 3482 /* ========================== Redis OP Array API ============================ */
 3483 
 3484 void redisOpArrayInit(redisOpArray *oa) {
 3485     oa->ops = NULL;
 3486     oa->numops = 0;
 3487 }
 3488 
 3489 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
 3490                        robj **argv, int argc, int target)
 3491 {
 3492     redisOp *op;
 3493 
 3494     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
 3495     op = oa->ops+oa->numops;
 3496     op->cmd = cmd;
 3497     op->dbid = dbid;
 3498     op->argv = argv;
 3499     op->argc = argc;
 3500     op->target = target;
 3501     oa->numops++;
 3502     return oa->numops;
 3503 }
 3504 
 3505 void redisOpArrayFree(redisOpArray *oa) {
 3506     while(oa->numops) {
 3507         int j;
 3508         redisOp *op;
 3509 
 3510         oa->numops--;
 3511         op = oa->ops+oa->numops;
 3512         for (j = 0; j < op->argc; j++)
 3513             decrRefCount(op->argv[j]);
 3514         zfree(op->argv);
 3515     }
 3516     zfree(oa->ops);
 3517     oa->ops = NULL;
 3518 }
 3519 
 3520 /* ====================== Commands lookup and execution ===================== */
 3521 
 3522 struct redisCommand *lookupCommand(sds name) {
 3523     return dictFetchValue(server.commands, name);
 3524 }
 3525 
 3526 struct redisCommand *lookupCommandByCString(const char *s) {
 3527     struct redisCommand *cmd;
 3528     sds name = sdsnew(s);
 3529 
 3530     cmd = dictFetchValue(server.commands, name);
 3531     sdsfree(name);
 3532     return cmd;
 3533 }
 3534 
 3535 /* Lookup the command in the current table, if not found also check in
 3536  * the original table containing the original command names unaffected by
 3537  * redis.conf rename-command statement.
 3538  *
 3539  * This is used by functions rewriting the argument vector such as
 3540  * rewriteClientCommandVector() in order to set client->cmd pointer
 3541  * correctly even if the command was renamed. */
 3542 struct redisCommand *lookupCommandOrOriginal(sds name) {
 3543     struct redisCommand *cmd = dictFetchValue(server.commands, name);
 3544 
 3545     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
 3546     return cmd;
 3547 }
 3548 
 3549 /* Propagate the specified command (in the context of the specified database id)
 3550  * to AOF and Slaves.
 3551  *
 3552  * flags are an xor between:
 3553  * + PROPAGATE_NONE (no propagation of command at all)
 3554  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
 3555  * + PROPAGATE_REPL (propagate into the replication link)
 3556  *
 3557  * This should not be used inside commands implementation since it will not
 3558  * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(),
 3559  * preventCommandPropagation(), forceCommandPropagation().
 3560  *
 3561  * However for functions that need to (also) propagate out of the context of a
 3562  * command execution, for example when serving a blocked client, you
 3563  * want to use propagate().
 3564  */
 3565 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 3566                int flags)
 3567 {
 3568     if (!server.replication_allowed)
 3569         return;
 3570 
 3571     /* Propagate a MULTI request once we encounter the first command which
 3572      * is a write command.
 3573      * This way we'll deliver the MULTI/..../EXEC block as a whole and
 3574      * both the AOF and the replication link will have the same consistency
 3575      * and atomicity guarantees. */
 3576     if (server.in_exec && !server.propagate_in_transaction)
 3577         execCommandPropagateMulti(dbid);
 3578 
 3579     /* This needs to be unreachable since the dataset should be fixed during 
 3580      * client pause, otherwise data may be lossed during a failover. */
 3581     serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction));
 3582 
 3583     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
 3584         feedAppendOnlyFile(cmd,dbid,argv,argc);
 3585     if (flags & PROPAGATE_REPL)
 3586         replicationFeedSlaves(server.slaves,dbid,argv,argc);
 3587 }
 3588 
 3589 /* Used inside commands to schedule the propagation of additional commands
 3590  * after the current command is propagated to AOF / Replication.
 3591  *
 3592  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
 3593  * database ID the command should be propagated into.
 3594  * Arguments of the command to propagate are passed as an array of redis
 3595  * objects pointers of len 'argc', using the 'argv' vector.
 3596  *
 3597  * The function does not take a reference to the passed 'argv' vector,
 3598  * so it is up to the caller to release the passed argv (but it is usually
 3599  * stack allocated).  The function automatically increments ref count of
 3600  * passed objects, so the caller does not need to. */
 3601 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 3602                    int target)
 3603 {
 3604     robj **argvcopy;
 3605     int j;
 3606 
 3607     if (server.loading) return; /* No propagation during loading. */
 3608 
 3609     argvcopy = zmalloc(sizeof(robj*)*argc);
 3610     for (j = 0; j < argc; j++) {
 3611         argvcopy[j] = argv[j];
 3612         incrRefCount(argv[j]);
 3613     }
 3614     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
 3615 }
 3616 
 3617 /* It is possible to call the function forceCommandPropagation() inside a
 3618  * Redis command implementation in order to to force the propagation of a
 3619  * specific command execution into AOF / Replication. */
 3620 void forceCommandPropagation(client *c, int flags) {
 3621     serverAssert(c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE));
 3622     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
 3623     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
 3624 }
 3625 
 3626 /* Avoid that the executed command is propagated at all. This way we
 3627  * are free to just propagate what we want using the alsoPropagate()
 3628  * API. */
 3629 void preventCommandPropagation(client *c) {
 3630     c->flags |= CLIENT_PREVENT_PROP;
 3631 }
 3632 
 3633 /* AOF specific version of preventCommandPropagation(). */
 3634 void preventCommandAOF(client *c) {
 3635     c->flags |= CLIENT_PREVENT_AOF_PROP;
 3636 }
 3637 
 3638 /* Replication specific version of preventCommandPropagation(). */
 3639 void preventCommandReplication(client *c) {
 3640     c->flags |= CLIENT_PREVENT_REPL_PROP;
 3641 }
 3642 
 3643 /* Log the last command a client executed into the slowlog. */
 3644 void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration) {
 3645     /* Some commands may contain sensitive data that should not be available in the slowlog. */
 3646     if (cmd->flags & CMD_SKIP_SLOWLOG)
 3647         return;
 3648 
 3649     /* If command argument vector was rewritten, use the original
 3650      * arguments. */
 3651     robj **argv = c->original_argv ? c->original_argv : c->argv;
 3652     int argc = c->original_argv ? c->original_argc : c->argc;
 3653     slowlogPushEntryIfNeeded(c,argv,argc,duration);
 3654 }
 3655 
 3656 /* Call() is the core of Redis execution of a command.
 3657  *
 3658  * The following flags can be passed:
 3659  * CMD_CALL_NONE        No flags.
 3660  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
 3661  * CMD_CALL_STATS       Populate command stats.
 3662  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
 3663  *                          or if the client flags are forcing propagation.
 3664  * CMD_CALL_PROPAGATE_REPL  Send command to slaves if it modified the dataset
 3665  *                          or if the client flags are forcing propagation.
 3666  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
 3667  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
 3668  *
 3669  * The exact propagation behavior depends on the client flags.
 3670  * Specifically:
 3671  *
 3672  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
 3673  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
 3674  *    in the call flags, then the command is propagated even if the
 3675  *    dataset was not affected by the command.
 3676  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
 3677  *    are set, the propagation into AOF or to slaves is not performed even
 3678  *    if the command modified the dataset.
 3679  *
 3680  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
 3681  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
 3682  * slaves propagation will never occur.
 3683  *
 3684  * Client flags are modified by the implementation of a given command
 3685  * using the following API:
 3686  *
 3687  * forceCommandPropagation(client *c, int flags);
 3688  * preventCommandPropagation(client *c);
 3689  * preventCommandAOF(client *c);
 3690  * preventCommandReplication(client *c);
 3691  *
 3692  */
 3693 void call(client *c, int flags) {
 3694     long long dirty;
 3695     monotime call_timer;
 3696     int client_old_flags = c->flags;
 3697     struct redisCommand *real_cmd = c->cmd;
 3698     static long long prev_err_count;
 3699 
 3700     /* Initialization: clear the flags that must be set by the command on
 3701      * demand, and initialize the array for additional commands propagation. */
 3702     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 3703     redisOpArray prev_also_propagate = server.also_propagate;
 3704     redisOpArrayInit(&server.also_propagate);
 3705 
 3706     /* Call the command. */
 3707     dirty = server.dirty;
 3708     prev_err_count = server.stat_total_error_replies;
 3709 
 3710     /* Update cache time, in case we have nested calls we want to
 3711      * update only on the first call*/
 3712     if (server.fixed_time_expire++ == 0) {
 3713         updateCachedTime(0);
 3714     }
 3715 
 3716     elapsedStart(&call_timer);
 3717     c->cmd->proc(c);
 3718     const long duration = elapsedUs(call_timer);
 3719     c->duration = duration;
 3720     dirty = server.dirty-dirty;
 3721     if (dirty < 0) dirty = 0;
 3722 
 3723     /* Update failed command calls if required.
 3724      * We leverage a static variable (prev_err_count) to retain
 3725      * the counter across nested function calls and avoid logging
 3726      * the same error twice. */
 3727     if ((server.stat_total_error_replies - prev_err_count) > 0) {
 3728         real_cmd->failed_calls++;
 3729     }
 3730 
 3731     /* After executing command, we will close the client after writing entire
 3732      * reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
 3733     if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
 3734         c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
 3735         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
 3736     }
 3737 
 3738     /* When EVAL is called loading the AOF we don't want commands called
 3739      * from Lua to go into the slowlog or to populate statistics. */
 3740     if (server.loading && c->flags & CLIENT_LUA)
 3741         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
 3742 
 3743     /* If the caller is Lua, we want to force the EVAL caller to propagate
 3744      * the script if the command flag or client flag are forcing the
 3745      * propagation. */
 3746     if (c->flags & CLIENT_LUA && server.lua_caller) {
 3747         if (c->flags & CLIENT_FORCE_REPL)
 3748             server.lua_caller->flags |= CLIENT_FORCE_REPL;
 3749         if (c->flags & CLIENT_FORCE_AOF)
 3750             server.lua_caller->flags |= CLIENT_FORCE_AOF;
 3751     }
 3752 
 3753     /* Note: the code below uses the real command that was executed
 3754      * c->cmd and c->lastcmd may be different, in case of MULTI-EXEC or
 3755      * re-written commands such as EXPIRE, GEOADD, etc. */
 3756 
 3757     /* Record the latency this command induced on the main thread.
 3758      * unless instructed by the caller not to log. (happens when processing
 3759      * a MULTI-EXEC from inside an AOF). */
 3760     if (flags & CMD_CALL_SLOWLOG) {
 3761         char *latency_event = (real_cmd->flags & CMD_FAST) ?
 3762                                "fast-command" : "command";
 3763         latencyAddSampleIfNeeded(latency_event,duration/1000);
 3764     }
 3765 
 3766     /* Log the command into the Slow log if needed.
 3767      * If the client is blocked we will handle slowlog when it is unblocked. */
 3768     if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED))
 3769         slowlogPushCurrentCommand(c, real_cmd, duration);
 3770 
 3771     /* Send the command to clients in MONITOR mode if applicable.
 3772      * Administrative commands are considered too dangerous to be shown. */
 3773     if (!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
 3774         robj **argv = c->original_argv ? c->original_argv : c->argv;
 3775         int argc = c->original_argv ? c->original_argc : c->argc;
 3776         replicationFeedMonitors(c,server.monitors,c->db->id,argv,argc);
 3777     }
 3778 
 3779     /* Clear the original argv.
 3780      * If the client is blocked we will handle slowlog when it is unblocked. */
 3781     if (!(c->flags & CLIENT_BLOCKED))
 3782         freeClientOriginalArgv(c);
 3783 
 3784     /* populate the per-command statistics that we show in INFO commandstats. */
 3785     if (flags & CMD_CALL_STATS) {
 3786         real_cmd->microseconds += duration;
 3787         real_cmd->calls++;
 3788     }
 3789 
 3790     /* Propagate the command into the AOF and replication link */
 3791     if (flags & CMD_CALL_PROPAGATE &&
 3792         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
 3793     {
 3794         int propagate_flags = PROPAGATE_NONE;
 3795 
 3796         /* Check if the command operated changes in the data set. If so
 3797          * set for replication / AOF propagation. */
 3798         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
 3799 
 3800         /* If the client forced AOF / replication of the command, set
 3801          * the flags regardless of the command effects on the data set. */
 3802         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
 3803         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
 3804 
 3805         /* However prevent AOF / replication propagation if the command
 3806          * implementation called preventCommandPropagation() or similar,
 3807          * or if we don't have the call() flags to do so. */
 3808         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
 3809             !(flags & CMD_CALL_PROPAGATE_REPL))
 3810                 propagate_flags &= ~PROPAGATE_REPL;
 3811         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
 3812             !(flags & CMD_CALL_PROPAGATE_AOF))
 3813                 propagate_flags &= ~PROPAGATE_AOF;
 3814 
 3815         /* Call propagate() only if at least one of AOF / replication
 3816          * propagation is needed. Note that modules commands handle replication
 3817          * in an explicit way, so we never replicate them automatically. */
 3818         if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
 3819             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
 3820     }
 3821 
 3822     /* Restore the old replication flags, since call() can be executed
 3823      * recursively. */
 3824     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 3825     c->flags |= client_old_flags &
 3826         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 3827 
 3828     /* Handle the alsoPropagate() API to handle commands that want to propagate
 3829      * multiple separated commands. Note that alsoPropagate() is not affected
 3830      * by CLIENT_PREVENT_PROP flag. */
 3831     if (server.also_propagate.numops) {
 3832         int j;
 3833         redisOp *rop;
 3834 
 3835         if (flags & CMD_CALL_PROPAGATE) {
 3836             int multi_emitted = 0;
 3837             /* Wrap the commands in server.also_propagate array,
 3838              * but don't wrap it if we are already in MULTI context,
 3839              * in case the nested MULTI/EXEC.
 3840              *
 3841              * And if the array contains only one command, no need to
 3842              * wrap it, since the single command is atomic. */
 3843             if (server.also_propagate.numops > 1 &&
 3844                 !(c->cmd->flags & CMD_MODULE) &&
 3845                 !(c->flags & CLIENT_MULTI) &&
 3846                 !(flags & CMD_CALL_NOWRAP))
 3847             {
 3848                 execCommandPropagateMulti(c->db->id);
 3849                 multi_emitted = 1;
 3850             }
 3851 
 3852             for (j = 0; j < server.also_propagate.numops; j++) {
 3853                 rop = &server.also_propagate.ops[j];
 3854                 int target = rop->target;
 3855                 /* Whatever the command wish is, we honor the call() flags. */
 3856                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
 3857                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
 3858                 if (target)
 3859                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
 3860             }
 3861 
 3862             if (multi_emitted) {
 3863                 execCommandPropagateExec(c->db->id);
 3864             }
 3865         }
 3866         redisOpArrayFree(&server.also_propagate);
 3867     }
 3868     server.also_propagate = prev_also_propagate;
 3869 
 3870     /* Client pause takes effect after a transaction has finished. This needs
 3871      * to be located after everything is propagated. */
 3872     if (!server.in_exec && server.client_pause_in_transaction) {
 3873         server.client_pause_in_transaction = 0;
 3874     }
 3875 
 3876     /* If the client has keys tracking enabled for client side caching,
 3877      * make sure to remember the keys it fetched via this command. */
 3878     if (c->cmd->flags & CMD_READONLY) {
 3879         client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
 3880                             server.lua_caller : c;
 3881         if (caller->flags & CLIENT_TRACKING &&
 3882             !(caller->flags & CLIENT_TRACKING_BCAST))
 3883         {
 3884             trackingRememberKeys(caller);
 3885         }
 3886     }
 3887 
 3888     server.fixed_time_expire--;
 3889     server.stat_numcommands++;
 3890     prev_err_count = server.stat_total_error_replies;
 3891 
 3892     /* Record peak memory after each command and before the eviction that runs
 3893      * before the next command. */
 3894     size_t zmalloc_used = zmalloc_used_memory();
 3895     if (zmalloc_used > server.stat_peak_memory)
 3896         server.stat_peak_memory = zmalloc_used;
 3897 }
 3898 
 3899 /* Used when a command that is ready for execution needs to be rejected, due to
 3900  * varios pre-execution checks. it returns the appropriate error to the client.
 3901  * If there's a transaction is flags it as dirty, and if the command is EXEC,
 3902  * it aborts the transaction.
 3903  * Note: 'reply' is expected to end with \r\n */
 3904 void rejectCommand(client *c, robj *reply) {
 3905     flagTransaction(c);
 3906     if (c->cmd) c->cmd->rejected_calls++;
 3907     if (c->cmd && c->cmd->proc == execCommand) {
 3908         execCommandAbort(c, reply->ptr);
 3909     } else {
 3910         /* using addReplyError* rather than addReply so that the error can be logged. */
 3911         addReplyErrorObject(c, reply);
 3912     }
 3913 }
 3914 
 3915 void rejectCommandFormat(client *c, const char *fmt, ...) {
 3916     if (c->cmd) c->cmd->rejected_calls++;
 3917     flagTransaction(c);
 3918     va_list ap;
 3919     va_start(ap,fmt);
 3920     sds s = sdscatvprintf(sdsempty(),fmt,ap);
 3921     va_end(ap);
 3922     /* Make sure there are no newlines in the string, otherwise invalid protocol
 3923      * is emitted (The args come from the user, they may contain any character). */
 3924     sdsmapchars(s, "\r\n", "  ",  2);
 3925     if (c->cmd && c->cmd->proc == execCommand) {
 3926         execCommandAbort(c, s);
 3927         sdsfree(s);
 3928     } else {
 3929         /* The following frees 's'. */
 3930         addReplyErrorSds(c, s);
 3931     }
 3932 }
 3933 
 3934 /* Returns 1 for commands that may have key names in their arguments, but have
 3935  * no pre-determined key positions. */
 3936 static int cmdHasMovableKeys(struct redisCommand *cmd) {
 3937     return (cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
 3938             cmd->flags & CMD_MODULE_GETKEYS;
 3939 }
 3940 
 3941 /* If this function gets called we already read a whole
 3942  * command, arguments are in the client argv/argc fields.
 3943  * processCommand() execute the command or prepare the
 3944  * server for a bulk read from the client.
 3945  *
 3946  * If C_OK is returned the client is still alive and valid and
 3947  * other operations can be performed by the caller. Otherwise
 3948  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
 3949 int processCommand(client *c) {
 3950     if (!server.lua_timedout) {
 3951         /* Both EXEC and EVAL call call() directly so there should be
 3952          * no way in_exec or in_eval or propagate_in_transaction is 1.
 3953          * That is unless lua_timedout, in which case client may run
 3954          * some commands. */
 3955         serverAssert(!server.propagate_in_transaction);
 3956         serverAssert(!server.in_exec);
 3957         serverAssert(!server.in_eval);
 3958     }
 3959 
 3960     moduleCallCommandFilters(c);
 3961 
 3962     /* The QUIT command is handled separately. Normal command procs will
 3963      * go through checking for replication and QUIT will cause trouble
 3964      * when FORCE_REPLICATION is enabled and would be implemented in
 3965      * a regular command proc. */
 3966     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
 3967         addReply(c,shared.ok);
 3968         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
 3969         return C_ERR;
 3970     }
 3971 
 3972     /* Now lookup the command and check ASAP about trivial error conditions
 3973      * such as wrong arity, bad command name and so forth. */
 3974     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
 3975     if (!c->cmd) {
 3976         sds args = sdsempty();
 3977         int i;
 3978         for (i=1; i < c->argc && sdslen(args) < 128; i++)
 3979             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
 3980         rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
 3981             (char*)c->argv[0]->ptr, args);
 3982         sdsfree(args);
 3983         return C_OK;
 3984     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
 3985                (c->argc < -c->cmd->arity)) {
 3986         rejectCommandFormat(c,"wrong number of arguments for '%s' command",
 3987             c->cmd->name);
 3988         return C_OK;
 3989     }
 3990 
 3991     int is_read_command = (c->cmd->flags & CMD_READONLY) ||
 3992                            (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
 3993     int is_write_command = (c->cmd->flags & CMD_WRITE) ||
 3994                            (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
 3995     int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
 3996                              (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
 3997     int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
 3998                                (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
 3999     int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
 4000                                  (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
 4001     int is_may_replicate_command = (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) ||
 4002                                    (c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
 4003 
 4004     /* Check if the user is authenticated. This check is skipped in case
 4005      * the default user is flagged as "nopass" and is active. */
 4006     int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
 4007                           (DefaultUser->flags & USER_FLAG_DISABLED)) &&
 4008                         !c->authenticated;
 4009     if (auth_required) {
 4010         /* AUTH and HELLO and no auth modules are valid even in
 4011          * non-authenticated state. */
 4012         if (!(c->cmd->flags & CMD_NO_AUTH)) {
 4013             rejectCommand(c,shared.noautherr);
 4014             return C_OK;
 4015         }
 4016     }
 4017 
 4018     /* Check if the user can run this command according to the current
 4019      * ACLs. */
 4020     int acl_errpos;
 4021     int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
 4022     if (acl_retval != ACL_OK) {
 4023         addACLLogEntry(c,acl_retval,acl_errpos,NULL);
 4024         switch (acl_retval) {
 4025         case ACL_DENIED_CMD:
 4026             rejectCommandFormat(c,
 4027                 "-NOPERM this user has no permissions to run "
 4028                 "the '%s' command or its subcommand", c->cmd->name);
 4029             break;
 4030         case ACL_DENIED_KEY:
 4031             rejectCommandFormat(c,
 4032                 "-NOPERM this user has no permissions to access "
 4033                 "one of the keys used as arguments");
 4034             break;
 4035         case ACL_DENIED_CHANNEL:
 4036             rejectCommandFormat(c,
 4037                 "-NOPERM this user has no permissions to access "
 4038                 "one of the channels used as arguments");
 4039             break;
 4040         default:
 4041             rejectCommandFormat(c, "no permission");
 4042             break;
 4043         }
 4044         return C_OK;
 4045     }
 4046 
 4047     /* If cluster is enabled perform the cluster redirection here.
 4048      * However we don't perform the redirection if:
 4049      * 1) The sender of this command is our master.
 4050      * 2) The command has no key arguments. */
 4051     if (server.cluster_enabled &&
 4052         !(c->flags & CLIENT_MASTER) &&
 4053         !(c->flags & CLIENT_LUA &&
 4054           server.lua_caller->flags & CLIENT_MASTER) &&
 4055         !(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 &&
 4056           c->cmd->proc != execCommand))
 4057     {
 4058         int hashslot;
 4059         int error_code;
 4060         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
 4061                                         &hashslot,&error_code);
 4062         if (n == NULL || n != server.cluster->myself) {
 4063             if (c->cmd->proc == execCommand) {
 4064                 discardTransaction(c);
 4065             } else {
 4066                 flagTransaction(c);
 4067             }
 4068             clusterRedirectClient(c,n,hashslot,error_code);
 4069             c->cmd->rejected_calls++;
 4070             return C_OK;
 4071         }
 4072     }
 4073 
 4074     /* Handle the maxmemory directive.
 4075      *
 4076      * Note that we do not want to reclaim memory if we are here re-entering
 4077      * the event loop since there is a busy Lua script running in timeout
 4078      * condition, to avoid mixing the propagation of scripts with the
 4079      * propagation of DELs due to eviction. */
 4080     if (server.maxmemory && !server.lua_timedout) {
 4081         int out_of_memory = (performEvictions() == EVICT_FAIL);
 4082         /* performEvictions may flush slave output buffers. This may result
 4083          * in a slave, that may be the active client, to be freed. */
 4084         if (server.current_client == NULL) return C_ERR;
 4085 
 4086         int reject_cmd_on_oom = is_denyoom_command;
 4087         /* If client is in MULTI/EXEC context, queuing may consume an unlimited
 4088          * amount of memory, so we want to stop that.
 4089          * However, we never want to reject DISCARD, or even EXEC (unless it
 4090          * contains denied commands, in which case is_denyoom_command is already
 4091          * set. */
 4092         if (c->flags & CLIENT_MULTI &&
 4093             c->cmd->proc != execCommand &&
 4094             c->cmd->proc != discardCommand &&
 4095             c->cmd->proc != resetCommand) {
 4096             reject_cmd_on_oom = 1;
 4097         }
 4098 
 4099         if (out_of_memory && reject_cmd_on_oom) {
 4100             rejectCommand(c, shared.oomerr);
 4101             return C_OK;
 4102         }
 4103 
 4104         /* Save out_of_memory result at script start, otherwise if we check OOM
 4105          * until first write within script, memory used by lua stack and
 4106          * arguments might interfere. */
 4107         if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
 4108             server.lua_oom = out_of_memory;
 4109         }
 4110     }
 4111 
 4112     /* Make sure to use a reasonable amount of memory for client side
 4113      * caching metadata. */
 4114     if (server.tracking_clients) trackingLimitUsedSlots();
 4115 
 4116     /* Don't accept write commands if there are problems persisting on disk
 4117      * and if this is a master instance. */
 4118     int deny_write_type = writeCommandsDeniedByDiskError();
 4119     if (deny_write_type != DISK_ERROR_TYPE_NONE &&
 4120         server.masterhost == NULL &&
 4121         (is_write_command ||c->cmd->proc == pingCommand))
 4122     {
 4123         if (deny_write_type == DISK_ERROR_TYPE_RDB)
 4124             rejectCommand(c, shared.bgsaveerr);
 4125         else
 4126             rejectCommandFormat(c,
 4127                 "-MISCONF Errors writing to the AOF file: %s",
 4128                 strerror(server.aof_last_write_errno));
 4129         return C_OK;
 4130     }
 4131 
 4132     /* Don't accept write commands if there are not enough good slaves and
 4133      * user configured the min-slaves-to-write option. */
 4134     if (server.masterhost == NULL &&
 4135         server.repl_min_slaves_to_write &&
 4136         server.repl_min_slaves_max_lag &&
 4137         is_write_command &&
 4138         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
 4139     {
 4140         rejectCommand(c, shared.noreplicaserr);
 4141         return C_OK;
 4142     }
 4143 
 4144     /* Don't accept write commands if this is a read only slave. But
 4145      * accept write commands if this is our master. */
 4146     if (server.masterhost && server.repl_slave_ro &&
 4147         !(c->flags & CLIENT_MASTER) &&
 4148         is_write_command)
 4149     {
 4150         rejectCommand(c, shared.roslaveerr);
 4151         return C_OK;
 4152     }
 4153 
 4154     /* Only allow a subset of commands in the context of Pub/Sub if the
 4155      * connection is in RESP2 mode. With RESP3 there are no limits. */
 4156     if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
 4157         c->cmd->proc != pingCommand &&
 4158         c->cmd->proc != subscribeCommand &&
 4159         c->cmd->proc != unsubscribeCommand &&
 4160         c->cmd->proc != psubscribeCommand &&
 4161         c->cmd->proc != punsubscribeCommand &&
 4162         c->cmd->proc != resetCommand) {
 4163         rejectCommandFormat(c,
 4164             "Can't execute '%s': only (P)SUBSCRIBE / "
 4165             "(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
 4166             c->cmd->name);
 4167         return C_OK;
 4168     }
 4169 
 4170     /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
 4171      * when slave-serve-stale-data is no and we are a slave with a broken
 4172      * link with master. */
 4173     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
 4174         server.repl_serve_stale_data == 0 &&
 4175         is_denystale_command)
 4176     {
 4177         rejectCommand(c, shared.masterdownerr);
 4178         return C_OK;
 4179     }
 4180 
 4181     /* Loading DB? Return an error if the command has not the
 4182      * CMD_LOADING flag. */
 4183     if (server.loading && is_denyloading_command) {
 4184         rejectCommand(c, shared.loadingerr);
 4185         return C_OK;
 4186     }
 4187 
 4188     /* Lua script too slow? Only allow a limited number of commands.
 4189      * Note that we need to allow the transactions commands, otherwise clients
 4190      * sending a transaction with pipelining without error checking, may have
 4191      * the MULTI plus a few initial commands refused, then the timeout
 4192      * condition resolves, and the bottom-half of the transaction gets
 4193      * executed, see Github PR #7022. */
 4194     if (server.lua_timedout &&
 4195           c->cmd->proc != authCommand &&
 4196           c->cmd->proc != helloCommand &&
 4197           c->cmd->proc != replconfCommand &&
 4198           c->cmd->proc != multiCommand &&
 4199           c->cmd->proc != discardCommand &&
 4200           c->cmd->proc != watchCommand &&
 4201           c->cmd->proc != unwatchCommand &&
 4202           c->cmd->proc != resetCommand &&
 4203         !(c->cmd->proc == shutdownCommand &&
 4204           c->argc == 2 &&
 4205           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
 4206         !(c->cmd->proc == scriptCommand &&
 4207           c->argc == 2 &&
 4208           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
 4209     {
 4210         rejectCommand(c, shared.slowscripterr);
 4211         return C_OK;
 4212     }
 4213 
 4214     /* Prevent a replica from sending commands that access the keyspace.
 4215      * The main objective here is to prevent abuse of client pause check
 4216      * from which replicas are exempt. */
 4217     if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {
 4218         rejectCommandFormat(c, "Replica can't interract with the keyspace");
 4219         return C_OK;
 4220     }
 4221 
 4222     /* If the server is paused, block the client until
 4223      * the pause has ended. Replicas are never paused. */
 4224     if (!(c->flags & CLIENT_SLAVE) && 
 4225         ((server.client_pause_type == CLIENT_PAUSE_ALL) ||
 4226         (server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
 4227     {
 4228         c->bpop.timeout = 0;
 4229         blockClient(c,BLOCKED_PAUSE);
 4230         return C_OK;       
 4231     }
 4232 
 4233     /* Exec the command */
 4234     if (c->flags & CLIENT_MULTI &&
 4235         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
 4236         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
 4237         c->cmd->proc != resetCommand)
 4238     {
 4239         queueMultiCommand(c);
 4240         addReply(c,shared.queued);
 4241     } else {
 4242         call(c,CMD_CALL_FULL);
 4243         c->woff = server.master_repl_offset;
 4244         if (listLength(server.ready_keys))
 4245             handleClientsBlockedOnKeys();
 4246     }
 4247 
 4248     return C_OK;
 4249 }
 4250 
 4251 /* ====================== Error lookup and execution ===================== */
 4252 
 4253 void incrementErrorCount(const char *fullerr, size_t namelen) {
 4254     struct redisError *error = raxFind(server.errors,(unsigned char*)fullerr,namelen);
 4255     if (error == raxNotFound) {
 4256         error = zmalloc(sizeof(*error));
 4257         error->count = 0;
 4258         raxInsert(server.errors,(unsigned char*)fullerr,namelen,error,NULL);
 4259     }
 4260     error->count++;
 4261 }
 4262 
 4263 /*================================== Shutdown =============================== */
 4264 
 4265 /* Close listening sockets. Also unlink the unix domain socket if
 4266  * unlink_unix_socket is non-zero. */
 4267 void closeListeningSockets(int unlink_unix_socket) {
 4268     int j;
 4269 
 4270     for (j = 0; j < server.ipfd.count; j++) close(server.ipfd.fd[j]);
 4271     for (j = 0; j < server.tlsfd.count; j++) close(server.tlsfd.fd[j]);
 4272     if (server.sofd != -1) close(server.sofd);
 4273     if (server.cluster_enabled)
 4274         for (j = 0; j < server.cfd.count; j++) close(server.cfd.fd[j]);
 4275     if (unlink_unix_socket && server.unixsocket) {
 4276         serverLog(LL_NOTICE,"Removing the unix socket file.");
 4277         unlink(server.unixsocket); /* don't care if this fails */
 4278     }
 4279 }
 4280 
 4281 int prepareForShutdown(int flags) {
 4282     /* When SHUTDOWN is called while the server is loading a dataset in
 4283      * memory we need to make sure no attempt is performed to save
 4284      * the dataset on shutdown (otherwise it could overwrite the current DB
 4285      * with half-read data).
 4286      *
 4287      * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
 4288     if (server.loading || server.sentinel_mode)
 4289         flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
 4290 
 4291     int save = flags & SHUTDOWN_SAVE;
 4292     int nosave = flags & SHUTDOWN_NOSAVE;
 4293 
 4294     serverLog(LL_WARNING,"User requested shutdown...");
 4295     if (server.supervised_mode == SUPERVISED_SYSTEMD)
 4296         redisCommunicateSystemd("STOPPING=1\n");
 4297 
 4298     /* Kill all the Lua debugger forked sessions. */
 4299     ldbKillForkedSessions();
 4300 
 4301     /* Kill the saving child if there is a background saving in progress.
 4302        We want to avoid race conditions, for instance our saving child may
 4303        overwrite the synchronous saving did by SHUTDOWN. */
 4304     if (server.child_type == CHILD_TYPE_RDB) {
 4305         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
 4306         killRDBChild();
 4307         /* Note that, in killRDBChild normally has backgroundSaveDoneHandler
 4308          * doing it's cleanup, but in this case this code will not be reached,
 4309          * so we need to call rdbRemoveTempFile which will close fd(in order
 4310          * to unlink file actully) in background thread.
 4311          * The temp rdb file fd may won't be closed when redis exits quickly,
 4312          * but OS will close this fd when process exits. */
 4313         rdbRemoveTempFile(server.child_pid, 0);
 4314     }
 4315 
 4316     /* Kill module child if there is one. */
 4317     if (server.child_type == CHILD_TYPE_MODULE) {
 4318         serverLog(LL_WARNING,"There is a module fork child. Killing it!");
 4319         TerminateModuleForkChild(server.child_pid,0);
 4320     }
 4321 
 4322     if (server.aof_state != AOF_OFF) {
 4323         /* Kill the AOF saving child as the AOF we already have may be longer
 4324          * but contains the full dataset anyway. */
 4325         if (server.child_type == CHILD_TYPE_AOF) {
 4326             /* If we have AOF enabled but haven't written the AOF yet, don't
 4327              * shutdown or else the dataset will be lost. */
 4328             if (server.aof_state == AOF_WAIT_REWRITE) {
 4329                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
 4330                 return C_ERR;
 4331             }
 4332             serverLog(LL_WARNING,
 4333                 "There is a child rewriting the AOF. Killing it!");
 4334             killAppendOnlyChild();
 4335         }
 4336         /* Append only file: flush buffers and fsync() the AOF at exit */
 4337         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
 4338         flushAppendOnlyFile(1);
 4339         if (redis_fsync(server.aof_fd) == -1) {
 4340             serverLog(LL_WARNING,"Fail to fsync the AOF file: %s.",
 4341                                  strerror(errno));
 4342         }
 4343     }
 4344 
 4345     /* Create a new RDB file before exiting. */
 4346     if ((server.saveparamslen > 0 && !nosave) || save) {
 4347         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
 4348         if (server.supervised_mode == SUPERVISED_SYSTEMD)
 4349             redisCommunicateSystemd("STATUS=Saving the final RDB snapshot\n");
 4350         /* Snapshotting. Perform a SYNC SAVE and exit */
 4351         rdbSaveInfo rsi, *rsiptr;
 4352         rsiptr = rdbPopulateSaveInfo(&rsi);
 4353         if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
 4354             /* Ooops.. error saving! The best we can do is to continue
 4355              * operating. Note that if there was a background saving process,
 4356              * in the next cron() Redis will be notified that the background
 4357              * saving aborted, handling special stuff like slaves pending for
 4358              * synchronization... */
 4359             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
 4360             if (server.supervised_mode == SUPERVISED_SYSTEMD)
 4361                 redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
 4362             return C_ERR;
 4363         }
 4364     }
 4365 
 4366     /* Fire the shutdown modules event. */
 4367     moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
 4368 
 4369     /* Remove the pid file if possible and needed. */
 4370     if (server.daemonize || server.pidfile) {
 4371         serverLog(LL_NOTICE,"Removing the pid file.");
 4372         unlink(server.pidfile);
 4373     }
 4374 
 4375     /* Best effort flush of slave output buffers, so that we hopefully
 4376      * send them pending writes. */
 4377     flushSlavesOutputBuffers();
 4378 
 4379     /* Close the listening sockets. Apparently this allows faster restarts. */
 4380     closeListeningSockets(1);
 4381     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
 4382         server.sentinel_mode ? "Sentinel" : "Redis");
 4383     return C_OK;
 4384 }
 4385 
 4386 /*================================== Commands =============================== */
 4387 
 4388 /* Sometimes Redis cannot accept write commands because there is a persistence
 4389  * error with the RDB or AOF file, and Redis is configured in order to stop
 4390  * accepting writes in such situation. This function returns if such a
 4391  * condition is active, and the type of the condition.
 4392  *
 4393  * Function return values:
 4394  *
 4395  * DISK_ERROR_TYPE_NONE:    No problems, we can accept writes.
 4396  * DISK_ERROR_TYPE_AOF:     Don't accept writes: AOF errors.
 4397  * DISK_ERROR_TYPE_RDB:     Don't accept writes: RDB errors.
 4398  */
 4399 int writeCommandsDeniedByDiskError(void) {
 4400     if (server.stop_writes_on_bgsave_err &&
 4401         server.saveparamslen > 0 &&
 4402         server.lastbgsave_status == C_ERR)
 4403     {
 4404         return DISK_ERROR_TYPE_RDB;
 4405     } else if (server.aof_state != AOF_OFF) {
 4406         if (server.aof_last_write_status == C_ERR) {
 4407             return DISK_ERROR_TYPE_AOF;
 4408         }
 4409         /* AOF fsync error. */
 4410         int aof_bio_fsync_status;
 4411         atomicGet(server.aof_bio_fsync_status,aof_bio_fsync_status);
 4412         if (aof_bio_fsync_status == C_ERR) {
 4413             atomicGet(server.aof_bio_fsync_errno,server.aof_last_write_errno);
 4414             return DISK_ERROR_TYPE_AOF;
 4415         }
 4416     }
 4417 
 4418     return DISK_ERROR_TYPE_NONE;
 4419 }
 4420 
 4421 /* The PING command. It works in a different way if the client is in
 4422  * in Pub/Sub mode. */
 4423 void pingCommand(client *c) {
 4424     /* The command takes zero or one arguments. */
 4425     if (c->argc > 2) {
 4426         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
 4427             c->cmd->name);
 4428         return;
 4429     }
 4430 
 4431     if (c->flags & CLIENT_PUBSUB && c->resp == 2) {
 4432         addReply(c,shared.mbulkhdr[2]);
 4433         addReplyBulkCBuffer(c,"pong",4);
 4434         if (c->argc == 1)
 4435             addReplyBulkCBuffer(c,"",0);
 4436         else
 4437             addReplyBulk(c,c->argv[1]);
 4438     } else {
 4439         if (c->argc == 1)
 4440             addReply(c,shared.pong);
 4441         else
 4442             addReplyBulk(c,c->argv[1]);
 4443     }
 4444 }
 4445 
 4446 void echoCommand(client *c) {
 4447     addReplyBulk(c,c->argv[1]);
 4448 }
 4449 
 4450 void timeCommand(client *c) {
 4451     struct timeval tv;
 4452 
 4453     /* gettimeofday() can only fail if &tv is a bad address so we
 4454      * don't check for errors. */
 4455     gettimeofday(&tv,NULL);
 4456     addReplyArrayLen(c,2);
 4457     addReplyBulkLongLong(c,tv.tv_sec);
 4458     addReplyBulkLongLong(c,tv.tv_usec);
 4459 }
 4460 
 4461 /* Helper function for addReplyCommand() to output flags. */
 4462 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
 4463     if (cmd->flags & f) {
 4464         addReplyStatus(c, reply);
 4465         return 1;
 4466     }
 4467     return 0;
 4468 }
 4469 
 4470 /* Output the representation of a Redis command. Used by the COMMAND command. */
 4471 void addReplyCommand(client *c, struct redisCommand *cmd) {
 4472     if (!cmd) {
 4473         addReplyNull(c);
 4474     } else {
 4475         /* We are adding: command name, arg count, flags, first, last, offset, categories */
 4476         addReplyArrayLen(c, 7);
 4477         addReplyBulkCString(c, cmd->name);
 4478         addReplyLongLong(c, cmd->arity);
 4479 
 4480         int flagcount = 0;
 4481         void *flaglen = addReplyDeferredLen(c);
 4482         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
 4483         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
 4484         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
 4485         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
 4486         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
 4487         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
 4488         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
 4489         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
 4490         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
 4491         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
 4492         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
 4493         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog");
 4494         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
 4495         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
 4496         flagcount += addReplyCommandFlag(c,cmd,CMD_NO_AUTH, "no_auth");
 4497         flagcount += addReplyCommandFlag(c,cmd,CMD_MAY_REPLICATE, "may_replicate");
 4498         if (cmdHasMovableKeys(cmd)) {
 4499             addReplyStatus(c, "movablekeys");
 4500             flagcount += 1;
 4501         }
 4502         setDeferredSetLen(c, flaglen, flagcount);
 4503 
 4504         addReplyLongLong(c, cmd->firstkey);
 4505         addReplyLongLong(c, cmd->lastkey);
 4506         addReplyLongLong(c, cmd->keystep);
 4507 
 4508         addReplyCommandCategories(c,cmd);
 4509     }
 4510 }
 4511 
 4512 /* COMMAND <subcommand> <args> */
 4513 void commandCommand(client *c) {
 4514     dictIterator *di;
 4515     dictEntry *de;
 4516 
 4517     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
 4518         const char *help[] = {
 4519 "(no subcommand)",
 4520 "    Return details about all Redis commands.",
 4521 "COUNT",
 4522 "    Return the total number of commands in this Redis server.",
 4523 "GETKEYS <full-command>",
 4524 "    Return the keys from a full Redis command.",
 4525 "INFO [<command-name> ...]",
 4526 "    Return details about multiple Redis commands.",
 4527 NULL
 4528         };
 4529         addReplyHelp(c, help);
 4530     } else if (c->argc == 1) {
 4531         addReplyArrayLen(c, dictSize(server.commands));
 4532         di = dictGetIterator(server.commands);
 4533         while ((de = dictNext(di)) != NULL) {
 4534             addReplyCommand(c, dictGetVal(de));
 4535         }
 4536         dictReleaseIterator(di);
 4537     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
 4538         int i;
 4539         addReplyArrayLen(c, c->argc-2);
 4540         for (i = 2; i < c->argc; i++) {
 4541             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
 4542         }
 4543     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
 4544         addReplyLongLong(c, dictSize(server.commands));
 4545     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
 4546         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
 4547         getKeysResult result = GETKEYS_RESULT_INIT;
 4548         int j;
 4549 
 4550         if (!cmd) {
 4551             addReplyError(c,"Invalid command specified");
 4552             return;
 4553         } else if (cmd->getkeys_proc == NULL && cmd->firstkey == 0) {
 4554             addReplyError(c,"The command has no key arguments");
 4555             return;
 4556         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
 4557                    ((c->argc-2) < -cmd->arity))
 4558         {
 4559             addReplyError(c,"Invalid number of arguments specified for command");
 4560             return;
 4561         }
 4562 
 4563         if (!getKeysFromCommand(cmd,c->argv+2,c->argc-2,&result)) {
 4564             addReplyError(c,"Invalid arguments specified for command");
 4565         } else {
 4566             addReplyArrayLen(c,result.numkeys);
 4567             for (j = 0; j < result.numkeys; j++) addReplyBulk(c,c->argv[result.keys[j]+2]);
 4568         }
 4569         getKeysFreeResult(&result);
 4570     } else {
 4571         addReplySubcommandSyntaxError(c);
 4572     }
 4573 }
 4574 
 4575 /* Convert an amount of bytes into a human readable string in the form
 4576  * of 100B, 2G, 100M, 4K, and so forth. */
 4577 void bytesToHuman(char *s, unsigned long long n) {
 4578     double d;
 4579 
 4580     if (n < 1024) {
 4581         /* Bytes */
 4582         sprintf(s,"%lluB",n);
 4583     } else if (n < (1024*1024)) {
 4584         d = (double)n/(1024);
 4585         sprintf(s,"%.2fK",d);
 4586     } else if (n < (1024LL*1024*1024)) {
 4587         d = (double)n/(1024*1024);
 4588         sprintf(s,"%.2fM",d);
 4589     } else if (n < (1024LL*1024*1024*1024)) {
 4590         d = (double)n/(1024LL*1024*1024);
 4591         sprintf(s,"%.2fG",d);
 4592     } else if (n < (1024LL*1024*1024*1024*1024)) {
 4593         d = (double)n/(1024LL*1024*1024*1024);
 4594         sprintf(s,"%.2fT",d);
 4595     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
 4596         d = (double)n/(1024LL*1024*1024*1024*1024);
 4597         sprintf(s,"%.2fP",d);
 4598     } else {
 4599         /* Let's hope we never need this */
 4600         sprintf(s,"%lluB",n);
 4601     }
 4602 }
 4603 
 4604 /* Characters we sanitize on INFO output to maintain expected format. */
 4605 static char unsafe_info_chars[] = "#:\n\r";
 4606 static char unsafe_info_chars_substs[] = "____";   /* Must be same length as above */
 4607 
 4608 /* Returns a sanitized version of s that contains no unsafe info string chars.
 4609  * If no unsafe characters are found, simply returns s. Caller needs to
 4610  * free tmp if it is non-null on return.
 4611  */
 4612 const char *getSafeInfoString(const char *s, size_t len, char **tmp) {
 4613     *tmp = NULL;
 4614     if (mempbrk(s, len, unsafe_info_chars,sizeof(unsafe_info_chars)-1)
 4615         == NULL) return s;
 4616     char *new = *tmp = zmalloc(len + 1);
 4617     memcpy(new, s, len);
 4618     new[len] = '\0';
 4619     return memmapchars(new, len, unsafe_info_chars, unsafe_info_chars_substs,
 4620                        sizeof(unsafe_info_chars)-1);
 4621 }
 4622 
 4623 /* Create the string returned by the INFO command. This is decoupled
 4624  * by the INFO command itself as we need to report the same information
 4625  * on memory corruption problems. */
 4626 sds genRedisInfoString(const char *section) {
 4627     sds info = sdsempty();
 4628     time_t uptime = server.unixtime-server.stat_starttime;
 4629     int j;
 4630     int allsections = 0, defsections = 0, everything = 0, modules = 0;
 4631     int sections = 0;
 4632 
 4633     if (section == NULL) section = "default";
 4634     allsections = strcasecmp(section,"all") == 0;
 4635     defsections = strcasecmp(section,"default") == 0;
 4636     everything = strcasecmp(section,"everything") == 0;
 4637     modules = strcasecmp(section,"modules") == 0;
 4638     if (everything) allsections = 1;
 4639 
 4640     /* Server */
 4641     if (allsections || defsections || !strcasecmp(section,"server")) {
 4642         static int call_uname = 1;
 4643         static struct utsname name;
 4644         char *mode;
 4645         char *supervised;
 4646 
 4647         if (server.cluster_enabled) mode = "cluster";
 4648         else if (server.sentinel_mode) mode = "sentinel";
 4649         else mode = "standalone";
 4650 
 4651         if (server.supervised) {
 4652             if (server.supervised_mode == SUPERVISED_UPSTART) supervised = "upstart";
 4653             else if (server.supervised_mode == SUPERVISED_SYSTEMD) supervised = "systemd";
 4654             else supervised = "unknown";
 4655         } else {
 4656             supervised = "no";
 4657         }
 4658 
 4659         if (sections++) info = sdscat(info,"\r\n");
 4660 
 4661         if (call_uname) {
 4662             /* Uname can be slow and is always the same output. Cache it. */
 4663             uname(&name);
 4664             call_uname = 0;
 4665         }
 4666 
 4667         unsigned int lruclock;
 4668         atomicGet(server.lruclock,lruclock);
 4669         info = sdscatfmt(info,
 4670             "# Server\r\n"
 4671             "redis_version:%s\r\n"
 4672             "redis_git_sha1:%s\r\n"
 4673             "redis_git_dirty:%i\r\n"
 4674             "redis_build_id:%s\r\n"
 4675             "redis_mode:%s\r\n"
 4676             "os:%s %s %s\r\n"
 4677             "arch_bits:%i\r\n"
 4678             "multiplexing_api:%s\r\n"
 4679             "atomicvar_api:%s\r\n"
 4680             "gcc_version:%i.%i.%i\r\n"
 4681             "process_id:%I\r\n"
 4682             "process_supervised:%s\r\n"
 4683             "run_id:%s\r\n"
 4684             "tcp_port:%i\r\n"
 4685             "server_time_usec:%I\r\n"
 4686             "uptime_in_seconds:%I\r\n"
 4687             "uptime_in_days:%I\r\n"
 4688             "hz:%i\r\n"
 4689             "configured_hz:%i\r\n"
 4690             "lru_clock:%u\r\n"
 4691             "executable:%s\r\n"
 4692             "config_file:%s\r\n"
 4693             "io_threads_active:%i\r\n",
 4694             REDIS_VERSION,
 4695             redisGitSHA1(),
 4696             strtol(redisGitDirty(),NULL,10) > 0,
 4697             redisBuildIdString(),
 4698             mode,
 4699             name.sysname, name.release, name.machine,
 4700             server.arch_bits,
 4701             aeGetApiName(),
 4702             REDIS_ATOMIC_API,
 4703 #ifdef __GNUC__
 4704             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
 4705 #else
 4706             0,0,0,
 4707 #endif
 4708             (int64_t) getpid(),
 4709             supervised,
 4710             server.runid,
 4711             server.port ? server.port : server.tls_port,
 4712             (int64_t)server.ustime,
 4713             (int64_t)uptime,
 4714             (int64_t)(uptime/(3600*24)),
 4715             server.hz,
 4716             server.config_hz,
 4717             lruclock,
 4718             server.executable ? server.executable : "",
 4719             server.configfile ? server.configfile : "",
 4720             server.io_threads_active);
 4721     }
 4722 
 4723     /* Clients */
 4724     if (allsections || defsections || !strcasecmp(section,"clients")) {
 4725         size_t maxin, maxout;
 4726         getExpansiveClientsInfo(&maxin,&maxout);
 4727         if (sections++) info = sdscat(info,"\r\n");
 4728         info = sdscatprintf(info,
 4729             "# Clients\r\n"
 4730             "connected_clients:%lu\r\n"
 4731             "cluster_connections:%lu\r\n"
 4732             "maxclients:%u\r\n"
 4733             "client_recent_max_input_buffer:%zu\r\n"
 4734             "client_recent_max_output_buffer:%zu\r\n"
 4735             "blocked_clients:%d\r\n"
 4736             "tracking_clients:%d\r\n"
 4737             "clients_in_timeout_table:%llu\r\n",
 4738             listLength(server.clients)-listLength(server.slaves),
 4739             getClusterConnectionsCount(),
 4740             server.maxclients,
 4741             maxin, maxout,
 4742             server.blocked_clients