"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.0.8/src/server.c" (10 Sep 2020, 197120 Bytes) of package /linux/misc/redis-6.0.8.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "server.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.0.7_vs_6.0.8.

    1 /*
    2  * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com>
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions are met:
    7  *
    8  *   * Redistributions of source code must retain the above copyright notice,
    9  *     this list of conditions and the following disclaimer.
   10  *   * Redistributions in binary form must reproduce the above copyright
   11  *     notice, this list of conditions and the following disclaimer in the
   12  *     documentation and/or other materials provided with the distribution.
   13  *   * Neither the name of Redis nor the names of its contributors may be used
   14  *     to endorse or promote products derived from this software without
   15  *     specific prior written permission.
   16  *
   17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   27  * POSSIBILITY OF SUCH DAMAGE.
   28  */
   29 
   30 #include "server.h"
   31 #include "cluster.h"
   32 #include "slowlog.h"
   33 #include "bio.h"
   34 #include "latency.h"
   35 #include "atomicvar.h"
   36 
   37 #include <time.h>
   38 #include <signal.h>
   39 #include <sys/wait.h>
   40 #include <errno.h>
   41 #include <assert.h>
   42 #include <ctype.h>
   43 #include <stdarg.h>
   44 #include <arpa/inet.h>
   45 #include <sys/stat.h>
   46 #include <fcntl.h>
   47 #include <sys/time.h>
   48 #include <sys/resource.h>
   49 #include <sys/uio.h>
   50 #include <sys/un.h>
   51 #include <limits.h>
   52 #include <float.h>
   53 #include <math.h>
   54 #include <sys/resource.h>
   55 #include <sys/utsname.h>
   56 #include <locale.h>
   57 #include <sys/socket.h>
   58 
   59 /* Our shared "common" objects */
   60 
   61 struct sharedObjectsStruct shared;
   62 
   63 /* Global vars that are actually used as constants. The following double
   64  * values are used for double on-disk serialization, and are initialized
   65  * at runtime to avoid strange compiler optimizations. */
   66 
   67 double R_Zero, R_PosInf, R_NegInf, R_Nan;
   68 
   69 /*================================= Globals ================================= */
   70 
   71 /* Global vars */
   72 struct redisServer server; /* Server global state */
   73 volatile unsigned long lru_clock; /* Server global current LRU time. */
   74 
   75 /* Our command table.
   76  *
   77  * Every entry is composed of the following fields:
   78  *
   79  * name:        A string representing the command name.
   80  *
   81  * function:    Pointer to the C function implementing the command.
   82  *
   83  * arity:       Number of arguments, it is possible to use -N to say >= N
   84  *
   85  * sflags:      Command flags as string. See below for a table of flags.
   86  *
   87  * flags:       Flags as bitmask. Computed by Redis using the 'sflags' field.
   88  *
   89  * get_keys_proc: An optional function to get key arguments from a command.
   90  *                This is only used when the following three fields are not
   91  *                enough to specify what arguments are keys.
   92  *
   93  * first_key_index: First argument that is a key
   94  *
   95  * last_key_index: Last argument that is a key
   96  *
   97  * key_step:    Step to get all the keys from first to last argument.
   98  *              For instance in MSET the step is two since arguments
   99  *              are key,val,key,val,...
  100  *
  101  * microseconds: Microseconds of total execution time for this command.
  102  *
  103  * calls:       Total number of calls of this command.
  104  *
  105  * id:          Command bit identifier for ACLs or other goals.
  106  *
  107  * The flags, microseconds and calls fields are computed by Redis and should
  108  * always be set to zero.
  109  *
  110  * Command flags are expressed using space separated strings, that are turned
  111  * into actual flags by the populateCommandTable() function.
  112  *
  113  * This is the meaning of the flags:
  114  *
  115  * write:       Write command (may modify the key space).
  116  *
  117  * read-only:   All the non special commands just reading from keys without
  118  *              changing the content, or returning other informations like
  119  *              the TIME command. Special commands such administrative commands
  120  *              or transaction related commands (multi, exec, discard, ...)
  121  *              are not flagged as read-only commands, since they affect the
  122  *              server or the connection in other ways.
  123  *
  124  * use-memory:  May increase memory usage once called. Don't allow if out
  125  *              of memory.
  126  *
  127  * admin:       Administrative command, like SAVE or SHUTDOWN.
  128  *
  129  * pub-sub:     Pub/Sub related command.
  130  *
  131  * no-script:   Command not allowed in scripts.
  132  *
  133  * random:      Random command. Command is not deterministic, that is, the same
  134  *              command with the same arguments, with the same key space, may
  135  *              have different results. For instance SPOP and RANDOMKEY are
  136  *              two random commands.
  137  *
  138  * to-sort:     Sort command output array if called from script, so that the
  139  *              output is deterministic. When this flag is used (not always
  140  *              possible), then the "random" flag is not needed.
  141  *
  142  * ok-loading:  Allow the command while loading the database.
  143  *
  144  * ok-stale:    Allow the command while a slave has stale data but is not
  145  *              allowed to serve this data. Normally no command is accepted
  146  *              in this condition but just a few.
  147  *
  148  * no-monitor:  Do not automatically propagate the command on MONITOR.
  149  *
  150  * no-slowlog:  Do not automatically propagate the command to the slowlog.
  151  *
  152  * cluster-asking: Perform an implicit ASKING for this command, so the
  153  *              command will be accepted in cluster mode if the slot is marked
  154  *              as 'importing'.
  155  *
  156  * fast:        Fast command: O(1) or O(log(N)) command that should never
  157  *              delay its execution as long as the kernel scheduler is giving
  158  *              us time. Note that commands that may trigger a DEL as a side
  159  *              effect (like SET) are not fast commands.
  160  *
  161  * The following additional flags are only used in order to put commands
  162  * in a specific ACL category. Commands can have multiple ACL categories.
  163  *
  164  * @keyspace, @read, @write, @set, @sortedset, @list, @hash, @string, @bitmap,
  165  * @hyperloglog, @stream, @admin, @fast, @slow, @pubsub, @blocking, @dangerous,
  166  * @connection, @transaction, @scripting, @geo.
  167  *
  168  * Note that:
  169  *
  170  * 1) The read-only flag implies the @read ACL category.
  171  * 2) The write flag implies the @write ACL category.
  172  * 3) The fast flag implies the @fast ACL category.
  173  * 4) The admin flag implies the @admin and @dangerous ACL category.
  174  * 5) The pub-sub flag implies the @pubsub ACL category.
  175  * 6) The lack of fast flag implies the @slow ACL category.
  176  * 7) The non obvious "keyspace" category includes the commands
  177  *    that interact with keys without having anything to do with
  178  *    specific data structures, such as: DEL, RENAME, MOVE, SELECT,
  179  *    TYPE, EXPIRE*, PEXPIRE*, TTL, PTTL, ...
  180  */
  181 
  182 struct redisCommand redisCommandTable[] = {
  183     {"module",moduleCommand,-2,
  184      "admin no-script",
  185      0,NULL,0,0,0,0,0,0},
  186 
  187     {"get",getCommand,2,
  188      "read-only fast @string",
  189      0,NULL,1,1,1,0,0,0},
  190 
  191     /* Note that we can't flag set as fast, since it may perform an
  192      * implicit DEL of a large key. */
  193     {"set",setCommand,-3,
  194      "write use-memory @string",
  195      0,NULL,1,1,1,0,0,0},
  196 
  197     {"setnx",setnxCommand,3,
  198      "write use-memory fast @string",
  199      0,NULL,1,1,1,0,0,0},
  200 
  201     {"setex",setexCommand,4,
  202      "write use-memory @string",
  203      0,NULL,1,1,1,0,0,0},
  204 
  205     {"psetex",psetexCommand,4,
  206      "write use-memory @string",
  207      0,NULL,1,1,1,0,0,0},
  208 
  209     {"append",appendCommand,3,
  210      "write use-memory fast @string",
  211      0,NULL,1,1,1,0,0,0},
  212 
  213     {"strlen",strlenCommand,2,
  214      "read-only fast @string",
  215      0,NULL,1,1,1,0,0,0},
  216 
  217     {"del",delCommand,-2,
  218      "write @keyspace",
  219      0,NULL,1,-1,1,0,0,0},
  220 
  221     {"unlink",unlinkCommand,-2,
  222      "write fast @keyspace",
  223      0,NULL,1,-1,1,0,0,0},
  224 
  225     {"exists",existsCommand,-2,
  226      "read-only fast @keyspace",
  227      0,NULL,1,-1,1,0,0,0},
  228 
  229     {"setbit",setbitCommand,4,
  230      "write use-memory @bitmap",
  231      0,NULL,1,1,1,0,0,0},
  232 
  233     {"getbit",getbitCommand,3,
  234      "read-only fast @bitmap",
  235      0,NULL,1,1,1,0,0,0},
  236 
  237     {"bitfield",bitfieldCommand,-2,
  238      "write use-memory @bitmap",
  239      0,NULL,1,1,1,0,0,0},
  240 
  241     {"bitfield_ro",bitfieldroCommand,-2,
  242      "read-only fast @bitmap",
  243      0,NULL,1,1,1,0,0,0},
  244 
  245     {"setrange",setrangeCommand,4,
  246      "write use-memory @string",
  247      0,NULL,1,1,1,0,0,0},
  248 
  249     {"getrange",getrangeCommand,4,
  250      "read-only @string",
  251      0,NULL,1,1,1,0,0,0},
  252 
  253     {"substr",getrangeCommand,4,
  254      "read-only @string",
  255      0,NULL,1,1,1,0,0,0},
  256 
  257     {"incr",incrCommand,2,
  258      "write use-memory fast @string",
  259      0,NULL,1,1,1,0,0,0},
  260 
  261     {"decr",decrCommand,2,
  262      "write use-memory fast @string",
  263      0,NULL,1,1,1,0,0,0},
  264 
  265     {"mget",mgetCommand,-2,
  266      "read-only fast @string",
  267      0,NULL,1,-1,1,0,0,0},
  268 
  269     {"rpush",rpushCommand,-3,
  270      "write use-memory fast @list",
  271      0,NULL,1,1,1,0,0,0},
  272 
  273     {"lpush",lpushCommand,-3,
  274      "write use-memory fast @list",
  275      0,NULL,1,1,1,0,0,0},
  276 
  277     {"rpushx",rpushxCommand,-3,
  278      "write use-memory fast @list",
  279      0,NULL,1,1,1,0,0,0},
  280 
  281     {"lpushx",lpushxCommand,-3,
  282      "write use-memory fast @list",
  283      0,NULL,1,1,1,0,0,0},
  284 
  285     {"linsert",linsertCommand,5,
  286      "write use-memory @list",
  287      0,NULL,1,1,1,0,0,0},
  288 
  289     {"rpop",rpopCommand,2,
  290      "write fast @list",
  291      0,NULL,1,1,1,0,0,0},
  292 
  293     {"lpop",lpopCommand,2,
  294      "write fast @list",
  295      0,NULL,1,1,1,0,0,0},
  296 
  297     {"brpop",brpopCommand,-3,
  298      "write no-script @list @blocking",
  299      0,NULL,1,-2,1,0,0,0},
  300 
  301     {"brpoplpush",brpoplpushCommand,4,
  302      "write use-memory no-script @list @blocking",
  303      0,NULL,1,2,1,0,0,0},
  304 
  305     {"blpop",blpopCommand,-3,
  306      "write no-script @list @blocking",
  307      0,NULL,1,-2,1,0,0,0},
  308 
  309     {"llen",llenCommand,2,
  310      "read-only fast @list",
  311      0,NULL,1,1,1,0,0,0},
  312 
  313     {"lindex",lindexCommand,3,
  314      "read-only @list",
  315      0,NULL,1,1,1,0,0,0},
  316 
  317     {"lset",lsetCommand,4,
  318      "write use-memory @list",
  319      0,NULL,1,1,1,0,0,0},
  320 
  321     {"lrange",lrangeCommand,4,
  322      "read-only @list",
  323      0,NULL,1,1,1,0,0,0},
  324 
  325     {"ltrim",ltrimCommand,4,
  326      "write @list",
  327      0,NULL,1,1,1,0,0,0},
  328 
  329     {"lpos",lposCommand,-3,
  330      "read-only @list",
  331      0,NULL,1,1,1,0,0,0},
  332 
  333     {"lrem",lremCommand,4,
  334      "write @list",
  335      0,NULL,1,1,1,0,0,0},
  336 
  337     {"rpoplpush",rpoplpushCommand,3,
  338      "write use-memory @list",
  339      0,NULL,1,2,1,0,0,0},
  340 
  341     {"sadd",saddCommand,-3,
  342      "write use-memory fast @set",
  343      0,NULL,1,1,1,0,0,0},
  344 
  345     {"srem",sremCommand,-3,
  346      "write fast @set",
  347      0,NULL,1,1,1,0,0,0},
  348 
  349     {"smove",smoveCommand,4,
  350      "write fast @set",
  351      0,NULL,1,2,1,0,0,0},
  352 
  353     {"sismember",sismemberCommand,3,
  354      "read-only fast @set",
  355      0,NULL,1,1,1,0,0,0},
  356 
  357     {"scard",scardCommand,2,
  358      "read-only fast @set",
  359      0,NULL,1,1,1,0,0,0},
  360 
  361     {"spop",spopCommand,-2,
  362      "write random fast @set",
  363      0,NULL,1,1,1,0,0,0},
  364 
  365     {"srandmember",srandmemberCommand,-2,
  366      "read-only random @set",
  367      0,NULL,1,1,1,0,0,0},
  368 
  369     {"sinter",sinterCommand,-2,
  370      "read-only to-sort @set",
  371      0,NULL,1,-1,1,0,0,0},
  372 
  373     {"sinterstore",sinterstoreCommand,-3,
  374      "write use-memory @set",
  375      0,NULL,1,-1,1,0,0,0},
  376 
  377     {"sunion",sunionCommand,-2,
  378      "read-only to-sort @set",
  379      0,NULL,1,-1,1,0,0,0},
  380 
  381     {"sunionstore",sunionstoreCommand,-3,
  382      "write use-memory @set",
  383      0,NULL,1,-1,1,0,0,0},
  384 
  385     {"sdiff",sdiffCommand,-2,
  386      "read-only to-sort @set",
  387      0,NULL,1,-1,1,0,0,0},
  388 
  389     {"sdiffstore",sdiffstoreCommand,-3,
  390      "write use-memory @set",
  391      0,NULL,1,-1,1,0,0,0},
  392 
  393     {"smembers",sinterCommand,2,
  394      "read-only to-sort @set",
  395      0,NULL,1,1,1,0,0,0},
  396 
  397     {"sscan",sscanCommand,-3,
  398      "read-only random @set",
  399      0,NULL,1,1,1,0,0,0},
  400 
  401     {"zadd",zaddCommand,-4,
  402      "write use-memory fast @sortedset",
  403      0,NULL,1,1,1,0,0,0},
  404 
  405     {"zincrby",zincrbyCommand,4,
  406      "write use-memory fast @sortedset",
  407      0,NULL,1,1,1,0,0,0},
  408 
  409     {"zrem",zremCommand,-3,
  410      "write fast @sortedset",
  411      0,NULL,1,1,1,0,0,0},
  412 
  413     {"zremrangebyscore",zremrangebyscoreCommand,4,
  414      "write @sortedset",
  415      0,NULL,1,1,1,0,0,0},
  416 
  417     {"zremrangebyrank",zremrangebyrankCommand,4,
  418      "write @sortedset",
  419      0,NULL,1,1,1,0,0,0},
  420 
  421     {"zremrangebylex",zremrangebylexCommand,4,
  422      "write @sortedset",
  423      0,NULL,1,1,1,0,0,0},
  424 
  425     {"zunionstore",zunionstoreCommand,-4,
  426      "write use-memory @sortedset",
  427      0,zunionInterGetKeys,0,0,0,0,0,0},
  428 
  429     {"zinterstore",zinterstoreCommand,-4,
  430      "write use-memory @sortedset",
  431      0,zunionInterGetKeys,0,0,0,0,0,0},
  432 
  433     {"zrange",zrangeCommand,-4,
  434      "read-only @sortedset",
  435      0,NULL,1,1,1,0,0,0},
  436 
  437     {"zrangebyscore",zrangebyscoreCommand,-4,
  438      "read-only @sortedset",
  439      0,NULL,1,1,1,0,0,0},
  440 
  441     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,
  442      "read-only @sortedset",
  443      0,NULL,1,1,1,0,0,0},
  444 
  445     {"zrangebylex",zrangebylexCommand,-4,
  446      "read-only @sortedset",
  447      0,NULL,1,1,1,0,0,0},
  448 
  449     {"zrevrangebylex",zrevrangebylexCommand,-4,
  450      "read-only @sortedset",
  451      0,NULL,1,1,1,0,0,0},
  452 
  453     {"zcount",zcountCommand,4,
  454      "read-only fast @sortedset",
  455      0,NULL,1,1,1,0,0,0},
  456 
  457     {"zlexcount",zlexcountCommand,4,
  458      "read-only fast @sortedset",
  459      0,NULL,1,1,1,0,0,0},
  460 
  461     {"zrevrange",zrevrangeCommand,-4,
  462      "read-only @sortedset",
  463      0,NULL,1,1,1,0,0,0},
  464 
  465     {"zcard",zcardCommand,2,
  466      "read-only fast @sortedset",
  467      0,NULL,1,1,1,0,0,0},
  468 
  469     {"zscore",zscoreCommand,3,
  470      "read-only fast @sortedset",
  471      0,NULL,1,1,1,0,0,0},
  472 
  473     {"zrank",zrankCommand,3,
  474      "read-only fast @sortedset",
  475      0,NULL,1,1,1,0,0,0},
  476 
  477     {"zrevrank",zrevrankCommand,3,
  478      "read-only fast @sortedset",
  479      0,NULL,1,1,1,0,0,0},
  480 
  481     {"zscan",zscanCommand,-3,
  482      "read-only random @sortedset",
  483      0,NULL,1,1,1,0,0,0},
  484 
  485     {"zpopmin",zpopminCommand,-2,
  486      "write fast @sortedset",
  487      0,NULL,1,1,1,0,0,0},
  488 
  489     {"zpopmax",zpopmaxCommand,-2,
  490      "write fast @sortedset",
  491      0,NULL,1,1,1,0,0,0},
  492 
  493     {"bzpopmin",bzpopminCommand,-3,
  494      "write no-script fast @sortedset @blocking",
  495      0,NULL,1,-2,1,0,0,0},
  496 
  497     {"bzpopmax",bzpopmaxCommand,-3,
  498      "write no-script fast @sortedset @blocking",
  499      0,NULL,1,-2,1,0,0,0},
  500 
  501     {"hset",hsetCommand,-4,
  502      "write use-memory fast @hash",
  503      0,NULL,1,1,1,0,0,0},
  504 
  505     {"hsetnx",hsetnxCommand,4,
  506      "write use-memory fast @hash",
  507      0,NULL,1,1,1,0,0,0},
  508 
  509     {"hget",hgetCommand,3,
  510      "read-only fast @hash",
  511      0,NULL,1,1,1,0,0,0},
  512 
  513     {"hmset",hsetCommand,-4,
  514      "write use-memory fast @hash",
  515      0,NULL,1,1,1,0,0,0},
  516 
  517     {"hmget",hmgetCommand,-3,
  518      "read-only fast @hash",
  519      0,NULL,1,1,1,0,0,0},
  520 
  521     {"hincrby",hincrbyCommand,4,
  522      "write use-memory fast @hash",
  523      0,NULL,1,1,1,0,0,0},
  524 
  525     {"hincrbyfloat",hincrbyfloatCommand,4,
  526      "write use-memory fast @hash",
  527      0,NULL,1,1,1,0,0,0},
  528 
  529     {"hdel",hdelCommand,-3,
  530      "write fast @hash",
  531      0,NULL,1,1,1,0,0,0},
  532 
  533     {"hlen",hlenCommand,2,
  534      "read-only fast @hash",
  535      0,NULL,1,1,1,0,0,0},
  536 
  537     {"hstrlen",hstrlenCommand,3,
  538      "read-only fast @hash",
  539      0,NULL,1,1,1,0,0,0},
  540 
  541     {"hkeys",hkeysCommand,2,
  542      "read-only to-sort @hash",
  543      0,NULL,1,1,1,0,0,0},
  544 
  545     {"hvals",hvalsCommand,2,
  546      "read-only to-sort @hash",
  547      0,NULL,1,1,1,0,0,0},
  548 
  549     {"hgetall",hgetallCommand,2,
  550      "read-only random @hash",
  551      0,NULL,1,1,1,0,0,0},
  552 
  553     {"hexists",hexistsCommand,3,
  554      "read-only fast @hash",
  555      0,NULL,1,1,1,0,0,0},
  556 
  557     {"hscan",hscanCommand,-3,
  558      "read-only random @hash",
  559      0,NULL,1,1,1,0,0,0},
  560 
  561     {"incrby",incrbyCommand,3,
  562      "write use-memory fast @string",
  563      0,NULL,1,1,1,0,0,0},
  564 
  565     {"decrby",decrbyCommand,3,
  566      "write use-memory fast @string",
  567      0,NULL,1,1,1,0,0,0},
  568 
  569     {"incrbyfloat",incrbyfloatCommand,3,
  570      "write use-memory fast @string",
  571      0,NULL,1,1,1,0,0,0},
  572 
  573     {"getset",getsetCommand,3,
  574      "write use-memory fast @string",
  575      0,NULL,1,1,1,0,0,0},
  576 
  577     {"mset",msetCommand,-3,
  578      "write use-memory @string",
  579      0,NULL,1,-1,2,0,0,0},
  580 
  581     {"msetnx",msetnxCommand,-3,
  582      "write use-memory @string",
  583      0,NULL,1,-1,2,0,0,0},
  584 
  585     {"randomkey",randomkeyCommand,1,
  586      "read-only random @keyspace",
  587      0,NULL,0,0,0,0,0,0},
  588 
  589     {"select",selectCommand,2,
  590      "ok-loading fast ok-stale @keyspace",
  591      0,NULL,0,0,0,0,0,0},
  592 
  593     {"swapdb",swapdbCommand,3,
  594      "write fast @keyspace @dangerous",
  595      0,NULL,0,0,0,0,0,0},
  596 
  597     {"move",moveCommand,3,
  598      "write fast @keyspace",
  599      0,NULL,1,1,1,0,0,0},
  600 
  601     /* Like for SET, we can't mark rename as a fast command because
  602      * overwriting the target key may result in an implicit slow DEL. */
  603     {"rename",renameCommand,3,
  604      "write @keyspace",
  605      0,NULL,1,2,1,0,0,0},
  606 
  607     {"renamenx",renamenxCommand,3,
  608      "write fast @keyspace",
  609      0,NULL,1,2,1,0,0,0},
  610 
  611     {"expire",expireCommand,3,
  612      "write fast @keyspace",
  613      0,NULL,1,1,1,0,0,0},
  614 
  615     {"expireat",expireatCommand,3,
  616      "write fast @keyspace",
  617      0,NULL,1,1,1,0,0,0},
  618 
  619     {"pexpire",pexpireCommand,3,
  620      "write fast @keyspace",
  621      0,NULL,1,1,1,0,0,0},
  622 
  623     {"pexpireat",pexpireatCommand,3,
  624      "write fast @keyspace",
  625      0,NULL,1,1,1,0,0,0},
  626 
  627     {"keys",keysCommand,2,
  628      "read-only to-sort @keyspace @dangerous",
  629      0,NULL,0,0,0,0,0,0},
  630 
  631     {"scan",scanCommand,-2,
  632      "read-only random @keyspace",
  633      0,NULL,0,0,0,0,0,0},
  634 
  635     {"dbsize",dbsizeCommand,1,
  636      "read-only fast @keyspace",
  637      0,NULL,0,0,0,0,0,0},
  638 
  639     {"auth",authCommand,-2,
  640      "no-auth no-script ok-loading ok-stale fast no-monitor no-slowlog @connection",
  641      0,NULL,0,0,0,0,0,0},
  642 
  643     /* We don't allow PING during loading since in Redis PING is used as
  644      * failure detection, and a loading server is considered to be
  645      * not available. */
  646     {"ping",pingCommand,-1,
  647      "ok-stale fast @connection",
  648      0,NULL,0,0,0,0,0,0},
  649 
  650     {"echo",echoCommand,2,
  651      "read-only fast @connection",
  652      0,NULL,0,0,0,0,0,0},
  653 
  654     {"save",saveCommand,1,
  655      "admin no-script",
  656      0,NULL,0,0,0,0,0,0},
  657 
  658     {"bgsave",bgsaveCommand,-1,
  659      "admin no-script",
  660      0,NULL,0,0,0,0,0,0},
  661 
  662     {"bgrewriteaof",bgrewriteaofCommand,1,
  663      "admin no-script",
  664      0,NULL,0,0,0,0,0,0},
  665 
  666     {"shutdown",shutdownCommand,-1,
  667      "admin no-script ok-loading ok-stale",
  668      0,NULL,0,0,0,0,0,0},
  669 
  670     {"lastsave",lastsaveCommand,1,
  671      "read-only random fast ok-loading ok-stale @admin @dangerous",
  672      0,NULL,0,0,0,0,0,0},
  673 
  674     {"type",typeCommand,2,
  675      "read-only fast @keyspace",
  676      0,NULL,1,1,1,0,0,0},
  677 
  678     {"multi",multiCommand,1,
  679      "no-script fast ok-loading ok-stale @transaction",
  680      0,NULL,0,0,0,0,0,0},
  681 
  682     {"exec",execCommand,1,
  683      "no-script no-monitor no-slowlog ok-loading ok-stale @transaction",
  684      0,NULL,0,0,0,0,0,0},
  685 
  686     {"discard",discardCommand,1,
  687      "no-script fast ok-loading ok-stale @transaction",
  688      0,NULL,0,0,0,0,0,0},
  689 
  690     {"sync",syncCommand,1,
  691      "admin no-script",
  692      0,NULL,0,0,0,0,0,0},
  693 
  694     {"psync",syncCommand,3,
  695      "admin no-script",
  696      0,NULL,0,0,0,0,0,0},
  697 
  698     {"replconf",replconfCommand,-1,
  699      "admin no-script ok-loading ok-stale",
  700      0,NULL,0,0,0,0,0,0},
  701 
  702     {"flushdb",flushdbCommand,-1,
  703      "write @keyspace @dangerous",
  704      0,NULL,0,0,0,0,0,0},
  705 
  706     {"flushall",flushallCommand,-1,
  707      "write @keyspace @dangerous",
  708      0,NULL,0,0,0,0,0,0},
  709 
  710     {"sort",sortCommand,-2,
  711      "write use-memory @list @set @sortedset @dangerous",
  712      0,sortGetKeys,1,1,1,0,0,0},
  713 
  714     {"info",infoCommand,-1,
  715      "ok-loading ok-stale random @dangerous",
  716      0,NULL,0,0,0,0,0,0},
  717 
  718     {"monitor",monitorCommand,1,
  719      "admin no-script ok-loading ok-stale",
  720      0,NULL,0,0,0,0,0,0},
  721 
  722     {"ttl",ttlCommand,2,
  723      "read-only fast random @keyspace",
  724      0,NULL,1,1,1,0,0,0},
  725 
  726     {"touch",touchCommand,-2,
  727      "read-only fast @keyspace",
  728      0,NULL,1,-1,1,0,0,0},
  729 
  730     {"pttl",pttlCommand,2,
  731      "read-only fast random @keyspace",
  732      0,NULL,1,1,1,0,0,0},
  733 
  734     {"persist",persistCommand,2,
  735      "write fast @keyspace",
  736      0,NULL,1,1,1,0,0,0},
  737 
  738     {"slaveof",replicaofCommand,3,
  739      "admin no-script ok-stale",
  740      0,NULL,0,0,0,0,0,0},
  741 
  742     {"replicaof",replicaofCommand,3,
  743      "admin no-script ok-stale",
  744      0,NULL,0,0,0,0,0,0},
  745 
  746     {"role",roleCommand,1,
  747      "ok-loading ok-stale no-script fast read-only @dangerous",
  748      0,NULL,0,0,0,0,0,0},
  749 
  750     {"debug",debugCommand,-2,
  751      "admin no-script ok-loading ok-stale",
  752      0,NULL,0,0,0,0,0,0},
  753 
  754     {"config",configCommand,-2,
  755      "admin ok-loading ok-stale no-script",
  756      0,NULL,0,0,0,0,0,0},
  757 
  758     {"subscribe",subscribeCommand,-2,
  759      "pub-sub no-script ok-loading ok-stale",
  760      0,NULL,0,0,0,0,0,0},
  761 
  762     {"unsubscribe",unsubscribeCommand,-1,
  763      "pub-sub no-script ok-loading ok-stale",
  764      0,NULL,0,0,0,0,0,0},
  765 
  766     {"psubscribe",psubscribeCommand,-2,
  767      "pub-sub no-script ok-loading ok-stale",
  768      0,NULL,0,0,0,0,0,0},
  769 
  770     {"punsubscribe",punsubscribeCommand,-1,
  771      "pub-sub no-script ok-loading ok-stale",
  772      0,NULL,0,0,0,0,0,0},
  773 
  774     {"publish",publishCommand,3,
  775      "pub-sub ok-loading ok-stale fast",
  776      0,NULL,0,0,0,0,0,0},
  777 
  778     {"pubsub",pubsubCommand,-2,
  779      "pub-sub ok-loading ok-stale random",
  780      0,NULL,0,0,0,0,0,0},
  781 
  782     {"watch",watchCommand,-2,
  783      "no-script fast ok-loading ok-stale @transaction",
  784      0,NULL,1,-1,1,0,0,0},
  785 
  786     {"unwatch",unwatchCommand,1,
  787      "no-script fast ok-loading ok-stale @transaction",
  788      0,NULL,0,0,0,0,0,0},
  789 
  790     {"cluster",clusterCommand,-2,
  791      "admin ok-stale random",
  792      0,NULL,0,0,0,0,0,0},
  793 
  794     {"restore",restoreCommand,-4,
  795      "write use-memory @keyspace @dangerous",
  796      0,NULL,1,1,1,0,0,0},
  797 
  798     {"restore-asking",restoreCommand,-4,
  799     "write use-memory cluster-asking @keyspace @dangerous",
  800     0,NULL,1,1,1,0,0,0},
  801 
  802     {"migrate",migrateCommand,-6,
  803      "write random @keyspace @dangerous",
  804      0,migrateGetKeys,0,0,0,0,0,0},
  805 
  806     {"asking",askingCommand,1,
  807      "fast @keyspace",
  808      0,NULL,0,0,0,0,0,0},
  809 
  810     {"readonly",readonlyCommand,1,
  811      "fast @keyspace",
  812      0,NULL,0,0,0,0,0,0},
  813 
  814     {"readwrite",readwriteCommand,1,
  815      "fast @keyspace",
  816      0,NULL,0,0,0,0,0,0},
  817 
  818     {"dump",dumpCommand,2,
  819      "read-only random @keyspace",
  820      0,NULL,1,1,1,0,0,0},
  821 
  822     {"object",objectCommand,-2,
  823      "read-only random @keyspace",
  824      0,NULL,2,2,1,0,0,0},
  825 
  826     {"memory",memoryCommand,-2,
  827      "random read-only",
  828      0,memoryGetKeys,0,0,0,0,0,0},
  829 
  830     {"client",clientCommand,-2,
  831      "admin no-script random ok-loading ok-stale @connection",
  832      0,NULL,0,0,0,0,0,0},
  833 
  834     {"hello",helloCommand,-2,
  835      "no-auth no-script fast no-monitor ok-loading ok-stale no-slowlog @connection",
  836      0,NULL,0,0,0,0,0,0},
  837 
  838     /* EVAL can modify the dataset, however it is not flagged as a write
  839      * command since we do the check while running commands from Lua. */
  840     {"eval",evalCommand,-3,
  841      "no-script @scripting",
  842      0,evalGetKeys,0,0,0,0,0,0},
  843 
  844     {"evalsha",evalShaCommand,-3,
  845      "no-script @scripting",
  846      0,evalGetKeys,0,0,0,0,0,0},
  847 
  848     {"slowlog",slowlogCommand,-2,
  849      "admin random ok-loading ok-stale",
  850      0,NULL,0,0,0,0,0,0},
  851 
  852     {"script",scriptCommand,-2,
  853      "no-script @scripting",
  854      0,NULL,0,0,0,0,0,0},
  855 
  856     {"time",timeCommand,1,
  857      "read-only random fast ok-loading ok-stale",
  858      0,NULL,0,0,0,0,0,0},
  859 
  860     {"bitop",bitopCommand,-4,
  861      "write use-memory @bitmap",
  862      0,NULL,2,-1,1,0,0,0},
  863 
  864     {"bitcount",bitcountCommand,-2,
  865      "read-only @bitmap",
  866      0,NULL,1,1,1,0,0,0},
  867 
  868     {"bitpos",bitposCommand,-3,
  869      "read-only @bitmap",
  870      0,NULL,1,1,1,0,0,0},
  871 
  872     {"wait",waitCommand,3,
  873      "no-script @keyspace",
  874      0,NULL,0,0,0,0,0,0},
  875 
  876     {"command",commandCommand,-1,
  877      "ok-loading ok-stale random @connection",
  878      0,NULL,0,0,0,0,0,0},
  879 
  880     {"geoadd",geoaddCommand,-5,
  881      "write use-memory @geo",
  882      0,NULL,1,1,1,0,0,0},
  883 
  884     /* GEORADIUS has store options that may write. */
  885     {"georadius",georadiusCommand,-6,
  886      "write @geo",
  887      0,georadiusGetKeys,1,1,1,0,0,0},
  888 
  889     {"georadius_ro",georadiusroCommand,-6,
  890      "read-only @geo",
  891      0,georadiusGetKeys,1,1,1,0,0,0},
  892 
  893     {"georadiusbymember",georadiusbymemberCommand,-5,
  894      "write @geo",
  895      0,georadiusGetKeys,1,1,1,0,0,0},
  896 
  897     {"georadiusbymember_ro",georadiusbymemberroCommand,-5,
  898      "read-only @geo",
  899      0,georadiusGetKeys,1,1,1,0,0,0},
  900 
  901     {"geohash",geohashCommand,-2,
  902      "read-only @geo",
  903      0,NULL,1,1,1,0,0,0},
  904 
  905     {"geopos",geoposCommand,-2,
  906      "read-only @geo",
  907      0,NULL,1,1,1,0,0,0},
  908 
  909     {"geodist",geodistCommand,-4,
  910      "read-only @geo",
  911      0,NULL,1,1,1,0,0,0},
  912 
  913     {"pfselftest",pfselftestCommand,1,
  914      "admin @hyperloglog",
  915       0,NULL,0,0,0,0,0,0},
  916 
  917     {"pfadd",pfaddCommand,-2,
  918      "write use-memory fast @hyperloglog",
  919      0,NULL,1,1,1,0,0,0},
  920 
  921     /* Technically speaking PFCOUNT may change the key since it changes the
  922      * final bytes in the HyperLogLog representation. However in this case
  923      * we claim that the representation, even if accessible, is an internal
  924      * affair, and the command is semantically read only. */
  925     {"pfcount",pfcountCommand,-2,
  926      "read-only @hyperloglog",
  927      0,NULL,1,-1,1,0,0,0},
  928 
  929     {"pfmerge",pfmergeCommand,-2,
  930      "write use-memory @hyperloglog",
  931      0,NULL,1,-1,1,0,0,0},
  932 
  933     {"pfdebug",pfdebugCommand,-3,
  934      "admin write",
  935      0,NULL,0,0,0,0,0,0},
  936 
  937     {"xadd",xaddCommand,-5,
  938      "write use-memory fast random @stream",
  939      0,NULL,1,1,1,0,0,0},
  940 
  941     {"xrange",xrangeCommand,-4,
  942      "read-only @stream",
  943      0,NULL,1,1,1,0,0,0},
  944 
  945     {"xrevrange",xrevrangeCommand,-4,
  946      "read-only @stream",
  947      0,NULL,1,1,1,0,0,0},
  948 
  949     {"xlen",xlenCommand,2,
  950      "read-only fast @stream",
  951      0,NULL,1,1,1,0,0,0},
  952 
  953     {"xread",xreadCommand,-4,
  954      "read-only @stream @blocking",
  955      0,xreadGetKeys,1,1,1,0,0,0},
  956 
  957     {"xreadgroup",xreadCommand,-7,
  958      "write @stream @blocking",
  959      0,xreadGetKeys,1,1,1,0,0,0},
  960 
  961     {"xgroup",xgroupCommand,-2,
  962      "write use-memory @stream",
  963      0,NULL,2,2,1,0,0,0},
  964 
  965     {"xsetid",xsetidCommand,3,
  966      "write use-memory fast @stream",
  967      0,NULL,1,1,1,0,0,0},
  968 
  969     {"xack",xackCommand,-4,
  970      "write fast random @stream",
  971      0,NULL,1,1,1,0,0,0},
  972 
  973     {"xpending",xpendingCommand,-3,
  974      "read-only random @stream",
  975      0,NULL,1,1,1,0,0,0},
  976 
  977     {"xclaim",xclaimCommand,-6,
  978      "write random fast @stream",
  979      0,NULL,1,1,1,0,0,0},
  980 
  981     {"xinfo",xinfoCommand,-2,
  982      "read-only random @stream",
  983      0,NULL,2,2,1,0,0,0},
  984 
  985     {"xdel",xdelCommand,-3,
  986      "write fast @stream",
  987      0,NULL,1,1,1,0,0,0},
  988 
  989     {"xtrim",xtrimCommand,-2,
  990      "write random @stream",
  991      0,NULL,1,1,1,0,0,0},
  992 
  993     {"post",securityWarningCommand,-1,
  994      "ok-loading ok-stale read-only",
  995      0,NULL,0,0,0,0,0,0},
  996 
  997     {"host:",securityWarningCommand,-1,
  998      "ok-loading ok-stale read-only",
  999      0,NULL,0,0,0,0,0,0},
 1000 
 1001     {"latency",latencyCommand,-2,
 1002      "admin no-script ok-loading ok-stale",
 1003      0,NULL,0,0,0,0,0,0},
 1004 
 1005     {"lolwut",lolwutCommand,-1,
 1006      "read-only fast",
 1007      0,NULL,0,0,0,0,0,0},
 1008 
 1009     {"acl",aclCommand,-2,
 1010      "admin no-script no-slowlog ok-loading ok-stale",
 1011      0,NULL,0,0,0,0,0,0},
 1012 
 1013     {"stralgo",stralgoCommand,-2,
 1014      "read-only @string",
 1015      0,lcsGetKeys,0,0,0,0,0,0}
 1016 };
 1017 
 1018 /*============================ Utility functions ============================ */
 1019 
 1020 /* We use a private localtime implementation which is fork-safe. The logging
 1021  * function of Redis may be called from other threads. */
 1022 void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
 1023 
 1024 /* Low level logging. To use only for very big messages, otherwise
 1025  * serverLog() is to prefer. */
 1026 void serverLogRaw(int level, const char *msg) {
 1027     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
 1028     const char *c = ".-*#";
 1029     FILE *fp;
 1030     char buf[64];
 1031     int rawmode = (level & LL_RAW);
 1032     int log_to_stdout = server.logfile[0] == '\0';
 1033 
 1034     level &= 0xff; /* clear flags */
 1035     if (level < server.verbosity) return;
 1036 
 1037     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
 1038     if (!fp) return;
 1039 
 1040     if (rawmode) {
 1041         fprintf(fp,"%s",msg);
 1042     } else {
 1043         int off;
 1044         struct timeval tv;
 1045         int role_char;
 1046         pid_t pid = getpid();
 1047 
 1048         gettimeofday(&tv,NULL);
 1049         struct tm tm;
 1050         nolocks_localtime(&tm,tv.tv_sec,server.timezone,server.daylight_active);
 1051         off = strftime(buf,sizeof(buf),"%d %b %Y %H:%M:%S.",&tm);
 1052         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
 1053         if (server.sentinel_mode) {
 1054             role_char = 'X'; /* Sentinel. */
 1055         } else if (pid != server.pid) {
 1056             role_char = 'C'; /* RDB / AOF writing child. */
 1057         } else {
 1058             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
 1059         }
 1060         fprintf(fp,"%d:%c %s %c %s\n",
 1061             (int)getpid(),role_char, buf,c[level],msg);
 1062     }
 1063     fflush(fp);
 1064 
 1065     if (!log_to_stdout) fclose(fp);
 1066     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
 1067 }
 1068 
 1069 /* Like serverLogRaw() but with printf-alike support. This is the function that
 1070  * is used across the code. The raw version is only used in order to dump
 1071  * the INFO output on crash. */
 1072 void serverLog(int level, const char *fmt, ...) {
 1073     va_list ap;
 1074     char msg[LOG_MAX_LEN];
 1075 
 1076     if ((level&0xff) < server.verbosity) return;
 1077 
 1078     va_start(ap, fmt);
 1079     vsnprintf(msg, sizeof(msg), fmt, ap);
 1080     va_end(ap);
 1081 
 1082     serverLogRaw(level,msg);
 1083 }
 1084 
 1085 /* Log a fixed message without printf-alike capabilities, in a way that is
 1086  * safe to call from a signal handler.
 1087  *
 1088  * We actually use this only for signals that are not fatal from the point
 1089  * of view of Redis. Signals that are going to kill the server anyway and
 1090  * where we need printf-alike features are served by serverLog(). */
 1091 void serverLogFromHandler(int level, const char *msg) {
 1092     int fd;
 1093     int log_to_stdout = server.logfile[0] == '\0';
 1094     char buf[64];
 1095 
 1096     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
 1097         return;
 1098     fd = log_to_stdout ? STDOUT_FILENO :
 1099                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
 1100     if (fd == -1) return;
 1101     ll2string(buf,sizeof(buf),getpid());
 1102     if (write(fd,buf,strlen(buf)) == -1) goto err;
 1103     if (write(fd,":signal-handler (",17) == -1) goto err;
 1104     ll2string(buf,sizeof(buf),time(NULL));
 1105     if (write(fd,buf,strlen(buf)) == -1) goto err;
 1106     if (write(fd,") ",2) == -1) goto err;
 1107     if (write(fd,msg,strlen(msg)) == -1) goto err;
 1108     if (write(fd,"\n",1) == -1) goto err;
 1109 err:
 1110     if (!log_to_stdout) close(fd);
 1111 }
 1112 
 1113 /* Return the UNIX time in microseconds */
 1114 long long ustime(void) {
 1115     struct timeval tv;
 1116     long long ust;
 1117 
 1118     gettimeofday(&tv, NULL);
 1119     ust = ((long long)tv.tv_sec)*1000000;
 1120     ust += tv.tv_usec;
 1121     return ust;
 1122 }
 1123 
 1124 /* Return the UNIX time in milliseconds */
 1125 mstime_t mstime(void) {
 1126     return ustime()/1000;
 1127 }
 1128 
 1129 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
 1130  * exit(), because the latter may interact with the same file objects used by
 1131  * the parent process. However if we are testing the coverage normal exit() is
 1132  * used in order to obtain the right coverage information. */
 1133 void exitFromChild(int retcode) {
 1134 #ifdef COVERAGE_TEST
 1135     exit(retcode);
 1136 #else
 1137     _exit(retcode);
 1138 #endif
 1139 }
 1140 
 1141 /*====================== Hash table type implementation  ==================== */
 1142 
 1143 /* This is a hash table type that uses the SDS dynamic strings library as
 1144  * keys and redis objects as values (objects can hold SDS strings,
 1145  * lists, sets). */
 1146 
 1147 void dictVanillaFree(void *privdata, void *val)
 1148 {
 1149     DICT_NOTUSED(privdata);
 1150     zfree(val);
 1151 }
 1152 
 1153 void dictListDestructor(void *privdata, void *val)
 1154 {
 1155     DICT_NOTUSED(privdata);
 1156     listRelease((list*)val);
 1157 }
 1158 
 1159 int dictSdsKeyCompare(void *privdata, const void *key1,
 1160         const void *key2)
 1161 {
 1162     int l1,l2;
 1163     DICT_NOTUSED(privdata);
 1164 
 1165     l1 = sdslen((sds)key1);
 1166     l2 = sdslen((sds)key2);
 1167     if (l1 != l2) return 0;
 1168     return memcmp(key1, key2, l1) == 0;
 1169 }
 1170 
 1171 /* A case insensitive version used for the command lookup table and other
 1172  * places where case insensitive non binary-safe comparison is needed. */
 1173 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
 1174         const void *key2)
 1175 {
 1176     DICT_NOTUSED(privdata);
 1177 
 1178     return strcasecmp(key1, key2) == 0;
 1179 }
 1180 
 1181 void dictObjectDestructor(void *privdata, void *val)
 1182 {
 1183     DICT_NOTUSED(privdata);
 1184 
 1185     if (val == NULL) return; /* Lazy freeing will set value to NULL. */
 1186     decrRefCount(val);
 1187 }
 1188 
 1189 void dictSdsDestructor(void *privdata, void *val)
 1190 {
 1191     DICT_NOTUSED(privdata);
 1192 
 1193     sdsfree(val);
 1194 }
 1195 
 1196 int dictObjKeyCompare(void *privdata, const void *key1,
 1197         const void *key2)
 1198 {
 1199     const robj *o1 = key1, *o2 = key2;
 1200     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
 1201 }
 1202 
 1203 uint64_t dictObjHash(const void *key) {
 1204     const robj *o = key;
 1205     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
 1206 }
 1207 
 1208 uint64_t dictSdsHash(const void *key) {
 1209     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
 1210 }
 1211 
 1212 uint64_t dictSdsCaseHash(const void *key) {
 1213     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
 1214 }
 1215 
 1216 int dictEncObjKeyCompare(void *privdata, const void *key1,
 1217         const void *key2)
 1218 {
 1219     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
 1220     int cmp;
 1221 
 1222     if (o1->encoding == OBJ_ENCODING_INT &&
 1223         o2->encoding == OBJ_ENCODING_INT)
 1224             return o1->ptr == o2->ptr;
 1225 
 1226     /* Due to OBJ_STATIC_REFCOUNT, we avoid calling getDecodedObject() without
 1227      * good reasons, because it would incrRefCount() the object, which
 1228      * is invalid. So we check to make sure dictFind() works with static
 1229      * objects as well. */
 1230     if (o1->refcount != OBJ_STATIC_REFCOUNT) o1 = getDecodedObject(o1);
 1231     if (o2->refcount != OBJ_STATIC_REFCOUNT) o2 = getDecodedObject(o2);
 1232     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
 1233     if (o1->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o1);
 1234     if (o2->refcount != OBJ_STATIC_REFCOUNT) decrRefCount(o2);
 1235     return cmp;
 1236 }
 1237 
 1238 uint64_t dictEncObjHash(const void *key) {
 1239     robj *o = (robj*) key;
 1240 
 1241     if (sdsEncodedObject(o)) {
 1242         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
 1243     } else {
 1244         if (o->encoding == OBJ_ENCODING_INT) {
 1245             char buf[32];
 1246             int len;
 1247 
 1248             len = ll2string(buf,32,(long)o->ptr);
 1249             return dictGenHashFunction((unsigned char*)buf, len);
 1250         } else {
 1251             uint64_t hash;
 1252 
 1253             o = getDecodedObject(o);
 1254             hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
 1255             decrRefCount(o);
 1256             return hash;
 1257         }
 1258     }
 1259 }
 1260 
 1261 /* Generic hash table type where keys are Redis Objects, Values
 1262  * dummy pointers. */
 1263 dictType objectKeyPointerValueDictType = {
 1264     dictEncObjHash,            /* hash function */
 1265     NULL,                      /* key dup */
 1266     NULL,                      /* val dup */
 1267     dictEncObjKeyCompare,      /* key compare */
 1268     dictObjectDestructor,      /* key destructor */
 1269     NULL                       /* val destructor */
 1270 };
 1271 
 1272 /* Like objectKeyPointerValueDictType(), but values can be destroyed, if
 1273  * not NULL, calling zfree(). */
 1274 dictType objectKeyHeapPointerValueDictType = {
 1275     dictEncObjHash,            /* hash function */
 1276     NULL,                      /* key dup */
 1277     NULL,                      /* val dup */
 1278     dictEncObjKeyCompare,      /* key compare */
 1279     dictObjectDestructor,      /* key destructor */
 1280     dictVanillaFree            /* val destructor */
 1281 };
 1282 
 1283 /* Set dictionary type. Keys are SDS strings, values are ot used. */
 1284 dictType setDictType = {
 1285     dictSdsHash,               /* hash function */
 1286     NULL,                      /* key dup */
 1287     NULL,                      /* val dup */
 1288     dictSdsKeyCompare,         /* key compare */
 1289     dictSdsDestructor,         /* key destructor */
 1290     NULL                       /* val destructor */
 1291 };
 1292 
 1293 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
 1294 dictType zsetDictType = {
 1295     dictSdsHash,               /* hash function */
 1296     NULL,                      /* key dup */
 1297     NULL,                      /* val dup */
 1298     dictSdsKeyCompare,         /* key compare */
 1299     NULL,                      /* Note: SDS string shared & freed by skiplist */
 1300     NULL                       /* val destructor */
 1301 };
 1302 
 1303 /* Db->dict, keys are sds strings, vals are Redis objects. */
 1304 dictType dbDictType = {
 1305     dictSdsHash,                /* hash function */
 1306     NULL,                       /* key dup */
 1307     NULL,                       /* val dup */
 1308     dictSdsKeyCompare,          /* key compare */
 1309     dictSdsDestructor,          /* key destructor */
 1310     dictObjectDestructor   /* val destructor */
 1311 };
 1312 
 1313 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
 1314 dictType shaScriptObjectDictType = {
 1315     dictSdsCaseHash,            /* hash function */
 1316     NULL,                       /* key dup */
 1317     NULL,                       /* val dup */
 1318     dictSdsKeyCaseCompare,      /* key compare */
 1319     dictSdsDestructor,          /* key destructor */
 1320     dictObjectDestructor        /* val destructor */
 1321 };
 1322 
 1323 /* Db->expires */
 1324 dictType keyptrDictType = {
 1325     dictSdsHash,                /* hash function */
 1326     NULL,                       /* key dup */
 1327     NULL,                       /* val dup */
 1328     dictSdsKeyCompare,          /* key compare */
 1329     NULL,                       /* key destructor */
 1330     NULL                        /* val destructor */
 1331 };
 1332 
 1333 /* Command table. sds string -> command struct pointer. */
 1334 dictType commandTableDictType = {
 1335     dictSdsCaseHash,            /* hash function */
 1336     NULL,                       /* key dup */
 1337     NULL,                       /* val dup */
 1338     dictSdsKeyCaseCompare,      /* key compare */
 1339     dictSdsDestructor,          /* key destructor */
 1340     NULL                        /* val destructor */
 1341 };
 1342 
 1343 /* Hash type hash table (note that small hashes are represented with ziplists) */
 1344 dictType hashDictType = {
 1345     dictSdsHash,                /* hash function */
 1346     NULL,                       /* key dup */
 1347     NULL,                       /* val dup */
 1348     dictSdsKeyCompare,          /* key compare */
 1349     dictSdsDestructor,          /* key destructor */
 1350     dictSdsDestructor           /* val destructor */
 1351 };
 1352 
 1353 /* Keylist hash table type has unencoded redis objects as keys and
 1354  * lists as values. It's used for blocking operations (BLPOP) and to
 1355  * map swapped keys to a list of clients waiting for this keys to be loaded. */
 1356 dictType keylistDictType = {
 1357     dictObjHash,                /* hash function */
 1358     NULL,                       /* key dup */
 1359     NULL,                       /* val dup */
 1360     dictObjKeyCompare,          /* key compare */
 1361     dictObjectDestructor,       /* key destructor */
 1362     dictListDestructor          /* val destructor */
 1363 };
 1364 
 1365 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
 1366  * clusterNode structures. */
 1367 dictType clusterNodesDictType = {
 1368     dictSdsHash,                /* hash function */
 1369     NULL,                       /* key dup */
 1370     NULL,                       /* val dup */
 1371     dictSdsKeyCompare,          /* key compare */
 1372     dictSdsDestructor,          /* key destructor */
 1373     NULL                        /* val destructor */
 1374 };
 1375 
 1376 /* Cluster re-addition blacklist. This maps node IDs to the time
 1377  * we can re-add this node. The goal is to avoid readding a removed
 1378  * node for some time. */
 1379 dictType clusterNodesBlackListDictType = {
 1380     dictSdsCaseHash,            /* hash function */
 1381     NULL,                       /* key dup */
 1382     NULL,                       /* val dup */
 1383     dictSdsKeyCaseCompare,      /* key compare */
 1384     dictSdsDestructor,          /* key destructor */
 1385     NULL                        /* val destructor */
 1386 };
 1387 
 1388 /* Cluster re-addition blacklist. This maps node IDs to the time
 1389  * we can re-add this node. The goal is to avoid readding a removed
 1390  * node for some time. */
 1391 dictType modulesDictType = {
 1392     dictSdsCaseHash,            /* hash function */
 1393     NULL,                       /* key dup */
 1394     NULL,                       /* val dup */
 1395     dictSdsKeyCaseCompare,      /* key compare */
 1396     dictSdsDestructor,          /* key destructor */
 1397     NULL                        /* val destructor */
 1398 };
 1399 
 1400 /* Migrate cache dict type. */
 1401 dictType migrateCacheDictType = {
 1402     dictSdsHash,                /* hash function */
 1403     NULL,                       /* key dup */
 1404     NULL,                       /* val dup */
 1405     dictSdsKeyCompare,          /* key compare */
 1406     dictSdsDestructor,          /* key destructor */
 1407     NULL                        /* val destructor */
 1408 };
 1409 
 1410 /* Replication cached script dict (server.repl_scriptcache_dict).
 1411  * Keys are sds SHA1 strings, while values are not used at all in the current
 1412  * implementation. */
 1413 dictType replScriptCacheDictType = {
 1414     dictSdsCaseHash,            /* hash function */
 1415     NULL,                       /* key dup */
 1416     NULL,                       /* val dup */
 1417     dictSdsKeyCaseCompare,      /* key compare */
 1418     dictSdsDestructor,          /* key destructor */
 1419     NULL                        /* val destructor */
 1420 };
 1421 
 1422 int htNeedsResize(dict *dict) {
 1423     long long size, used;
 1424 
 1425     size = dictSlots(dict);
 1426     used = dictSize(dict);
 1427     return (size > DICT_HT_INITIAL_SIZE &&
 1428             (used*100/size < HASHTABLE_MIN_FILL));
 1429 }
 1430 
 1431 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
 1432  * we resize the hash table to save memory */
 1433 void tryResizeHashTables(int dbid) {
 1434     if (htNeedsResize(server.db[dbid].dict))
 1435         dictResize(server.db[dbid].dict);
 1436     if (htNeedsResize(server.db[dbid].expires))
 1437         dictResize(server.db[dbid].expires);
 1438 }
 1439 
 1440 /* Our hash table implementation performs rehashing incrementally while
 1441  * we write/read from the hash table. Still if the server is idle, the hash
 1442  * table will use two tables for a long time. So we try to use 1 millisecond
 1443  * of CPU time at every call of this function to perform some rehahsing.
 1444  *
 1445  * The function returns 1 if some rehashing was performed, otherwise 0
 1446  * is returned. */
 1447 int incrementallyRehash(int dbid) {
 1448     /* Keys dictionary */
 1449     if (dictIsRehashing(server.db[dbid].dict)) {
 1450         dictRehashMilliseconds(server.db[dbid].dict,1);
 1451         return 1; /* already used our millisecond for this loop... */
 1452     }
 1453     /* Expires */
 1454     if (dictIsRehashing(server.db[dbid].expires)) {
 1455         dictRehashMilliseconds(server.db[dbid].expires,1);
 1456         return 1; /* already used our millisecond for this loop... */
 1457     }
 1458     return 0;
 1459 }
 1460 
 1461 /* This function is called once a background process of some kind terminates,
 1462  * as we want to avoid resizing the hash tables when there is a child in order
 1463  * to play well with copy-on-write (otherwise when a resize happens lots of
 1464  * memory pages are copied). The goal of this function is to update the ability
 1465  * for dict.c to resize the hash tables accordingly to the fact we have o not
 1466  * running childs. */
 1467 void updateDictResizePolicy(void) {
 1468     if (!hasActiveChildProcess())
 1469         dictEnableResize();
 1470     else
 1471         dictDisableResize();
 1472 }
 1473 
 1474 /* Return true if there are no active children processes doing RDB saving,
 1475  * AOF rewriting, or some side process spawned by a loaded module. */
 1476 int hasActiveChildProcess() {
 1477     return server.rdb_child_pid != -1 ||
 1478            server.aof_child_pid != -1 ||
 1479            server.module_child_pid != -1;
 1480 }
 1481 
 1482 /* Return true if this instance has persistence completely turned off:
 1483  * both RDB and AOF are disabled. */
 1484 int allPersistenceDisabled(void) {
 1485     return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
 1486 }
 1487 
 1488 /* ======================= Cron: called every 100 ms ======================== */
 1489 
 1490 /* Add a sample to the operations per second array of samples. */
 1491 void trackInstantaneousMetric(int metric, long long current_reading) {
 1492     long long t = mstime() - server.inst_metric[metric].last_sample_time;
 1493     long long ops = current_reading -
 1494                     server.inst_metric[metric].last_sample_count;
 1495     long long ops_sec;
 1496 
 1497     ops_sec = t > 0 ? (ops*1000/t) : 0;
 1498 
 1499     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
 1500         ops_sec;
 1501     server.inst_metric[metric].idx++;
 1502     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
 1503     server.inst_metric[metric].last_sample_time = mstime();
 1504     server.inst_metric[metric].last_sample_count = current_reading;
 1505 }
 1506 
 1507 /* Return the mean of all the samples. */
 1508 long long getInstantaneousMetric(int metric) {
 1509     int j;
 1510     long long sum = 0;
 1511 
 1512     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
 1513         sum += server.inst_metric[metric].samples[j];
 1514     return sum / STATS_METRIC_SAMPLES;
 1515 }
 1516 
 1517 /* The client query buffer is an sds.c string that can end with a lot of
 1518  * free space not used, this function reclaims space if needed.
 1519  *
 1520  * The function always returns 0 as it never terminates the client. */
 1521 int clientsCronResizeQueryBuffer(client *c) {
 1522     size_t querybuf_size = sdsAllocSize(c->querybuf);
 1523     time_t idletime = server.unixtime - c->lastinteraction;
 1524 
 1525     /* There are two conditions to resize the query buffer:
 1526      * 1) Query buffer is > BIG_ARG and too big for latest peak.
 1527      * 2) Query buffer is > BIG_ARG and client is idle. */
 1528     if (querybuf_size > PROTO_MBULK_BIG_ARG &&
 1529          ((querybuf_size/(c->querybuf_peak+1)) > 2 ||
 1530           idletime > 2))
 1531     {
 1532         /* Only resize the query buffer if it is actually wasting
 1533          * at least a few kbytes. */
 1534         if (sdsavail(c->querybuf) > 1024*4) {
 1535             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
 1536         }
 1537     }
 1538     /* Reset the peak again to capture the peak memory usage in the next
 1539      * cycle. */
 1540     c->querybuf_peak = 0;
 1541 
 1542     /* Clients representing masters also use a "pending query buffer" that
 1543      * is the yet not applied part of the stream we are reading. Such buffer
 1544      * also needs resizing from time to time, otherwise after a very large
 1545      * transfer (a huge value or a big MIGRATE operation) it will keep using
 1546      * a lot of memory. */
 1547     if (c->flags & CLIENT_MASTER) {
 1548         /* There are two conditions to resize the pending query buffer:
 1549          * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
 1550          * 2) Used length is smaller than pending_querybuf_size/2 */
 1551         size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
 1552         if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
 1553            sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
 1554         {
 1555             c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
 1556         }
 1557     }
 1558     return 0;
 1559 }
 1560 
 1561 /* This function is used in order to track clients using the biggest amount
 1562  * of memory in the latest few seconds. This way we can provide such information
 1563  * in the INFO output (clients section), without having to do an O(N) scan for
 1564  * all the clients.
 1565  *
 1566  * This is how it works. We have an array of CLIENTS_PEAK_MEM_USAGE_SLOTS slots
 1567  * where we track, for each, the biggest client output and input buffers we
 1568  * saw in that slot. Every slot correspond to one of the latest seconds, since
 1569  * the array is indexed by doing UNIXTIME % CLIENTS_PEAK_MEM_USAGE_SLOTS.
 1570  *
 1571  * When we want to know what was recently the peak memory usage, we just scan
 1572  * such few slots searching for the maximum value. */
 1573 #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8
 1574 size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
 1575 size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
 1576 
 1577 int clientsCronTrackExpansiveClients(client *c) {
 1578     size_t in_usage = sdsAllocSize(c->querybuf);
 1579     size_t out_usage = getClientOutputBufferMemoryUsage(c);
 1580     int i = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
 1581     int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
 1582 
 1583     /* Always zero the next sample, so that when we switch to that second, we'll
 1584      * only register samples that are greater in that second without considering
 1585      * the history of such slot.
 1586      *
 1587      * Note: our index may jump to any random position if serverCron() is not
 1588      * called for some reason with the normal frequency, for instance because
 1589      * some slow command is called taking multiple seconds to execute. In that
 1590      * case our array may end containing data which is potentially older
 1591      * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
 1592      * since here we want just to track if "recently" there were very expansive
 1593      * clients from the POV of memory usage. */
 1594     ClientsPeakMemInput[zeroidx] = 0;
 1595     ClientsPeakMemOutput[zeroidx] = 0;
 1596 
 1597     /* Track the biggest values observed so far in this slot. */
 1598     if (in_usage > ClientsPeakMemInput[i]) ClientsPeakMemInput[i] = in_usage;
 1599     if (out_usage > ClientsPeakMemOutput[i]) ClientsPeakMemOutput[i] = out_usage;
 1600 
 1601     return 0; /* This function never terminates the client. */
 1602 }
 1603 
 1604 /* Iterating all the clients in getMemoryOverheadData() is too slow and
 1605  * in turn would make the INFO command too slow. So we perform this
 1606  * computation incrementally and track the (not instantaneous but updated
 1607  * to the second) total memory used by clients using clinetsCron() in
 1608  * a more incremental way (depending on server.hz). */
 1609 int clientsCronTrackClientsMemUsage(client *c) {
 1610     size_t mem = 0;
 1611     int type = getClientType(c);
 1612     mem += getClientOutputBufferMemoryUsage(c);
 1613     mem += sdsAllocSize(c->querybuf);
 1614     mem += sizeof(client);
 1615     /* Now that we have the memory used by the client, remove the old
 1616      * value from the old categoty, and add it back. */
 1617     server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
 1618         c->client_cron_last_memory_usage;
 1619     server.stat_clients_type_memory[type] += mem;
 1620     /* Remember what we added and where, to remove it next time. */
 1621     c->client_cron_last_memory_usage = mem;
 1622     c->client_cron_last_memory_type = type;
 1623     return 0;
 1624 }
 1625 
 1626 /* Return the max samples in the memory usage of clients tracked by
 1627  * the function clientsCronTrackExpansiveClients(). */
 1628 void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
 1629     size_t i = 0, o = 0;
 1630     for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) {
 1631         if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j];
 1632         if (ClientsPeakMemOutput[j] > o) o = ClientsPeakMemOutput[j];
 1633     }
 1634     *in_usage = i;
 1635     *out_usage = o;
 1636 }
 1637 
 1638 /* This function is called by serverCron() and is used in order to perform
 1639  * operations on clients that are important to perform constantly. For instance
 1640  * we use this function in order to disconnect clients after a timeout, including
 1641  * clients blocked in some blocking command with a non-zero timeout.
 1642  *
 1643  * The function makes some effort to process all the clients every second, even
 1644  * if this cannot be strictly guaranteed, since serverCron() may be called with
 1645  * an actual frequency lower than server.hz in case of latency events like slow
 1646  * commands.
 1647  *
 1648  * It is very important for this function, and the functions it calls, to be
 1649  * very fast: sometimes Redis has tens of hundreds of connected clients, and the
 1650  * default server.hz value is 10, so sometimes here we need to process thousands
 1651  * of clients per second, turning this function into a source of latency.
 1652  */
 1653 #define CLIENTS_CRON_MIN_ITERATIONS 5
 1654 void clientsCron(void) {
 1655     /* Try to process at least numclients/server.hz of clients
 1656      * per call. Since normally (if there are no big latency events) this
 1657      * function is called server.hz times per second, in the average case we
 1658      * process all the clients in 1 second. */
 1659     int numclients = listLength(server.clients);
 1660     int iterations = numclients/server.hz;
 1661     mstime_t now = mstime();
 1662 
 1663     /* Process at least a few clients while we are at it, even if we need
 1664      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
 1665      * of processing each client once per second. */
 1666     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
 1667         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
 1668                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
 1669 
 1670     while(listLength(server.clients) && iterations--) {
 1671         client *c;
 1672         listNode *head;
 1673 
 1674         /* Rotate the list, take the current head, process.
 1675          * This way if the client must be removed from the list it's the
 1676          * first element and we don't incur into O(N) computation. */
 1677         listRotateTailToHead(server.clients);
 1678         head = listFirst(server.clients);
 1679         c = listNodeValue(head);
 1680         /* The following functions do different service checks on the client.
 1681          * The protocol is that they return non-zero if the client was
 1682          * terminated. */
 1683         if (clientsCronHandleTimeout(c,now)) continue;
 1684         if (clientsCronResizeQueryBuffer(c)) continue;
 1685         if (clientsCronTrackExpansiveClients(c)) continue;
 1686         if (clientsCronTrackClientsMemUsage(c)) continue;
 1687     }
 1688 }
 1689 
 1690 /* This function handles 'background' operations we are required to do
 1691  * incrementally in Redis databases, such as active key expiring, resizing,
 1692  * rehashing. */
 1693 void databasesCron(void) {
 1694     /* Expire keys by random sampling. Not required for slaves
 1695      * as master will synthesize DELs for us. */
 1696     if (server.active_expire_enabled) {
 1697         if (iAmMaster()) {
 1698             activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
 1699         } else {
 1700             expireSlaveKeys();
 1701         }
 1702     }
 1703 
 1704     /* Defrag keys gradually. */
 1705     activeDefragCycle();
 1706 
 1707     /* Perform hash tables rehashing if needed, but only if there are no
 1708      * other processes saving the DB on disk. Otherwise rehashing is bad
 1709      * as will cause a lot of copy-on-write of memory pages. */
 1710     if (!hasActiveChildProcess()) {
 1711         /* We use global counters so if we stop the computation at a given
 1712          * DB we'll be able to start from the successive in the next
 1713          * cron loop iteration. */
 1714         static unsigned int resize_db = 0;
 1715         static unsigned int rehash_db = 0;
 1716         int dbs_per_call = CRON_DBS_PER_CALL;
 1717         int j;
 1718 
 1719         /* Don't test more DBs than we have. */
 1720         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
 1721 
 1722         /* Resize */
 1723         for (j = 0; j < dbs_per_call; j++) {
 1724             tryResizeHashTables(resize_db % server.dbnum);
 1725             resize_db++;
 1726         }
 1727 
 1728         /* Rehash */
 1729         if (server.activerehashing) {
 1730             for (j = 0; j < dbs_per_call; j++) {
 1731                 int work_done = incrementallyRehash(rehash_db);
 1732                 if (work_done) {
 1733                     /* If the function did some work, stop here, we'll do
 1734                      * more at the next cron loop. */
 1735                     break;
 1736                 } else {
 1737                     /* If this db didn't need rehash, we'll try the next one. */
 1738                     rehash_db++;
 1739                     rehash_db %= server.dbnum;
 1740                 }
 1741             }
 1742         }
 1743     }
 1744 }
 1745 
 1746 /* We take a cached value of the unix time in the global state because with
 1747  * virtual memory and aging there is to store the current time in objects at
 1748  * every object access, and accuracy is not needed. To access a global var is
 1749  * a lot faster than calling time(NULL).
 1750  *
 1751  * This function should be fast because it is called at every command execution
 1752  * in call(), so it is possible to decide if to update the daylight saving
 1753  * info or not using the 'update_daylight_info' argument. Normally we update
 1754  * such info only when calling this function from serverCron() but not when
 1755  * calling it from call(). */
 1756 void updateCachedTime(int update_daylight_info) {
 1757     server.ustime = ustime();
 1758     server.mstime = server.ustime / 1000;
 1759     server.unixtime = server.mstime / 1000;
 1760 
 1761     /* To get information about daylight saving time, we need to call
 1762      * localtime_r and cache the result. However calling localtime_r in this
 1763      * context is safe since we will never fork() while here, in the main
 1764      * thread. The logging function will call a thread safe version of
 1765      * localtime that has no locks. */
 1766     if (update_daylight_info) {
 1767         struct tm tm;
 1768         time_t ut = server.unixtime;
 1769         localtime_r(&ut,&tm);
 1770         server.daylight_active = tm.tm_isdst;
 1771     }
 1772 }
 1773 
 1774 void checkChildrenDone(void) {
 1775     int statloc;
 1776     pid_t pid;
 1777 
 1778     /* If we have a diskless rdb child (note that we support only one concurrent
 1779      * child), we want to avoid collecting it's exit status and acting on it
 1780      * as long as we didn't finish to drain the pipe, since then we're at risk
 1781      * of starting a new fork and a new pipe before we're done with the previous
 1782      * one. */
 1783     if (server.rdb_child_pid != -1 && server.rdb_pipe_conns)
 1784         return;
 1785 
 1786     if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
 1787         int exitcode = WEXITSTATUS(statloc);
 1788         int bysignal = 0;
 1789 
 1790         if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
 1791 
 1792         /* sigKillChildHandler catches the signal and calls exit(), but we
 1793          * must make sure not to flag lastbgsave_status, etc incorrectly.
 1794          * We could directly terminate the child process via SIGUSR1
 1795          * without handling it, but in this case Valgrind will log an
 1796          * annoying error. */
 1797         if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
 1798             bysignal = SIGUSR1;
 1799             exitcode = 1;
 1800         }
 1801 
 1802         if (pid == -1) {
 1803             serverLog(LL_WARNING,"wait3() returned an error: %s. "
 1804                 "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
 1805                 strerror(errno),
 1806                 (int) server.rdb_child_pid,
 1807                 (int) server.aof_child_pid,
 1808                 (int) server.module_child_pid);
 1809         } else if (pid == server.rdb_child_pid) {
 1810             backgroundSaveDoneHandler(exitcode,bysignal);
 1811             if (!bysignal && exitcode == 0) receiveChildInfo();
 1812         } else if (pid == server.aof_child_pid) {
 1813             backgroundRewriteDoneHandler(exitcode,bysignal);
 1814             if (!bysignal && exitcode == 0) receiveChildInfo();
 1815         } else if (pid == server.module_child_pid) {
 1816             ModuleForkDoneHandler(exitcode,bysignal);
 1817             if (!bysignal && exitcode == 0) receiveChildInfo();
 1818         } else {
 1819             if (!ldbRemoveChild(pid)) {
 1820                 serverLog(LL_WARNING,
 1821                     "Warning, detected child with unmatched pid: %ld",
 1822                     (long)pid);
 1823             }
 1824         }
 1825         updateDictResizePolicy();
 1826         closeChildInfoPipe();
 1827     }
 1828 }
 1829 
 1830 /* This is our timer interrupt, called server.hz times per second.
 1831  * Here is where we do a number of things that need to be done asynchronously.
 1832  * For instance:
 1833  *
 1834  * - Active expired keys collection (it is also performed in a lazy way on
 1835  *   lookup).
 1836  * - Software watchdog.
 1837  * - Update some statistic.
 1838  * - Incremental rehashing of the DBs hash tables.
 1839  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
 1840  * - Clients timeout of different kinds.
 1841  * - Replication reconnection.
 1842  * - Many more...
 1843  *
 1844  * Everything directly called here will be called server.hz times per second,
 1845  * so in order to throttle execution of things we want to do less frequently
 1846  * a macro is used: run_with_period(milliseconds) { .... }
 1847  */
 1848 
 1849 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 1850     int j;
 1851     UNUSED(eventLoop);
 1852     UNUSED(id);
 1853     UNUSED(clientData);
 1854 
 1855     /* Software watchdog: deliver the SIGALRM that will reach the signal
 1856      * handler if we don't return here fast enough. */
 1857     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
 1858 
 1859     /* Update the time cache. */
 1860     updateCachedTime(1);
 1861 
 1862     server.hz = server.config_hz;
 1863     /* Adapt the server.hz value to the number of configured clients. If we have
 1864      * many clients, we want to call serverCron() with an higher frequency. */
 1865     if (server.dynamic_hz) {
 1866         while (listLength(server.clients) / server.hz >
 1867                MAX_CLIENTS_PER_CLOCK_TICK)
 1868         {
 1869             server.hz *= 2;
 1870             if (server.hz > CONFIG_MAX_HZ) {
 1871                 server.hz = CONFIG_MAX_HZ;
 1872                 break;
 1873             }
 1874         }
 1875     }
 1876 
 1877     run_with_period(100) {
 1878         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
 1879         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
 1880                 server.stat_net_input_bytes);
 1881         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
 1882                 server.stat_net_output_bytes);
 1883     }
 1884 
 1885     /* We have just LRU_BITS bits per object for LRU information.
 1886      * So we use an (eventually wrapping) LRU clock.
 1887      *
 1888      * Note that even if the counter wraps it's not a big problem,
 1889      * everything will still work but some object will appear younger
 1890      * to Redis. However for this to happen a given object should never be
 1891      * touched for all the time needed to the counter to wrap, which is
 1892      * not likely.
 1893      *
 1894      * Note that you can change the resolution altering the
 1895      * LRU_CLOCK_RESOLUTION define. */
 1896     server.lruclock = getLRUClock();
 1897 
 1898     /* Record the max memory used since the server was started. */
 1899     if (zmalloc_used_memory() > server.stat_peak_memory)
 1900         server.stat_peak_memory = zmalloc_used_memory();
 1901 
 1902     run_with_period(100) {
 1903         /* Sample the RSS and other metrics here since this is a relatively slow call.
 1904          * We must sample the zmalloc_used at the same time we take the rss, otherwise
 1905          * the frag ratio calculate may be off (ratio of two samples at different times) */
 1906         server.cron_malloc_stats.process_rss = zmalloc_get_rss();
 1907         server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
 1908         /* Sampling the allcator info can be slow too.
 1909          * The fragmentation ratio it'll show is potentically more accurate
 1910          * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc
 1911          * allocations, and allocator reserved pages that can be pursed (all not actual frag) */
 1912         zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
 1913                                    &server.cron_malloc_stats.allocator_active,
 1914                                    &server.cron_malloc_stats.allocator_resident);
 1915         /* in case the allocator isn't providing these stats, fake them so that
 1916          * fragmention info still shows some (inaccurate metrics) */
 1917         if (!server.cron_malloc_stats.allocator_resident) {
 1918             /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS,
 1919              * so we must desuct it in order to be able to calculate correct
 1920              * "allocator fragmentation" ratio */
 1921             size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
 1922             server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
 1923         }
 1924         if (!server.cron_malloc_stats.allocator_active)
 1925             server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
 1926         if (!server.cron_malloc_stats.allocator_allocated)
 1927             server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
 1928     }
 1929 
 1930     /* We received a SIGTERM, shutting down here in a safe way, as it is
 1931      * not ok doing so inside the signal handler. */
 1932     if (server.shutdown_asap) {
 1933         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
 1934         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
 1935         server.shutdown_asap = 0;
 1936     }
 1937 
 1938     /* Show some info about non-empty databases */
 1939     run_with_period(5000) {
 1940         for (j = 0; j < server.dbnum; j++) {
 1941             long long size, used, vkeys;
 1942 
 1943             size = dictSlots(server.db[j].dict);
 1944             used = dictSize(server.db[j].dict);
 1945             vkeys = dictSize(server.db[j].expires);
 1946             if (used || vkeys) {
 1947                 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
 1948                 /* dictPrintStats(server.dict); */
 1949             }
 1950         }
 1951     }
 1952 
 1953     /* Show information about connected clients */
 1954     if (!server.sentinel_mode) {
 1955         run_with_period(5000) {
 1956             serverLog(LL_DEBUG,
 1957                 "%lu clients connected (%lu replicas), %zu bytes in use",
 1958                 listLength(server.clients)-listLength(server.slaves),
 1959                 listLength(server.slaves),
 1960                 zmalloc_used_memory());
 1961         }
 1962     }
 1963 
 1964     /* We need to do a few operations on clients asynchronously. */
 1965     clientsCron();
 1966 
 1967     /* Handle background operations on Redis databases. */
 1968     databasesCron();
 1969 
 1970     /* Start a scheduled AOF rewrite if this was requested by the user while
 1971      * a BGSAVE was in progress. */
 1972     if (!hasActiveChildProcess() &&
 1973         server.aof_rewrite_scheduled)
 1974     {
 1975         rewriteAppendOnlyFileBackground();
 1976     }
 1977 
 1978     /* Check if a background saving or AOF rewrite in progress terminated. */
 1979     if (hasActiveChildProcess() || ldbPendingChildren())
 1980     {
 1981         checkChildrenDone();
 1982     } else {
 1983         /* If there is not a background saving/rewrite in progress check if
 1984          * we have to save/rewrite now. */
 1985         for (j = 0; j < server.saveparamslen; j++) {
 1986             struct saveparam *sp = server.saveparams+j;
 1987 
 1988             /* Save if we reached the given amount of changes,
 1989              * the given amount of seconds, and if the latest bgsave was
 1990              * successful or if, in case of an error, at least
 1991              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
 1992             if (server.dirty >= sp->changes &&
 1993                 server.unixtime-server.lastsave > sp->seconds &&
 1994                 (server.unixtime-server.lastbgsave_try >
 1995                  CONFIG_BGSAVE_RETRY_DELAY ||
 1996                  server.lastbgsave_status == C_OK))
 1997             {
 1998                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
 1999                     sp->changes, (int)sp->seconds);
 2000                 rdbSaveInfo rsi, *rsiptr;
 2001                 rsiptr = rdbPopulateSaveInfo(&rsi);
 2002                 rdbSaveBackground(server.rdb_filename,rsiptr);
 2003                 break;
 2004             }
 2005         }
 2006 
 2007         /* Trigger an AOF rewrite if needed. */
 2008         if (server.aof_state == AOF_ON &&
 2009             !hasActiveChildProcess() &&
 2010             server.aof_rewrite_perc &&
 2011             server.aof_current_size > server.aof_rewrite_min_size)
 2012         {
 2013             long long base = server.aof_rewrite_base_size ?
 2014                 server.aof_rewrite_base_size : 1;
 2015             long long growth = (server.aof_current_size*100/base) - 100;
 2016             if (growth >= server.aof_rewrite_perc) {
 2017                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
 2018                 rewriteAppendOnlyFileBackground();
 2019             }
 2020         }
 2021     }
 2022 
 2023 
 2024     /* AOF postponed flush: Try at every cron cycle if the slow fsync
 2025      * completed. */
 2026     if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
 2027 
 2028     /* AOF write errors: in this case we have a buffer to flush as well and
 2029      * clear the AOF error in case of success to make the DB writable again,
 2030      * however to try every second is enough in case of 'hz' is set to
 2031      * an higher frequency. */
 2032     run_with_period(1000) {
 2033         if (server.aof_last_write_status == C_ERR)
 2034             flushAppendOnlyFile(0);
 2035     }
 2036 
 2037     /* Clear the paused clients flag if needed. */
 2038     clientsArePaused(); /* Don't check return value, just use the side effect.*/
 2039 
 2040     /* Replication cron function -- used to reconnect to master,
 2041      * detect transfer failures, start background RDB transfers and so forth. */
 2042     run_with_period(1000) replicationCron();
 2043 
 2044     /* Run the Redis Cluster cron. */
 2045     run_with_period(100) {
 2046         if (server.cluster_enabled) clusterCron();
 2047     }
 2048 
 2049     /* Run the Sentinel timer if we are in sentinel mode. */
 2050     if (server.sentinel_mode) sentinelTimer();
 2051 
 2052     /* Cleanup expired MIGRATE cached sockets. */
 2053     run_with_period(1000) {
 2054         migrateCloseTimedoutSockets();
 2055     }
 2056 
 2057     /* Stop the I/O threads if we don't have enough pending work. */
 2058     stopThreadedIOIfNeeded();
 2059 
 2060     /* Resize tracking keys table if needed. This is also done at every
 2061      * command execution, but we want to be sure that if the last command
 2062      * executed changes the value via CONFIG SET, the server will perform
 2063      * the operation even if completely idle. */
 2064     if (server.tracking_clients) trackingLimitUsedSlots();
 2065 
 2066     /* Start a scheduled BGSAVE if the corresponding flag is set. This is
 2067      * useful when we are forced to postpone a BGSAVE because an AOF
 2068      * rewrite is in progress.
 2069      *
 2070      * Note: this code must be after the replicationCron() call above so
 2071      * make sure when refactoring this file to keep this order. This is useful
 2072      * because we want to give priority to RDB savings for replication. */
 2073     if (!hasActiveChildProcess() &&
 2074         server.rdb_bgsave_scheduled &&
 2075         (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
 2076          server.lastbgsave_status == C_OK))
 2077     {
 2078         rdbSaveInfo rsi, *rsiptr;
 2079         rsiptr = rdbPopulateSaveInfo(&rsi);
 2080         if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
 2081             server.rdb_bgsave_scheduled = 0;
 2082     }
 2083 
 2084     /* Fire the cron loop modules event. */
 2085     RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
 2086     moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
 2087                           0,
 2088                           &ei);
 2089 
 2090     server.cronloops++;
 2091     return 1000/server.hz;
 2092 }
 2093 
 2094 extern int ProcessingEventsWhileBlocked;
 2095 
 2096 /* This function gets called every time Redis is entering the
 2097  * main loop of the event driven library, that is, before to sleep
 2098  * for ready file descriptors.
 2099  *
 2100  * Note: This function is (currently) called from two functions:
 2101  * 1. aeMain - The main server loop
 2102  * 2. processEventsWhileBlocked - Process clients during RDB/AOF load
 2103  *
 2104  * If it was called from processEventsWhileBlocked we don't want
 2105  * to perform all actions (For example, we don't want to expire
 2106  * keys), but we do need to perform some actions.
 2107  *
 2108  * The most important is freeClientsInAsyncFreeQueue but we also
 2109  * call some other low-risk functions. */
 2110 void beforeSleep(struct aeEventLoop *eventLoop) {
 2111     UNUSED(eventLoop);
 2112 
 2113     /* Just call a subset of vital functions in case we are re-entering
 2114      * the event loop from processEventsWhileBlocked(). Note that in this
 2115      * case we keep track of the number of events we are processing, since
 2116      * processEventsWhileBlocked() wants to stop ASAP if there are no longer
 2117      * events to handle. */
 2118     if (ProcessingEventsWhileBlocked) {
 2119         uint64_t processed = 0;
 2120         processed += handleClientsWithPendingReadsUsingThreads();
 2121         processed += tlsProcessPendingData();
 2122         processed += handleClientsWithPendingWrites();
 2123         processed += freeClientsInAsyncFreeQueue();
 2124         server.events_processed_while_blocked += processed;
 2125         return;
 2126     }
 2127 
 2128     /* Handle precise timeouts of blocked clients. */
 2129     handleBlockedClientsTimeout();
 2130 
 2131     /* We should handle pending reads clients ASAP after event loop. */
 2132     handleClientsWithPendingReadsUsingThreads();
 2133 
 2134     /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
 2135     tlsProcessPendingData();
 2136 
 2137     /* If tls still has pending unread data don't sleep at all. */
 2138     aeSetDontWait(server.el, tlsHasPendingData());
 2139 
 2140     /* Call the Redis Cluster before sleep function. Note that this function
 2141      * may change the state of Redis Cluster (from ok to fail or vice versa),
 2142      * so it's a good idea to call it before serving the unblocked clients
 2143      * later in this function. */
 2144     if (server.cluster_enabled) clusterBeforeSleep();
 2145 
 2146     /* Run a fast expire cycle (the called function will return
 2147      * ASAP if a fast cycle is not needed). */
 2148     if (server.active_expire_enabled && server.masterhost == NULL)
 2149         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
 2150 
 2151     /* Unblock all the clients blocked for synchronous replication
 2152      * in WAIT. */
 2153     if (listLength(server.clients_waiting_acks))
 2154         processClientsWaitingReplicas();
 2155 
 2156     /* Check if there are clients unblocked by modules that implement
 2157      * blocking commands. */
 2158     if (moduleCount()) moduleHandleBlockedClients();
 2159 
 2160     /* Try to process pending commands for clients that were just unblocked. */
 2161     if (listLength(server.unblocked_clients))
 2162         processUnblockedClients();
 2163 
 2164     /* Send all the slaves an ACK request if at least one client blocked
 2165      * during the previous event loop iteration. Note that we do this after
 2166      * processUnblockedClients(), so if there are multiple pipelined WAITs
 2167      * and the just unblocked WAIT gets blocked again, we don't have to wait
 2168      * a server cron cycle in absence of other event loop events. See #6623. */
 2169     if (server.get_ack_from_slaves) {
 2170         robj *argv[3];
 2171 
 2172         argv[0] = createStringObject("REPLCONF",8);
 2173         argv[1] = createStringObject("GETACK",6);
 2174         argv[2] = createStringObject("*",1); /* Not used argument. */
 2175         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
 2176         decrRefCount(argv[0]);
 2177         decrRefCount(argv[1]);
 2178         decrRefCount(argv[2]);
 2179         server.get_ack_from_slaves = 0;
 2180     }
 2181 
 2182     /* Send the invalidation messages to clients participating to the
 2183      * client side caching protocol in broadcasting (BCAST) mode. */
 2184     trackingBroadcastInvalidationMessages();
 2185 
 2186     /* Write the AOF buffer on disk */
 2187     flushAppendOnlyFile(0);
 2188 
 2189     /* Handle writes with pending output buffers. */
 2190     handleClientsWithPendingWritesUsingThreads();
 2191 
 2192     /* Close clients that need to be closed asynchronous */
 2193     freeClientsInAsyncFreeQueue();
 2194 
 2195     /* Before we are going to sleep, let the threads access the dataset by
 2196      * releasing the GIL. Redis main thread will not touch anything at this
 2197      * time. */
 2198     if (moduleCount()) moduleReleaseGIL();
 2199 }
 2200 
 2201 /* This function is called immadiately after the event loop multiplexing
 2202  * API returned, and the control is going to soon return to Redis by invoking
 2203  * the different events callbacks. */
 2204 void afterSleep(struct aeEventLoop *eventLoop) {
 2205     UNUSED(eventLoop);
 2206 
 2207     if (!ProcessingEventsWhileBlocked) {
 2208         if (moduleCount()) moduleAcquireGIL();
 2209     }
 2210 }
 2211 
 2212 /* =========================== Server initialization ======================== */
 2213 
 2214 void createSharedObjects(void) {
 2215     int j;
 2216 
 2217     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
 2218     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
 2219     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
 2220     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
 2221     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
 2222     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
 2223     shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));
 2224     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
 2225     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
 2226     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
 2227     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
 2228         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
 2229     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
 2230         "-ERR no such key\r\n"));
 2231     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
 2232         "-ERR syntax error\r\n"));
 2233     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
 2234         "-ERR source and destination objects are the same\r\n"));
 2235     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
 2236         "-ERR index out of range\r\n"));
 2237     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
 2238         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
 2239     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
 2240         "-LOADING Redis is loading the dataset in memory\r\n"));
 2241     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
 2242         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
 2243     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
 2244         "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
 2245     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
 2246         "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
 2247     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
 2248         "-READONLY You can't write against a read only replica.\r\n"));
 2249     shared.noautherr = createObject(OBJ_STRING,sdsnew(
 2250         "-NOAUTH Authentication required.\r\n"));
 2251     shared.oomerr = createObject(OBJ_STRING,sdsnew(
 2252         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
 2253     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
 2254         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
 2255     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
 2256         "-NOREPLICAS Not enough good replicas to write.\r\n"));
 2257     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
 2258         "-BUSYKEY Target key name already exists.\r\n"));
 2259     shared.space = createObject(OBJ_STRING,sdsnew(" "));
 2260     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
 2261     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
 2262 
 2263     /* The shared NULL depends on the protocol version. */
 2264     shared.null[0] = NULL;
 2265     shared.null[1] = NULL;
 2266     shared.null[2] = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
 2267     shared.null[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));
 2268 
 2269     shared.nullarray[0] = NULL;
 2270     shared.nullarray[1] = NULL;
 2271     shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
 2272     shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));
 2273 
 2274     shared.emptymap[0] = NULL;
 2275     shared.emptymap[1] = NULL;
 2276     shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
 2277     shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n"));
 2278 
 2279     shared.emptyset[0] = NULL;
 2280     shared.emptyset[1] = NULL;
 2281     shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
 2282     shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n"));
 2283 
 2284     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
 2285         char dictid_str[64];
 2286         int dictid_len;
 2287 
 2288         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
 2289         shared.select[j] = createObject(OBJ_STRING,
 2290             sdscatprintf(sdsempty(),
 2291                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
 2292                 dictid_len, dictid_str));
 2293     }
 2294     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
 2295     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
 2296     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
 2297     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
 2298     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
 2299     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
 2300     shared.del = createStringObject("DEL",3);
 2301     shared.unlink = createStringObject("UNLINK",6);
 2302     shared.rpop = createStringObject("RPOP",4);
 2303     shared.lpop = createStringObject("LPOP",4);
 2304     shared.lpush = createStringObject("LPUSH",5);
 2305     shared.rpoplpush = createStringObject("RPOPLPUSH",9);
 2306     shared.zpopmin = createStringObject("ZPOPMIN",7);
 2307     shared.zpopmax = createStringObject("ZPOPMAX",7);
 2308     shared.multi = createStringObject("MULTI",5);
 2309     shared.exec = createStringObject("EXEC",4);
 2310     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
 2311         shared.integers[j] =
 2312             makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
 2313         shared.integers[j]->encoding = OBJ_ENCODING_INT;
 2314     }
 2315     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
 2316         shared.mbulkhdr[j] = createObject(OBJ_STRING,
 2317             sdscatprintf(sdsempty(),"*%d\r\n",j));
 2318         shared.bulkhdr[j] = createObject(OBJ_STRING,
 2319             sdscatprintf(sdsempty(),"$%d\r\n",j));
 2320     }
 2321     /* The following two shared objects, minstring and maxstrings, are not
 2322      * actually used for their value but as a special object meaning
 2323      * respectively the minimum possible string and the maximum possible
 2324      * string in string comparisons for the ZRANGEBYLEX command. */
 2325     shared.minstring = sdsnew("minstring");
 2326     shared.maxstring = sdsnew("maxstring");
 2327 }
 2328 
 2329 void initServerConfig(void) {
 2330     int j;
 2331 
 2332     updateCachedTime(1);
 2333     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
 2334     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
 2335     changeReplicationId();
 2336     clearReplicationId2();
 2337     server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get
 2338                                       updated later after loading the config.
 2339                                       This value may be used before the server
 2340                                       is initialized. */
 2341     server.timezone = getTimeZone(); /* Initialized by tzset(). */
 2342     server.configfile = NULL;
 2343     server.executable = NULL;
 2344     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
 2345     server.bindaddr_count = 0;
 2346     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
 2347     server.ipfd_count = 0;
 2348     server.tlsfd_count = 0;
 2349     server.sofd = -1;
 2350     server.active_expire_enabled = 1;
 2351     server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
 2352     server.saveparams = NULL;
 2353     server.loading = 0;
 2354     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
 2355     server.aof_state = AOF_OFF;
 2356     server.aof_rewrite_base_size = 0;
 2357     server.aof_rewrite_scheduled = 0;
 2358     server.aof_flush_sleep = 0;
 2359     server.aof_last_fsync = time(NULL);
 2360     server.aof_rewrite_time_last = -1;
 2361     server.aof_rewrite_time_start = -1;
 2362     server.aof_lastbgrewrite_status = C_OK;
 2363     server.aof_delayed_fsync = 0;
 2364     server.aof_fd = -1;
 2365     server.aof_selected_db = -1; /* Make sure the first time will not match */
 2366     server.aof_flush_postponed_start = 0;
 2367     server.pidfile = NULL;
 2368     server.active_defrag_running = 0;
 2369     server.notify_keyspace_events = 0;
 2370     server.blocked_clients = 0;
 2371     memset(server.blocked_clients_by_type,0,
 2372            sizeof(server.blocked_clients_by_type));
 2373     server.shutdown_asap = 0;
 2374     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
 2375     server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
 2376     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
 2377     server.next_client_id = 1; /* Client IDs, start from 1 .*/
 2378     server.loading_process_events_interval_bytes = (1024*1024*2);
 2379 
 2380     server.lruclock = getLRUClock();
 2381     resetServerSaveParams();
 2382 
 2383     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
 2384     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
 2385     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
 2386 
 2387     /* Replication related */
 2388     server.masterauth = NULL;
 2389     server.masterhost = NULL;
 2390     server.masterport = 6379;
 2391     server.master = NULL;
 2392     server.cached_master = NULL;
 2393     server.master_initial_offset = -1;
 2394     server.repl_state = REPL_STATE_NONE;
 2395     server.repl_transfer_tmpfile = NULL;
 2396     server.repl_transfer_fd = -1;
 2397     server.repl_transfer_s = NULL;
 2398     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
 2399     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
 2400     server.master_repl_offset = 0;
 2401 
 2402     /* Replication partial resync backlog */
 2403     server.repl_backlog = NULL;
 2404     server.repl_backlog_histlen = 0;
 2405     server.repl_backlog_idx = 0;
 2406     server.repl_backlog_off = 0;
 2407     server.repl_no_slaves_since = time(NULL);
 2408 
 2409     /* Client output buffer limits */
 2410     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
 2411         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
 2412 
 2413     /* Linux OOM Score config */
 2414     for (j = 0; j < CONFIG_OOM_COUNT; j++)
 2415         server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j];
 2416 
 2417     /* Double constants initialization */
 2418     R_Zero = 0.0;
 2419     R_PosInf = 1.0/R_Zero;
 2420     R_NegInf = -1.0/R_Zero;
 2421     R_Nan = R_Zero/R_Zero;
 2422 
 2423     /* Command table -- we initiialize it here as it is part of the
 2424      * initial configuration, since command names may be changed via
 2425      * redis.conf using the rename-command directive. */
 2426     server.commands = dictCreate(&commandTableDictType,NULL);
 2427     server.orig_commands = dictCreate(&commandTableDictType,NULL);
 2428     populateCommandTable();
 2429     server.delCommand = lookupCommandByCString("del");
 2430     server.multiCommand = lookupCommandByCString("multi");
 2431     server.lpushCommand = lookupCommandByCString("lpush");
 2432     server.lpopCommand = lookupCommandByCString("lpop");
 2433     server.rpopCommand = lookupCommandByCString("rpop");
 2434     server.zpopminCommand = lookupCommandByCString("zpopmin");
 2435     server.zpopmaxCommand = lookupCommandByCString("zpopmax");
 2436     server.sremCommand = lookupCommandByCString("srem");
 2437     server.execCommand = lookupCommandByCString("exec");
 2438     server.expireCommand = lookupCommandByCString("expire");
 2439     server.pexpireCommand = lookupCommandByCString("pexpire");
 2440     server.xclaimCommand = lookupCommandByCString("xclaim");
 2441     server.xgroupCommand = lookupCommandByCString("xgroup");
 2442     server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
 2443 
 2444     /* Debugging */
 2445     server.assert_failed = "<no assertion failed>";
 2446     server.assert_file = "<no file>";
 2447     server.assert_line = 0;
 2448     server.bug_report_start = 0;
 2449     server.watchdog_period = 0;
 2450 
 2451     /* By default we want scripts to be always replicated by effects
 2452      * (single commands executed by the script), and not by sending the
 2453      * script to the slave / AOF. This is the new way starting from
 2454      * Redis 5. However it is possible to revert it via redis.conf. */
 2455     server.lua_always_replicate_commands = 1;
 2456 
 2457     initConfigValues();
 2458 }
 2459 
 2460 extern char **environ;
 2461 
 2462 /* Restart the server, executing the same executable that started this
 2463  * instance, with the same arguments and configuration file.
 2464  *
 2465  * The function is designed to directly call execve() so that the new
 2466  * server instance will retain the PID of the previous one.
 2467  *
 2468  * The list of flags, that may be bitwise ORed together, alter the
 2469  * behavior of this function:
 2470  *
 2471  * RESTART_SERVER_NONE              No flags.
 2472  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
 2473  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
 2474  *
 2475  * On success the function does not return, because the process turns into
 2476  * a different process. On error C_ERR is returned. */
 2477 int restartServer(int flags, mstime_t delay) {
 2478     int j;
 2479 
 2480     /* Check if we still have accesses to the executable that started this
 2481      * server instance. */
 2482     if (access(server.executable,X_OK) == -1) {
 2483         serverLog(LL_WARNING,"Can't restart: this process has no "
 2484                              "permissions to execute %s", server.executable);
 2485         return C_ERR;
 2486     }
 2487 
 2488     /* Config rewriting. */
 2489     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
 2490         server.configfile &&
 2491         rewriteConfig(server.configfile, 0) == -1)
 2492     {
 2493         serverLog(LL_WARNING,"Can't restart: configuration rewrite process "
 2494                              "failed");
 2495         return C_ERR;
 2496     }
 2497 
 2498     /* Perform a proper shutdown. */
 2499     if (flags & RESTART_SERVER_GRACEFULLY &&
 2500         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
 2501     {
 2502         serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
 2503         return C_ERR;
 2504     }
 2505 
 2506     /* Close all file descriptors, with the exception of stdin, stdout, strerr
 2507      * which are useful if we restart a Redis server which is not daemonized. */
 2508     for (j = 3; j < (int)server.maxclients + 1024; j++) {
 2509         /* Test the descriptor validity before closing it, otherwise
 2510          * Valgrind issues a warning on close(). */
 2511         if (fcntl(j,F_GETFD) != -1) close(j);
 2512     }
 2513 
 2514     /* Execute the server with the original command line. */
 2515     if (delay) usleep(delay*1000);
 2516     zfree(server.exec_argv[0]);
 2517     server.exec_argv[0] = zstrdup(server.executable);
 2518     execve(server.executable,server.exec_argv,environ);
 2519 
 2520     /* If an error occurred here, there is nothing we can do, but exit. */
 2521     _exit(1);
 2522 
 2523     return C_ERR; /* Never reached. */
 2524 }
 2525 
 2526 static void readOOMScoreAdj(void) {
 2527 #ifdef HAVE_PROC_OOM_SCORE_ADJ
 2528     char buf[64];
 2529     int fd = open("/proc/self/oom_score_adj", O_RDONLY);
 2530 
 2531     if (fd < 0) return;
 2532     if (read(fd, buf, sizeof(buf)) > 0)
 2533         server.oom_score_adj_base = atoi(buf);
 2534     close(fd);
 2535 #endif
 2536 }
 2537 
 2538 /* This function will configure the current process's oom_score_adj according
 2539  * to user specified configuration. This is currently implemented on Linux
 2540  * only.
 2541  *
 2542  * A process_class value of -1 implies OOM_CONFIG_MASTER or OOM_CONFIG_REPLICA,
 2543  * depending on current role.
 2544  */
 2545 int setOOMScoreAdj(int process_class) {
 2546 
 2547     if (!server.oom_score_adj) return C_OK;
 2548     if (process_class == -1)
 2549         process_class = (server.masterhost ? CONFIG_OOM_REPLICA : CONFIG_OOM_MASTER);
 2550 
 2551     serverAssert(process_class >= 0 && process_class < CONFIG_OOM_COUNT);
 2552 
 2553 #ifdef HAVE_PROC_OOM_SCORE_ADJ
 2554     int fd;
 2555     int val;
 2556     char buf[64];
 2557 
 2558     val = server.oom_score_adj_base + server.oom_score_adj_values[process_class];
 2559     if (val > 1000) val = 1000;
 2560     if (val < -1000) val = -1000;
 2561 
 2562     snprintf(buf, sizeof(buf) - 1, "%d\n", val);
 2563 
 2564     fd = open("/proc/self/oom_score_adj", O_WRONLY);
 2565     if (fd < 0 || write(fd, buf, strlen(buf)) < 0) {
 2566         serverLog(LOG_WARNING, "Unable to write oom_score_adj: %s", strerror(errno));
 2567         if (fd != -1) close(fd);
 2568         return C_ERR;
 2569     }
 2570 
 2571     close(fd);
 2572     return C_OK;
 2573 #else
 2574     /* Unsupported */
 2575     return C_ERR;
 2576 #endif
 2577 }
 2578 
 2579 /* This function will try to raise the max number of open files accordingly to
 2580  * the configured max number of clients. It also reserves a number of file
 2581  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
 2582  * persistence, listening sockets, log files and so forth.
 2583  *
 2584  * If it will not be possible to set the limit accordingly to the configured
 2585  * max number of clients, the function will do the reverse setting
 2586  * server.maxclients to the value that we can actually handle. */
 2587 void adjustOpenFilesLimit(void) {
 2588     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
 2589     struct rlimit limit;
 2590 
 2591     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
 2592         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
 2593             strerror(errno));
 2594         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
 2595     } else {
 2596         rlim_t oldlimit = limit.rlim_cur;
 2597 
 2598         /* Set the max number of files if the current limit is not enough
 2599          * for our needs. */
 2600         if (oldlimit < maxfiles) {
 2601             rlim_t bestlimit;
 2602             int setrlimit_error = 0;
 2603 
 2604             /* Try to set the file limit to match 'maxfiles' or at least
 2605              * to the higher value supported less than maxfiles. */
 2606             bestlimit = maxfiles;
 2607             while(bestlimit > oldlimit) {
 2608                 rlim_t decr_step = 16;
 2609 
 2610                 limit.rlim_cur = bestlimit;
 2611                 limit.rlim_max = bestlimit;
 2612                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
 2613                 setrlimit_error = errno;
 2614 
 2615                 /* We failed to set file limit to 'bestlimit'. Try with a
 2616                  * smaller limit decrementing by a few FDs per iteration. */
 2617                 if (bestlimit < decr_step) break;
 2618                 bestlimit -= decr_step;
 2619             }
 2620 
 2621             /* Assume that the limit we get initially is still valid if
 2622              * our last try was even lower. */
 2623             if (bestlimit < oldlimit) bestlimit = oldlimit;
 2624 
 2625             if (bestlimit < maxfiles) {
 2626                 unsigned int old_maxclients = server.maxclients;
 2627                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
 2628                 /* maxclients is unsigned so may overflow: in order
 2629                  * to check if maxclients is now logically less than 1
 2630                  * we test indirectly via bestlimit. */
 2631                 if (bestlimit <= CONFIG_MIN_RESERVED_FDS) {
 2632                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
 2633                         "of %llu is not enough for the server to start. "
 2634                         "Please increase your open file limit to at least "
 2635                         "%llu. Exiting.",
 2636                         (unsigned long long) oldlimit,
 2637                         (unsigned long long) maxfiles);
 2638                     exit(1);
 2639                 }
 2640                 serverLog(LL_WARNING,"You requested maxclients of %d "
 2641                     "requiring at least %llu max file descriptors.",
 2642                     old_maxclients,
 2643                     (unsigned long long) maxfiles);
 2644                 serverLog(LL_WARNING,"Server can't set maximum open files "
 2645                     "to %llu because of OS error: %s.",
 2646                     (unsigned long long) maxfiles, strerror(setrlimit_error));
 2647                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
 2648                     "maxclients has been reduced to %d to compensate for "
 2649                     "low ulimit. "
 2650                     "If you need higher maxclients increase 'ulimit -n'.",
 2651                     (unsigned long long) bestlimit, server.maxclients);
 2652             } else {
 2653                 serverLog(LL_NOTICE,"Increased maximum number of open files "
 2654                     "to %llu (it was originally set to %llu).",
 2655                     (unsigned long long) maxfiles,
 2656                     (unsigned long long) oldlimit);
 2657             }
 2658         }
 2659     }
 2660 }
 2661 
 2662 /* Check that server.tcp_backlog can be actually enforced in Linux according
 2663  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
 2664 void checkTcpBacklogSettings(void) {
 2665 #ifdef HAVE_PROC_SOMAXCONN
 2666     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
 2667     char buf[1024];
 2668     if (!fp) return;
 2669     if (fgets(buf,sizeof(buf),fp) != NULL) {
 2670         int somaxconn = atoi(buf);
 2671         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
 2672             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
 2673         }
 2674     }
 2675     fclose(fp);
 2676 #endif
 2677 }
 2678 
 2679 /* Initialize a set of file descriptors to listen to the specified 'port'
 2680  * binding the addresses specified in the Redis server configuration.
 2681  *
 2682  * The listening file descriptors are stored in the integer array 'fds'
 2683  * and their number is set in '*count'.
 2684  *
 2685  * The addresses to bind are specified in the global server.bindaddr array
 2686  * and their number is server.bindaddr_count. If the server configuration
 2687  * contains no specific addresses to bind, this function will try to
 2688  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
 2689  *
 2690  * On success the function returns C_OK.
 2691  *
 2692  * On error the function returns C_ERR. For the function to be on
 2693  * error, at least one of the server.bindaddr addresses was
 2694  * impossible to bind, or no bind addresses were specified in the server
 2695  * configuration but the function is not able to bind * for at least
 2696  * one of the IPv4 or IPv6 protocols. */
 2697 int listenToPort(int port, int *fds, int *count) {
 2698     int j;
 2699 
 2700     /* Force binding of 0.0.0.0 if no bind address is specified, always
 2701      * entering the loop if j == 0. */
 2702     if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
 2703     for (j = 0; j < server.bindaddr_count || j == 0; j++) {
 2704         if (server.bindaddr[j] == NULL) {
 2705             int unsupported = 0;
 2706             /* Bind * for both IPv6 and IPv4, we enter here only if
 2707              * server.bindaddr_count == 0. */
 2708             fds[*count] = anetTcp6Server(server.neterr,port,NULL,
 2709                 server.tcp_backlog);
 2710             if (fds[*count] != ANET_ERR) {
 2711                 anetNonBlock(NULL,fds[*count]);
 2712                 (*count)++;
 2713             } else if (errno == EAFNOSUPPORT) {
 2714                 unsupported++;
 2715                 serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
 2716             }
 2717 
 2718             if (*count == 1 || unsupported) {
 2719                 /* Bind the IPv4 address as well. */
 2720                 fds[*count] = anetTcpServer(server.neterr,port,NULL,
 2721                     server.tcp_backlog);
 2722                 if (fds[*count] != ANET_ERR) {
 2723                     anetNonBlock(NULL,fds[*count]);
 2724                     (*count)++;
 2725                 } else if (errno == EAFNOSUPPORT) {
 2726                     unsupported++;
 2727                     serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
 2728                 }
 2729             }
 2730             /* Exit the loop if we were able to bind * on IPv4 and IPv6,
 2731              * otherwise fds[*count] will be ANET_ERR and we'll print an
 2732              * error and return to the caller with an error. */
 2733             if (*count + unsupported == 2) break;
 2734         } else if (strchr(server.bindaddr[j],':')) {
 2735             /* Bind IPv6 address. */
 2736             fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
 2737                 server.tcp_backlog);
 2738         } else {
 2739             /* Bind IPv4 address. */
 2740             fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
 2741                 server.tcp_backlog);
 2742         }
 2743         if (fds[*count] == ANET_ERR) {
 2744             serverLog(LL_WARNING,
 2745                 "Could not create server TCP listening socket %s:%d: %s",
 2746                 server.bindaddr[j] ? server.bindaddr[j] : "*",
 2747                 port, server.neterr);
 2748                 if (errno == ENOPROTOOPT     || errno == EPROTONOSUPPORT ||
 2749                     errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
 2750                     errno == EAFNOSUPPORT    || errno == EADDRNOTAVAIL)
 2751                     continue;
 2752             return C_ERR;
 2753         }
 2754         anetNonBlock(NULL,fds[*count]);
 2755         (*count)++;
 2756     }
 2757     return C_OK;
 2758 }
 2759 
 2760 /* Resets the stats that we expose via INFO or other means that we want
 2761  * to reset via CONFIG RESETSTAT. The function is also used in order to
 2762  * initialize these fields in initServer() at server startup. */
 2763 void resetServerStats(void) {
 2764     int j;
 2765 
 2766     server.stat_numcommands = 0;
 2767     server.stat_numconnections = 0;
 2768     server.stat_expiredkeys = 0;
 2769     server.stat_expired_stale_perc = 0;
 2770     server.stat_expired_time_cap_reached_count = 0;
 2771     server.stat_expire_cycle_time_used = 0;
 2772     server.stat_evictedkeys = 0;
 2773     server.stat_keyspace_misses = 0;
 2774     server.stat_keyspace_hits = 0;
 2775     server.stat_active_defrag_hits = 0;
 2776     server.stat_active_defrag_misses = 0;
 2777     server.stat_active_defrag_key_hits = 0;
 2778     server.stat_active_defrag_key_misses = 0;
 2779     server.stat_active_defrag_scanned = 0;
 2780     server.stat_fork_time = 0;
 2781     server.stat_fork_rate = 0;
 2782     server.stat_rejected_conn = 0;
 2783     server.stat_sync_full = 0;
 2784     server.stat_sync_partial_ok = 0;
 2785     server.stat_sync_partial_err = 0;
 2786     server.stat_io_reads_processed = 0;
 2787     server.stat_total_reads_processed = 0;
 2788     server.stat_io_writes_processed = 0;
 2789     server.stat_total_writes_processed = 0;
 2790     for (j = 0; j < STATS_METRIC_COUNT; j++) {
 2791         server.inst_metric[j].idx = 0;
 2792         server.inst_metric[j].last_sample_time = mstime();
 2793         server.inst_metric[j].last_sample_count = 0;
 2794         memset(server.inst_metric[j].samples,0,
 2795             sizeof(server.inst_metric[j].samples));
 2796     }
 2797     server.stat_net_input_bytes = 0;
 2798     server.stat_net_output_bytes = 0;
 2799     server.stat_unexpected_error_replies = 0;
 2800     server.aof_delayed_fsync = 0;
 2801 }
 2802 
 2803 void initServer(void) {
 2804     int j;
 2805 
 2806     signal(SIGHUP, SIG_IGN);
 2807     signal(SIGPIPE, SIG_IGN);
 2808     setupSignalHandlers();
 2809 
 2810     if (server.syslog_enabled) {
 2811         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
 2812             server.syslog_facility);
 2813     }
 2814 
 2815     /* Initialization after setting defaults from the config system. */
 2816     server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
 2817     server.hz = server.config_hz;
 2818     server.pid = getpid();
 2819     server.current_client = NULL;
 2820     server.fixed_time_expire = 0;
 2821     server.clients = listCreate();
 2822     server.clients_index = raxNew();
 2823     server.clients_to_close = listCreate();
 2824     server.slaves = listCreate();
 2825     server.monitors = listCreate();
 2826     server.clients_pending_write = listCreate();
 2827     server.clients_pending_read = listCreate();
 2828     server.clients_timeout_table = raxNew();
 2829     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
 2830     server.unblocked_clients = listCreate();
 2831     server.ready_keys = listCreate();
 2832     server.clients_waiting_acks = listCreate();
 2833     server.get_ack_from_slaves = 0;
 2834     server.clients_paused = 0;
 2835     server.events_processed_while_blocked = 0;
 2836     server.system_memory_size = zmalloc_get_memory_size();
 2837 
 2838     if ((server.tls_port || server.tls_replication || server.tls_cluster)
 2839                 && tlsConfigure(&server.tls_ctx_config) == C_ERR) {
 2840         serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
 2841         exit(1);
 2842     }
 2843 
 2844     createSharedObjects();
 2845     adjustOpenFilesLimit();
 2846     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
 2847     if (server.el == NULL) {
 2848         serverLog(LL_WARNING,
 2849             "Failed creating the event loop. Error message: '%s'",
 2850             strerror(errno));
 2851         exit(1);
 2852     }
 2853     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
 2854 
 2855     /* Open the TCP listening socket for the user commands. */
 2856     if (server.port != 0 &&
 2857         listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
 2858         exit(1);
 2859     if (server.tls_port != 0 &&
 2860         listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
 2861         exit(1);
 2862 
 2863     /* Open the listening Unix domain socket. */
 2864     if (server.unixsocket != NULL) {
 2865         unlink(server.unixsocket); /* don't care if this fails */
 2866         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
 2867             server.unixsocketperm, server.tcp_backlog);
 2868         if (server.sofd == ANET_ERR) {
 2869             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
 2870             exit(1);
 2871         }
 2872         anetNonBlock(NULL,server.sofd);
 2873     }
 2874 
 2875     /* Abort if there are no listening sockets at all. */
 2876     if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) {
 2877         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
 2878         exit(1);
 2879     }
 2880 
 2881     /* Create the Redis databases, and initialize other internal state. */
 2882     for (j = 0; j < server.dbnum; j++) {
 2883         server.db[j].dict = dictCreate(&dbDictType,NULL);
 2884         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
 2885         server.db[j].expires_cursor = 0;
 2886         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
 2887         server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
 2888         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
 2889         server.db[j].id = j;
 2890         server.db[j].avg_ttl = 0;
 2891         server.db[j].defrag_later = listCreate();
 2892         listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
 2893     }
 2894     evictionPoolAlloc(); /* Initialize the LRU keys pool. */
 2895     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
 2896     server.pubsub_patterns = listCreate();
 2897     server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
 2898     listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
 2899     listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
 2900     server.cronloops = 0;
 2901     server.rdb_child_pid = -1;
 2902     server.aof_child_pid = -1;
 2903     server.module_child_pid = -1;
 2904     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
 2905     server.rdb_pipe_conns = NULL;
 2906     server.rdb_pipe_numconns = 0;
 2907     server.rdb_pipe_numconns_writing = 0;
 2908     server.rdb_pipe_buff = NULL;
 2909     server.rdb_pipe_bufflen = 0;
 2910     server.rdb_bgsave_scheduled = 0;
 2911     server.child_info_pipe[0] = -1;
 2912     server.child_info_pipe[1] = -1;
 2913     server.child_info_data.magic = 0;
 2914     aofRewriteBufferReset();
 2915     server.aof_buf = sdsempty();
 2916     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
 2917     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
 2918     server.rdb_save_time_last = -1;
 2919     server.rdb_save_time_start = -1;
 2920     server.dirty = 0;
 2921     resetServerStats();
 2922     /* A few stats we don't want to reset: server startup time, and peak mem. */
 2923     server.stat_starttime = time(NULL);
 2924     server.stat_peak_memory = 0;
 2925     server.stat_rdb_cow_bytes = 0;
 2926     server.stat_aof_cow_bytes = 0;
 2927     server.stat_module_cow_bytes = 0;
 2928     for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
 2929         server.stat_clients_type_memory[j] = 0;
 2930     server.cron_malloc_stats.zmalloc_used = 0;
 2931     server.cron_malloc_stats.process_rss = 0;
 2932     server.cron_malloc_stats.allocator_allocated = 0;
 2933     server.cron_malloc_stats.allocator_active = 0;
 2934     server.cron_malloc_stats.allocator_resident = 0;
 2935     server.lastbgsave_status = C_OK;
 2936     server.aof_last_write_status = C_OK;
 2937     server.aof_last_write_errno = 0;
 2938     server.repl_good_slaves_count = 0;
 2939 
 2940     /* Create the timer callback, this is our way to process many background
 2941      * operations incrementally, like clients timeout, eviction of unaccessed
 2942      * expired keys and so forth. */
 2943     if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
 2944         serverPanic("Can't create event loop timers.");
 2945         exit(1);
 2946     }
 2947 
 2948     /* Create an event handler for accepting new connections in TCP and Unix
 2949      * domain sockets. */
 2950     for (j = 0; j < server.ipfd_count; j++) {
 2951         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
 2952             acceptTcpHandler,NULL) == AE_ERR)
 2953             {
 2954                 serverPanic(
 2955                     "Unrecoverable error creating server.ipfd file event.");
 2956             }
 2957     }
 2958     for (j = 0; j < server.tlsfd_count; j++) {
 2959         if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE,
 2960             acceptTLSHandler,NULL) == AE_ERR)
 2961             {
 2962                 serverPanic(
 2963                     "Unrecoverable error creating server.tlsfd file event.");
 2964             }
 2965     }
 2966     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
 2967         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
 2968 
 2969 
 2970     /* Register a readable event for the pipe used to awake the event loop
 2971      * when a blocked client in a module needs attention. */
 2972     if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
 2973         moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
 2974             serverPanic(
 2975                 "Error registering the readable event for the module "
 2976                 "blocked clients subsystem.");
 2977     }
 2978 
 2979     /* Register before and after sleep handlers (note this needs to be done
 2980      * before loading persistence since it is used by processEventsWhileBlocked. */
 2981     aeSetBeforeSleepProc(server.el,beforeSleep);
 2982     aeSetAfterSleepProc(server.el,afterSleep);
 2983 
 2984     /* Open the AOF file if needed. */
 2985     if (server.aof_state == AOF_ON) {
 2986         server.aof_fd = open(server.aof_filename,
 2987                                O_WRONLY|O_APPEND|O_CREAT,0644);
 2988         if (server.aof_fd == -1) {
 2989             serverLog(LL_WARNING, "Can't open the append-only file: %s",
 2990                 strerror(errno));
 2991             exit(1);
 2992         }
 2993     }
 2994 
 2995     /* 32 bit instances are limited to 4GB of address space, so if there is
 2996      * no explicit limit in the user provided configuration we set a limit
 2997      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
 2998      * useless crashes of the Redis instance for out of memory. */
 2999     if (server.arch_bits == 32 && server.maxmemory == 0) {
 3000         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
 3001         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
 3002         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
 3003     }
 3004 
 3005     if (server.cluster_enabled) clusterInit();
 3006     replicationScriptCacheInit();
 3007     scriptingInit(1);
 3008     slowlogInit();
 3009     latencyMonitorInit();
 3010 }
 3011 
 3012 /* Some steps in server initialization need to be done last (after modules
 3013  * are loaded).
 3014  * Specifically, creation of threads due to a race bug in ld.so, in which
 3015  * Thread Local Storage initialization collides with dlopen call.
 3016  * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
 3017 void InitServerLast() {
 3018     bioInit();
 3019     initThreadedIO();
 3020     set_jemalloc_bg_thread(server.jemalloc_bg_thread);
 3021     server.initial_memory_usage = zmalloc_used_memory();
 3022 }
 3023 
 3024 /* Parse the flags string description 'strflags' and set them to the
 3025  * command 'c'. If the flags are all valid C_OK is returned, otherwise
 3026  * C_ERR is returned (yet the recognized flags are set in the command). */
 3027 int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) {
 3028     int argc;
 3029     sds *argv;
 3030 
 3031     /* Split the line into arguments for processing. */
 3032     argv = sdssplitargs(strflags,&argc);
 3033     if (argv == NULL) return C_ERR;
 3034 
 3035     for (int j = 0; j < argc; j++) {
 3036         char *flag = argv[j];
 3037         if (!strcasecmp(flag,"write")) {
 3038             c->flags |= CMD_WRITE|CMD_CATEGORY_WRITE;
 3039         } else if (!strcasecmp(flag,"read-only")) {
 3040             c->flags |= CMD_READONLY|CMD_CATEGORY_READ;
 3041         } else if (!strcasecmp(flag,"use-memory")) {
 3042             c->flags |= CMD_DENYOOM;
 3043         } else if (!strcasecmp(flag,"admin")) {
 3044             c->flags |= CMD_ADMIN|CMD_CATEGORY_ADMIN|CMD_CATEGORY_DANGEROUS;
 3045         } else if (!strcasecmp(flag,"pub-sub")) {
 3046             c->flags |= CMD_PUBSUB|CMD_CATEGORY_PUBSUB;
 3047         } else if (!strcasecmp(flag,"no-script")) {
 3048             c->flags |= CMD_NOSCRIPT;
 3049         } else if (!strcasecmp(flag,"random")) {
 3050             c->flags |= CMD_RANDOM;
 3051         } else if (!strcasecmp(flag,"to-sort")) {
 3052             c->flags |= CMD_SORT_FOR_SCRIPT;
 3053         } else if (!strcasecmp(flag,"ok-loading")) {
 3054             c->flags |= CMD_LOADING;
 3055         } else if (!strcasecmp(flag,"ok-stale")) {
 3056             c->flags |= CMD_STALE;
 3057         } else if (!strcasecmp(flag,"no-monitor")) {
 3058             c->flags |= CMD_SKIP_MONITOR;
 3059         } else if (!strcasecmp(flag,"no-slowlog")) {
 3060             c->flags |= CMD_SKIP_SLOWLOG;
 3061         } else if (!strcasecmp(flag,"cluster-asking")) {
 3062             c->flags |= CMD_ASKING;
 3063         } else if (!strcasecmp(flag,"fast")) {
 3064             c->flags |= CMD_FAST | CMD_CATEGORY_FAST;
 3065         } else if (!strcasecmp(flag,"no-auth")) {
 3066             c->flags |= CMD_NO_AUTH;
 3067         } else {
 3068             /* Parse ACL categories here if the flag name starts with @. */
 3069             uint64_t catflag;
 3070             if (flag[0] == '@' &&
 3071                 (catflag = ACLGetCommandCategoryFlagByName(flag+1)) != 0)
 3072             {
 3073                 c->flags |= catflag;
 3074             } else {
 3075                 sdsfreesplitres(argv,argc);
 3076                 return C_ERR;
 3077             }
 3078         }
 3079     }
 3080     /* If it's not @fast is @slow in this binary world. */
 3081     if (!(c->flags & CMD_CATEGORY_FAST)) c->flags |= CMD_CATEGORY_SLOW;
 3082 
 3083     sdsfreesplitres(argv,argc);
 3084     return C_OK;
 3085 }
 3086 
 3087 /* Populates the Redis Command Table starting from the hard coded list
 3088  * we have on top of redis.c file. */
 3089 void populateCommandTable(void) {
 3090     int j;
 3091     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
 3092 
 3093     for (j = 0; j < numcommands; j++) {
 3094         struct redisCommand *c = redisCommandTable+j;
 3095         int retval1, retval2;
 3096 
 3097         /* Translate the command string flags description into an actual
 3098          * set of flags. */
 3099         if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
 3100             serverPanic("Unsupported command flag");
 3101 
 3102         c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
 3103         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
 3104         /* Populate an additional dictionary that will be unaffected
 3105          * by rename-command statements in redis.conf. */
 3106         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
 3107         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
 3108     }
 3109 }
 3110 
 3111 void resetCommandTableStats(void) {
 3112     struct redisCommand *c;
 3113     dictEntry *de;
 3114     dictIterator *di;
 3115 
 3116     di = dictGetSafeIterator(server.commands);
 3117     while((de = dictNext(di)) != NULL) {
 3118         c = (struct redisCommand *) dictGetVal(de);
 3119         c->microseconds = 0;
 3120         c->calls = 0;
 3121     }
 3122     dictReleaseIterator(di);
 3123 
 3124 }
 3125 
 3126 /* ========================== Redis OP Array API ============================ */
 3127 
 3128 void redisOpArrayInit(redisOpArray *oa) {
 3129     oa->ops = NULL;
 3130     oa->numops = 0;
 3131 }
 3132 
 3133 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
 3134                        robj **argv, int argc, int target)
 3135 {
 3136     redisOp *op;
 3137 
 3138     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
 3139     op = oa->ops+oa->numops;
 3140     op->cmd = cmd;
 3141     op->dbid = dbid;
 3142     op->argv = argv;
 3143     op->argc = argc;
 3144     op->target = target;
 3145     oa->numops++;
 3146     return oa->numops;
 3147 }
 3148 
 3149 void redisOpArrayFree(redisOpArray *oa) {
 3150     while(oa->numops) {
 3151         int j;
 3152         redisOp *op;
 3153 
 3154         oa->numops--;
 3155         op = oa->ops+oa->numops;
 3156         for (j = 0; j < op->argc; j++)
 3157             decrRefCount(op->argv[j]);
 3158         zfree(op->argv);
 3159     }
 3160     zfree(oa->ops);
 3161 }
 3162 
 3163 /* ====================== Commands lookup and execution ===================== */
 3164 
 3165 struct redisCommand *lookupCommand(sds name) {
 3166     return dictFetchValue(server.commands, name);
 3167 }
 3168 
 3169 struct redisCommand *lookupCommandByCString(char *s) {
 3170     struct redisCommand *cmd;
 3171     sds name = sdsnew(s);
 3172 
 3173     cmd = dictFetchValue(server.commands, name);
 3174     sdsfree(name);
 3175     return cmd;
 3176 }
 3177 
 3178 /* Lookup the command in the current table, if not found also check in
 3179  * the original table containing the original command names unaffected by
 3180  * redis.conf rename-command statement.
 3181  *
 3182  * This is used by functions rewriting the argument vector such as
 3183  * rewriteClientCommandVector() in order to set client->cmd pointer
 3184  * correctly even if the command was renamed. */
 3185 struct redisCommand *lookupCommandOrOriginal(sds name) {
 3186     struct redisCommand *cmd = dictFetchValue(server.commands, name);
 3187 
 3188     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
 3189     return cmd;
 3190 }
 3191 
 3192 /* Propagate the specified command (in the context of the specified database id)
 3193  * to AOF and Slaves.
 3194  *
 3195  * flags are an xor between:
 3196  * + PROPAGATE_NONE (no propagation of command at all)
 3197  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
 3198  * + PROPAGATE_REPL (propagate into the replication link)
 3199  *
 3200  * This should not be used inside commands implementation since it will not
 3201  * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(),
 3202  * preventCommandPropagation(), forceCommandPropagation().
 3203  *
 3204  * However for functions that need to (also) propagate out of the context of a
 3205  * command execution, for example when serving a blocked client, you
 3206  * want to use propagate().
 3207  */
 3208 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 3209                int flags)
 3210 {
 3211     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
 3212         feedAppendOnlyFile(cmd,dbid,argv,argc);
 3213     if (flags & PROPAGATE_REPL)
 3214         replicationFeedSlaves(server.slaves,dbid,argv,argc);
 3215 }
 3216 
 3217 /* Used inside commands to schedule the propagation of additional commands
 3218  * after the current command is propagated to AOF / Replication.
 3219  *
 3220  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
 3221  * database ID the command should be propagated into.
 3222  * Arguments of the command to propagte are passed as an array of redis
 3223  * objects pointers of len 'argc', using the 'argv' vector.
 3224  *
 3225  * The function does not take a reference to the passed 'argv' vector,
 3226  * so it is up to the caller to release the passed argv (but it is usually
 3227  * stack allocated).  The function autoamtically increments ref count of
 3228  * passed objects, so the caller does not need to. */
 3229 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 3230                    int target)
 3231 {
 3232     robj **argvcopy;
 3233     int j;
 3234 
 3235     if (server.loading) return; /* No propagation during loading. */
 3236 
 3237     argvcopy = zmalloc(sizeof(robj*)*argc);
 3238     for (j = 0; j < argc; j++) {
 3239         argvcopy[j] = argv[j];
 3240         incrRefCount(argv[j]);
 3241     }
 3242     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
 3243 }
 3244 
 3245 /* It is possible to call the function forceCommandPropagation() inside a
 3246  * Redis command implementation in order to to force the propagation of a
 3247  * specific command execution into AOF / Replication. */
 3248 void forceCommandPropagation(client *c, int flags) {
 3249     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
 3250     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
 3251 }
 3252 
 3253 /* Avoid that the executed command is propagated at all. This way we
 3254  * are free to just propagate what we want using the alsoPropagate()
 3255  * API. */
 3256 void preventCommandPropagation(client *c) {
 3257     c->flags |= CLIENT_PREVENT_PROP;
 3258 }
 3259 
 3260 /* AOF specific version of preventCommandPropagation(). */
 3261 void preventCommandAOF(client *c) {
 3262     c->flags |= CLIENT_PREVENT_AOF_PROP;
 3263 }
 3264 
 3265 /* Replication specific version of preventCommandPropagation(). */
 3266 void preventCommandReplication(client *c) {
 3267     c->flags |= CLIENT_PREVENT_REPL_PROP;
 3268 }
 3269 
 3270 /* Call() is the core of Redis execution of a command.
 3271  *
 3272  * The following flags can be passed:
 3273  * CMD_CALL_NONE        No flags.
 3274  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
 3275  * CMD_CALL_STATS       Populate command stats.
 3276  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
 3277  *                          or if the client flags are forcing propagation.
 3278  * CMD_CALL_PROPAGATE_REPL  Send command to slaves if it modified the dataset
 3279  *                          or if the client flags are forcing propagation.
 3280  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
 3281  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
 3282  *
 3283  * The exact propagation behavior depends on the client flags.
 3284  * Specifically:
 3285  *
 3286  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
 3287  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
 3288  *    in the call flags, then the command is propagated even if the
 3289  *    dataset was not affected by the command.
 3290  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
 3291  *    are set, the propagation into AOF or to slaves is not performed even
 3292  *    if the command modified the dataset.
 3293  *
 3294  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
 3295  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
 3296  * slaves propagation will never occur.
 3297  *
 3298  * Client flags are modified by the implementation of a given command
 3299  * using the following API:
 3300  *
 3301  * forceCommandPropagation(client *c, int flags);
 3302  * preventCommandPropagation(client *c);
 3303  * preventCommandAOF(client *c);
 3304  * preventCommandReplication(client *c);
 3305  *
 3306  */
 3307 void call(client *c, int flags) {
 3308     long long dirty;
 3309     ustime_t start, duration;
 3310     int client_old_flags = c->flags;
 3311     struct redisCommand *real_cmd = c->cmd;
 3312 
 3313     server.fixed_time_expire++;
 3314 
 3315     /* Send the command to clients in MONITOR mode if applicable.
 3316      * Administrative commands are considered too dangerous to be shown. */
 3317     if (listLength(server.monitors) &&
 3318         !server.loading &&
 3319         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
 3320     {
 3321         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
 3322     }
 3323 
 3324     /* Initialization: clear the flags that must be set by the command on
 3325      * demand, and initialize the array for additional commands propagation. */
 3326     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 3327     redisOpArray prev_also_propagate = server.also_propagate;
 3328     redisOpArrayInit(&server.also_propagate);
 3329 
 3330     /* Call the command. */
 3331     dirty = server.dirty;
 3332     updateCachedTime(0);
 3333     start = server.ustime;
 3334     c->cmd->proc(c);
 3335     duration = ustime()-start;
 3336     dirty = server.dirty-dirty;
 3337     if (dirty < 0) dirty = 0;
 3338 
 3339     /* When EVAL is called loading the AOF we don't want commands called
 3340      * from Lua to go into the slowlog or to populate statistics. */
 3341     if (server.loading && c->flags & CLIENT_LUA)
 3342         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
 3343 
 3344     /* If the caller is Lua, we want to force the EVAL caller to propagate
 3345      * the script if the command flag or client flag are forcing the
 3346      * propagation. */
 3347     if (c->flags & CLIENT_LUA && server.lua_caller) {
 3348         if (c->flags & CLIENT_FORCE_REPL)
 3349             server.lua_caller->flags |= CLIENT_FORCE_REPL;
 3350         if (c->flags & CLIENT_FORCE_AOF)
 3351             server.lua_caller->flags |= CLIENT_FORCE_AOF;
 3352     }
 3353 
 3354     /* Log the command into the Slow log if needed, and populate the
 3355      * per-command statistics that we show in INFO commandstats. */
 3356     if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
 3357         char *latency_event = (c->cmd->flags & CMD_FAST) ?
 3358                               "fast-command" : "command";
 3359         latencyAddSampleIfNeeded(latency_event,duration/1000);
 3360         slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
 3361     }
 3362 
 3363     if (flags & CMD_CALL_STATS) {
 3364         /* use the real command that was executed (cmd and lastamc) may be
 3365          * different, in case of MULTI-EXEC or re-written commands such as
 3366          * EXPIRE, GEOADD, etc. */
 3367         real_cmd->microseconds += duration;
 3368         real_cmd->calls++;
 3369     }
 3370 
 3371     /* Propagate the command into the AOF and replication link */
 3372     if (flags & CMD_CALL_PROPAGATE &&
 3373         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
 3374     {
 3375         int propagate_flags = PROPAGATE_NONE;
 3376 
 3377         /* Check if the command operated changes in the data set. If so
 3378          * set for replication / AOF propagation. */
 3379         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
 3380 
 3381         /* If the client forced AOF / replication of the command, set
 3382          * the flags regardless of the command effects on the data set. */
 3383         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
 3384         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
 3385 
 3386         /* However prevent AOF / replication propagation if the command
 3387          * implementations called preventCommandPropagation() or similar,
 3388          * or if we don't have the call() flags to do so. */
 3389         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
 3390             !(flags & CMD_CALL_PROPAGATE_REPL))
 3391                 propagate_flags &= ~PROPAGATE_REPL;
 3392         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
 3393             !(flags & CMD_CALL_PROPAGATE_AOF))
 3394                 propagate_flags &= ~PROPAGATE_AOF;
 3395 
 3396         /* Call propagate() only if at least one of AOF / replication
 3397          * propagation is needed. Note that modules commands handle replication
 3398          * in an explicit way, so we never replicate them automatically. */
 3399         if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
 3400             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
 3401     }
 3402 
 3403     /* Restore the old replication flags, since call() can be executed
 3404      * recursively. */
 3405     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 3406     c->flags |= client_old_flags &
 3407         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 3408 
 3409     /* Handle the alsoPropagate() API to handle commands that want to propagate
 3410      * multiple separated commands. Note that alsoPropagate() is not affected
 3411      * by CLIENT_PREVENT_PROP flag. */
 3412     if (server.also_propagate.numops) {
 3413         int j;
 3414         redisOp *rop;
 3415 
 3416         if (flags & CMD_CALL_PROPAGATE) {
 3417             int multi_emitted = 0;
 3418             /* Wrap the commands in server.also_propagate array,
 3419              * but don't wrap it if we are already in MULTI context,
 3420              * in case the nested MULTI/EXEC.
 3421              *
 3422              * And if the array contains only one command, no need to
 3423              * wrap it, since the single command is atomic. */
 3424             if (server.also_propagate.numops > 1 &&
 3425                 !(c->cmd->flags & CMD_MODULE) &&
 3426                 !(c->flags & CLIENT_MULTI) &&
 3427                 !(flags & CMD_CALL_NOWRAP))
 3428             {
 3429                 execCommandPropagateMulti(c);
 3430                 multi_emitted = 1;
 3431             }
 3432 
 3433             for (j = 0; j < server.also_propagate.numops; j++) {
 3434                 rop = &server.also_propagate.ops[j];
 3435                 int target = rop->target;
 3436                 /* Whatever the command wish is, we honor the call() flags. */
 3437                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
 3438                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
 3439                 if (target)
 3440                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
 3441             }
 3442 
 3443             if (multi_emitted) {
 3444                 execCommandPropagateExec(c);
 3445             }
 3446         }
 3447         redisOpArrayFree(&server.also_propagate);
 3448     }
 3449     server.also_propagate = prev_also_propagate;
 3450 
 3451     /* If the client has keys tracking enabled for client side caching,
 3452      * make sure to remember the keys it fetched via this command. */
 3453     if (c->cmd->flags & CMD_READONLY) {
 3454         client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
 3455                             server.lua_caller : c;
 3456         if (caller->flags & CLIENT_TRACKING &&
 3457             !(caller->flags & CLIENT_TRACKING_BCAST))
 3458         {
 3459             trackingRememberKeys(caller);
 3460         }
 3461     }
 3462 
 3463     server.fixed_time_expire--;
 3464     server.stat_numcommands++;
 3465 }
 3466 
 3467 /* Used when a command that is ready for execution needs to be rejected, due to
 3468  * varios pre-execution checks. it returns the appropriate error to the client.
 3469  * If there's a transaction is flags it as dirty, and if the command is EXEC,
 3470  * it aborts the transaction.
 3471  * Note: 'reply' is expected to end with \r\n */
 3472 void rejectCommand(client *c, robj *reply) {
 3473     flagTransaction(c);
 3474     if (c->cmd && c->cmd->proc == execCommand) {
 3475         execCommandAbort(c, reply->ptr);
 3476     } else {
 3477         /* using addReplyError* rather than addReply so that the error can be logged. */
 3478         addReplyErrorObject(c, reply);
 3479     }
 3480 }
 3481 
 3482 void rejectCommandFormat(client *c, const char *fmt, ...) {
 3483     flagTransaction(c);
 3484     va_list ap;
 3485     va_start(ap,fmt);
 3486     sds s = sdscatvprintf(sdsempty(),fmt,ap);
 3487     va_end(ap);
 3488     /* Make sure there are no newlines in the string, otherwise invalid protocol
 3489      * is emitted (The args come from the user, they may contain any character). */
 3490     sdsmapchars(s, "\r\n", "  ",  2);
 3491     if (c->cmd && c->cmd->proc == execCommand) {
 3492         execCommandAbort(c, s);
 3493     } else {
 3494         addReplyErrorSds(c, s);
 3495     }
 3496     sdsfree(s);
 3497 }
 3498 
 3499 /* If this function gets called we already read a whole
 3500  * command, arguments are in the client argv/argc fields.
 3501  * processCommand() execute the command or prepare the
 3502  * server for a bulk read from the client.
 3503  *
 3504  * If C_OK is returned the client is still alive and valid and
 3505  * other operations can be performed by the caller. Otherwise
 3506  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
 3507 int processCommand(client *c) {
 3508     moduleCallCommandFilters(c);
 3509 
 3510     /* The QUIT command is handled separately. Normal command procs will
 3511      * go through checking for replication and QUIT will cause trouble
 3512      * when FORCE_REPLICATION is enabled and would be implemented in
 3513      * a regular command proc. */
 3514     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
 3515         addReply(c,shared.ok);
 3516         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
 3517         return C_ERR;
 3518     }
 3519 
 3520     /* Now lookup the command and check ASAP about trivial error conditions
 3521      * such as wrong arity, bad command name and so forth. */
 3522     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
 3523     if (!c->cmd) {
 3524         sds args = sdsempty();
 3525         int i;
 3526         for (i=1; i < c->argc && sdslen(args) < 128; i++)
 3527             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
 3528         rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
 3529             (char*)c->argv[0]->ptr, args);
 3530         sdsfree(args);
 3531         return C_OK;
 3532     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
 3533                (c->argc < -c->cmd->arity)) {
 3534         rejectCommandFormat(c,"wrong number of arguments for '%s' command",
 3535             c->cmd->name);
 3536         return C_OK;
 3537     }
 3538 
 3539     int is_write_command = (c->cmd->flags & CMD_WRITE) ||
 3540                            (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
 3541     int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
 3542                              (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
 3543     int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
 3544                                (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
 3545     int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
 3546                                  (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
 3547 
 3548     /* Check if the user is authenticated. This check is skipped in case
 3549      * the default user is flagged as "nopass" and is active. */
 3550     int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
 3551                           (DefaultUser->flags & USER_FLAG_DISABLED)) &&
 3552                         !c->authenticated;
 3553     if (auth_required) {
 3554         /* AUTH and HELLO and no auth modules are valid even in
 3555          * non-authenticated state. */
 3556         if (!(c->cmd->flags & CMD_NO_AUTH)) {
 3557             rejectCommand(c,shared.noautherr);
 3558             return C_OK;
 3559         }
 3560     }
 3561 
 3562     /* Check if the user can run this command according to the current
 3563      * ACLs. */
 3564     int acl_keypos;
 3565     int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
 3566     if (acl_retval != ACL_OK) {
 3567         addACLLogEntry(c,acl_retval,acl_keypos,NULL);
 3568         if (acl_retval == ACL_DENIED_CMD)
 3569             rejectCommandFormat(c,
 3570                 "-NOPERM this user has no permissions to run "
 3571                 "the '%s' command or its subcommand", c->cmd->name);
 3572         else
 3573             rejectCommandFormat(c,
 3574                 "-NOPERM this user has no permissions to access "
 3575                 "one of the keys used as arguments");
 3576         return C_OK;
 3577     }
 3578 
 3579     /* If cluster is enabled perform the cluster redirection here.
 3580      * However we don't perform the redirection if:
 3581      * 1) The sender of this command is our master.
 3582      * 2) The command has no key arguments. */
 3583     if (server.cluster_enabled &&
 3584         !(c->flags & CLIENT_MASTER) &&
 3585         !(c->flags & CLIENT_LUA &&
 3586           server.lua_caller->flags & CLIENT_MASTER) &&
 3587         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
 3588           c->cmd->proc != execCommand))
 3589     {
 3590         int hashslot;
 3591         int error_code;
 3592         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
 3593                                         &hashslot,&error_code);
 3594         if (n == NULL || n != server.cluster->myself) {
 3595             if (c->cmd->proc == execCommand) {
 3596                 discardTransaction(c);
 3597             } else {
 3598                 flagTransaction(c);
 3599             }
 3600             clusterRedirectClient(c,n,hashslot,error_code);
 3601             return C_OK;
 3602         }
 3603     }
 3604 
 3605     /* Handle the maxmemory directive.
 3606      *
 3607      * Note that we do not want to reclaim memory if we are here re-entering
 3608      * the event loop since there is a busy Lua script running in timeout
 3609      * condition, to avoid mixing the propagation of scripts with the
 3610      * propagation of DELs due to eviction. */
 3611     if (server.maxmemory && !server.lua_timedout) {
 3612         int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
 3613         /* freeMemoryIfNeeded may flush slave output buffers. This may result
 3614          * into a slave, that may be the active client, to be freed. */
 3615         if (server.current_client == NULL) return C_ERR;
 3616 
 3617         int reject_cmd_on_oom = is_denyoom_command;
 3618         /* If client is in MULTI/EXEC context, queuing may consume an unlimited
 3619          * amount of memory, so we want to stop that.
 3620          * However, we never want to reject DISCARD, or even EXEC (unless it
 3621          * contains denied commands, in which case is_denyoom_command is already
 3622          * set. */
 3623         if (c->flags & CLIENT_MULTI &&
 3624             c->cmd->proc != execCommand &&
 3625             c->cmd->proc != discardCommand) {
 3626             reject_cmd_on_oom = 1;
 3627         }
 3628 
 3629         if (out_of_memory && reject_cmd_on_oom) {
 3630             rejectCommand(c, shared.oomerr);
 3631             return C_OK;
 3632         }
 3633 
 3634         /* Save out_of_memory result at script start, otherwise if we check OOM
 3635          * untill first write within script, memory used by lua stack and
 3636          * arguments might interfere. */
 3637         if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
 3638             server.lua_oom = out_of_memory;
 3639         }
 3640     }
 3641 
 3642     /* Make sure to use a reasonable amount of memory for client side
 3643      * caching metadata. */
 3644     if (server.tracking_clients) trackingLimitUsedSlots();
 3645 
 3646     /* Don't accept write commands if there are problems persisting on disk
 3647      * and if this is a master instance. */
 3648     int deny_write_type = writeCommandsDeniedByDiskError();
 3649     if (deny_write_type != DISK_ERROR_TYPE_NONE &&
 3650         server.masterhost == NULL &&
 3651         (is_write_command ||c->cmd->proc == pingCommand))
 3652     {
 3653         if (deny_write_type == DISK_ERROR_TYPE_RDB)
 3654             rejectCommand(c, shared.bgsaveerr);
 3655         else
 3656             rejectCommandFormat(c,
 3657                 "-MISCONF Errors writing to the AOF file: %s",
 3658                 strerror(server.aof_last_write_errno));
 3659         return C_OK;
 3660     }
 3661 
 3662     /* Don't accept write commands if there are not enough good slaves and
 3663      * user configured the min-slaves-to-write option. */
 3664     if (server.masterhost == NULL &&
 3665         server.repl_min_slaves_to_write &&
 3666         server.repl_min_slaves_max_lag &&
 3667         is_write_command &&
 3668         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
 3669     {
 3670         rejectCommand(c, shared.noreplicaserr);
 3671         return C_OK;
 3672     }
 3673 
 3674     /* Don't accept write commands if this is a read only slave. But
 3675      * accept write commands if this is our master. */
 3676     if (server.masterhost && server.repl_slave_ro &&
 3677         !(c->flags & CLIENT_MASTER) &&
 3678         is_write_command)
 3679     {
 3680         rejectCommand(c, shared.roslaveerr);
 3681         return C_OK;
 3682     }
 3683 
 3684     /* Only allow a subset of commands in the context of Pub/Sub if the
 3685      * connection is in RESP2 mode. With RESP3 there are no limits. */
 3686     if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
 3687         c->cmd->proc != pingCommand &&
 3688         c->cmd->proc != subscribeCommand &&
 3689         c->cmd->proc != unsubscribeCommand &&
 3690         c->cmd->proc != psubscribeCommand &&
 3691         c->cmd->proc != punsubscribeCommand) {
 3692         rejectCommandFormat(c,
 3693             "Can't execute '%s': only (P)SUBSCRIBE / "
 3694             "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
 3695             c->cmd->name);
 3696         return C_OK;
 3697     }
 3698 
 3699     /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
 3700      * when slave-serve-stale-data is no and we are a slave with a broken
 3701      * link with master. */
 3702     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
 3703         server.repl_serve_stale_data == 0 &&
 3704         is_denystale_command)
 3705     {
 3706         rejectCommand(c, shared.masterdownerr);
 3707         return C_OK;
 3708     }
 3709 
 3710     /* Loading DB? Return an error if the command has not the
 3711      * CMD_LOADING flag. */
 3712     if (server.loading && is_denyloading_command) {
 3713         rejectCommand(c, shared.loadingerr);
 3714         return C_OK;
 3715     }
 3716 
 3717     /* Lua script too slow? Only allow a limited number of commands.
 3718      * Note that we need to allow the transactions commands, otherwise clients
 3719      * sending a transaction with pipelining without error checking, may have
 3720      * the MULTI plus a few initial commands refused, then the timeout
 3721      * condition resolves, and the bottom-half of the transaction gets
 3722      * executed, see Github PR #7022. */
 3723     if (server.lua_timedout &&
 3724           c->cmd->proc != authCommand &&
 3725           c->cmd->proc != helloCommand &&
 3726           c->cmd->proc != replconfCommand &&
 3727           c->cmd->proc != multiCommand &&
 3728           c->cmd->proc != discardCommand &&
 3729           c->cmd->proc != watchCommand &&
 3730           c->cmd->proc != unwatchCommand &&
 3731         !(c->cmd->proc == shutdownCommand &&
 3732           c->argc == 2 &&
 3733           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
 3734         !(c->cmd->proc == scriptCommand &&
 3735           c->argc == 2 &&
 3736           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
 3737     {
 3738         rejectCommand(c, shared.slowscripterr);
 3739         return C_OK;
 3740     }
 3741 
 3742     /* Exec the command */
 3743     if (c->flags & CLIENT_MULTI &&
 3744         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
 3745         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
 3746     {
 3747         queueMultiCommand(c);
 3748         addReply(c,shared.queued);
 3749     } else {
 3750         call(c,CMD_CALL_FULL);
 3751         c->woff = server.master_repl_offset;
 3752         if (listLength(server.ready_keys))
 3753             handleClientsBlockedOnKeys();
 3754     }
 3755     return C_OK;
 3756 }
 3757 
 3758 /*================================== Shutdown =============================== */
 3759 
 3760 /* Close listening sockets. Also unlink the unix domain socket if
 3761  * unlink_unix_socket is non-zero. */
 3762 void closeListeningSockets(int unlink_unix_socket) {
 3763     int j;
 3764 
 3765     for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
 3766     for (j = 0; j < server.tlsfd_count; j++) close(server.tlsfd[j]);
 3767     if (server.sofd != -1) close(server.sofd);
 3768     if (server.cluster_enabled)
 3769         for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
 3770     if (unlink_unix_socket && server.unixsocket) {
 3771         serverLog(LL_NOTICE,"Removing the unix socket file.");
 3772         unlink(server.unixsocket); /* don't care if this fails */
 3773     }
 3774 }
 3775 
 3776 int prepareForShutdown(int flags) {
 3777     /* When SHUTDOWN is called while the server is loading a dataset in
 3778      * memory we need to make sure no attempt is performed to save
 3779      * the dataset on shutdown (otherwise it could overwrite the current DB
 3780      * with half-read data).
 3781      *
 3782      * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
 3783     if (server.loading || server.sentinel_mode)
 3784         flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
 3785 
 3786     int save = flags & SHUTDOWN_SAVE;
 3787     int nosave = flags & SHUTDOWN_NOSAVE;
 3788 
 3789     serverLog(LL_WARNING,"User requested shutdown...");
 3790     if (server.supervised_mode == SUPERVISED_SYSTEMD)
 3791         redisCommunicateSystemd("STOPPING=1\n");
 3792 
 3793     /* Kill all the Lua debugger forked sessions. */
 3794     ldbKillForkedSessions();
 3795 
 3796     /* Kill the saving child if there is a background saving in progress.
 3797        We want to avoid race conditions, for instance our saving child may
 3798        overwrite the synchronous saving did by SHUTDOWN. */
 3799     if (server.rdb_child_pid != -1) {
 3800         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
 3801         killRDBChild();
 3802     }
 3803 
 3804     /* Kill module child if there is one. */
 3805     if (server.module_child_pid != -1) {
 3806         serverLog(LL_WARNING,"There is a module fork child. Killing it!");
 3807         TerminateModuleForkChild(server.module_child_pid,0);
 3808     }
 3809 
 3810     if (server.aof_state != AOF_OFF) {
 3811         /* Kill the AOF saving child as the AOF we already have may be longer
 3812          * but contains the full dataset anyway. */
 3813         if (server.aof_child_pid != -1) {
 3814             /* If we have AOF enabled but haven't written the AOF yet, don't
 3815              * shutdown or else the dataset will be lost. */
 3816             if (server.aof_state == AOF_WAIT_REWRITE) {
 3817                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
 3818                 return C_ERR;
 3819             }
 3820             serverLog(LL_WARNING,
 3821                 "There is a child rewriting the AOF. Killing it!");
 3822             killAppendOnlyChild();
 3823         }
 3824         /* Append only file: flush buffers and fsync() the AOF at exit */
 3825         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
 3826         flushAppendOnlyFile(1);
 3827         redis_fsync(server.aof_fd);
 3828     }
 3829 
 3830     /* Create a new RDB file before exiting. */
 3831     if ((server.saveparamslen > 0 && !nosave) || save) {
 3832         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
 3833         if (server.supervised_mode == SUPERVISED_SYSTEMD)
 3834             redisCommunicateSystemd("STATUS=Saving the final RDB snapshot\n");
 3835         /* Snapshotting. Perform a SYNC SAVE and exit */
 3836         rdbSaveInfo rsi, *rsiptr;
 3837         rsiptr = rdbPopulateSaveInfo(&rsi);
 3838         if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
 3839             /* Ooops.. error saving! The best we can do is to continue
 3840              * operating. Note that if there was a background saving process,
 3841              * in the next cron() Redis will be notified that the background
 3842              * saving aborted, handling special stuff like slaves pending for
 3843              * synchronization... */
 3844             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
 3845             if (server.supervised_mode == SUPERVISED_SYSTEMD)
 3846                 redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
 3847             return C_ERR;
 3848         }
 3849     }
 3850 
 3851     /* Fire the shutdown modules event. */
 3852     moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
 3853 
 3854     /* Remove the pid file if possible and needed. */
 3855     if (server.daemonize || server.pidfile) {
 3856         serverLog(LL_NOTICE,"Removing the pid file.");
 3857         unlink(server.pidfile);
 3858     }
 3859 
 3860     /* Best effort flush of slave output buffers, so that we hopefully
 3861      * send them pending writes. */
 3862     flushSlavesOutputBuffers();
 3863 
 3864     /* Close the listening sockets. Apparently this allows faster restarts. */
 3865     closeListeningSockets(1);
 3866     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
 3867         server.sentinel_mode ? "Sentinel" : "Redis");
 3868     return C_OK;
 3869 }
 3870 
 3871 /*================================== Commands =============================== */
 3872 
 3873 /* Sometimes Redis cannot accept write commands because there is a perstence
 3874  * error with the RDB or AOF file, and Redis is configured in order to stop
 3875  * accepting writes in such situation. This function returns if such a
 3876  * condition is active, and the type of the condition.
 3877  *
 3878  * Function return values:
 3879  *
 3880  * DISK_ERROR_TYPE_NONE:    No problems, we can accept writes.
 3881  * DISK_ERROR_TYPE_AOF:     Don't accept writes: AOF errors.
 3882  * DISK_ERROR_TYPE_RDB:     Don't accept writes: RDB errors.
 3883  */
 3884 int writeCommandsDeniedByDiskError(void) {
 3885     if (server.stop_writes_on_bgsave_err &&
 3886         server.saveparamslen > 0 &&
 3887         server.lastbgsave_status == C_ERR)
 3888     {
 3889         return DISK_ERROR_TYPE_RDB;
 3890     } else if (server.aof_state != AOF_OFF &&
 3891                server.aof_last_write_status == C_ERR)
 3892     {
 3893         return DISK_ERROR_TYPE_AOF;
 3894     } else {
 3895         return DISK_ERROR_TYPE_NONE;
 3896     }
 3897 }
 3898 
 3899 /* The PING command. It works in a different way if the client is in
 3900  * in Pub/Sub mode. */
 3901 void pingCommand(client *c) {
 3902     /* The command takes zero or one arguments. */
 3903     if (c->argc > 2) {
 3904         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
 3905             c->cmd->name);
 3906         return;
 3907     }
 3908 
 3909     if (c->flags & CLIENT_PUBSUB && c->resp == 2) {
 3910         addReply(c,shared.mbulkhdr[2]);
 3911         addReplyBulkCBuffer(c,"pong",4);
 3912         if (c->argc == 1)
 3913             addReplyBulkCBuffer(c,"",0);
 3914         else
 3915             addReplyBulk(c,c->argv[1]);
 3916     } else {
 3917         if (c->argc == 1)
 3918             addReply(c,shared.pong);
 3919         else
 3920             addReplyBulk(c,c->argv[1]);
 3921     }
 3922 }
 3923 
 3924 void echoCommand(client *c) {
 3925     addReplyBulk(c,c->argv[1]);
 3926 }
 3927 
 3928 void timeCommand(client *c) {
 3929     struct timeval tv;
 3930 
 3931     /* gettimeofday() can only fail if &tv is a bad address so we
 3932      * don't check for errors. */
 3933     gettimeofday(&tv,NULL);
 3934     addReplyArrayLen(c,2);
 3935     addReplyBulkLongLong(c,tv.tv_sec);
 3936     addReplyBulkLongLong(c,tv.tv_usec);
 3937 }
 3938 
 3939 /* Helper function for addReplyCommand() to output flags. */
 3940 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
 3941     if (cmd->flags & f) {
 3942         addReplyStatus(c, reply);
 3943         return 1;
 3944     }
 3945     return 0;
 3946 }
 3947 
 3948 /* Output the representation of a Redis command. Used by the COMMAND command. */
 3949 void addReplyCommand(client *c, struct redisCommand *cmd) {
 3950     if (!cmd) {
 3951         addReplyNull(c);
 3952     } else {
 3953         /* We are adding: command name, arg count, flags, first, last, offset, categories */
 3954         addReplyArrayLen(c, 7);
 3955         addReplyBulkCString(c, cmd->name);
 3956         addReplyLongLong(c, cmd->arity);
 3957 
 3958         int flagcount = 0;
 3959         void *flaglen = addReplyDeferredLen(c);
 3960         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
 3961         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
 3962         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
 3963         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
 3964         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
 3965         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
 3966         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
 3967         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
 3968         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
 3969         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
 3970         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
 3971         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog");
 3972         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
 3973         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
 3974         flagcount += addReplyCommandFlag(c,cmd,CMD_NO_AUTH, "no_auth");
 3975         if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
 3976             cmd->flags & CMD_MODULE_GETKEYS)
 3977         {
 3978             addReplyStatus(c, "movablekeys");
 3979             flagcount += 1;
 3980         }
 3981         setDeferredSetLen(c, flaglen, flagcount);
 3982 
 3983         addReplyLongLong(c, cmd->firstkey);
 3984         addReplyLongLong(c, cmd->lastkey);
 3985         addReplyLongLong(c, cmd->keystep);
 3986 
 3987         addReplyCommandCategories(c,cmd);
 3988     }
 3989 }
 3990 
 3991 /* COMMAND <subcommand> <args> */
 3992 void commandCommand(client *c) {
 3993     dictIterator *di;
 3994     dictEntry *de;
 3995 
 3996     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
 3997         const char *help[] = {
 3998 "(no subcommand) -- Return details about all Redis commands.",
 3999 "COUNT -- Return the total number of commands in this Redis server.",
 4000 "GETKEYS <full-command> -- Return the keys from a full Redis command.",
 4001 "INFO [command-name ...] -- Return details about multiple Redis commands.",
 4002 NULL
 4003         };
 4004         addReplyHelp(c, help);
 4005     } else if (c->argc == 1) {
 4006         addReplyArrayLen(c, dictSize(server.commands));
 4007         di = dictGetIterator(server.commands);
 4008         while ((de = dictNext(di)) != NULL) {
 4009             addReplyCommand(c, dictGetVal(de));
 4010         }
 4011         dictReleaseIterator(di);
 4012     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
 4013         int i;
 4014         addReplyArrayLen(c, c->argc-2);
 4015         for (i = 2; i < c->argc; i++) {
 4016             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
 4017         }
 4018     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
 4019         addReplyLongLong(c, dictSize(server.commands));
 4020     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
 4021         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
 4022         int *keys, numkeys, j;
 4023 
 4024         if (!cmd) {
 4025             addReplyError(c,"Invalid command specified");
 4026             return;
 4027         } else if (cmd->getkeys_proc == NULL && cmd->firstkey == 0) {
 4028             addReplyError(c,"The command has no key arguments");
 4029             return;
 4030         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
 4031                    ((c->argc-2) < -cmd->arity))
 4032         {
 4033             addReplyError(c,"Invalid number of arguments specified for command");
 4034             return;
 4035         }
 4036 
 4037         keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
 4038         if (!keys) {
 4039             addReplyError(c,"Invalid arguments specified for command");
 4040         } else {
 4041             addReplyArrayLen(c,numkeys);
 4042             for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
 4043             getKeysFreeResult(keys);
 4044         }
 4045     } else {
 4046         addReplySubcommandSyntaxError(c);
 4047     }
 4048 }
 4049 
 4050 /* Convert an amount of bytes into a human readable string in the form
 4051  * of 100B, 2G, 100M, 4K, and so forth. */
 4052 void bytesToHuman(char *s, unsigned long long n) {
 4053     double d;
 4054 
 4055     if (n < 1024) {
 4056         /* Bytes */
 4057         sprintf(s,"%lluB",n);
 4058     } else if (n < (1024*1024)) {
 4059         d = (double)n/(1024);
 4060         sprintf(s,"%.2fK",d);
 4061     } else if (n < (1024LL*1024*1024)) {
 4062         d = (double)n/(1024*1024);
 4063         sprintf(s,"%.2fM",d);
 4064     } else if (n < (1024LL*1024*1024*1024)) {
 4065         d = (double)n/(1024LL*1024*1024);
 4066         sprintf(s,"%.2fG",d);
 4067     } else if (n < (1024LL*1024*1024*1024*1024)) {
 4068         d = (double)n/(1024LL*1024*1024*1024);
 4069         sprintf(s,"%.2fT",d);
 4070     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
 4071         d = (double)n/(1024LL*1024*1024*1024*1024);
 4072         sprintf(s,"%.2fP",d);
 4073     } else {
 4074         /* Let's hope we never need this */
 4075         sprintf(s,"%lluB",n);
 4076     }
 4077 }
 4078 
 4079 /* Create the string returned by the INFO command. This is decoupled
 4080  * by the INFO command itself as we need to report the same information
 4081  * on memory corruption problems. */
 4082 sds genRedisInfoString(const char *section) {
 4083     sds info = sdsempty();
 4084     time_t uptime = server.unixtime-server.stat_starttime;
 4085     int j;
 4086     struct rusage self_ru, c_ru;
 4087     int allsections = 0, defsections = 0, everything = 0, modules = 0;
 4088     int sections = 0;
 4089 
 4090     if (section == NULL) section = "default";
 4091     allsections = strcasecmp(section,"all") == 0;
 4092     defsections = strcasecmp(section,"default") == 0;
 4093     everything = strcasecmp(section,"everything") == 0;
 4094     modules = strcasecmp(section,"modules") == 0;
 4095     if (everything) allsections = 1;
 4096 
 4097     getrusage(RUSAGE_SELF, &self_ru);
 4098     getrusage(RUSAGE_CHILDREN, &c_ru);
 4099 
 4100     /* Server */
 4101     if (allsections || defsections || !strcasecmp(section,"server")) {
 4102         static int call_uname = 1;
 4103         static struct utsname name;
 4104         char *mode;
 4105 
 4106         if (server.cluster_enabled) mode = "cluster";
 4107         else if (server.sentinel_mode) mode = "sentinel";
 4108         else mode = "standalone";
 4109 
 4110         if (sections++) info = sdscat(info,"\r\n");
 4111 
 4112         if (call_uname) {
 4113             /* Uname can be slow and is always the same output. Cache it. */
 4114             uname(&name);
 4115             call_uname = 0;
 4116         }
 4117 
 4118         info = sdscatfmt(info,
 4119             "# Server\r\n"
 4120             "redis_version:%s\r\n"
 4121             "redis_git_sha1:%s\r\n"
 4122             "redis_git_dirty:%i\r\n"
 4123             "redis_build_id:%s\r\n"
 4124             "redis_mode:%s\r\n"
 4125             "os:%s %s %s\r\n"
 4126             "arch_bits:%i\r\n"
 4127             "multiplexing_api:%s\r\n"
 4128             "atomicvar_api:%s\r\n"
 4129             "gcc_version:%i.%i.%i\r\n"
 4130             "process_id:%I\r\n"
 4131             "run_id:%s\r\n"
 4132             "tcp_port:%i\r\n"
 4133             "uptime_in_seconds:%I\r\n"
 4134             "uptime_in_days:%I\r\n"
 4135             "hz:%i\r\n"
 4136             "configured_hz:%i\r\n"
 4137             "lru_clock:%u\r\n"
 4138             "executable:%s\r\n"
 4139             "config_file:%s\r\n"
 4140             "io_threads_active:%i\r\n",
 4141             REDIS_VERSION,
 4142             redisGitSHA1(),
 4143             strtol(redisGitDirty(),NULL,10) > 0,
 4144             redisBuildIdString(),
 4145             mode,
 4146             name.sysname, name.release, name.machine,
 4147             server.arch_bits,
 4148             aeGetApiName(),
 4149             REDIS_ATOMIC_API,
 4150 #ifdef __GNUC__
 4151             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
 4152 #else
 4153             0,0,0,
 4154 #endif
 4155             (int64_t) getpid(),
 4156             server.runid,
 4157             server.port ? server.port : server.tls_port,
 4158             (int64_t)uptime,
 4159             (int64_t)(uptime/(3600*24)),
 4160             server.hz,
 4161             server.config_hz,
 4162             server.lruclock,
 4163             server.executable ? server.executable : "",
 4164             server.configfile ? server.configfile : "",
 4165             server.io_threads_active);
 4166     }
 4167 
 4168     /* Clients */
 4169     if (allsections || defsections || !strcasecmp(section,"clients")) {
 4170         size_t maxin, maxout;
 4171         getExpansiveClientsInfo(&maxin,&maxout);
 4172         if (sections++) info = sdscat(info,"\r\n");
 4173         info = sdscatprintf(info,
 4174             "# Clients\r\n"
 4175             "connected_clients:%lu\r\n"
 4176             "client_recent_max_input_buffer:%zu\r\n"
 4177             "client_recent_max_output_buffer:%zu\r\n"
 4178             "blocked_clients:%d\r\n"
 4179             "tracking_clients:%d\r\n"
 4180             "clients_in_timeout_table:%llu\r\n",
 4181             listLength(server.clients)-listLength(server.slaves),
 4182             maxin, maxout,
 4183             server.blocked_clients,
 4184             server.tracking_clients,
 4185             (unsigned long long) raxSize(server.clients_timeout_table));
 4186     }
 4187 
 4188     /* Memory */
 4189     if (allsections || defsections || !strcasecmp(section,"memory")) {
 4190         char hmem[64];
 4191         char peak_hmem[64];
 4192         char total_system_hmem[64];
 4193         char used_memory_lua_hmem[64];
 4194         char used_memory_scripts_hmem[64];
 4195         char used_memory_rss_hmem[64];
 4196         char maxmemory_hmem[64];
 4197         size_t zmalloc_used = zmalloc_used_memory();
 4198         size_t total_system_mem = server.system_memory_size;
 4199         const char *evict_policy = evictPolicyToString();
 4200         long long memory_lua = server.lua ? (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024 : 0;
 4201         struct redisMemOverhead *mh = getMemoryOverheadData();
 4202 
 4203         /* Peak memory is updated from time to time by serverCron() so it
 4204          * may happen that the instantaneous value is slightly bigger than
 4205          * the peak value. This may confuse users, so we update the peak
 4206          * if found smaller than the current memory usage. */
 4207         if (zmalloc_used > server.stat_peak_memory)
 4208             server.stat_peak_memory = zmalloc_used;
 4209 
 4210         bytesToHuman(hmem,zmalloc_used);
 4211         bytesToHuman(peak_hmem,server.stat_peak_memory);
 4212         bytesToHuman(total_system_hmem,total_system_mem);
 4213         bytesToHuman(used_memory_lua_hmem,memory_lua);
 4214         bytesToHuman(used_memory_scripts_hmem,mh->lua_caches);
 4215         bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss);
 4216         bytesToHuman(maxmemory_hmem,server.maxmemory);
 4217 
 4218         if (sections++) info = sdscat(info,"\r\n");
 4219         info = sdscatprintf(info,
 4220             "# Memory\r\n"
 4221             "used_memory:%zu\r\n"
 4222             "used_memory_human:%s\r\n"
 4223             "used_memory_rss:%zu\r\n"
 4224             "used_memory_rss_human:%s\r\n"
 4225             "used_memory_peak:%zu\r\n"
 4226             "used_memory_peak_human:%s\r\n"
 4227             "used_memory_peak_perc:%.2f%%\r\n"
 4228             "used_memory_overhead:%zu\r\n"
 4229             "used_memory_startup:%zu\r\n"
 4230             "used_memory_dataset:%zu\r\n"
 4231             "used_memory_dataset_perc:%.2f%%\r\n"
 4232             "allocator_allocated:%zu\r\n"
 4233             "allocator_active:%zu\r\n"
 4234             "allocator_resident:%zu\r\n"
 4235             "total_system_memory:%lu\r\n"
 4236             "total_system_memory_human:%s\r\n"
 4237             "used_memory_lua:%lld\r\n"
 4238             "used_memory_lua_human:%s\r\n"
 4239             "used_memory_scripts:%lld\r\n"
 4240             "used_memory_scripts_human:%s\r\n"
 4241             "number_of_cached_scripts:%lu\r\n"
 4242             "maxmemory:%lld\r\n"
 4243             "maxmemory_human:%s\r\n"
 4244             "maxmemory_policy:%s\r\n"
 4245             "allocator_frag_ratio:%.2f\r\n"
 4246             "allocator_frag_bytes:%zu\r\n"
 4247             "allocator_rss_ratio:%.2f\r\n"
 4248             "allocator_rss_bytes:%zd\r\n"
 4249             "rss_overhead_ratio:%.2f\r\n"
 4250             "rss_overhead_bytes:%zd\r\n"
 4251             "mem_fragmentation_ratio:%.2f\r\n"
 4252             "mem_fragmentation_bytes:%zd\r\n"
 4253             "mem_not_counted_for_evict:%zu\r\n"
 4254             "mem_replication_backlog:%zu\r\n"
 4255             "mem_clients_slaves:%zu\r\n"
 4256             "mem_clients_normal:%zu\r\n"
 4257             "mem_aof_buffer:%zu\r\n"
 4258             "mem_allocator:%s\r\n"
 4259             "active_defrag_running:%d\r\n"
 4260             "lazyfree_pending_objects:%zu\r\n",
 4261             zmalloc_used,
 4262             hmem,
 4263             server.cron_malloc_stats.process_rss,
 4264             used_memory_rss_hmem,
 4265             server.stat_peak_memory,
 4266             peak_hmem,
 4267             mh->peak_perc,
 4268             mh->overhead_total,
 4269             mh->startup_allocated,
 4270             mh->dataset,
 4271             mh->dataset_perc,
 4272             server.cron_malloc_stats.allocator_allocated,
 4273             server.cron_malloc_stats.allocator_active,
 4274             server.cron_malloc_stats.allocator_resident,
 4275             (unsigned long)total_system_mem,
 4276             total_system_hmem,
 4277             memory_lua,
 4278             used_memory_lua_hmem,
 4279             (long long) mh->lua_caches,
 4280             used_memory_scripts_hmem,
 4281             dictSize(server.lua_scripts),
 4282             server.maxmemory,
 4283             maxmemory_hmem,
 4284             evict_policy,
 4285             mh->allocator_frag,
 4286             mh->allocator_frag_bytes,
 4287             mh->allocator_rss,
 4288             mh->allocator_rss_bytes,
 4289             mh->rss_extra,
 4290             mh->rss_extra_bytes,
 4291             mh->total_frag,       /* This is the total RSS overhead, including
 4292                                      fragmentation, but not just it. This field
 4293                                      (and the next one) is named like that just
 4294                                      for backward compatibility. */
 4295             mh->total_frag_bytes,
 4296             freeMemoryGetNotCountedMemory(),
 4297             mh->repl_backlog,
 4298             mh->clients_slaves,
 4299             mh->clients_normal,
 4300             mh->aof_buffer,
 4301             ZMALLOC_LIB,
 4302             server.active_defrag_running,
 4303             lazyfreeGetPendingObjectsCount()
 4304         );
 4305         freeMemoryOverheadData(mh);
 4306     }
 4307 
 4308     /* Persistence */
 4309     if (allsections || defsections || !strcasecmp(section,"persistence")) {
 4310         if (sections++) info = sdscat(info,"\r\n");
 4311         info = sdscatprintf(info,
 4312             "# Persistence\r\n"
 4313             "loading:%d\r\n"
 4314             "rdb_changes_since_last_save:%lld\r\n"
 4315             "rdb_bgsave_in_progress:%d\r\n"
 4316             "rdb_last_save_time:%jd\r\n"
 4317             "rdb_last_bgsave_status:%s\r\n"
 4318             "rdb_last_bgsave_time_sec:%jd\r\n"
 4319             "rdb_current_bgsave_time_sec:%jd\r\n"
 4320             "rdb_last_cow_size:%zu\r\n"
 4321             "aof_enabled:%d\r\n"
 4322             "aof_rewrite_in_progress:%d\r\n"
 4323             "aof_rewrite_scheduled:%d\r\n"
 4324             "aof_last_rewrite_time_sec:%jd\r\n"
 4325             "aof_current_rewrite_time_sec:%jd\r\n"
 4326             "aof_last_bgrewrite_status:%s\r\n"
 4327             "aof_last_write_status:%s\r\n"
 4328             "aof_last_cow_size:%zu\r\n"
 4329             "module_fork_in_progress:%d\r\n"
 4330             "module_fork_last_cow_size:%zu\r\n",
 4331             server.loading,
 4332             server.dirty,
 4333             server.rdb_child_pid != -1,
 4334             (intmax_t)server.lastsave,
 4335             (server.lastbgsave_status == C_OK) ? "ok" : "err",
 4336             (intmax_t)server.rdb_save_time_last,
 4337             (intmax_t)((server.rdb_child_pid == -1) ?
 4338                 -1 : time(NULL)-server.rdb_save_time_start),
 4339             server.stat_rdb_cow_bytes,
 4340             server.aof_state != AOF_OFF,
 4341             server.aof_child_pid != -1,
 4342             server.aof_rewrite_scheduled,
 4343             (intmax_t)server.aof_rewrite_time_last,
 4344             (intmax_t)((server.aof_child_pid == -1) ?
 4345                 -1 : time(NULL)-server.aof_rewrite_time_start),
 4346             (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
 4347             (server.aof_last_write_status == C_OK) ? "ok" : "err",
 4348             server.stat_aof_cow_bytes,
 4349             server.module_child_pid != -1,
 4350             server.stat_module_cow_bytes);
 4351 
 4352         if (server.aof_enabled) {
 4353             info = sdscatprintf(info,
 4354                 "aof_current_size:%lld\r\n"
 4355                 "aof_base_size:%lld\r\n"
 4356                 "aof_pending_rewrite:%d\r\n"
 4357                 "aof_buffer_length:%zu\r\n"
 4358                 "aof_rewrite_buffer_length:%lu\r\n"
 4359                 "aof_pending_bio_fsync:%llu\r\n"
 4360                 "aof_delayed_fsync:%lu\r\n",
 4361                 (long long) server.aof_current_size,
 4362                 (long long) server.aof_rewrite_base_size,
 4363                 server.aof_rewrite_scheduled,
 4364                 sdslen(server.aof_buf),
 4365                 aofRewriteBufferSize(),
 4366                 bioPendingJobsOfType(BIO_AOF_FSYNC),
 4367                 server.aof_delayed_fsync);
 4368         }
 4369 
 4370         if (server.loading) {
 4371             double perc;
 4372             time_t eta, elapsed;
 4373             off_t remaining_bytes = server.loading_total_bytes-
 4374                                     server.loading_loaded_bytes;
 4375 
 4376             perc = ((double)server.loading_loaded_bytes /
 4377                    (server.loading_total_bytes+1)) * 100;
 4378 
 4379             elapsed = time(NULL)-server.loading_start_time;
 4380             if (elapsed == 0) {
 4381                 eta = 1; /* A fake 1 second figure if we don't have
 4382                             enough info */
 4383             } else {
 4384                 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
 4385             }
 4386 
 4387             info = sdscatprintf(info,
 4388                 "loading_start_time:%jd\r\n"
 4389                 "loading_total_bytes:%llu\r\n"
 4390                 "loading_loaded_bytes:%llu\r\n"
 4391                 "loading_loaded_perc:%.2f\r\n"
 4392                 "loading_eta_seconds:%jd\r\n",
 4393                 (intmax_t) server.loading_start_time,
 4394                 (unsigned long long) server.loading_total_bytes,
 4395                 (unsigned long long) server.loading_loaded_bytes,
 4396                 perc,
 4397                 (intmax_t)eta
 4398             );
 4399         }
 4400     }
 4401 
 4402     /* Stats */
 4403     if (allsections || defsections || !strcasecmp(section,"stats")) {
 4404         if (sections++) info = sdscat(info,"\r\n");
 4405         info = sdscatprintf(info,
 4406             "# Stats\r\n"
 4407             "total_connections_received:%lld\r\n"
 4408             "total_commands_processed:%lld\r\n"
 4409             "instantaneous_ops_per_sec:%lld\r\n"
 4410             "total_net_input_bytes:%lld\r\n"
 4411             "total_net_output_bytes:%lld\r\n"
 4412             "instantaneous_input_kbps:%.2f\r\n"
 4413             "instantaneous_output_kbps:%.2f\r\n"
 4414             "rejected_connections:%lld\r\n"
 4415             "sync_full:%lld\r\n"
 4416             "sync_partial_ok:%lld\r\n"
 4417             "sync_partial_err:%lld\r\n"
 4418             "expired_keys:%lld\r\n"
 4419             "expired_stale_perc:%.2f\r\n"
 4420             "expired_time_cap_reached_count:%lld\r\n"
 4421             "expire_cycle_cpu_milliseconds:%lld\r\n"
 4422             "evicted_keys:%lld\r\n"
 4423             "keyspace_hits:%lld\r\n"
 4424             "keyspace_misses:%lld\r\n"
 4425             "pubsub_channels:%ld\r\n"
 4426             "pubsub_patterns:%lu\r\n"
 4427             "latest_fork_usec:%lld\r\n"
 4428             "migrate_cached_sockets:%ld\r\n"
 4429             "slave_expires_tracked_keys:%zu\r\n"
 4430             "active_defrag_hits:%lld\r\n"
 4431             "active_defrag_misses:%lld\r\n"
 4432             "active_defrag_key_hits:%lld\r\n"
 4433             "active_defrag_key_misses:%lld\r\n"
 4434             "tracking_total_keys:%lld\r\n"
 4435             "tracking_total_items:%lld\r\n"
 4436             "tracking_total_prefixes:%lld\r\n"
 4437             "unexpected_error_replies:%lld\r\n"
 4438             "total_reads_processed:%lld\r\n"
 4439             "total_writes_processed:%lld\r\n"
 4440             "io_threaded_reads_processed:%lld\r\n"
 4441             "io_threaded_writes_processed:%lld\r\n",
 4442             server.stat_numconnections,
 4443             server.stat_numcommands,
 4444             getInstantaneousMetric(STATS_METRIC_COMMAND),
 4445             server.stat_net_input_bytes,
 4446             server.stat_net_output_bytes,
 4447             (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
 4448             (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
 4449             server.stat_rejected_conn,
 4450             server.stat_sync_full,
 4451             server.stat_sync_partial_ok,
 4452             server.stat_sync_partial_err,
 4453             server.stat_expiredkeys,
 4454             server.stat_expired_stale_perc*100,
 4455             server.stat_expired_time_cap_reached_count,
 4456             server.stat_expire_cycle_time_used/1000,
 4457             server.stat_evictedkeys,
 4458             server.stat_keyspace_hits,
 4459             server.stat_keyspace_misses,
 4460             dictSize(server.pubsub_channels),
 4461             listLength(server.pubsub_patterns),
 4462             server.stat_fork_time,
 4463             dictSize(server.migrate_cached_sockets),
 4464             getSlaveKeyWithExpireCount(),
 4465             server.stat_active_defrag_hits,
 4466             server.stat_active_defrag_misses,
 4467             server.stat_active_defrag_key_hits,
 4468             server.stat_active_defrag_key_misses,
 4469             (unsigned long long) trackingGetTotalKeys(),
 4470             (unsigned long long) trackingGetTotalItems(),
 4471             (unsigned long long) trackingGetTotalPrefixes(),
 4472             server.stat_unexpected_error_replies,
 4473             server.stat_total_reads_processed,
 4474             server.stat_total_writes_processed,
 4475             server.stat_io_reads_processed,
 4476             server.stat_io_writes_processed);
 4477     }
 4478 
 4479     /* Replication */
 4480     if (allsections || defsections || !strcasecmp(section,"replication")) {
 4481         if (sections++) info = sdscat(info,"\r\n");
 4482         info = sdscatprintf(info,
 4483             "# Replication\r\n"
 4484             "role:%s\r\n",
 4485             server.masterhost == NULL ? "master" : "slave");
 4486         if (server.masterhost) {
 4487             long long slave_repl_offset = 1;
 4488 
 4489             if (server.master)
 4490                 slave_repl_offset = server.master->reploff;
 4491             else if (server.cached_master)
 4492                 slave_repl_offset = server.cached_master->reploff;
 4493 
 4494             info = sdscatprintf(info,
 4495                 "master_host:%s\r\n"
 4496                 "master_port:%d\r\n"
 4497                 "master_link_status:%s\r\n"
 4498                 "master_last_io_seconds_ago:%d\r\n"
 4499                 "master_sync_in_progress:%d\r\n"
 4500                 "slave_repl_offset:%lld\r\n"
 4501                 ,server.masterhost,
 4502                 server.masterport,
 4503                 (server.repl_state == REPL_STATE_CONNECTED) ?
 4504                     "up" : "down",
 4505                 server.master ?
 4506                 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
 4507                 server.repl_state == REPL_STATE_TRANSFER,
 4508                 slave_repl_offset
 4509             );
 4510 
 4511             if (server.repl_state == REPL_STATE_TRANSFER) {
 4512                 info = sdscatprintf(info,
 4513                     "master_sync_left_bytes:%lld\r\n"
 4514                     "master_sync_last_io_seconds_ago:%d\r\n"
 4515                     , (long long)
 4516                         (server.repl_transfer_size - server.repl_transfer_read),
 4517                     (int)(server.unixtime-server.repl_transfer_lastio)
 4518                 );
 4519             }
 4520 
 4521             if (server.repl_state != REPL_STATE_CONNECTED) {
 4522                 info = sdscatprintf(info,
 4523                     "master_link_down_since_seconds:%jd\r\n",
 4524                     (intmax_t)(server.unixtime-server.repl_down_since));
 4525             }
 4526             info = sdscatprintf(info,
 4527                 "slave_priority:%d\r\n"
 4528                 "slave_read_only:%d\r\n",
 4529                 server.slave_priority,
 4530                 server.repl_slave_ro);
 4531         }
 4532 
 4533         info = sdscatprintf(info,
 4534             "connected_slaves:%lu\r\n",
 4535             listLength(server.slaves));
 4536 
 4537         /* If min-slaves-to-write is active, write the number of slaves
 4538          * currently considered 'good'. */
 4539         if (server.repl_min_slaves_to_write &&
 4540             server.repl_min_slaves_max_lag) {
 4541             info = sdscatprintf(info,
 4542                 "min_slaves_good_slaves:%d\r\n",
 4543                 server.repl_good_slaves_count);
 4544         }
 4545 
 4546         if (listLength(server.slaves)) {
 4547             int slaveid = 0;
 4548             listNode *ln;
 4549             listIter li;
 4550 
 4551             listRewind(server.slaves,&li);
 4552             while((ln = listNext(&li))) {
 4553                 client *slave = listNodeValue(ln);
 4554                 char *state = NULL;
 4555                 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
 4556                 int port;
 4557                 long lag = 0;
 4558 
 4559                 if (slaveip[0] == '\0') {
 4560                     if (connPeerToString(slave->conn,ip,sizeof(ip),&port) == -1)
 4561                         continue;
 4562                     slaveip = ip;
 4563                 }
 4564                 switch(slave->replstate) {
 4565                 case SLAVE_STATE_WAIT_BGSAVE_START:
 4566                 case SLAVE_STATE_WAIT_BGSAVE_END:
 4567                     state = "wait_bgsave";
 4568                     break;
 4569                 case SLAVE_STATE_SEND_BULK:
 4570                     state = "send_bulk";
 4571                     break;
 4572                 case SLAVE_STATE_ONLINE:
 4573                     state = "online";
 4574                     break;
 4575                 }
 4576                 if (state == NULL) continue;
 4577                 if (slave->replstate == SLAVE_STATE_ONLINE)
 4578                     lag = time(NULL) - slave->repl_ack_time;
 4579 
 4580                 info = sdscatprintf(info,
 4581                     "slave%d:ip=%s,port=%d,state=%s,"
 4582                     "offset=%lld,lag=%ld\r\n",
 4583                     slaveid,slaveip,slave->slave_listening_port,state,
 4584                     slave->repl_ack_off, lag);
 4585                 slaveid++;
 4586             }
 4587         }
 4588         info = sdscatprintf(info,
 4589             "master_replid:%s\r\n"
 4590             "master_replid2:%s\r\n"
 4591             "master_repl_offset:%lld\r\n"
 4592             "second_repl_offset:%lld\r\n"
 4593             "repl_backlog_active:%d\r\n"
 4594             "repl_backlog_size:%lld\r\n"
 4595             "repl_backlog_first_byte_offset:%lld\r\n"
 4596             "repl_backlog_histlen:%lld\r\n",
 4597             server.replid,
 4598             server.replid2,
 4599             server.master_repl_offset,
 4600             server.second_replid_offset,
 4601             server.repl_backlog != NULL,
 4602             server.repl_backlog_size,
 4603             server.repl_backlog_off,
 4604             server.repl_backlog_histlen);
 4605     }
 4606 
 4607     /* CPU */
 4608     if (allsections || defsections || !strcasecmp(section,"cpu")) {
 4609         if (sections++) info = sdscat(info,"\r\n");
 4610         info = sdscatprintf(info,
 4611         "# CPU\r\n"
 4612         "used_cpu_sys:%ld.%06ld\r\n"
 4613         "used_cpu_user:%ld.%06ld\r\n"
 4614         "used_cpu_sys_children:%ld.%06ld\r\n"
 4615         "used_cpu_user_children:%ld.%06ld\r\n",
 4616         (long)self_ru.ru_stime.tv_sec, (long)self_ru.ru_stime.tv_usec,
 4617         (long)self_ru.ru_utime.tv_sec, (long)self_ru.ru_utime.tv_usec,
 4618         (long)c_ru.ru_stime.tv_sec, (long)c_ru.ru_stime.tv_usec,
 4619         (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec);
 4620     }
 4621 
 4622     /* Modules */
 4623     if (allsections || defsections || !strcasecmp(section,"modules")) {
 4624         if (sections++) info = sdscat(info,"\r\n");
 4625         info = sdscatprintf(info,"# Modules\r\n");
 4626         info = genModulesInfoString(info);
 4627     }
 4628 
 4629     /* Command statistics */
 4630     if (allsections || !strcasecmp(section,"commandstats")) {
 4631         if (sections++) info = sdscat(info,"\r\n");
 4632         info = sdscatprintf(info, "# Commandstats\r\n");
 4633 
 4634         struct redisCommand *c;
 4635         dictEntry *de;
 4636         dictIterator *di;
 4637         di = dictGetSafeIterator(server.commands);
 4638         while((de = dictNext(di)) != NULL) {
 4639             c = (struct redisCommand *) dictGetVal(de);
 4640             if (!c->calls) continue;
 4641             info = sdscatprintf(info,
 4642                 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
 4643                 c->name, c->calls, c->microseconds,
 4644                 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
 4645         }
 4646         dictReleaseIterator(di);
 4647     }
 4648 
 4649     /* Cluster */
 4650     if (allsections || defsections || !strcasecmp(section,"cluster")) {
 4651         if (sections++) info = sdscat(info,"\r\n");
 4652         info = sdscatprintf(info,
 4653         "# Cluster\r\n"
 4654         "cluster_enabled:%d\r\n",
 4655         server.cluster_enabled);
 4656     }
 4657 
 4658     /* Key space */
 4659     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
 4660         if (sections++) info = sdscat(info,"\r\n");
 4661         info = sdscatprintf(info, "# Keyspace\r\n");
 4662         for (j = 0; j < server.dbnum; j++) {
 4663             long long keys, vkeys;
 4664 
 4665             keys = dictSize(server.db[j].dict);
 4666             vkeys = dictSize(server.db[j].expires);
 4667             if (keys || vkeys) {
 4668                 info = sdscatprintf(info,
 4669                     "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
 4670                     j, keys, vkeys, server.db[j].avg_ttl);
 4671             }
 4672         }
 4673     }
 4674 
 4675     /* Get info from modules.
 4676      * if user asked for "everything" or "modules", or a specific section
 4677      * that's not found yet. */
 4678     if (everything || modules ||
 4679         (!allsections && !defsections && sections==0)) {
 4680         info = modulesCollectInfo(info,
 4681                                   everything || modules ? NULL: section,
 4682                                   0, /* not a crash report */
 4683                                   sections);
 4684     }
 4685     return info;
 4686 }
 4687 
 4688 void infoCommand(client *c) {
 4689     char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
 4690 
 4691     if (c->argc > 2) {
 4692         addReply(c,shared.syntaxerr);
 4693         return;
 4694     }
 4695     sds info = genRedisInfoString(section);
 4696     addReplyVerbatim(c,info,sdslen(info),"txt");
 4697     sdsfree(info);
 4698 }
 4699 
 4700 void monitorCommand(client *c) {
 4701     /* ignore MONITOR if already slave or in monitor mode */
 4702     if (c->flags & CLIENT_SLAVE) return;
 4703 
 4704     c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
 4705     listAddNodeTail(server.monitors,c);
 4706     addReply(c,shared.ok);
 4707 }
 4708 
 4709 /* =================================== Main! ================================ */
 4710 
 4711 <