"Fossies" - the Fresh Open Source Software Archive

Member "redis-5.0.7/src/t_zset.c" (19 Nov 2019, 108437 Bytes) of package /linux/misc/redis-5.0.7.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "t_zset.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 5.0.5_vs_5.0.6.

    1 /*
    2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
    3  * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
    4  * All rights reserved.
    5  *
    6  * Redistribution and use in source and binary forms, with or without
    7  * modification, are permitted provided that the following conditions are met:
    8  *
    9  *   * Redistributions of source code must retain the above copyright notice,
   10  *     this list of conditions and the following disclaimer.
   11  *   * Redistributions in binary form must reproduce the above copyright
   12  *     notice, this list of conditions and the following disclaimer in the
   13  *     documentation and/or other materials provided with the distribution.
   14  *   * Neither the name of Redis nor the names of its contributors may be used
   15  *     to endorse or promote products derived from this software without
   16  *     specific prior written permission.
   17  *
   18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   28  * POSSIBILITY OF SUCH DAMAGE.
   29  */
   30 
   31 /*-----------------------------------------------------------------------------
   32  * Sorted set API
   33  *----------------------------------------------------------------------------*/
   34 
   35 /* ZSETs are ordered sets using two data structures to hold the same elements
   36  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
   37  * data structure.
   38  *
   39  * The elements are added to a hash table mapping Redis objects to scores.
   40  * At the same time the elements are added to a skip list mapping scores
   41  * to Redis objects (so objects are sorted by scores in this "view").
   42  *
   43  * Note that the SDS string representing the element is the same in both
   44  * the hash table and skiplist in order to save memory. What we do in order
   45  * to manage the shared SDS string more easily is to free the SDS string
   46  * only in zslFreeNode(). The dictionary has no value free method set.
   47  * So we should always remove an element from the dictionary, and later from
   48  * the skiplist.
   49  *
   50  * This skiplist implementation is almost a C translation of the original
   51  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
   52  * Alternative to Balanced Trees", modified in three ways:
   53  * a) this implementation allows for repeated scores.
   54  * b) the comparison is not just by key (our 'score') but by satellite data.
   55  * c) there is a back pointer, so it's a doubly linked list with the back
   56  * pointers being only at "level 1". This allows to traverse the list
   57  * from tail to head, useful for ZREVRANGE. */
   58 
   59 #include "server.h"
   60 #include <math.h>
   61 
   62 /*-----------------------------------------------------------------------------
   63  * Skiplist implementation of the low level API
   64  *----------------------------------------------------------------------------*/
   65 
   66 int zslLexValueGteMin(sds value, zlexrangespec *spec);
   67 int zslLexValueLteMax(sds value, zlexrangespec *spec);
   68 
   69 /* Create a skiplist node with the specified number of levels.
   70  * The SDS string 'ele' is referenced by the node after the call. */
   71 zskiplistNode *zslCreateNode(int level, double score, sds ele) {
   72     zskiplistNode *zn =
   73         zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
   74     zn->score = score;
   75     zn->ele = ele;
   76     return zn;
   77 }
   78 
   79 /* Create a new skiplist. */
   80 zskiplist *zslCreate(void) {
   81     int j;
   82     zskiplist *zsl;
   83 
   84     zsl = zmalloc(sizeof(*zsl));
   85     zsl->level = 1;
   86     zsl->length = 0;
   87     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
   88     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
   89         zsl->header->level[j].forward = NULL;
   90         zsl->header->level[j].span = 0;
   91     }
   92     zsl->header->backward = NULL;
   93     zsl->tail = NULL;
   94     return zsl;
   95 }
   96 
   97 /* Free the specified skiplist node. The referenced SDS string representation
   98  * of the element is freed too, unless node->ele is set to NULL before calling
   99  * this function. */
  100 void zslFreeNode(zskiplistNode *node) {
  101     sdsfree(node->ele);
  102     zfree(node);
  103 }
  104 
  105 /* Free a whole skiplist. */
  106 void zslFree(zskiplist *zsl) {
  107     zskiplistNode *node = zsl->header->level[0].forward, *next;
  108 
  109     zfree(zsl->header);
  110     while(node) {
  111         next = node->level[0].forward;
  112         zslFreeNode(node);
  113         node = next;
  114     }
  115     zfree(zsl);
  116 }
  117 
  118 /* Returns a random level for the new skiplist node we are going to create.
  119  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
  120  * (both inclusive), with a powerlaw-alike distribution where higher
  121  * levels are less likely to be returned. */
  122 int zslRandomLevel(void) {
  123     int level = 1;
  124     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
  125         level += 1;
  126     return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
  127 }
  128 
  129 /* Insert a new node in the skiplist. Assumes the element does not already
  130  * exist (up to the caller to enforce that). The skiplist takes ownership
  131  * of the passed SDS string 'ele'. */
  132 zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
  133     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  134     unsigned int rank[ZSKIPLIST_MAXLEVEL];
  135     int i, level;
  136 
  137     serverAssert(!isnan(score));
  138     x = zsl->header;
  139     for (i = zsl->level-1; i >= 0; i--) {
  140         /* store rank that is crossed to reach the insert position */
  141         rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
  142         while (x->level[i].forward &&
  143                 (x->level[i].forward->score < score ||
  144                     (x->level[i].forward->score == score &&
  145                     sdscmp(x->level[i].forward->ele,ele) < 0)))
  146         {
  147             rank[i] += x->level[i].span;
  148             x = x->level[i].forward;
  149         }
  150         update[i] = x;
  151     }
  152     /* we assume the element is not already inside, since we allow duplicated
  153      * scores, reinserting the same element should never happen since the
  154      * caller of zslInsert() should test in the hash table if the element is
  155      * already inside or not. */
  156     level = zslRandomLevel();
  157     if (level > zsl->level) {
  158         for (i = zsl->level; i < level; i++) {
  159             rank[i] = 0;
  160             update[i] = zsl->header;
  161             update[i]->level[i].span = zsl->length;
  162         }
  163         zsl->level = level;
  164     }
  165     x = zslCreateNode(level,score,ele);
  166     for (i = 0; i < level; i++) {
  167         x->level[i].forward = update[i]->level[i].forward;
  168         update[i]->level[i].forward = x;
  169 
  170         /* update span covered by update[i] as x is inserted here */
  171         x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
  172         update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  173     }
  174 
  175     /* increment span for untouched levels */
  176     for (i = level; i < zsl->level; i++) {
  177         update[i]->level[i].span++;
  178     }
  179 
  180     x->backward = (update[0] == zsl->header) ? NULL : update[0];
  181     if (x->level[0].forward)
  182         x->level[0].forward->backward = x;
  183     else
  184         zsl->tail = x;
  185     zsl->length++;
  186     return x;
  187 }
  188 
  189 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
  190 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
  191     int i;
  192     for (i = 0; i < zsl->level; i++) {
  193         if (update[i]->level[i].forward == x) {
  194             update[i]->level[i].span += x->level[i].span - 1;
  195             update[i]->level[i].forward = x->level[i].forward;
  196         } else {
  197             update[i]->level[i].span -= 1;
  198         }
  199     }
  200     if (x->level[0].forward) {
  201         x->level[0].forward->backward = x->backward;
  202     } else {
  203         zsl->tail = x->backward;
  204     }
  205     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
  206         zsl->level--;
  207     zsl->length--;
  208 }
  209 
  210 /* Delete an element with matching score/element from the skiplist.
  211  * The function returns 1 if the node was found and deleted, otherwise
  212  * 0 is returned.
  213  *
  214  * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
  215  * it is not freed (but just unlinked) and *node is set to the node pointer,
  216  * so that it is possible for the caller to reuse the node (including the
  217  * referenced SDS string at node->ele). */
  218 int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
  219     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  220     int i;
  221 
  222     x = zsl->header;
  223     for (i = zsl->level-1; i >= 0; i--) {
  224         while (x->level[i].forward &&
  225                 (x->level[i].forward->score < score ||
  226                     (x->level[i].forward->score == score &&
  227                      sdscmp(x->level[i].forward->ele,ele) < 0)))
  228         {
  229             x = x->level[i].forward;
  230         }
  231         update[i] = x;
  232     }
  233     /* We may have multiple elements with the same score, what we need
  234      * is to find the element with both the right score and object. */
  235     x = x->level[0].forward;
  236     if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
  237         zslDeleteNode(zsl, x, update);
  238         if (!node)
  239             zslFreeNode(x);
  240         else
  241             *node = x;
  242         return 1;
  243     }
  244     return 0; /* not found */
  245 }
  246 
  247 /* Update the score of an elmenent inside the sorted set skiplist.
  248  * Note that the element must exist and must match 'score'.
  249  * This function does not update the score in the hash table side, the
  250  * caller should take care of it.
  251  *
  252  * Note that this function attempts to just update the node, in case after
  253  * the score update, the node would be exactly at the same position.
  254  * Otherwise the skiplist is modified by removing and re-adding a new
  255  * element, which is more costly.
  256  *
  257  * The function returns the updated element skiplist node pointer. */
  258 zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
  259     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  260     int i;
  261 
  262     /* We need to seek to element to update to start: this is useful anyway,
  263      * we'll have to update or remove it. */
  264     x = zsl->header;
  265     for (i = zsl->level-1; i >= 0; i--) {
  266         while (x->level[i].forward &&
  267                 (x->level[i].forward->score < curscore ||
  268                     (x->level[i].forward->score == curscore &&
  269                      sdscmp(x->level[i].forward->ele,ele) < 0)))
  270         {
  271             x = x->level[i].forward;
  272         }
  273         update[i] = x;
  274     }
  275 
  276     /* Jump to our element: note that this function assumes that the
  277      * element with the matching score exists. */
  278     x = x->level[0].forward;
  279     serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
  280 
  281     /* If the node, after the score update, would be still exactly
  282      * at the same position, we can just update the score without
  283      * actually removing and re-inserting the element in the skiplist. */
  284     if ((x->backward == NULL || x->backward->score < newscore) &&
  285         (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
  286     {
  287         x->score = newscore;
  288         return x;
  289     }
  290 
  291     /* No way to reuse the old node: we need to remove and insert a new
  292      * one at a different place. */
  293     zslDeleteNode(zsl, x, update);
  294     zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
  295     /* We reused the old node x->ele SDS string, free the node now
  296      * since zslInsert created a new one. */
  297     x->ele = NULL;
  298     zslFreeNode(x);
  299     return newnode;
  300 }
  301 
  302 int zslValueGteMin(double value, zrangespec *spec) {
  303     return spec->minex ? (value > spec->min) : (value >= spec->min);
  304 }
  305 
  306 int zslValueLteMax(double value, zrangespec *spec) {
  307     return spec->maxex ? (value < spec->max) : (value <= spec->max);
  308 }
  309 
  310 /* Returns if there is a part of the zset is in range. */
  311 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
  312     zskiplistNode *x;
  313 
  314     /* Test for ranges that will always be empty. */
  315     if (range->min > range->max ||
  316             (range->min == range->max && (range->minex || range->maxex)))
  317         return 0;
  318     x = zsl->tail;
  319     if (x == NULL || !zslValueGteMin(x->score,range))
  320         return 0;
  321     x = zsl->header->level[0].forward;
  322     if (x == NULL || !zslValueLteMax(x->score,range))
  323         return 0;
  324     return 1;
  325 }
  326 
  327 /* Find the first node that is contained in the specified range.
  328  * Returns NULL when no element is contained in the range. */
  329 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
  330     zskiplistNode *x;
  331     int i;
  332 
  333     /* If everything is out of range, return early. */
  334     if (!zslIsInRange(zsl,range)) return NULL;
  335 
  336     x = zsl->header;
  337     for (i = zsl->level-1; i >= 0; i--) {
  338         /* Go forward while *OUT* of range. */
  339         while (x->level[i].forward &&
  340             !zslValueGteMin(x->level[i].forward->score,range))
  341                 x = x->level[i].forward;
  342     }
  343 
  344     /* This is an inner range, so the next node cannot be NULL. */
  345     x = x->level[0].forward;
  346     serverAssert(x != NULL);
  347 
  348     /* Check if score <= max. */
  349     if (!zslValueLteMax(x->score,range)) return NULL;
  350     return x;
  351 }
  352 
  353 /* Find the last node that is contained in the specified range.
  354  * Returns NULL when no element is contained in the range. */
  355 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
  356     zskiplistNode *x;
  357     int i;
  358 
  359     /* If everything is out of range, return early. */
  360     if (!zslIsInRange(zsl,range)) return NULL;
  361 
  362     x = zsl->header;
  363     for (i = zsl->level-1; i >= 0; i--) {
  364         /* Go forward while *IN* range. */
  365         while (x->level[i].forward &&
  366             zslValueLteMax(x->level[i].forward->score,range))
  367                 x = x->level[i].forward;
  368     }
  369 
  370     /* This is an inner range, so this node cannot be NULL. */
  371     serverAssert(x != NULL);
  372 
  373     /* Check if score >= min. */
  374     if (!zslValueGteMin(x->score,range)) return NULL;
  375     return x;
  376 }
  377 
  378 /* Delete all the elements with score between min and max from the skiplist.
  379  * Min and max are inclusive, so a score >= min || score <= max is deleted.
  380  * Note that this function takes the reference to the hash table view of the
  381  * sorted set, in order to remove the elements from the hash table too. */
  382 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
  383     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  384     unsigned long removed = 0;
  385     int i;
  386 
  387     x = zsl->header;
  388     for (i = zsl->level-1; i >= 0; i--) {
  389         while (x->level[i].forward && (range->minex ?
  390             x->level[i].forward->score <= range->min :
  391             x->level[i].forward->score < range->min))
  392                 x = x->level[i].forward;
  393         update[i] = x;
  394     }
  395 
  396     /* Current node is the last with score < or <= min. */
  397     x = x->level[0].forward;
  398 
  399     /* Delete nodes while in range. */
  400     while (x &&
  401            (range->maxex ? x->score < range->max : x->score <= range->max))
  402     {
  403         zskiplistNode *next = x->level[0].forward;
  404         zslDeleteNode(zsl,x,update);
  405         dictDelete(dict,x->ele);
  406         zslFreeNode(x); /* Here is where x->ele is actually released. */
  407         removed++;
  408         x = next;
  409     }
  410     return removed;
  411 }
  412 
  413 unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
  414     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  415     unsigned long removed = 0;
  416     int i;
  417 
  418 
  419     x = zsl->header;
  420     for (i = zsl->level-1; i >= 0; i--) {
  421         while (x->level[i].forward &&
  422             !zslLexValueGteMin(x->level[i].forward->ele,range))
  423                 x = x->level[i].forward;
  424         update[i] = x;
  425     }
  426 
  427     /* Current node is the last with score < or <= min. */
  428     x = x->level[0].forward;
  429 
  430     /* Delete nodes while in range. */
  431     while (x && zslLexValueLteMax(x->ele,range)) {
  432         zskiplistNode *next = x->level[0].forward;
  433         zslDeleteNode(zsl,x,update);
  434         dictDelete(dict,x->ele);
  435         zslFreeNode(x); /* Here is where x->ele is actually released. */
  436         removed++;
  437         x = next;
  438     }
  439     return removed;
  440 }
  441 
  442 /* Delete all the elements with rank between start and end from the skiplist.
  443  * Start and end are inclusive. Note that start and end need to be 1-based */
  444 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
  445     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  446     unsigned long traversed = 0, removed = 0;
  447     int i;
  448 
  449     x = zsl->header;
  450     for (i = zsl->level-1; i >= 0; i--) {
  451         while (x->level[i].forward && (traversed + x->level[i].span) < start) {
  452             traversed += x->level[i].span;
  453             x = x->level[i].forward;
  454         }
  455         update[i] = x;
  456     }
  457 
  458     traversed++;
  459     x = x->level[0].forward;
  460     while (x && traversed <= end) {
  461         zskiplistNode *next = x->level[0].forward;
  462         zslDeleteNode(zsl,x,update);
  463         dictDelete(dict,x->ele);
  464         zslFreeNode(x);
  465         removed++;
  466         traversed++;
  467         x = next;
  468     }
  469     return removed;
  470 }
  471 
  472 /* Find the rank for an element by both score and key.
  473  * Returns 0 when the element cannot be found, rank otherwise.
  474  * Note that the rank is 1-based due to the span of zsl->header to the
  475  * first element. */
  476 unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
  477     zskiplistNode *x;
  478     unsigned long rank = 0;
  479     int i;
  480 
  481     x = zsl->header;
  482     for (i = zsl->level-1; i >= 0; i--) {
  483         while (x->level[i].forward &&
  484             (x->level[i].forward->score < score ||
  485                 (x->level[i].forward->score == score &&
  486                 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
  487             rank += x->level[i].span;
  488             x = x->level[i].forward;
  489         }
  490 
  491         /* x might be equal to zsl->header, so test if obj is non-NULL */
  492         if (x->ele && sdscmp(x->ele,ele) == 0) {
  493             return rank;
  494         }
  495     }
  496     return 0;
  497 }
  498 
  499 /* Finds an element by its rank. The rank argument needs to be 1-based. */
  500 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
  501     zskiplistNode *x;
  502     unsigned long traversed = 0;
  503     int i;
  504 
  505     x = zsl->header;
  506     for (i = zsl->level-1; i >= 0; i--) {
  507         while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
  508         {
  509             traversed += x->level[i].span;
  510             x = x->level[i].forward;
  511         }
  512         if (traversed == rank) {
  513             return x;
  514         }
  515     }
  516     return NULL;
  517 }
  518 
  519 /* Populate the rangespec according to the objects min and max. */
  520 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
  521     char *eptr;
  522     spec->minex = spec->maxex = 0;
  523 
  524     /* Parse the min-max interval. If one of the values is prefixed
  525      * by the "(" character, it's considered "open". For instance
  526      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
  527      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
  528     if (min->encoding == OBJ_ENCODING_INT) {
  529         spec->min = (long)min->ptr;
  530     } else {
  531         if (((char*)min->ptr)[0] == '(') {
  532             spec->min = strtod((char*)min->ptr+1,&eptr);
  533             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
  534             spec->minex = 1;
  535         } else {
  536             spec->min = strtod((char*)min->ptr,&eptr);
  537             if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR;
  538         }
  539     }
  540     if (max->encoding == OBJ_ENCODING_INT) {
  541         spec->max = (long)max->ptr;
  542     } else {
  543         if (((char*)max->ptr)[0] == '(') {
  544             spec->max = strtod((char*)max->ptr+1,&eptr);
  545             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
  546             spec->maxex = 1;
  547         } else {
  548             spec->max = strtod((char*)max->ptr,&eptr);
  549             if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR;
  550         }
  551     }
  552 
  553     return C_OK;
  554 }
  555 
  556 /* ------------------------ Lexicographic ranges ---------------------------- */
  557 
  558 /* Parse max or min argument of ZRANGEBYLEX.
  559   * (foo means foo (open interval)
  560   * [foo means foo (closed interval)
  561   * - means the min string possible
  562   * + means the max string possible
  563   *
  564   * If the string is valid the *dest pointer is set to the redis object
  565   * that will be used for the comparison, and ex will be set to 0 or 1
  566   * respectively if the item is exclusive or inclusive. C_OK will be
  567   * returned.
  568   *
  569   * If the string is not a valid range C_ERR is returned, and the value
  570   * of *dest and *ex is undefined. */
  571 int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
  572     char *c = item->ptr;
  573 
  574     switch(c[0]) {
  575     case '+':
  576         if (c[1] != '\0') return C_ERR;
  577         *ex = 1;
  578         *dest = shared.maxstring;
  579         return C_OK;
  580     case '-':
  581         if (c[1] != '\0') return C_ERR;
  582         *ex = 1;
  583         *dest = shared.minstring;
  584         return C_OK;
  585     case '(':
  586         *ex = 1;
  587         *dest = sdsnewlen(c+1,sdslen(c)-1);
  588         return C_OK;
  589     case '[':
  590         *ex = 0;
  591         *dest = sdsnewlen(c+1,sdslen(c)-1);
  592         return C_OK;
  593     default:
  594         return C_ERR;
  595     }
  596 }
  597 
  598 /* Free a lex range structure, must be called only after zelParseLexRange()
  599  * populated the structure with success (C_OK returned). */
  600 void zslFreeLexRange(zlexrangespec *spec) {
  601     if (spec->min != shared.minstring &&
  602         spec->min != shared.maxstring) sdsfree(spec->min);
  603     if (spec->max != shared.minstring &&
  604         spec->max != shared.maxstring) sdsfree(spec->max);
  605 }
  606 
  607 /* Populate the lex rangespec according to the objects min and max.
  608  *
  609  * Return C_OK on success. On error C_ERR is returned.
  610  * When OK is returned the structure must be freed with zslFreeLexRange(),
  611  * otherwise no release is needed. */
  612 int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
  613     /* The range can't be valid if objects are integer encoded.
  614      * Every item must start with ( or [. */
  615     if (min->encoding == OBJ_ENCODING_INT ||
  616         max->encoding == OBJ_ENCODING_INT) return C_ERR;
  617 
  618     spec->min = spec->max = NULL;
  619     if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
  620         zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
  621         zslFreeLexRange(spec);
  622         return C_ERR;
  623     } else {
  624         return C_OK;
  625     }
  626 }
  627 
  628 /* This is just a wrapper to sdscmp() that is able to
  629  * handle shared.minstring and shared.maxstring as the equivalent of
  630  * -inf and +inf for strings */
  631 int sdscmplex(sds a, sds b) {
  632     if (a == b) return 0;
  633     if (a == shared.minstring || b == shared.maxstring) return -1;
  634     if (a == shared.maxstring || b == shared.minstring) return 1;
  635     return sdscmp(a,b);
  636 }
  637 
  638 int zslLexValueGteMin(sds value, zlexrangespec *spec) {
  639     return spec->minex ?
  640         (sdscmplex(value,spec->min) > 0) :
  641         (sdscmplex(value,spec->min) >= 0);
  642 }
  643 
  644 int zslLexValueLteMax(sds value, zlexrangespec *spec) {
  645     return spec->maxex ?
  646         (sdscmplex(value,spec->max) < 0) :
  647         (sdscmplex(value,spec->max) <= 0);
  648 }
  649 
  650 /* Returns if there is a part of the zset is in the lex range. */
  651 int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
  652     zskiplistNode *x;
  653 
  654     /* Test for ranges that will always be empty. */
  655     int cmp = sdscmplex(range->min,range->max);
  656     if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
  657         return 0;
  658     x = zsl->tail;
  659     if (x == NULL || !zslLexValueGteMin(x->ele,range))
  660         return 0;
  661     x = zsl->header->level[0].forward;
  662     if (x == NULL || !zslLexValueLteMax(x->ele,range))
  663         return 0;
  664     return 1;
  665 }
  666 
  667 /* Find the first node that is contained in the specified lex range.
  668  * Returns NULL when no element is contained in the range. */
  669 zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
  670     zskiplistNode *x;
  671     int i;
  672 
  673     /* If everything is out of range, return early. */
  674     if (!zslIsInLexRange(zsl,range)) return NULL;
  675 
  676     x = zsl->header;
  677     for (i = zsl->level-1; i >= 0; i--) {
  678         /* Go forward while *OUT* of range. */
  679         while (x->level[i].forward &&
  680             !zslLexValueGteMin(x->level[i].forward->ele,range))
  681                 x = x->level[i].forward;
  682     }
  683 
  684     /* This is an inner range, so the next node cannot be NULL. */
  685     x = x->level[0].forward;
  686     serverAssert(x != NULL);
  687 
  688     /* Check if score <= max. */
  689     if (!zslLexValueLteMax(x->ele,range)) return NULL;
  690     return x;
  691 }
  692 
  693 /* Find the last node that is contained in the specified range.
  694  * Returns NULL when no element is contained in the range. */
  695 zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
  696     zskiplistNode *x;
  697     int i;
  698 
  699     /* If everything is out of range, return early. */
  700     if (!zslIsInLexRange(zsl,range)) return NULL;
  701 
  702     x = zsl->header;
  703     for (i = zsl->level-1; i >= 0; i--) {
  704         /* Go forward while *IN* range. */
  705         while (x->level[i].forward &&
  706             zslLexValueLteMax(x->level[i].forward->ele,range))
  707                 x = x->level[i].forward;
  708     }
  709 
  710     /* This is an inner range, so this node cannot be NULL. */
  711     serverAssert(x != NULL);
  712 
  713     /* Check if score >= min. */
  714     if (!zslLexValueGteMin(x->ele,range)) return NULL;
  715     return x;
  716 }
  717 
  718 /*-----------------------------------------------------------------------------
  719  * Ziplist-backed sorted set API
  720  *----------------------------------------------------------------------------*/
  721 
  722 double zzlGetScore(unsigned char *sptr) {
  723     unsigned char *vstr;
  724     unsigned int vlen;
  725     long long vlong;
  726     char buf[128];
  727     double score;
  728 
  729     serverAssert(sptr != NULL);
  730     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
  731 
  732     if (vstr) {
  733         memcpy(buf,vstr,vlen);
  734         buf[vlen] = '\0';
  735         score = strtod(buf,NULL);
  736     } else {
  737         score = vlong;
  738     }
  739 
  740     return score;
  741 }
  742 
  743 /* Return a ziplist element as an SDS string. */
  744 sds ziplistGetObject(unsigned char *sptr) {
  745     unsigned char *vstr;
  746     unsigned int vlen;
  747     long long vlong;
  748 
  749     serverAssert(sptr != NULL);
  750     serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
  751 
  752     if (vstr) {
  753         return sdsnewlen((char*)vstr,vlen);
  754     } else {
  755         return sdsfromlonglong(vlong);
  756     }
  757 }
  758 
  759 /* Compare element in sorted set with given element. */
  760 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
  761     unsigned char *vstr;
  762     unsigned int vlen;
  763     long long vlong;
  764     unsigned char vbuf[32];
  765     int minlen, cmp;
  766 
  767     serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
  768     if (vstr == NULL) {
  769         /* Store string representation of long long in buf. */
  770         vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
  771         vstr = vbuf;
  772     }
  773 
  774     minlen = (vlen < clen) ? vlen : clen;
  775     cmp = memcmp(vstr,cstr,minlen);
  776     if (cmp == 0) return vlen-clen;
  777     return cmp;
  778 }
  779 
  780 unsigned int zzlLength(unsigned char *zl) {
  781     return ziplistLen(zl)/2;
  782 }
  783 
  784 /* Move to next entry based on the values in eptr and sptr. Both are set to
  785  * NULL when there is no next entry. */
  786 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
  787     unsigned char *_eptr, *_sptr;
  788     serverAssert(*eptr != NULL && *sptr != NULL);
  789 
  790     _eptr = ziplistNext(zl,*sptr);
  791     if (_eptr != NULL) {
  792         _sptr = ziplistNext(zl,_eptr);
  793         serverAssert(_sptr != NULL);
  794     } else {
  795         /* No next entry. */
  796         _sptr = NULL;
  797     }
  798 
  799     *eptr = _eptr;
  800     *sptr = _sptr;
  801 }
  802 
  803 /* Move to the previous entry based on the values in eptr and sptr. Both are
  804  * set to NULL when there is no next entry. */
  805 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
  806     unsigned char *_eptr, *_sptr;
  807     serverAssert(*eptr != NULL && *sptr != NULL);
  808 
  809     _sptr = ziplistPrev(zl,*eptr);
  810     if (_sptr != NULL) {
  811         _eptr = ziplistPrev(zl,_sptr);
  812         serverAssert(_eptr != NULL);
  813     } else {
  814         /* No previous entry. */
  815         _eptr = NULL;
  816     }
  817 
  818     *eptr = _eptr;
  819     *sptr = _sptr;
  820 }
  821 
  822 /* Returns if there is a part of the zset is in range. Should only be used
  823  * internally by zzlFirstInRange and zzlLastInRange. */
  824 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
  825     unsigned char *p;
  826     double score;
  827 
  828     /* Test for ranges that will always be empty. */
  829     if (range->min > range->max ||
  830             (range->min == range->max && (range->minex || range->maxex)))
  831         return 0;
  832 
  833     p = ziplistIndex(zl,-1); /* Last score. */
  834     if (p == NULL) return 0; /* Empty sorted set */
  835     score = zzlGetScore(p);
  836     if (!zslValueGteMin(score,range))
  837         return 0;
  838 
  839     p = ziplistIndex(zl,1); /* First score. */
  840     serverAssert(p != NULL);
  841     score = zzlGetScore(p);
  842     if (!zslValueLteMax(score,range))
  843         return 0;
  844 
  845     return 1;
  846 }
  847 
  848 /* Find pointer to the first element contained in the specified range.
  849  * Returns NULL when no element is contained in the range. */
  850 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
  851     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
  852     double score;
  853 
  854     /* If everything is out of range, return early. */
  855     if (!zzlIsInRange(zl,range)) return NULL;
  856 
  857     while (eptr != NULL) {
  858         sptr = ziplistNext(zl,eptr);
  859         serverAssert(sptr != NULL);
  860 
  861         score = zzlGetScore(sptr);
  862         if (zslValueGteMin(score,range)) {
  863             /* Check if score <= max. */
  864             if (zslValueLteMax(score,range))
  865                 return eptr;
  866             return NULL;
  867         }
  868 
  869         /* Move to next element. */
  870         eptr = ziplistNext(zl,sptr);
  871     }
  872 
  873     return NULL;
  874 }
  875 
  876 /* Find pointer to the last element contained in the specified range.
  877  * Returns NULL when no element is contained in the range. */
  878 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
  879     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
  880     double score;
  881 
  882     /* If everything is out of range, return early. */
  883     if (!zzlIsInRange(zl,range)) return NULL;
  884 
  885     while (eptr != NULL) {
  886         sptr = ziplistNext(zl,eptr);
  887         serverAssert(sptr != NULL);
  888 
  889         score = zzlGetScore(sptr);
  890         if (zslValueLteMax(score,range)) {
  891             /* Check if score >= min. */
  892             if (zslValueGteMin(score,range))
  893                 return eptr;
  894             return NULL;
  895         }
  896 
  897         /* Move to previous element by moving to the score of previous element.
  898          * When this returns NULL, we know there also is no element. */
  899         sptr = ziplistPrev(zl,eptr);
  900         if (sptr != NULL)
  901             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
  902         else
  903             eptr = NULL;
  904     }
  905 
  906     return NULL;
  907 }
  908 
  909 int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
  910     sds value = ziplistGetObject(p);
  911     int res = zslLexValueGteMin(value,spec);
  912     sdsfree(value);
  913     return res;
  914 }
  915 
  916 int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
  917     sds value = ziplistGetObject(p);
  918     int res = zslLexValueLteMax(value,spec);
  919     sdsfree(value);
  920     return res;
  921 }
  922 
  923 /* Returns if there is a part of the zset is in range. Should only be used
  924  * internally by zzlFirstInRange and zzlLastInRange. */
  925 int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
  926     unsigned char *p;
  927 
  928     /* Test for ranges that will always be empty. */
  929     int cmp = sdscmplex(range->min,range->max);
  930     if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
  931         return 0;
  932 
  933     p = ziplistIndex(zl,-2); /* Last element. */
  934     if (p == NULL) return 0;
  935     if (!zzlLexValueGteMin(p,range))
  936         return 0;
  937 
  938     p = ziplistIndex(zl,0); /* First element. */
  939     serverAssert(p != NULL);
  940     if (!zzlLexValueLteMax(p,range))
  941         return 0;
  942 
  943     return 1;
  944 }
  945 
  946 /* Find pointer to the first element contained in the specified lex range.
  947  * Returns NULL when no element is contained in the range. */
  948 unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
  949     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
  950 
  951     /* If everything is out of range, return early. */
  952     if (!zzlIsInLexRange(zl,range)) return NULL;
  953 
  954     while (eptr != NULL) {
  955         if (zzlLexValueGteMin(eptr,range)) {
  956             /* Check if score <= max. */
  957             if (zzlLexValueLteMax(eptr,range))
  958                 return eptr;
  959             return NULL;
  960         }
  961 
  962         /* Move to next element. */
  963         sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
  964         serverAssert(sptr != NULL);
  965         eptr = ziplistNext(zl,sptr); /* Next element. */
  966     }
  967 
  968     return NULL;
  969 }
  970 
  971 /* Find pointer to the last element contained in the specified lex range.
  972  * Returns NULL when no element is contained in the range. */
  973 unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
  974     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
  975 
  976     /* If everything is out of range, return early. */
  977     if (!zzlIsInLexRange(zl,range)) return NULL;
  978 
  979     while (eptr != NULL) {
  980         if (zzlLexValueLteMax(eptr,range)) {
  981             /* Check if score >= min. */
  982             if (zzlLexValueGteMin(eptr,range))
  983                 return eptr;
  984             return NULL;
  985         }
  986 
  987         /* Move to previous element by moving to the score of previous element.
  988          * When this returns NULL, we know there also is no element. */
  989         sptr = ziplistPrev(zl,eptr);
  990         if (sptr != NULL)
  991             serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
  992         else
  993             eptr = NULL;
  994     }
  995 
  996     return NULL;
  997 }
  998 
  999 unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
 1000     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
 1001 
 1002     while (eptr != NULL) {
 1003         sptr = ziplistNext(zl,eptr);
 1004         serverAssert(sptr != NULL);
 1005 
 1006         if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
 1007             /* Matching element, pull out score. */
 1008             if (score != NULL) *score = zzlGetScore(sptr);
 1009             return eptr;
 1010         }
 1011 
 1012         /* Move to next element. */
 1013         eptr = ziplistNext(zl,sptr);
 1014     }
 1015     return NULL;
 1016 }
 1017 
 1018 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
 1019  * don't want to modify the one given as argument. */
 1020 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
 1021     unsigned char *p = eptr;
 1022 
 1023     /* TODO: add function to ziplist API to delete N elements from offset. */
 1024     zl = ziplistDelete(zl,&p);
 1025     zl = ziplistDelete(zl,&p);
 1026     return zl;
 1027 }
 1028 
 1029 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
 1030     unsigned char *sptr;
 1031     char scorebuf[128];
 1032     int scorelen;
 1033     size_t offset;
 1034 
 1035     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
 1036     if (eptr == NULL) {
 1037         zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
 1038         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
 1039     } else {
 1040         /* Keep offset relative to zl, as it might be re-allocated. */
 1041         offset = eptr-zl;
 1042         zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
 1043         eptr = zl+offset;
 1044 
 1045         /* Insert score after the element. */
 1046         serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
 1047         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
 1048     }
 1049     return zl;
 1050 }
 1051 
 1052 /* Insert (element,score) pair in ziplist. This function assumes the element is
 1053  * not yet present in the list. */
 1054 unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
 1055     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
 1056     double s;
 1057 
 1058     while (eptr != NULL) {
 1059         sptr = ziplistNext(zl,eptr);
 1060         serverAssert(sptr != NULL);
 1061         s = zzlGetScore(sptr);
 1062 
 1063         if (s > score) {
 1064             /* First element with score larger than score for element to be
 1065              * inserted. This means we should take its spot in the list to
 1066              * maintain ordering. */
 1067             zl = zzlInsertAt(zl,eptr,ele,score);
 1068             break;
 1069         } else if (s == score) {
 1070             /* Ensure lexicographical ordering for elements. */
 1071             if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
 1072                 zl = zzlInsertAt(zl,eptr,ele,score);
 1073                 break;
 1074             }
 1075         }
 1076 
 1077         /* Move to next element. */
 1078         eptr = ziplistNext(zl,sptr);
 1079     }
 1080 
 1081     /* Push on tail of list when it was not yet inserted. */
 1082     if (eptr == NULL)
 1083         zl = zzlInsertAt(zl,NULL,ele,score);
 1084     return zl;
 1085 }
 1086 
 1087 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
 1088     unsigned char *eptr, *sptr;
 1089     double score;
 1090     unsigned long num = 0;
 1091 
 1092     if (deleted != NULL) *deleted = 0;
 1093 
 1094     eptr = zzlFirstInRange(zl,range);
 1095     if (eptr == NULL) return zl;
 1096 
 1097     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
 1098      * byte and ziplistNext will return NULL. */
 1099     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
 1100         score = zzlGetScore(sptr);
 1101         if (zslValueLteMax(score,range)) {
 1102             /* Delete both the element and the score. */
 1103             zl = ziplistDelete(zl,&eptr);
 1104             zl = ziplistDelete(zl,&eptr);
 1105             num++;
 1106         } else {
 1107             /* No longer in range. */
 1108             break;
 1109         }
 1110     }
 1111 
 1112     if (deleted != NULL) *deleted = num;
 1113     return zl;
 1114 }
 1115 
 1116 unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
 1117     unsigned char *eptr, *sptr;
 1118     unsigned long num = 0;
 1119 
 1120     if (deleted != NULL) *deleted = 0;
 1121 
 1122     eptr = zzlFirstInLexRange(zl,range);
 1123     if (eptr == NULL) return zl;
 1124 
 1125     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
 1126      * byte and ziplistNext will return NULL. */
 1127     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
 1128         if (zzlLexValueLteMax(eptr,range)) {
 1129             /* Delete both the element and the score. */
 1130             zl = ziplistDelete(zl,&eptr);
 1131             zl = ziplistDelete(zl,&eptr);
 1132             num++;
 1133         } else {
 1134             /* No longer in range. */
 1135             break;
 1136         }
 1137     }
 1138 
 1139     if (deleted != NULL) *deleted = num;
 1140     return zl;
 1141 }
 1142 
 1143 /* Delete all the elements with rank between start and end from the skiplist.
 1144  * Start and end are inclusive. Note that start and end need to be 1-based */
 1145 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
 1146     unsigned int num = (end-start)+1;
 1147     if (deleted) *deleted = num;
 1148     zl = ziplistDeleteRange(zl,2*(start-1),2*num);
 1149     return zl;
 1150 }
 1151 
 1152 /*-----------------------------------------------------------------------------
 1153  * Common sorted set API
 1154  *----------------------------------------------------------------------------*/
 1155 
 1156 unsigned long zsetLength(const robj *zobj) {
 1157     unsigned long length = 0;
 1158     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1159         length = zzlLength(zobj->ptr);
 1160     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1161         length = ((const zset*)zobj->ptr)->zsl->length;
 1162     } else {
 1163         serverPanic("Unknown sorted set encoding");
 1164     }
 1165     return length;
 1166 }
 1167 
 1168 void zsetConvert(robj *zobj, int encoding) {
 1169     zset *zs;
 1170     zskiplistNode *node, *next;
 1171     sds ele;
 1172     double score;
 1173 
 1174     if (zobj->encoding == encoding) return;
 1175     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1176         unsigned char *zl = zobj->ptr;
 1177         unsigned char *eptr, *sptr;
 1178         unsigned char *vstr;
 1179         unsigned int vlen;
 1180         long long vlong;
 1181 
 1182         if (encoding != OBJ_ENCODING_SKIPLIST)
 1183             serverPanic("Unknown target encoding");
 1184 
 1185         zs = zmalloc(sizeof(*zs));
 1186         zs->dict = dictCreate(&zsetDictType,NULL);
 1187         zs->zsl = zslCreate();
 1188 
 1189         eptr = ziplistIndex(zl,0);
 1190         serverAssertWithInfo(NULL,zobj,eptr != NULL);
 1191         sptr = ziplistNext(zl,eptr);
 1192         serverAssertWithInfo(NULL,zobj,sptr != NULL);
 1193 
 1194         while (eptr != NULL) {
 1195             score = zzlGetScore(sptr);
 1196             serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
 1197             if (vstr == NULL)
 1198                 ele = sdsfromlonglong(vlong);
 1199             else
 1200                 ele = sdsnewlen((char*)vstr,vlen);
 1201 
 1202             node = zslInsert(zs->zsl,score,ele);
 1203             serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
 1204             zzlNext(zl,&eptr,&sptr);
 1205         }
 1206 
 1207         zfree(zobj->ptr);
 1208         zobj->ptr = zs;
 1209         zobj->encoding = OBJ_ENCODING_SKIPLIST;
 1210     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1211         unsigned char *zl = ziplistNew();
 1212 
 1213         if (encoding != OBJ_ENCODING_ZIPLIST)
 1214             serverPanic("Unknown target encoding");
 1215 
 1216         /* Approach similar to zslFree(), since we want to free the skiplist at
 1217          * the same time as creating the ziplist. */
 1218         zs = zobj->ptr;
 1219         dictRelease(zs->dict);
 1220         node = zs->zsl->header->level[0].forward;
 1221         zfree(zs->zsl->header);
 1222         zfree(zs->zsl);
 1223 
 1224         while (node) {
 1225             zl = zzlInsertAt(zl,NULL,node->ele,node->score);
 1226             next = node->level[0].forward;
 1227             zslFreeNode(node);
 1228             node = next;
 1229         }
 1230 
 1231         zfree(zs);
 1232         zobj->ptr = zl;
 1233         zobj->encoding = OBJ_ENCODING_ZIPLIST;
 1234     } else {
 1235         serverPanic("Unknown sorted set encoding");
 1236     }
 1237 }
 1238 
 1239 /* Convert the sorted set object into a ziplist if it is not already a ziplist
 1240  * and if the number of elements and the maximum element size is within the
 1241  * expected ranges. */
 1242 void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
 1243     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) return;
 1244     zset *zset = zobj->ptr;
 1245 
 1246     if (zset->zsl->length <= server.zset_max_ziplist_entries &&
 1247         maxelelen <= server.zset_max_ziplist_value)
 1248             zsetConvert(zobj,OBJ_ENCODING_ZIPLIST);
 1249 }
 1250 
 1251 /* Return (by reference) the score of the specified member of the sorted set
 1252  * storing it into *score. If the element does not exist C_ERR is returned
 1253  * otherwise C_OK is returned and *score is correctly populated.
 1254  * If 'zobj' or 'member' is NULL, C_ERR is returned. */
 1255 int zsetScore(robj *zobj, sds member, double *score) {
 1256     if (!zobj || !member) return C_ERR;
 1257 
 1258     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1259         if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
 1260     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1261         zset *zs = zobj->ptr;
 1262         dictEntry *de = dictFind(zs->dict, member);
 1263         if (de == NULL) return C_ERR;
 1264         *score = *(double*)dictGetVal(de);
 1265     } else {
 1266         serverPanic("Unknown sorted set encoding");
 1267     }
 1268     return C_OK;
 1269 }
 1270 
 1271 /* Add a new element or update the score of an existing element in a sorted
 1272  * set, regardless of its encoding.
 1273  *
 1274  * The set of flags change the command behavior. They are passed with an integer
 1275  * pointer since the function will clear the flags and populate them with
 1276  * other flags to indicate different conditions.
 1277  *
 1278  * The input flags are the following:
 1279  *
 1280  * ZADD_INCR: Increment the current element score by 'score' instead of updating
 1281  *            the current element score. If the element does not exist, we
 1282  *            assume 0 as previous score.
 1283  * ZADD_NX:   Perform the operation only if the element does not exist.
 1284  * ZADD_XX:   Perform the operation only if the element already exist.
 1285  *
 1286  * When ZADD_INCR is used, the new score of the element is stored in
 1287  * '*newscore' if 'newscore' is not NULL.
 1288  *
 1289  * The returned flags are the following:
 1290  *
 1291  * ZADD_NAN:     The resulting score is not a number.
 1292  * ZADD_ADDED:   The element was added (not present before the call).
 1293  * ZADD_UPDATED: The element score was updated.
 1294  * ZADD_NOP:     No operation was performed because of NX or XX.
 1295  *
 1296  * Return value:
 1297  *
 1298  * The function returns 1 on success, and sets the appropriate flags
 1299  * ADDED or UPDATED to signal what happened during the operation (note that
 1300  * none could be set if we re-added an element using the same score it used
 1301  * to have, or in the case a zero increment is used).
 1302  *
 1303  * The function returns 0 on erorr, currently only when the increment
 1304  * produces a NAN condition, or when the 'score' value is NAN since the
 1305  * start.
 1306  *
 1307  * The commad as a side effect of adding a new element may convert the sorted
 1308  * set internal encoding from ziplist to hashtable+skiplist.
 1309  *
 1310  * Memory managemnet of 'ele':
 1311  *
 1312  * The function does not take ownership of the 'ele' SDS string, but copies
 1313  * it if needed. */
 1314 int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
 1315     /* Turn options into simple to check vars. */
 1316     int incr = (*flags & ZADD_INCR) != 0;
 1317     int nx = (*flags & ZADD_NX) != 0;
 1318     int xx = (*flags & ZADD_XX) != 0;
 1319     *flags = 0; /* We'll return our response flags. */
 1320     double curscore;
 1321 
 1322     /* NaN as input is an error regardless of all the other parameters. */
 1323     if (isnan(score)) {
 1324         *flags = ZADD_NAN;
 1325         return 0;
 1326     }
 1327 
 1328     /* Update the sorted set according to its encoding. */
 1329     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1330         unsigned char *eptr;
 1331 
 1332         if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
 1333             /* NX? Return, same element already exists. */
 1334             if (nx) {
 1335                 *flags |= ZADD_NOP;
 1336                 return 1;
 1337             }
 1338 
 1339             /* Prepare the score for the increment if needed. */
 1340             if (incr) {
 1341                 score += curscore;
 1342                 if (isnan(score)) {
 1343                     *flags |= ZADD_NAN;
 1344                     return 0;
 1345                 }
 1346                 if (newscore) *newscore = score;
 1347             }
 1348 
 1349             /* Remove and re-insert when score changed. */
 1350             if (score != curscore) {
 1351                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
 1352                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
 1353                 *flags |= ZADD_UPDATED;
 1354             }
 1355             return 1;
 1356         } else if (!xx) {
 1357             /* Optimize: check if the element is too large or the list
 1358              * becomes too long *before* executing zzlInsert. */
 1359             zobj->ptr = zzlInsert(zobj->ptr,ele,score);
 1360             if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
 1361                 sdslen(ele) > server.zset_max_ziplist_value)
 1362                 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
 1363             if (newscore) *newscore = score;
 1364             *flags |= ZADD_ADDED;
 1365             return 1;
 1366         } else {
 1367             *flags |= ZADD_NOP;
 1368             return 1;
 1369         }
 1370     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1371         zset *zs = zobj->ptr;
 1372         zskiplistNode *znode;
 1373         dictEntry *de;
 1374 
 1375         de = dictFind(zs->dict,ele);
 1376         if (de != NULL) {
 1377             /* NX? Return, same element already exists. */
 1378             if (nx) {
 1379                 *flags |= ZADD_NOP;
 1380                 return 1;
 1381             }
 1382             curscore = *(double*)dictGetVal(de);
 1383 
 1384             /* Prepare the score for the increment if needed. */
 1385             if (incr) {
 1386                 score += curscore;
 1387                 if (isnan(score)) {
 1388                     *flags |= ZADD_NAN;
 1389                     return 0;
 1390                 }
 1391                 if (newscore) *newscore = score;
 1392             }
 1393 
 1394             /* Remove and re-insert when score changes. */
 1395             if (score != curscore) {
 1396                 znode = zslUpdateScore(zs->zsl,curscore,ele,score);
 1397                 /* Note that we did not removed the original element from
 1398                  * the hash table representing the sorted set, so we just
 1399                  * update the score. */
 1400                 dictGetVal(de) = &znode->score; /* Update score ptr. */
 1401                 *flags |= ZADD_UPDATED;
 1402             }
 1403             return 1;
 1404         } else if (!xx) {
 1405             ele = sdsdup(ele);
 1406             znode = zslInsert(zs->zsl,score,ele);
 1407             serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
 1408             *flags |= ZADD_ADDED;
 1409             if (newscore) *newscore = score;
 1410             return 1;
 1411         } else {
 1412             *flags |= ZADD_NOP;
 1413             return 1;
 1414         }
 1415     } else {
 1416         serverPanic("Unknown sorted set encoding");
 1417     }
 1418     return 0; /* Never reached. */
 1419 }
 1420 
 1421 /* Delete the element 'ele' from the sorted set, returning 1 if the element
 1422  * existed and was deleted, 0 otherwise (the element was not there). */
 1423 int zsetDel(robj *zobj, sds ele) {
 1424     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1425         unsigned char *eptr;
 1426 
 1427         if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
 1428             zobj->ptr = zzlDelete(zobj->ptr,eptr);
 1429             return 1;
 1430         }
 1431     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1432         zset *zs = zobj->ptr;
 1433         dictEntry *de;
 1434         double score;
 1435 
 1436         de = dictUnlink(zs->dict,ele);
 1437         if (de != NULL) {
 1438             /* Get the score in order to delete from the skiplist later. */
 1439             score = *(double*)dictGetVal(de);
 1440 
 1441             /* Delete from the hash table and later from the skiplist.
 1442              * Note that the order is important: deleting from the skiplist
 1443              * actually releases the SDS string representing the element,
 1444              * which is shared between the skiplist and the hash table, so
 1445              * we need to delete from the skiplist as the final step. */
 1446             dictFreeUnlinkedEntry(zs->dict,de);
 1447 
 1448             /* Delete from skiplist. */
 1449             int retval = zslDelete(zs->zsl,score,ele,NULL);
 1450             serverAssert(retval);
 1451 
 1452             if (htNeedsResize(zs->dict)) dictResize(zs->dict);
 1453             return 1;
 1454         }
 1455     } else {
 1456         serverPanic("Unknown sorted set encoding");
 1457     }
 1458     return 0; /* No such element found. */
 1459 }
 1460 
 1461 /* Given a sorted set object returns the 0-based rank of the object or
 1462  * -1 if the object does not exist.
 1463  *
 1464  * For rank we mean the position of the element in the sorted collection
 1465  * of elements. So the first element has rank 0, the second rank 1, and so
 1466  * forth up to length-1 elements.
 1467  *
 1468  * If 'reverse' is false, the rank is returned considering as first element
 1469  * the one with the lowest score. Otherwise if 'reverse' is non-zero
 1470  * the rank is computed considering as element with rank 0 the one with
 1471  * the highest score. */
 1472 long zsetRank(robj *zobj, sds ele, int reverse) {
 1473     unsigned long llen;
 1474     unsigned long rank;
 1475 
 1476     llen = zsetLength(zobj);
 1477 
 1478     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1479         unsigned char *zl = zobj->ptr;
 1480         unsigned char *eptr, *sptr;
 1481 
 1482         eptr = ziplistIndex(zl,0);
 1483         serverAssert(eptr != NULL);
 1484         sptr = ziplistNext(zl,eptr);
 1485         serverAssert(sptr != NULL);
 1486 
 1487         rank = 1;
 1488         while(eptr != NULL) {
 1489             if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
 1490                 break;
 1491             rank++;
 1492             zzlNext(zl,&eptr,&sptr);
 1493         }
 1494 
 1495         if (eptr != NULL) {
 1496             if (reverse)
 1497                 return llen-rank;
 1498             else
 1499                 return rank-1;
 1500         } else {
 1501             return -1;
 1502         }
 1503     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1504         zset *zs = zobj->ptr;
 1505         zskiplist *zsl = zs->zsl;
 1506         dictEntry *de;
 1507         double score;
 1508 
 1509         de = dictFind(zs->dict,ele);
 1510         if (de != NULL) {
 1511             score = *(double*)dictGetVal(de);
 1512             rank = zslGetRank(zsl,score,ele);
 1513             /* Existing elements always have a rank. */
 1514             serverAssert(rank != 0);
 1515             if (reverse)
 1516                 return llen-rank;
 1517             else
 1518                 return rank-1;
 1519         } else {
 1520             return -1;
 1521         }
 1522     } else {
 1523         serverPanic("Unknown sorted set encoding");
 1524     }
 1525 }
 1526 
 1527 /*-----------------------------------------------------------------------------
 1528  * Sorted set commands
 1529  *----------------------------------------------------------------------------*/
 1530 
 1531 /* This generic command implements both ZADD and ZINCRBY. */
 1532 void zaddGenericCommand(client *c, int flags) {
 1533     static char *nanerr = "resulting score is not a number (NaN)";
 1534     robj *key = c->argv[1];
 1535     robj *zobj;
 1536     sds ele;
 1537     double score = 0, *scores = NULL;
 1538     int j, elements;
 1539     int scoreidx = 0;
 1540     /* The following vars are used in order to track what the command actually
 1541      * did during the execution, to reply to the client and to trigger the
 1542      * notification of keyspace change. */
 1543     int added = 0;      /* Number of new elements added. */
 1544     int updated = 0;    /* Number of elements with updated score. */
 1545     int processed = 0;  /* Number of elements processed, may remain zero with
 1546                            options like XX. */
 1547 
 1548     /* Parse options. At the end 'scoreidx' is set to the argument position
 1549      * of the score of the first score-element pair. */
 1550     scoreidx = 2;
 1551     while(scoreidx < c->argc) {
 1552         char *opt = c->argv[scoreidx]->ptr;
 1553         if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
 1554         else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
 1555         else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
 1556         else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
 1557         else break;
 1558         scoreidx++;
 1559     }
 1560 
 1561     /* Turn options into simple to check vars. */
 1562     int incr = (flags & ZADD_INCR) != 0;
 1563     int nx = (flags & ZADD_NX) != 0;
 1564     int xx = (flags & ZADD_XX) != 0;
 1565     int ch = (flags & ZADD_CH) != 0;
 1566 
 1567     /* After the options, we expect to have an even number of args, since
 1568      * we expect any number of score-element pairs. */
 1569     elements = c->argc-scoreidx;
 1570     if (elements % 2 || !elements) {
 1571         addReply(c,shared.syntaxerr);
 1572         return;
 1573     }
 1574     elements /= 2; /* Now this holds the number of score-element pairs. */
 1575 
 1576     /* Check for incompatible options. */
 1577     if (nx && xx) {
 1578         addReplyError(c,
 1579             "XX and NX options at the same time are not compatible");
 1580         return;
 1581     }
 1582 
 1583     if (incr && elements > 1) {
 1584         addReplyError(c,
 1585             "INCR option supports a single increment-element pair");
 1586         return;
 1587     }
 1588 
 1589     /* Start parsing all the scores, we need to emit any syntax error
 1590      * before executing additions to the sorted set, as the command should
 1591      * either execute fully or nothing at all. */
 1592     scores = zmalloc(sizeof(double)*elements);
 1593     for (j = 0; j < elements; j++) {
 1594         if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
 1595             != C_OK) goto cleanup;
 1596     }
 1597 
 1598     /* Lookup the key and create the sorted set if does not exist. */
 1599     zobj = lookupKeyWrite(c->db,key);
 1600     if (zobj == NULL) {
 1601         if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
 1602         if (server.zset_max_ziplist_entries == 0 ||
 1603             server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
 1604         {
 1605             zobj = createZsetObject();
 1606         } else {
 1607             zobj = createZsetZiplistObject();
 1608         }
 1609         dbAdd(c->db,key,zobj);
 1610     } else {
 1611         if (zobj->type != OBJ_ZSET) {
 1612             addReply(c,shared.wrongtypeerr);
 1613             goto cleanup;
 1614         }
 1615     }
 1616 
 1617     for (j = 0; j < elements; j++) {
 1618         double newscore;
 1619         score = scores[j];
 1620         int retflags = flags;
 1621 
 1622         ele = c->argv[scoreidx+1+j*2]->ptr;
 1623         int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
 1624         if (retval == 0) {
 1625             addReplyError(c,nanerr);
 1626             goto cleanup;
 1627         }
 1628         if (retflags & ZADD_ADDED) added++;
 1629         if (retflags & ZADD_UPDATED) updated++;
 1630         if (!(retflags & ZADD_NOP)) processed++;
 1631         score = newscore;
 1632     }
 1633     server.dirty += (added+updated);
 1634 
 1635 reply_to_client:
 1636     if (incr) { /* ZINCRBY or INCR option. */
 1637         if (processed)
 1638             addReplyDouble(c,score);
 1639         else
 1640             addReply(c,shared.nullbulk);
 1641     } else { /* ZADD. */
 1642         addReplyLongLong(c,ch ? added+updated : added);
 1643     }
 1644 
 1645 cleanup:
 1646     zfree(scores);
 1647     if (added || updated) {
 1648         signalModifiedKey(c->db,key);
 1649         notifyKeyspaceEvent(NOTIFY_ZSET,
 1650             incr ? "zincr" : "zadd", key, c->db->id);
 1651     }
 1652 }
 1653 
 1654 void zaddCommand(client *c) {
 1655     zaddGenericCommand(c,ZADD_NONE);
 1656 }
 1657 
 1658 void zincrbyCommand(client *c) {
 1659     zaddGenericCommand(c,ZADD_INCR);
 1660 }
 1661 
 1662 void zremCommand(client *c) {
 1663     robj *key = c->argv[1];
 1664     robj *zobj;
 1665     int deleted = 0, keyremoved = 0, j;
 1666 
 1667     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
 1668         checkType(c,zobj,OBJ_ZSET)) return;
 1669 
 1670     for (j = 2; j < c->argc; j++) {
 1671         if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
 1672         if (zsetLength(zobj) == 0) {
 1673             dbDelete(c->db,key);
 1674             keyremoved = 1;
 1675             break;
 1676         }
 1677     }
 1678 
 1679     if (deleted) {
 1680         notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
 1681         if (keyremoved)
 1682             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
 1683         signalModifiedKey(c->db,key);
 1684         server.dirty += deleted;
 1685     }
 1686     addReplyLongLong(c,deleted);
 1687 }
 1688 
 1689 /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
 1690 #define ZRANGE_RANK 0
 1691 #define ZRANGE_SCORE 1
 1692 #define ZRANGE_LEX 2
 1693 void zremrangeGenericCommand(client *c, int rangetype) {
 1694     robj *key = c->argv[1];
 1695     robj *zobj;
 1696     int keyremoved = 0;
 1697     unsigned long deleted = 0;
 1698     zrangespec range;
 1699     zlexrangespec lexrange;
 1700     long start, end, llen;
 1701 
 1702     /* Step 1: Parse the range. */
 1703     if (rangetype == ZRANGE_RANK) {
 1704         if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
 1705             (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
 1706             return;
 1707     } else if (rangetype == ZRANGE_SCORE) {
 1708         if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
 1709             addReplyError(c,"min or max is not a float");
 1710             return;
 1711         }
 1712     } else if (rangetype == ZRANGE_LEX) {
 1713         if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
 1714             addReplyError(c,"min or max not valid string range item");
 1715             return;
 1716         }
 1717     }
 1718 
 1719     /* Step 2: Lookup & range sanity checks if needed. */
 1720     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
 1721         checkType(c,zobj,OBJ_ZSET)) goto cleanup;
 1722 
 1723     if (rangetype == ZRANGE_RANK) {
 1724         /* Sanitize indexes. */
 1725         llen = zsetLength(zobj);
 1726         if (start < 0) start = llen+start;
 1727         if (end < 0) end = llen+end;
 1728         if (start < 0) start = 0;
 1729 
 1730         /* Invariant: start >= 0, so this test will be true when end < 0.
 1731          * The range is empty when start > end or start >= length. */
 1732         if (start > end || start >= llen) {
 1733             addReply(c,shared.czero);
 1734             goto cleanup;
 1735         }
 1736         if (end >= llen) end = llen-1;
 1737     }
 1738 
 1739     /* Step 3: Perform the range deletion operation. */
 1740     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 1741         switch(rangetype) {
 1742         case ZRANGE_RANK:
 1743             zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
 1744             break;
 1745         case ZRANGE_SCORE:
 1746             zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
 1747             break;
 1748         case ZRANGE_LEX:
 1749             zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
 1750             break;
 1751         }
 1752         if (zzlLength(zobj->ptr) == 0) {
 1753             dbDelete(c->db,key);
 1754             keyremoved = 1;
 1755         }
 1756     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 1757         zset *zs = zobj->ptr;
 1758         switch(rangetype) {
 1759         case ZRANGE_RANK:
 1760             deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
 1761             break;
 1762         case ZRANGE_SCORE:
 1763             deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
 1764             break;
 1765         case ZRANGE_LEX:
 1766             deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
 1767             break;
 1768         }
 1769         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
 1770         if (dictSize(zs->dict) == 0) {
 1771             dbDelete(c->db,key);
 1772             keyremoved = 1;
 1773         }
 1774     } else {
 1775         serverPanic("Unknown sorted set encoding");
 1776     }
 1777 
 1778     /* Step 4: Notifications and reply. */
 1779     if (deleted) {
 1780         char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
 1781         signalModifiedKey(c->db,key);
 1782         notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
 1783         if (keyremoved)
 1784             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
 1785     }
 1786     server.dirty += deleted;
 1787     addReplyLongLong(c,deleted);
 1788 
 1789 cleanup:
 1790     if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
 1791 }
 1792 
 1793 void zremrangebyrankCommand(client *c) {
 1794     zremrangeGenericCommand(c,ZRANGE_RANK);
 1795 }
 1796 
 1797 void zremrangebyscoreCommand(client *c) {
 1798     zremrangeGenericCommand(c,ZRANGE_SCORE);
 1799 }
 1800 
 1801 void zremrangebylexCommand(client *c) {
 1802     zremrangeGenericCommand(c,ZRANGE_LEX);
 1803 }
 1804 
 1805 typedef struct {
 1806     robj *subject;
 1807     int type; /* Set, sorted set */
 1808     int encoding;
 1809     double weight;
 1810 
 1811     union {
 1812         /* Set iterators. */
 1813         union _iterset {
 1814             struct {
 1815                 intset *is;
 1816                 int ii;
 1817             } is;
 1818             struct {
 1819                 dict *dict;
 1820                 dictIterator *di;
 1821                 dictEntry *de;
 1822             } ht;
 1823         } set;
 1824 
 1825         /* Sorted set iterators. */
 1826         union _iterzset {
 1827             struct {
 1828                 unsigned char *zl;
 1829                 unsigned char *eptr, *sptr;
 1830             } zl;
 1831             struct {
 1832                 zset *zs;
 1833                 zskiplistNode *node;
 1834             } sl;
 1835         } zset;
 1836     } iter;
 1837 } zsetopsrc;
 1838 
 1839 
 1840 /* Use dirty flags for pointers that need to be cleaned up in the next
 1841  * iteration over the zsetopval. The dirty flag for the long long value is
 1842  * special, since long long values don't need cleanup. Instead, it means that
 1843  * we already checked that "ell" holds a long long, or tried to convert another
 1844  * representation into a long long value. When this was successful,
 1845  * OPVAL_VALID_LL is set as well. */
 1846 #define OPVAL_DIRTY_SDS 1
 1847 #define OPVAL_DIRTY_LL 2
 1848 #define OPVAL_VALID_LL 4
 1849 
 1850 /* Store value retrieved from the iterator. */
 1851 typedef struct {
 1852     int flags;
 1853     unsigned char _buf[32]; /* Private buffer. */
 1854     sds ele;
 1855     unsigned char *estr;
 1856     unsigned int elen;
 1857     long long ell;
 1858     double score;
 1859 } zsetopval;
 1860 
 1861 typedef union _iterset iterset;
 1862 typedef union _iterzset iterzset;
 1863 
 1864 void zuiInitIterator(zsetopsrc *op) {
 1865     if (op->subject == NULL)
 1866         return;
 1867 
 1868     if (op->type == OBJ_SET) {
 1869         iterset *it = &op->iter.set;
 1870         if (op->encoding == OBJ_ENCODING_INTSET) {
 1871             it->is.is = op->subject->ptr;
 1872             it->is.ii = 0;
 1873         } else if (op->encoding == OBJ_ENCODING_HT) {
 1874             it->ht.dict = op->subject->ptr;
 1875             it->ht.di = dictGetIterator(op->subject->ptr);
 1876             it->ht.de = dictNext(it->ht.di);
 1877         } else {
 1878             serverPanic("Unknown set encoding");
 1879         }
 1880     } else if (op->type == OBJ_ZSET) {
 1881         iterzset *it = &op->iter.zset;
 1882         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
 1883             it->zl.zl = op->subject->ptr;
 1884             it->zl.eptr = ziplistIndex(it->zl.zl,0);
 1885             if (it->zl.eptr != NULL) {
 1886                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
 1887                 serverAssert(it->zl.sptr != NULL);
 1888             }
 1889         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
 1890             it->sl.zs = op->subject->ptr;
 1891             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
 1892         } else {
 1893             serverPanic("Unknown sorted set encoding");
 1894         }
 1895     } else {
 1896         serverPanic("Unsupported type");
 1897     }
 1898 }
 1899 
 1900 void zuiClearIterator(zsetopsrc *op) {
 1901     if (op->subject == NULL)
 1902         return;
 1903 
 1904     if (op->type == OBJ_SET) {
 1905         iterset *it = &op->iter.set;
 1906         if (op->encoding == OBJ_ENCODING_INTSET) {
 1907             UNUSED(it); /* skip */
 1908         } else if (op->encoding == OBJ_ENCODING_HT) {
 1909             dictReleaseIterator(it->ht.di);
 1910         } else {
 1911             serverPanic("Unknown set encoding");
 1912         }
 1913     } else if (op->type == OBJ_ZSET) {
 1914         iterzset *it = &op->iter.zset;
 1915         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
 1916             UNUSED(it); /* skip */
 1917         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
 1918             UNUSED(it); /* skip */
 1919         } else {
 1920             serverPanic("Unknown sorted set encoding");
 1921         }
 1922     } else {
 1923         serverPanic("Unsupported type");
 1924     }
 1925 }
 1926 
 1927 unsigned long zuiLength(zsetopsrc *op) {
 1928     if (op->subject == NULL)
 1929         return 0;
 1930 
 1931     if (op->type == OBJ_SET) {
 1932         if (op->encoding == OBJ_ENCODING_INTSET) {
 1933             return intsetLen(op->subject->ptr);
 1934         } else if (op->encoding == OBJ_ENCODING_HT) {
 1935             dict *ht = op->subject->ptr;
 1936             return dictSize(ht);
 1937         } else {
 1938             serverPanic("Unknown set encoding");
 1939         }
 1940     } else if (op->type == OBJ_ZSET) {
 1941         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
 1942             return zzlLength(op->subject->ptr);
 1943         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
 1944             zset *zs = op->subject->ptr;
 1945             return zs->zsl->length;
 1946         } else {
 1947             serverPanic("Unknown sorted set encoding");
 1948         }
 1949     } else {
 1950         serverPanic("Unsupported type");
 1951     }
 1952 }
 1953 
 1954 /* Check if the current value is valid. If so, store it in the passed structure
 1955  * and move to the next element. If not valid, this means we have reached the
 1956  * end of the structure and can abort. */
 1957 int zuiNext(zsetopsrc *op, zsetopval *val) {
 1958     if (op->subject == NULL)
 1959         return 0;
 1960 
 1961     if (val->flags & OPVAL_DIRTY_SDS)
 1962         sdsfree(val->ele);
 1963 
 1964     memset(val,0,sizeof(zsetopval));
 1965 
 1966     if (op->type == OBJ_SET) {
 1967         iterset *it = &op->iter.set;
 1968         if (op->encoding == OBJ_ENCODING_INTSET) {
 1969             int64_t ell;
 1970 
 1971             if (!intsetGet(it->is.is,it->is.ii,&ell))
 1972                 return 0;
 1973             val->ell = ell;
 1974             val->score = 1.0;
 1975 
 1976             /* Move to next element. */
 1977             it->is.ii++;
 1978         } else if (op->encoding == OBJ_ENCODING_HT) {
 1979             if (it->ht.de == NULL)
 1980                 return 0;
 1981             val->ele = dictGetKey(it->ht.de);
 1982             val->score = 1.0;
 1983 
 1984             /* Move to next element. */
 1985             it->ht.de = dictNext(it->ht.di);
 1986         } else {
 1987             serverPanic("Unknown set encoding");
 1988         }
 1989     } else if (op->type == OBJ_ZSET) {
 1990         iterzset *it = &op->iter.zset;
 1991         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
 1992             /* No need to check both, but better be explicit. */
 1993             if (it->zl.eptr == NULL || it->zl.sptr == NULL)
 1994                 return 0;
 1995             serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
 1996             val->score = zzlGetScore(it->zl.sptr);
 1997 
 1998             /* Move to next element. */
 1999             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
 2000         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
 2001             if (it->sl.node == NULL)
 2002                 return 0;
 2003             val->ele = it->sl.node->ele;
 2004             val->score = it->sl.node->score;
 2005 
 2006             /* Move to next element. */
 2007             it->sl.node = it->sl.node->level[0].forward;
 2008         } else {
 2009             serverPanic("Unknown sorted set encoding");
 2010         }
 2011     } else {
 2012         serverPanic("Unsupported type");
 2013     }
 2014     return 1;
 2015 }
 2016 
 2017 int zuiLongLongFromValue(zsetopval *val) {
 2018     if (!(val->flags & OPVAL_DIRTY_LL)) {
 2019         val->flags |= OPVAL_DIRTY_LL;
 2020 
 2021         if (val->ele != NULL) {
 2022             if (string2ll(val->ele,sdslen(val->ele),&val->ell))
 2023                 val->flags |= OPVAL_VALID_LL;
 2024         } else if (val->estr != NULL) {
 2025             if (string2ll((char*)val->estr,val->elen,&val->ell))
 2026                 val->flags |= OPVAL_VALID_LL;
 2027         } else {
 2028             /* The long long was already set, flag as valid. */
 2029             val->flags |= OPVAL_VALID_LL;
 2030         }
 2031     }
 2032     return val->flags & OPVAL_VALID_LL;
 2033 }
 2034 
 2035 sds zuiSdsFromValue(zsetopval *val) {
 2036     if (val->ele == NULL) {
 2037         if (val->estr != NULL) {
 2038             val->ele = sdsnewlen((char*)val->estr,val->elen);
 2039         } else {
 2040             val->ele = sdsfromlonglong(val->ell);
 2041         }
 2042         val->flags |= OPVAL_DIRTY_SDS;
 2043     }
 2044     return val->ele;
 2045 }
 2046 
 2047 /* This is different from zuiSdsFromValue since returns a new SDS string
 2048  * which is up to the caller to free. */
 2049 sds zuiNewSdsFromValue(zsetopval *val) {
 2050     if (val->flags & OPVAL_DIRTY_SDS) {
 2051         /* We have already one to return! */
 2052         sds ele = val->ele;
 2053         val->flags &= ~OPVAL_DIRTY_SDS;
 2054         val->ele = NULL;
 2055         return ele;
 2056     } else if (val->ele) {
 2057         return sdsdup(val->ele);
 2058     } else if (val->estr) {
 2059         return sdsnewlen((char*)val->estr,val->elen);
 2060     } else {
 2061         return sdsfromlonglong(val->ell);
 2062     }
 2063 }
 2064 
 2065 int zuiBufferFromValue(zsetopval *val) {
 2066     if (val->estr == NULL) {
 2067         if (val->ele != NULL) {
 2068             val->elen = sdslen(val->ele);
 2069             val->estr = (unsigned char*)val->ele;
 2070         } else {
 2071             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
 2072             val->estr = val->_buf;
 2073         }
 2074     }
 2075     return 1;
 2076 }
 2077 
 2078 /* Find value pointed to by val in the source pointer to by op. When found,
 2079  * return 1 and store its score in target. Return 0 otherwise. */
 2080 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
 2081     if (op->subject == NULL)
 2082         return 0;
 2083 
 2084     if (op->type == OBJ_SET) {
 2085         if (op->encoding == OBJ_ENCODING_INTSET) {
 2086             if (zuiLongLongFromValue(val) &&
 2087                 intsetFind(op->subject->ptr,val->ell))
 2088             {
 2089                 *score = 1.0;
 2090                 return 1;
 2091             } else {
 2092                 return 0;
 2093             }
 2094         } else if (op->encoding == OBJ_ENCODING_HT) {
 2095             dict *ht = op->subject->ptr;
 2096             zuiSdsFromValue(val);
 2097             if (dictFind(ht,val->ele) != NULL) {
 2098                 *score = 1.0;
 2099                 return 1;
 2100             } else {
 2101                 return 0;
 2102             }
 2103         } else {
 2104             serverPanic("Unknown set encoding");
 2105         }
 2106     } else if (op->type == OBJ_ZSET) {
 2107         zuiSdsFromValue(val);
 2108 
 2109         if (op->encoding == OBJ_ENCODING_ZIPLIST) {
 2110             if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
 2111                 /* Score is already set by zzlFind. */
 2112                 return 1;
 2113             } else {
 2114                 return 0;
 2115             }
 2116         } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
 2117             zset *zs = op->subject->ptr;
 2118             dictEntry *de;
 2119             if ((de = dictFind(zs->dict,val->ele)) != NULL) {
 2120                 *score = *(double*)dictGetVal(de);
 2121                 return 1;
 2122             } else {
 2123                 return 0;
 2124             }
 2125         } else {
 2126             serverPanic("Unknown sorted set encoding");
 2127         }
 2128     } else {
 2129         serverPanic("Unsupported type");
 2130     }
 2131 }
 2132 
 2133 int zuiCompareByCardinality(const void *s1, const void *s2) {
 2134     unsigned long first = zuiLength((zsetopsrc*)s1);
 2135     unsigned long second = zuiLength((zsetopsrc*)s2);
 2136     if (first > second) return 1;
 2137     if (first < second) return -1;
 2138     return 0;
 2139 }
 2140 
 2141 #define REDIS_AGGR_SUM 1
 2142 #define REDIS_AGGR_MIN 2
 2143 #define REDIS_AGGR_MAX 3
 2144 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
 2145 
 2146 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
 2147     if (aggregate == REDIS_AGGR_SUM) {
 2148         *target = *target + val;
 2149         /* The result of adding two doubles is NaN when one variable
 2150          * is +inf and the other is -inf. When these numbers are added,
 2151          * we maintain the convention of the result being 0.0. */
 2152         if (isnan(*target)) *target = 0.0;
 2153     } else if (aggregate == REDIS_AGGR_MIN) {
 2154         *target = val < *target ? val : *target;
 2155     } else if (aggregate == REDIS_AGGR_MAX) {
 2156         *target = val > *target ? val : *target;
 2157     } else {
 2158         /* safety net */
 2159         serverPanic("Unknown ZUNION/INTER aggregate type");
 2160     }
 2161 }
 2162 
 2163 uint64_t dictSdsHash(const void *key);
 2164 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
 2165 
 2166 dictType setAccumulatorDictType = {
 2167     dictSdsHash,               /* hash function */
 2168     NULL,                      /* key dup */
 2169     NULL,                      /* val dup */
 2170     dictSdsKeyCompare,         /* key compare */
 2171     NULL,                      /* key destructor */
 2172     NULL                       /* val destructor */
 2173 };
 2174 
 2175 void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
 2176     int i, j;
 2177     long setnum;
 2178     int aggregate = REDIS_AGGR_SUM;
 2179     zsetopsrc *src;
 2180     zsetopval zval;
 2181     sds tmp;
 2182     size_t maxelelen = 0;
 2183     robj *dstobj;
 2184     zset *dstzset;
 2185     zskiplistNode *znode;
 2186     int touched = 0;
 2187 
 2188     /* expect setnum input keys to be given */
 2189     if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK))
 2190         return;
 2191 
 2192     if (setnum < 1) {
 2193         addReplyError(c,
 2194             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
 2195         return;
 2196     }
 2197 
 2198     /* test if the expected number of keys would overflow */
 2199     if (setnum > c->argc-3) {
 2200         addReply(c,shared.syntaxerr);
 2201         return;
 2202     }
 2203 
 2204     /* read keys to be used for input */
 2205     src = zcalloc(sizeof(zsetopsrc) * setnum);
 2206     for (i = 0, j = 3; i < setnum; i++, j++) {
 2207         robj *obj = lookupKeyWrite(c->db,c->argv[j]);
 2208         if (obj != NULL) {
 2209             if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
 2210                 zfree(src);
 2211                 addReply(c,shared.wrongtypeerr);
 2212                 return;
 2213             }
 2214 
 2215             src[i].subject = obj;
 2216             src[i].type = obj->type;
 2217             src[i].encoding = obj->encoding;
 2218         } else {
 2219             src[i].subject = NULL;
 2220         }
 2221 
 2222         /* Default all weights to 1. */
 2223         src[i].weight = 1.0;
 2224     }
 2225 
 2226     /* parse optional extra arguments */
 2227     if (j < c->argc) {
 2228         int remaining = c->argc - j;
 2229 
 2230         while (remaining) {
 2231             if (remaining >= (setnum + 1) &&
 2232                 !strcasecmp(c->argv[j]->ptr,"weights"))
 2233             {
 2234                 j++; remaining--;
 2235                 for (i = 0; i < setnum; i++, j++, remaining--) {
 2236                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
 2237                             "weight value is not a float") != C_OK)
 2238                     {
 2239                         zfree(src);
 2240                         return;
 2241                     }
 2242                 }
 2243             } else if (remaining >= 2 &&
 2244                        !strcasecmp(c->argv[j]->ptr,"aggregate"))
 2245             {
 2246                 j++; remaining--;
 2247                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
 2248                     aggregate = REDIS_AGGR_SUM;
 2249                 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
 2250                     aggregate = REDIS_AGGR_MIN;
 2251                 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
 2252                     aggregate = REDIS_AGGR_MAX;
 2253                 } else {
 2254                     zfree(src);
 2255                     addReply(c,shared.syntaxerr);
 2256                     return;
 2257                 }
 2258                 j++; remaining--;
 2259             } else {
 2260                 zfree(src);
 2261                 addReply(c,shared.syntaxerr);
 2262                 return;
 2263             }
 2264         }
 2265     }
 2266 
 2267     /* sort sets from the smallest to largest, this will improve our
 2268      * algorithm's performance */
 2269     qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
 2270 
 2271     dstobj = createZsetObject();
 2272     dstzset = dstobj->ptr;
 2273     memset(&zval, 0, sizeof(zval));
 2274 
 2275     if (op == SET_OP_INTER) {
 2276         /* Skip everything if the smallest input is empty. */
 2277         if (zuiLength(&src[0]) > 0) {
 2278             /* Precondition: as src[0] is non-empty and the inputs are ordered
 2279              * by size, all src[i > 0] are non-empty too. */
 2280             zuiInitIterator(&src[0]);
 2281             while (zuiNext(&src[0],&zval)) {
 2282                 double score, value;
 2283 
 2284                 score = src[0].weight * zval.score;
 2285                 if (isnan(score)) score = 0;
 2286 
 2287                 for (j = 1; j < setnum; j++) {
 2288                     /* It is not safe to access the zset we are
 2289                      * iterating, so explicitly check for equal object. */
 2290                     if (src[j].subject == src[0].subject) {
 2291                         value = zval.score*src[j].weight;
 2292                         zunionInterAggregate(&score,value,aggregate);
 2293                     } else if (zuiFind(&src[j],&zval,&value)) {
 2294                         value *= src[j].weight;
 2295                         zunionInterAggregate(&score,value,aggregate);
 2296                     } else {
 2297                         break;
 2298                     }
 2299                 }
 2300 
 2301                 /* Only continue when present in every input. */
 2302                 if (j == setnum) {
 2303                     tmp = zuiNewSdsFromValue(&zval);
 2304                     znode = zslInsert(dstzset->zsl,score,tmp);
 2305                     dictAdd(dstzset->dict,tmp,&znode->score);
 2306                     if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
 2307                 }
 2308             }
 2309             zuiClearIterator(&src[0]);
 2310         }
 2311     } else if (op == SET_OP_UNION) {
 2312         dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
 2313         dictIterator *di;
 2314         dictEntry *de, *existing;
 2315         double score;
 2316 
 2317         if (setnum) {
 2318             /* Our union is at least as large as the largest set.
 2319              * Resize the dictionary ASAP to avoid useless rehashing. */
 2320             dictExpand(accumulator,zuiLength(&src[setnum-1]));
 2321         }
 2322 
 2323         /* Step 1: Create a dictionary of elements -> aggregated-scores
 2324          * by iterating one sorted set after the other. */
 2325         for (i = 0; i < setnum; i++) {
 2326             if (zuiLength(&src[i]) == 0) continue;
 2327 
 2328             zuiInitIterator(&src[i]);
 2329             while (zuiNext(&src[i],&zval)) {
 2330                 /* Initialize value */
 2331                 score = src[i].weight * zval.score;
 2332                 if (isnan(score)) score = 0;
 2333 
 2334                 /* Search for this element in the accumulating dictionary. */
 2335                 de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
 2336                 /* If we don't have it, we need to create a new entry. */
 2337                 if (!existing) {
 2338                     tmp = zuiNewSdsFromValue(&zval);
 2339                     /* Remember the longest single element encountered,
 2340                      * to understand if it's possible to convert to ziplist
 2341                      * at the end. */
 2342                      if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
 2343                     /* Update the element with its initial score. */
 2344                     dictSetKey(accumulator, de, tmp);
 2345                     dictSetDoubleVal(de,score);
 2346                 } else {
 2347                     /* Update the score with the score of the new instance
 2348                      * of the element found in the current sorted set.
 2349                      *
 2350                      * Here we access directly the dictEntry double
 2351                      * value inside the union as it is a big speedup
 2352                      * compared to using the getDouble/setDouble API. */
 2353                     zunionInterAggregate(&existing->v.d,score,aggregate);
 2354                 }
 2355             }
 2356             zuiClearIterator(&src[i]);
 2357         }
 2358 
 2359         /* Step 2: convert the dictionary into the final sorted set. */
 2360         di = dictGetIterator(accumulator);
 2361 
 2362         /* We now are aware of the final size of the resulting sorted set,
 2363          * let's resize the dictionary embedded inside the sorted set to the
 2364          * right size, in order to save rehashing time. */
 2365         dictExpand(dstzset->dict,dictSize(accumulator));
 2366 
 2367         while((de = dictNext(di)) != NULL) {
 2368             sds ele = dictGetKey(de);
 2369             score = dictGetDoubleVal(de);
 2370             znode = zslInsert(dstzset->zsl,score,ele);
 2371             dictAdd(dstzset->dict,ele,&znode->score);
 2372         }
 2373         dictReleaseIterator(di);
 2374         dictRelease(accumulator);
 2375     } else {
 2376         serverPanic("Unknown operator");
 2377     }
 2378 
 2379     if (dbDelete(c->db,dstkey))
 2380         touched = 1;
 2381     if (dstzset->zsl->length) {
 2382         zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
 2383         dbAdd(c->db,dstkey,dstobj);
 2384         addReplyLongLong(c,zsetLength(dstobj));
 2385         signalModifiedKey(c->db,dstkey);
 2386         notifyKeyspaceEvent(NOTIFY_ZSET,
 2387             (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
 2388             dstkey,c->db->id);
 2389         server.dirty++;
 2390     } else {
 2391         decrRefCount(dstobj);
 2392         addReply(c,shared.czero);
 2393         if (touched) {
 2394             signalModifiedKey(c->db,dstkey);
 2395             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
 2396             server.dirty++;
 2397         }
 2398     }
 2399     zfree(src);
 2400 }
 2401 
 2402 void zunionstoreCommand(client *c) {
 2403     zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION);
 2404 }
 2405 
 2406 void zinterstoreCommand(client *c) {
 2407     zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER);
 2408 }
 2409 
 2410 void zrangeGenericCommand(client *c, int reverse) {
 2411     robj *key = c->argv[1];
 2412     robj *zobj;
 2413     int withscores = 0;
 2414     long start;
 2415     long end;
 2416     long llen;
 2417     long rangelen;
 2418 
 2419     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
 2420         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
 2421 
 2422     if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
 2423         withscores = 1;
 2424     } else if (c->argc >= 5) {
 2425         addReply(c,shared.syntaxerr);
 2426         return;
 2427     }
 2428 
 2429     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
 2430          || checkType(c,zobj,OBJ_ZSET)) return;
 2431 
 2432     /* Sanitize indexes. */
 2433     llen = zsetLength(zobj);
 2434     if (start < 0) start = llen+start;
 2435     if (end < 0) end = llen+end;
 2436     if (start < 0) start = 0;
 2437 
 2438     /* Invariant: start >= 0, so this test will be true when end < 0.
 2439      * The range is empty when start > end or start >= length. */
 2440     if (start > end || start >= llen) {
 2441         addReply(c,shared.emptymultibulk);
 2442         return;
 2443     }
 2444     if (end >= llen) end = llen-1;
 2445     rangelen = (end-start)+1;
 2446 
 2447     /* Return the result in form of a multi-bulk reply */
 2448     addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
 2449 
 2450     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 2451         unsigned char *zl = zobj->ptr;
 2452         unsigned char *eptr, *sptr;
 2453         unsigned char *vstr;
 2454         unsigned int vlen;
 2455         long long vlong;
 2456 
 2457         if (reverse)
 2458             eptr = ziplistIndex(zl,-2-(2*start));
 2459         else
 2460             eptr = ziplistIndex(zl,2*start);
 2461 
 2462         serverAssertWithInfo(c,zobj,eptr != NULL);
 2463         sptr = ziplistNext(zl,eptr);
 2464 
 2465         while (rangelen--) {
 2466             serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
 2467             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
 2468             if (vstr == NULL)
 2469                 addReplyBulkLongLong(c,vlong);
 2470             else
 2471                 addReplyBulkCBuffer(c,vstr,vlen);
 2472 
 2473             if (withscores)
 2474                 addReplyDouble(c,zzlGetScore(sptr));
 2475 
 2476             if (reverse)
 2477                 zzlPrev(zl,&eptr,&sptr);
 2478             else
 2479                 zzlNext(zl,&eptr,&sptr);
 2480         }
 2481 
 2482     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 2483         zset *zs = zobj->ptr;
 2484         zskiplist *zsl = zs->zsl;
 2485         zskiplistNode *ln;
 2486         sds ele;
 2487 
 2488         /* Check if starting point is trivial, before doing log(N) lookup. */
 2489         if (reverse) {
 2490             ln = zsl->tail;
 2491             if (start > 0)
 2492                 ln = zslGetElementByRank(zsl,llen-start);
 2493         } else {
 2494             ln = zsl->header->level[0].forward;
 2495             if (start > 0)
 2496                 ln = zslGetElementByRank(zsl,start+1);
 2497         }
 2498 
 2499         while(rangelen--) {
 2500             serverAssertWithInfo(c,zobj,ln != NULL);
 2501             ele = ln->ele;
 2502             addReplyBulkCBuffer(c,ele,sdslen(ele));
 2503             if (withscores)
 2504                 addReplyDouble(c,ln->score);
 2505             ln = reverse ? ln->backward : ln->level[0].forward;
 2506         }
 2507     } else {
 2508         serverPanic("Unknown sorted set encoding");
 2509     }
 2510 }
 2511 
 2512 void zrangeCommand(client *c) {
 2513     zrangeGenericCommand(c,0);
 2514 }
 2515 
 2516 void zrevrangeCommand(client *c) {
 2517     zrangeGenericCommand(c,1);
 2518 }
 2519 
 2520 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
 2521 void genericZrangebyscoreCommand(client *c, int reverse) {
 2522     zrangespec range;
 2523     robj *key = c->argv[1];
 2524     robj *zobj;
 2525     long offset = 0, limit = -1;
 2526     int withscores = 0;
 2527     unsigned long rangelen = 0;
 2528     void *replylen = NULL;
 2529     int minidx, maxidx;
 2530 
 2531     /* Parse the range arguments. */
 2532     if (reverse) {
 2533         /* Range is given as [max,min] */
 2534         maxidx = 2; minidx = 3;
 2535     } else {
 2536         /* Range is given as [min,max] */
 2537         minidx = 2; maxidx = 3;
 2538     }
 2539 
 2540     if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
 2541         addReplyError(c,"min or max is not a float");
 2542         return;
 2543     }
 2544 
 2545     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
 2546      * 4 arguments, so we'll never enter the following code path. */
 2547     if (c->argc > 4) {
 2548         int remaining = c->argc - 4;
 2549         int pos = 4;
 2550 
 2551         while (remaining) {
 2552             if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
 2553                 pos++; remaining--;
 2554                 withscores = 1;
 2555             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
 2556                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
 2557                         != C_OK) ||
 2558                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
 2559                         != C_OK))
 2560                 {
 2561                     return;
 2562                 }
 2563                 pos += 3; remaining -= 3;
 2564             } else {
 2565                 addReply(c,shared.syntaxerr);
 2566                 return;
 2567             }
 2568         }
 2569     }
 2570 
 2571     /* Ok, lookup the key and get the range */
 2572     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
 2573         checkType(c,zobj,OBJ_ZSET)) return;
 2574 
 2575     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 2576         unsigned char *zl = zobj->ptr;
 2577         unsigned char *eptr, *sptr;
 2578         unsigned char *vstr;
 2579         unsigned int vlen;
 2580         long long vlong;
 2581         double score;
 2582 
 2583         /* If reversed, get the last node in range as starting point. */
 2584         if (reverse) {
 2585             eptr = zzlLastInRange(zl,&range);
 2586         } else {
 2587             eptr = zzlFirstInRange(zl,&range);
 2588         }
 2589 
 2590         /* No "first" element in the specified interval. */
 2591         if (eptr == NULL) {
 2592             addReply(c, shared.emptymultibulk);
 2593             return;
 2594         }
 2595 
 2596         /* Get score pointer for the first element. */
 2597         serverAssertWithInfo(c,zobj,eptr != NULL);
 2598         sptr = ziplistNext(zl,eptr);
 2599 
 2600         /* We don't know in advance how many matching elements there are in the
 2601          * list, so we push this object that will represent the multi-bulk
 2602          * length in the output buffer, and will "fix" it later */
 2603         replylen = addDeferredMultiBulkLength(c);
 2604 
 2605         /* If there is an offset, just traverse the number of elements without
 2606          * checking the score because that is done in the next loop. */
 2607         while (eptr && offset--) {
 2608             if (reverse) {
 2609                 zzlPrev(zl,&eptr,&sptr);
 2610             } else {
 2611                 zzlNext(zl,&eptr,&sptr);
 2612             }
 2613         }
 2614 
 2615         while (eptr && limit--) {
 2616             score = zzlGetScore(sptr);
 2617 
 2618             /* Abort when the node is no longer in range. */
 2619             if (reverse) {
 2620                 if (!zslValueGteMin(score,&range)) break;
 2621             } else {
 2622                 if (!zslValueLteMax(score,&range)) break;
 2623             }
 2624 
 2625             /* We know the element exists, so ziplistGet should always succeed */
 2626             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
 2627 
 2628             rangelen++;
 2629             if (vstr == NULL) {
 2630                 addReplyBulkLongLong(c,vlong);
 2631             } else {
 2632                 addReplyBulkCBuffer(c,vstr,vlen);
 2633             }
 2634 
 2635             if (withscores) {
 2636                 addReplyDouble(c,score);
 2637             }
 2638 
 2639             /* Move to next node */
 2640             if (reverse) {
 2641                 zzlPrev(zl,&eptr,&sptr);
 2642             } else {
 2643                 zzlNext(zl,&eptr,&sptr);
 2644             }
 2645         }
 2646     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 2647         zset *zs = zobj->ptr;
 2648         zskiplist *zsl = zs->zsl;
 2649         zskiplistNode *ln;
 2650 
 2651         /* If reversed, get the last node in range as starting point. */
 2652         if (reverse) {
 2653             ln = zslLastInRange(zsl,&range);
 2654         } else {
 2655             ln = zslFirstInRange(zsl,&range);
 2656         }
 2657 
 2658         /* No "first" element in the specified interval. */
 2659         if (ln == NULL) {
 2660             addReply(c, shared.emptymultibulk);
 2661             return;
 2662         }
 2663 
 2664         /* We don't know in advance how many matching elements there are in the
 2665          * list, so we push this object that will represent the multi-bulk
 2666          * length in the output buffer, and will "fix" it later */
 2667         replylen = addDeferredMultiBulkLength(c);
 2668 
 2669         /* If there is an offset, just traverse the number of elements without
 2670          * checking the score because that is done in the next loop. */
 2671         while (ln && offset--) {
 2672             if (reverse) {
 2673                 ln = ln->backward;
 2674             } else {
 2675                 ln = ln->level[0].forward;
 2676             }
 2677         }
 2678 
 2679         while (ln && limit--) {
 2680             /* Abort when the node is no longer in range. */
 2681             if (reverse) {
 2682                 if (!zslValueGteMin(ln->score,&range)) break;
 2683             } else {
 2684                 if (!zslValueLteMax(ln->score,&range)) break;
 2685             }
 2686 
 2687             rangelen++;
 2688             addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
 2689 
 2690             if (withscores) {
 2691                 addReplyDouble(c,ln->score);
 2692             }
 2693 
 2694             /* Move to next node */
 2695             if (reverse) {
 2696                 ln = ln->backward;
 2697             } else {
 2698                 ln = ln->level[0].forward;
 2699             }
 2700         }
 2701     } else {
 2702         serverPanic("Unknown sorted set encoding");
 2703     }
 2704 
 2705     if (withscores) {
 2706         rangelen *= 2;
 2707     }
 2708 
 2709     setDeferredMultiBulkLength(c, replylen, rangelen);
 2710 }
 2711 
 2712 void zrangebyscoreCommand(client *c) {
 2713     genericZrangebyscoreCommand(c,0);
 2714 }
 2715 
 2716 void zrevrangebyscoreCommand(client *c) {
 2717     genericZrangebyscoreCommand(c,1);
 2718 }
 2719 
 2720 void zcountCommand(client *c) {
 2721     robj *key = c->argv[1];
 2722     robj *zobj;
 2723     zrangespec range;
 2724     unsigned long count = 0;
 2725 
 2726     /* Parse the range arguments */
 2727     if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
 2728         addReplyError(c,"min or max is not a float");
 2729         return;
 2730     }
 2731 
 2732     /* Lookup the sorted set */
 2733     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
 2734         checkType(c, zobj, OBJ_ZSET)) return;
 2735 
 2736     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 2737         unsigned char *zl = zobj->ptr;
 2738         unsigned char *eptr, *sptr;
 2739         double score;
 2740 
 2741         /* Use the first element in range as the starting point */
 2742         eptr = zzlFirstInRange(zl,&range);
 2743 
 2744         /* No "first" element */
 2745         if (eptr == NULL) {
 2746             addReply(c, shared.czero);
 2747             return;
 2748         }
 2749 
 2750         /* First element is in range */
 2751         sptr = ziplistNext(zl,eptr);
 2752         score = zzlGetScore(sptr);
 2753         serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
 2754 
 2755         /* Iterate over elements in range */
 2756         while (eptr) {
 2757             score = zzlGetScore(sptr);
 2758 
 2759             /* Abort when the node is no longer in range. */
 2760             if (!zslValueLteMax(score,&range)) {
 2761                 break;
 2762             } else {
 2763                 count++;
 2764                 zzlNext(zl,&eptr,&sptr);
 2765             }
 2766         }
 2767     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 2768         zset *zs = zobj->ptr;
 2769         zskiplist *zsl = zs->zsl;
 2770         zskiplistNode *zn;
 2771         unsigned long rank;
 2772 
 2773         /* Find first element in range */
 2774         zn = zslFirstInRange(zsl, &range);
 2775 
 2776         /* Use rank of first element, if any, to determine preliminary count */
 2777         if (zn != NULL) {
 2778             rank = zslGetRank(zsl, zn->score, zn->ele);
 2779             count = (zsl->length - (rank - 1));
 2780 
 2781             /* Find last element in range */
 2782             zn = zslLastInRange(zsl, &range);
 2783 
 2784             /* Use rank of last element, if any, to determine the actual count */
 2785             if (zn != NULL) {
 2786                 rank = zslGetRank(zsl, zn->score, zn->ele);
 2787                 count -= (zsl->length - rank);
 2788             }
 2789         }
 2790     } else {
 2791         serverPanic("Unknown sorted set encoding");
 2792     }
 2793 
 2794     addReplyLongLong(c, count);
 2795 }
 2796 
 2797 void zlexcountCommand(client *c) {
 2798     robj *key = c->argv[1];
 2799     robj *zobj;
 2800     zlexrangespec range;
 2801     unsigned long count = 0;
 2802 
 2803     /* Parse the range arguments */
 2804     if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK) {
 2805         addReplyError(c,"min or max not valid string range item");
 2806         return;
 2807     }
 2808 
 2809     /* Lookup the sorted set */
 2810     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
 2811         checkType(c, zobj, OBJ_ZSET))
 2812     {
 2813         zslFreeLexRange(&range);
 2814         return;
 2815     }
 2816 
 2817     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 2818         unsigned char *zl = zobj->ptr;
 2819         unsigned char *eptr, *sptr;
 2820 
 2821         /* Use the first element in range as the starting point */
 2822         eptr = zzlFirstInLexRange(zl,&range);
 2823 
 2824         /* No "first" element */
 2825         if (eptr == NULL) {
 2826             zslFreeLexRange(&range);
 2827             addReply(c, shared.czero);
 2828             return;
 2829         }
 2830 
 2831         /* First element is in range */
 2832         sptr = ziplistNext(zl,eptr);
 2833         serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
 2834 
 2835         /* Iterate over elements in range */
 2836         while (eptr) {
 2837             /* Abort when the node is no longer in range. */
 2838             if (!zzlLexValueLteMax(eptr,&range)) {
 2839                 break;
 2840             } else {
 2841                 count++;
 2842                 zzlNext(zl,&eptr,&sptr);
 2843             }
 2844         }
 2845     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 2846         zset *zs = zobj->ptr;
 2847         zskiplist *zsl = zs->zsl;
 2848         zskiplistNode *zn;
 2849         unsigned long rank;
 2850 
 2851         /* Find first element in range */
 2852         zn = zslFirstInLexRange(zsl, &range);
 2853 
 2854         /* Use rank of first element, if any, to determine preliminary count */
 2855         if (zn != NULL) {
 2856             rank = zslGetRank(zsl, zn->score, zn->ele);
 2857             count = (zsl->length - (rank - 1));
 2858 
 2859             /* Find last element in range */
 2860             zn = zslLastInLexRange(zsl, &range);
 2861 
 2862             /* Use rank of last element, if any, to determine the actual count */
 2863             if (zn != NULL) {
 2864                 rank = zslGetRank(zsl, zn->score, zn->ele);
 2865                 count -= (zsl->length - rank);
 2866             }
 2867         }
 2868     } else {
 2869         serverPanic("Unknown sorted set encoding");
 2870     }
 2871 
 2872     zslFreeLexRange(&range);
 2873     addReplyLongLong(c, count);
 2874 }
 2875 
 2876 /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
 2877 void genericZrangebylexCommand(client *c, int reverse) {
 2878     zlexrangespec range;
 2879     robj *key = c->argv[1];
 2880     robj *zobj;
 2881     long offset = 0, limit = -1;
 2882     unsigned long rangelen = 0;
 2883     void *replylen = NULL;
 2884     int minidx, maxidx;
 2885 
 2886     /* Parse the range arguments. */
 2887     if (reverse) {
 2888         /* Range is given as [max,min] */
 2889         maxidx = 2; minidx = 3;
 2890     } else {
 2891         /* Range is given as [min,max] */
 2892         minidx = 2; maxidx = 3;
 2893     }
 2894 
 2895     if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
 2896         addReplyError(c,"min or max not valid string range item");
 2897         return;
 2898     }
 2899 
 2900     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
 2901      * 4 arguments, so we'll never enter the following code path. */
 2902     if (c->argc > 4) {
 2903         int remaining = c->argc - 4;
 2904         int pos = 4;
 2905 
 2906         while (remaining) {
 2907             if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
 2908                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
 2909                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) {
 2910                     zslFreeLexRange(&range);
 2911                     return;
 2912                 }
 2913                 pos += 3; remaining -= 3;
 2914             } else {
 2915                 zslFreeLexRange(&range);
 2916                 addReply(c,shared.syntaxerr);
 2917                 return;
 2918             }
 2919         }
 2920     }
 2921 
 2922     /* Ok, lookup the key and get the range */
 2923     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
 2924         checkType(c,zobj,OBJ_ZSET))
 2925     {
 2926         zslFreeLexRange(&range);
 2927         return;
 2928     }
 2929 
 2930     if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 2931         unsigned char *zl = zobj->ptr;
 2932         unsigned char *eptr, *sptr;
 2933         unsigned char *vstr;
 2934         unsigned int vlen;
 2935         long long vlong;
 2936 
 2937         /* If reversed, get the last node in range as starting point. */
 2938         if (reverse) {
 2939             eptr = zzlLastInLexRange(zl,&range);
 2940         } else {
 2941             eptr = zzlFirstInLexRange(zl,&range);
 2942         }
 2943 
 2944         /* No "first" element in the specified interval. */
 2945         if (eptr == NULL) {
 2946             addReply(c, shared.emptymultibulk);
 2947             zslFreeLexRange(&range);
 2948             return;
 2949         }
 2950 
 2951         /* Get score pointer for the first element. */
 2952         serverAssertWithInfo(c,zobj,eptr != NULL);
 2953         sptr = ziplistNext(zl,eptr);
 2954 
 2955         /* We don't know in advance how many matching elements there are in the
 2956          * list, so we push this object that will represent the multi-bulk
 2957          * length in the output buffer, and will "fix" it later */
 2958         replylen = addDeferredMultiBulkLength(c);
 2959 
 2960         /* If there is an offset, just traverse the number of elements without
 2961          * checking the score because that is done in the next loop. */
 2962         while (eptr && offset--) {
 2963             if (reverse) {
 2964                 zzlPrev(zl,&eptr,&sptr);
 2965             } else {
 2966                 zzlNext(zl,&eptr,&sptr);
 2967             }
 2968         }
 2969 
 2970         while (eptr && limit--) {
 2971             /* Abort when the node is no longer in range. */
 2972             if (reverse) {
 2973                 if (!zzlLexValueGteMin(eptr,&range)) break;
 2974             } else {
 2975                 if (!zzlLexValueLteMax(eptr,&range)) break;
 2976             }
 2977 
 2978             /* We know the element exists, so ziplistGet should always
 2979              * succeed. */
 2980             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
 2981 
 2982             rangelen++;
 2983             if (vstr == NULL) {
 2984                 addReplyBulkLongLong(c,vlong);
 2985             } else {
 2986                 addReplyBulkCBuffer(c,vstr,vlen);
 2987             }
 2988 
 2989             /* Move to next node */
 2990             if (reverse) {
 2991                 zzlPrev(zl,&eptr,&sptr);
 2992             } else {
 2993                 zzlNext(zl,&eptr,&sptr);
 2994             }
 2995         }
 2996     } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 2997         zset *zs = zobj->ptr;
 2998         zskiplist *zsl = zs->zsl;
 2999         zskiplistNode *ln;
 3000 
 3001         /* If reversed, get the last node in range as starting point. */
 3002         if (reverse) {
 3003             ln = zslLastInLexRange(zsl,&range);
 3004         } else {
 3005             ln = zslFirstInLexRange(zsl,&range);
 3006         }
 3007 
 3008         /* No "first" element in the specified interval. */
 3009         if (ln == NULL) {
 3010             addReply(c, shared.emptymultibulk);
 3011             zslFreeLexRange(&range);
 3012             return;
 3013         }
 3014 
 3015         /* We don't know in advance how many matching elements there are in the
 3016          * list, so we push this object that will represent the multi-bulk
 3017          * length in the output buffer, and will "fix" it later */
 3018         replylen = addDeferredMultiBulkLength(c);
 3019 
 3020         /* If there is an offset, just traverse the number of elements without
 3021          * checking the score because that is done in the next loop. */
 3022         while (ln && offset--) {
 3023             if (reverse) {
 3024                 ln = ln->backward;
 3025             } else {
 3026                 ln = ln->level[0].forward;
 3027             }
 3028         }
 3029 
 3030         while (ln && limit--) {
 3031             /* Abort when the node is no longer in range. */
 3032             if (reverse) {
 3033                 if (!zslLexValueGteMin(ln->ele,&range)) break;
 3034             } else {
 3035                 if (!zslLexValueLteMax(ln->ele,&range)) break;
 3036             }
 3037 
 3038             rangelen++;
 3039             addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
 3040 
 3041             /* Move to next node */
 3042             if (reverse) {
 3043                 ln = ln->backward;
 3044             } else {
 3045                 ln = ln->level[0].forward;
 3046             }
 3047         }
 3048     } else {
 3049         serverPanic("Unknown sorted set encoding");
 3050     }
 3051 
 3052     zslFreeLexRange(&range);
 3053     setDeferredMultiBulkLength(c, replylen, rangelen);
 3054 }
 3055 
 3056 void zrangebylexCommand(client *c) {
 3057     genericZrangebylexCommand(c,0);
 3058 }
 3059 
 3060 void zrevrangebylexCommand(client *c) {
 3061     genericZrangebylexCommand(c,1);
 3062 }
 3063 
 3064 void zcardCommand(client *c) {
 3065     robj *key = c->argv[1];
 3066     robj *zobj;
 3067 
 3068     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
 3069         checkType(c,zobj,OBJ_ZSET)) return;
 3070 
 3071     addReplyLongLong(c,zsetLength(zobj));
 3072 }
 3073 
 3074 void zscoreCommand(client *c) {
 3075     robj *key = c->argv[1];
 3076     robj *zobj;
 3077     double score;
 3078 
 3079     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
 3080         checkType(c,zobj,OBJ_ZSET)) return;
 3081 
 3082     if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
 3083         addReply(c,shared.nullbulk);
 3084     } else {
 3085         addReplyDouble(c,score);
 3086     }
 3087 }
 3088 
 3089 void zrankGenericCommand(client *c, int reverse) {
 3090     robj *key = c->argv[1];
 3091     robj *ele = c->argv[2];
 3092     robj *zobj;
 3093     long rank;
 3094 
 3095     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
 3096         checkType(c,zobj,OBJ_ZSET)) return;
 3097 
 3098     serverAssertWithInfo(c,ele,sdsEncodedObject(ele));
 3099     rank = zsetRank(zobj,ele->ptr,reverse);
 3100     if (rank >= 0) {
 3101         addReplyLongLong(c,rank);
 3102     } else {
 3103         addReply(c,shared.nullbulk);
 3104     }
 3105 }
 3106 
 3107 void zrankCommand(client *c) {
 3108     zrankGenericCommand(c, 0);
 3109 }
 3110 
 3111 void zrevrankCommand(client *c) {
 3112     zrankGenericCommand(c, 1);
 3113 }
 3114 
 3115 void zscanCommand(client *c) {
 3116     robj *o;
 3117     unsigned long cursor;
 3118 
 3119     if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
 3120     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
 3121         checkType(c,o,OBJ_ZSET)) return;
 3122     scanGenericCommand(c,o,cursor);
 3123 }
 3124 
 3125 /* This command implements the generic zpop operation, used by:
 3126  * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
 3127  * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
 3128  *
 3129  * If 'emitkey' is true also the key name is emitted, useful for the blocking
 3130  * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
 3131  *
 3132  * The synchronous version instead does not need to emit the key, but may
 3133  * use the 'count' argument to return multiple items if available. */
 3134 void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
 3135     int idx;
 3136     robj *key = NULL;
 3137     robj *zobj = NULL;
 3138     sds ele;
 3139     double score;
 3140     long count = 1;
 3141 
 3142     /* If a count argument as passed, parse it or return an error. */
 3143     if (countarg) {
 3144         if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
 3145             return;
 3146         if (count <= 0) {
 3147             addReply(c,shared.emptymultibulk);
 3148             return;
 3149         }
 3150     }
 3151 
 3152     /* Check type and break on the first error, otherwise identify candidate. */
 3153     idx = 0;
 3154     while (idx < keyc) {
 3155         key = keyv[idx++];
 3156         zobj = lookupKeyWrite(c->db,key);
 3157         if (!zobj) continue;
 3158         if (checkType(c,zobj,OBJ_ZSET)) return;
 3159         break;
 3160     }
 3161 
 3162     /* No candidate for zpopping, return empty. */
 3163     if (!zobj) {
 3164         addReply(c,shared.emptymultibulk);
 3165         return;
 3166     }
 3167 
 3168     void *arraylen_ptr = addDeferredMultiBulkLength(c);
 3169     long arraylen = 0;
 3170 
 3171     /* We emit the key only for the blocking variant. */
 3172     if (emitkey) addReplyBulk(c,key);
 3173 
 3174     /* Remove the element. */
 3175     do {
 3176         if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
 3177             unsigned char *zl = zobj->ptr;
 3178             unsigned char *eptr, *sptr;
 3179             unsigned char *vstr;
 3180             unsigned int vlen;
 3181             long long vlong;
 3182 
 3183             /* Get the first or last element in the sorted set. */
 3184             eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
 3185             serverAssertWithInfo(c,zobj,eptr != NULL);
 3186             serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
 3187             if (vstr == NULL)
 3188                 ele = sdsfromlonglong(vlong);
 3189             else
 3190                 ele = sdsnewlen(vstr,vlen);
 3191 
 3192             /* Get the score. */
 3193             sptr = ziplistNext(zl,eptr);
 3194             serverAssertWithInfo(c,zobj,sptr != NULL);
 3195             score = zzlGetScore(sptr);
 3196         } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
 3197             zset *zs = zobj->ptr;
 3198             zskiplist *zsl = zs->zsl;
 3199             zskiplistNode *zln;
 3200 
 3201             /* Get the first or last element in the sorted set. */
 3202             zln = (where == ZSET_MAX ? zsl->tail :
 3203                                        zsl->header->level[0].forward);
 3204 
 3205             /* There must be an element in the sorted set. */
 3206             serverAssertWithInfo(c,zobj,zln != NULL);
 3207             ele = sdsdup(zln->ele);
 3208             score = zln->score;
 3209         } else {
 3210             serverPanic("Unknown sorted set encoding");
 3211         }
 3212 
 3213         serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
 3214         server.dirty++;
 3215 
 3216         if (arraylen == 0) { /* Do this only for the first iteration. */
 3217             char *events[2] = {"zpopmin","zpopmax"};
 3218             notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
 3219             signalModifiedKey(c->db,key);
 3220         }
 3221 
 3222         addReplyBulkCBuffer(c,ele,sdslen(ele));
 3223         addReplyDouble(c,score);
 3224         sdsfree(ele);
 3225         arraylen += 2;
 3226 
 3227         /* Remove the key, if indeed needed. */
 3228         if (zsetLength(zobj) == 0) {
 3229             dbDelete(c->db,key);
 3230             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
 3231             break;
 3232         }
 3233     } while(--count);
 3234 
 3235     setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0));
 3236 }
 3237 
 3238 /* ZPOPMIN key [<count>] */
 3239 void zpopminCommand(client *c) {
 3240     if (c->argc > 3) {
 3241         addReply(c,shared.syntaxerr);
 3242         return;
 3243     }
 3244     genericZpopCommand(c,&c->argv[1],1,ZSET_MIN,0,
 3245         c->argc == 3 ? c->argv[2] : NULL);
 3246 }
 3247 
 3248 /* ZMAXPOP key [<count>] */
 3249 void zpopmaxCommand(client *c) {
 3250     if (c->argc > 3) {
 3251         addReply(c,shared.syntaxerr);
 3252         return;
 3253     }
 3254     genericZpopCommand(c,&c->argv[1],1,ZSET_MAX,0,
 3255         c->argc == 3 ? c->argv[2] : NULL);
 3256 }
 3257 
 3258 /* BZPOPMIN / BZPOPMAX actual implementation. */
 3259 void blockingGenericZpopCommand(client *c, int where) {
 3260     robj *o;
 3261     mstime_t timeout;
 3262     int j;
 3263 
 3264     if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
 3265         != C_OK) return;
 3266 
 3267     for (j = 1; j < c->argc-1; j++) {
 3268         o = lookupKeyWrite(c->db,c->argv[j]);
 3269         if (o != NULL) {
 3270             if (o->type != OBJ_ZSET) {
 3271                 addReply(c,shared.wrongtypeerr);
 3272                 return;
 3273             } else {
 3274                 if (zsetLength(o) != 0) {
 3275                     /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
 3276                     genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
 3277                     /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
 3278                     rewriteClientCommandVector(c,2,
 3279                         where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
 3280                         c->argv[j]);
 3281                     return;
 3282                 }
 3283             }
 3284         }
 3285     }
 3286 
 3287     /* If we are inside a MULTI/EXEC and the zset is empty the only thing
 3288      * we can do is treating it as a timeout (even with timeout 0). */
 3289     if (c->flags & CLIENT_MULTI) {
 3290         addReply(c,shared.nullmultibulk);
 3291         return;
 3292     }
 3293 
 3294     /* If the keys do not exist we must block */
 3295     blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
 3296 }
 3297 
 3298 // BZPOPMIN key [key ...] timeout
 3299 void bzpopminCommand(client *c) {
 3300     blockingGenericZpopCommand(c,ZSET_MIN);
 3301 }
 3302 
 3303 // BZPOPMAX key [key ...] timeout
 3304 void bzpopmaxCommand(client *c) {
 3305     blockingGenericZpopCommand(c,ZSET_MAX);
 3306 }