"Fossies" - the Fresh Open Source Software Archive

Member "redis-6.2.5/src/t_set.c" (21 Jul 2021, 41630 Bytes) of package /linux/misc/redis-6.2.5.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "t_set.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 6.2.4_vs_6.2.5.

    1 /*
    2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
    3  * All rights reserved.
    4  *
    5  * Redistribution and use in source and binary forms, with or without
    6  * modification, are permitted provided that the following conditions are met:
    7  *
    8  *   * Redistributions of source code must retain the above copyright notice,
    9  *     this list of conditions and the following disclaimer.
   10  *   * Redistributions in binary form must reproduce the above copyright
   11  *     notice, this list of conditions and the following disclaimer in the
   12  *     documentation and/or other materials provided with the distribution.
   13  *   * Neither the name of Redis nor the names of its contributors may be used
   14  *     to endorse or promote products derived from this software without
   15  *     specific prior written permission.
   16  *
   17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
   21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   27  * POSSIBILITY OF SUCH DAMAGE.
   28  */
   29 
   30 #include "server.h"
   31 
   32 /*-----------------------------------------------------------------------------
   33  * Set Commands
   34  *----------------------------------------------------------------------------*/
   35 
   36 void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
   37                               robj *dstkey, int op);
   38 
   39 /* Factory method to return a set that *can* hold "value". When the object has
   40  * an integer-encodable value, an intset will be returned. Otherwise a regular
   41  * hash table. */
   42 robj *setTypeCreate(sds value) {
   43     if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
   44         return createIntsetObject();
   45     return createSetObject();
   46 }
   47 
   48 /* Add the specified value into a set.
   49  *
   50  * If the value was already member of the set, nothing is done and 0 is
   51  * returned, otherwise the new element is added and 1 is returned. */
   52 int setTypeAdd(robj *subject, sds value) {
   53     long long llval;
   54     if (subject->encoding == OBJ_ENCODING_HT) {
   55         dict *ht = subject->ptr;
   56         dictEntry *de = dictAddRaw(ht,value,NULL);
   57         if (de) {
   58             dictSetKey(ht,de,sdsdup(value));
   59             dictSetVal(ht,de,NULL);
   60             return 1;
   61         }
   62     } else if (subject->encoding == OBJ_ENCODING_INTSET) {
   63         if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
   64             uint8_t success = 0;
   65             subject->ptr = intsetAdd(subject->ptr,llval,&success);
   66             if (success) {
   67                 /* Convert to regular set when the intset contains
   68                  * too many entries. */
   69                 if (intsetLen(subject->ptr) > server.set_max_intset_entries)
   70                     setTypeConvert(subject,OBJ_ENCODING_HT);
   71                 return 1;
   72             }
   73         } else {
   74             /* Failed to get integer from object, convert to regular set. */
   75             setTypeConvert(subject,OBJ_ENCODING_HT);
   76 
   77             /* The set *was* an intset and this value is not integer
   78              * encodable, so dictAdd should always work. */
   79             serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
   80             return 1;
   81         }
   82     } else {
   83         serverPanic("Unknown set encoding");
   84     }
   85     return 0;
   86 }
   87 
   88 int setTypeRemove(robj *setobj, sds value) {
   89     long long llval;
   90     if (setobj->encoding == OBJ_ENCODING_HT) {
   91         if (dictDelete(setobj->ptr,value) == DICT_OK) {
   92             if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr);
   93             return 1;
   94         }
   95     } else if (setobj->encoding == OBJ_ENCODING_INTSET) {
   96         if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
   97             int success;
   98             setobj->ptr = intsetRemove(setobj->ptr,llval,&success);
   99             if (success) return 1;
  100         }
  101     } else {
  102         serverPanic("Unknown set encoding");
  103     }
  104     return 0;
  105 }
  106 
  107 int setTypeIsMember(robj *subject, sds value) {
  108     long long llval;
  109     if (subject->encoding == OBJ_ENCODING_HT) {
  110         return dictFind((dict*)subject->ptr,value) != NULL;
  111     } else if (subject->encoding == OBJ_ENCODING_INTSET) {
  112         if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
  113             return intsetFind((intset*)subject->ptr,llval);
  114         }
  115     } else {
  116         serverPanic("Unknown set encoding");
  117     }
  118     return 0;
  119 }
  120 
  121 setTypeIterator *setTypeInitIterator(robj *subject) {
  122     setTypeIterator *si = zmalloc(sizeof(setTypeIterator));
  123     si->subject = subject;
  124     si->encoding = subject->encoding;
  125     if (si->encoding == OBJ_ENCODING_HT) {
  126         si->di = dictGetIterator(subject->ptr);
  127     } else if (si->encoding == OBJ_ENCODING_INTSET) {
  128         si->ii = 0;
  129     } else {
  130         serverPanic("Unknown set encoding");
  131     }
  132     return si;
  133 }
  134 
  135 void setTypeReleaseIterator(setTypeIterator *si) {
  136     if (si->encoding == OBJ_ENCODING_HT)
  137         dictReleaseIterator(si->di);
  138     zfree(si);
  139 }
  140 
  141 /* Move to the next entry in the set. Returns the object at the current
  142  * position.
  143  *
  144  * Since set elements can be internally be stored as SDS strings or
  145  * simple arrays of integers, setTypeNext returns the encoding of the
  146  * set object you are iterating, and will populate the appropriate pointer
  147  * (sdsele) or (llele) accordingly.
  148  *
  149  * Note that both the sdsele and llele pointers should be passed and cannot
  150  * be NULL since the function will try to defensively populate the non
  151  * used field with values which are easy to trap if misused.
  152  *
  153  * When there are no longer elements -1 is returned. */
  154 int setTypeNext(setTypeIterator *si, sds *sdsele, int64_t *llele) {
  155     if (si->encoding == OBJ_ENCODING_HT) {
  156         dictEntry *de = dictNext(si->di);
  157         if (de == NULL) return -1;
  158         *sdsele = dictGetKey(de);
  159         *llele = -123456789; /* Not needed. Defensive. */
  160     } else if (si->encoding == OBJ_ENCODING_INTSET) {
  161         if (!intsetGet(si->subject->ptr,si->ii++,llele))
  162             return -1;
  163         *sdsele = NULL; /* Not needed. Defensive. */
  164     } else {
  165         serverPanic("Wrong set encoding in setTypeNext");
  166     }
  167     return si->encoding;
  168 }
  169 
  170 /* The not copy on write friendly version but easy to use version
  171  * of setTypeNext() is setTypeNextObject(), returning new SDS
  172  * strings. So if you don't retain a pointer to this object you should call
  173  * sdsfree() against it.
  174  *
  175  * This function is the way to go for write operations where COW is not
  176  * an issue. */
  177 sds setTypeNextObject(setTypeIterator *si) {
  178     int64_t intele;
  179     sds sdsele;
  180     int encoding;
  181 
  182     encoding = setTypeNext(si,&sdsele,&intele);
  183     switch(encoding) {
  184         case -1:    return NULL;
  185         case OBJ_ENCODING_INTSET:
  186             return sdsfromlonglong(intele);
  187         case OBJ_ENCODING_HT:
  188             return sdsdup(sdsele);
  189         default:
  190             serverPanic("Unsupported encoding");
  191     }
  192     return NULL; /* just to suppress warnings */
  193 }
  194 
  195 /* Return random element from a non empty set.
  196  * The returned element can be an int64_t value if the set is encoded
  197  * as an "intset" blob of integers, or an SDS string if the set
  198  * is a regular set.
  199  *
  200  * The caller provides both pointers to be populated with the right
  201  * object. The return value of the function is the object->encoding
  202  * field of the object and is used by the caller to check if the
  203  * int64_t pointer or the redis object pointer was populated.
  204  *
  205  * Note that both the sdsele and llele pointers should be passed and cannot
  206  * be NULL since the function will try to defensively populate the non
  207  * used field with values which are easy to trap if misused. */
  208 int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele) {
  209     if (setobj->encoding == OBJ_ENCODING_HT) {
  210         dictEntry *de = dictGetFairRandomKey(setobj->ptr);
  211         *sdsele = dictGetKey(de);
  212         *llele = -123456789; /* Not needed. Defensive. */
  213     } else if (setobj->encoding == OBJ_ENCODING_INTSET) {
  214         *llele = intsetRandom(setobj->ptr);
  215         *sdsele = NULL; /* Not needed. Defensive. */
  216     } else {
  217         serverPanic("Unknown set encoding");
  218     }
  219     return setobj->encoding;
  220 }
  221 
  222 unsigned long setTypeSize(const robj *subject) {
  223     if (subject->encoding == OBJ_ENCODING_HT) {
  224         return dictSize((const dict*)subject->ptr);
  225     } else if (subject->encoding == OBJ_ENCODING_INTSET) {
  226         return intsetLen((const intset*)subject->ptr);
  227     } else {
  228         serverPanic("Unknown set encoding");
  229     }
  230 }
  231 
  232 /* Convert the set to specified encoding. The resulting dict (when converting
  233  * to a hash table) is presized to hold the number of elements in the original
  234  * set. */
  235 void setTypeConvert(robj *setobj, int enc) {
  236     setTypeIterator *si;
  237     serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
  238                              setobj->encoding == OBJ_ENCODING_INTSET);
  239 
  240     if (enc == OBJ_ENCODING_HT) {
  241         int64_t intele;
  242         dict *d = dictCreate(&setDictType,NULL);
  243         sds element;
  244 
  245         /* Presize the dict to avoid rehashing */
  246         dictExpand(d,intsetLen(setobj->ptr));
  247 
  248         /* To add the elements we extract integers and create redis objects */
  249         si = setTypeInitIterator(setobj);
  250         while (setTypeNext(si,&element,&intele) != -1) {
  251             element = sdsfromlonglong(intele);
  252             serverAssert(dictAdd(d,element,NULL) == DICT_OK);
  253         }
  254         setTypeReleaseIterator(si);
  255 
  256         setobj->encoding = OBJ_ENCODING_HT;
  257         zfree(setobj->ptr);
  258         setobj->ptr = d;
  259     } else {
  260         serverPanic("Unsupported set conversion");
  261     }
  262 }
  263 
  264 /* This is a helper function for the COPY command.
  265  * Duplicate a set object, with the guarantee that the returned object
  266  * has the same encoding as the original one.
  267  *
  268  * The resulting object always has refcount set to 1 */
  269 robj *setTypeDup(robj *o) {
  270     robj *set;
  271     setTypeIterator *si;
  272     sds elesds;
  273     int64_t intobj;
  274 
  275     serverAssert(o->type == OBJ_SET);
  276 
  277     /* Create a new set object that have the same encoding as the original object's encoding */
  278     if (o->encoding == OBJ_ENCODING_INTSET) {
  279         intset *is = o->ptr;
  280         size_t size = intsetBlobLen(is);
  281         intset *newis = zmalloc(size);
  282         memcpy(newis,is,size);
  283         set = createObject(OBJ_SET, newis);
  284         set->encoding = OBJ_ENCODING_INTSET;
  285     } else if (o->encoding == OBJ_ENCODING_HT) {
  286         set = createSetObject();
  287         dict *d = o->ptr;
  288         dictExpand(set->ptr, dictSize(d));
  289         si = setTypeInitIterator(o);
  290         while (setTypeNext(si, &elesds, &intobj) != -1) {
  291             setTypeAdd(set, elesds);
  292         }
  293         setTypeReleaseIterator(si);
  294     } else {
  295         serverPanic("Unknown set encoding");
  296     }
  297     return set;
  298 }
  299 
  300 void saddCommand(client *c) {
  301     robj *set;
  302     int j, added = 0;
  303 
  304     set = lookupKeyWrite(c->db,c->argv[1]);
  305     if (checkType(c,set,OBJ_SET)) return;
  306     
  307     if (set == NULL) {
  308         set = setTypeCreate(c->argv[2]->ptr);
  309         dbAdd(c->db,c->argv[1],set);
  310     }
  311 
  312     for (j = 2; j < c->argc; j++) {
  313         if (setTypeAdd(set,c->argv[j]->ptr)) added++;
  314     }
  315     if (added) {
  316         signalModifiedKey(c,c->db,c->argv[1]);
  317         notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
  318     }
  319     server.dirty += added;
  320     addReplyLongLong(c,added);
  321 }
  322 
  323 void sremCommand(client *c) {
  324     robj *set;
  325     int j, deleted = 0, keyremoved = 0;
  326 
  327     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
  328         checkType(c,set,OBJ_SET)) return;
  329 
  330     for (j = 2; j < c->argc; j++) {
  331         if (setTypeRemove(set,c->argv[j]->ptr)) {
  332             deleted++;
  333             if (setTypeSize(set) == 0) {
  334                 dbDelete(c->db,c->argv[1]);
  335                 keyremoved = 1;
  336                 break;
  337             }
  338         }
  339     }
  340     if (deleted) {
  341         signalModifiedKey(c,c->db,c->argv[1]);
  342         notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
  343         if (keyremoved)
  344             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
  345                                 c->db->id);
  346         server.dirty += deleted;
  347     }
  348     addReplyLongLong(c,deleted);
  349 }
  350 
  351 void smoveCommand(client *c) {
  352     robj *srcset, *dstset, *ele;
  353     srcset = lookupKeyWrite(c->db,c->argv[1]);
  354     dstset = lookupKeyWrite(c->db,c->argv[2]);
  355     ele = c->argv[3];
  356 
  357     /* If the source key does not exist return 0 */
  358     if (srcset == NULL) {
  359         addReply(c,shared.czero);
  360         return;
  361     }
  362 
  363     /* If the source key has the wrong type, or the destination key
  364      * is set and has the wrong type, return with an error. */
  365     if (checkType(c,srcset,OBJ_SET) ||
  366         checkType(c,dstset,OBJ_SET)) return;
  367 
  368     /* If srcset and dstset are equal, SMOVE is a no-op */
  369     if (srcset == dstset) {
  370         addReply(c,setTypeIsMember(srcset,ele->ptr) ?
  371             shared.cone : shared.czero);
  372         return;
  373     }
  374 
  375     /* If the element cannot be removed from the src set, return 0. */
  376     if (!setTypeRemove(srcset,ele->ptr)) {
  377         addReply(c,shared.czero);
  378         return;
  379     }
  380     notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
  381 
  382     /* Remove the src set from the database when empty */
  383     if (setTypeSize(srcset) == 0) {
  384         dbDelete(c->db,c->argv[1]);
  385         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
  386     }
  387 
  388     /* Create the destination set when it doesn't exist */
  389     if (!dstset) {
  390         dstset = setTypeCreate(ele->ptr);
  391         dbAdd(c->db,c->argv[2],dstset);
  392     }
  393 
  394     signalModifiedKey(c,c->db,c->argv[1]);
  395     server.dirty++;
  396 
  397     /* An extra key has changed when ele was successfully added to dstset */
  398     if (setTypeAdd(dstset,ele->ptr)) {
  399         server.dirty++;
  400         signalModifiedKey(c,c->db,c->argv[2]);
  401         notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id);
  402     }
  403     addReply(c,shared.cone);
  404 }
  405 
  406 void sismemberCommand(client *c) {
  407     robj *set;
  408 
  409     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
  410         checkType(c,set,OBJ_SET)) return;
  411 
  412     if (setTypeIsMember(set,c->argv[2]->ptr))
  413         addReply(c,shared.cone);
  414     else
  415         addReply(c,shared.czero);
  416 }
  417 
  418 void smismemberCommand(client *c) {
  419     robj *set;
  420     int j;
  421 
  422     /* Don't abort when the key cannot be found. Non-existing keys are empty
  423      * sets, where SMISMEMBER should respond with a series of zeros. */
  424     set = lookupKeyRead(c->db,c->argv[1]);
  425     if (set && checkType(c,set,OBJ_SET)) return;
  426 
  427     addReplyArrayLen(c,c->argc - 2);
  428 
  429     for (j = 2; j < c->argc; j++) {
  430         if (set && setTypeIsMember(set,c->argv[j]->ptr))
  431             addReply(c,shared.cone);
  432         else
  433             addReply(c,shared.czero);
  434     }
  435 }
  436 
  437 void scardCommand(client *c) {
  438     robj *o;
  439 
  440     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
  441         checkType(c,o,OBJ_SET)) return;
  442 
  443     addReplyLongLong(c,setTypeSize(o));
  444 }
  445 
  446 /* Handle the "SPOP key <count>" variant. The normal version of the
  447  * command is handled by the spopCommand() function itself. */
  448 
  449 /* How many times bigger should be the set compared to the remaining size
  450  * for us to use the "create new set" strategy? Read later in the
  451  * implementation for more info. */
  452 #define SPOP_MOVE_STRATEGY_MUL 5
  453 
  454 void spopWithCountCommand(client *c) {
  455     long l;
  456     unsigned long count, size;
  457     robj *set;
  458 
  459     /* Get the count argument */
  460     if (getPositiveLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
  461     count = (unsigned long) l;
  462 
  463     /* Make sure a key with the name inputted exists, and that it's type is
  464      * indeed a set. Otherwise, return nil */
  465     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.emptyset[c->resp]))
  466         == NULL || checkType(c,set,OBJ_SET)) return;
  467 
  468     /* If count is zero, serve an empty set ASAP to avoid special
  469      * cases later. */
  470     if (count == 0) {
  471         addReply(c,shared.emptyset[c->resp]);
  472         return;
  473     }
  474 
  475     size = setTypeSize(set);
  476 
  477     /* Generate an SPOP keyspace notification */
  478     notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);
  479     server.dirty += (count >= size) ? size : count;
  480 
  481     /* CASE 1:
  482      * The number of requested elements is greater than or equal to
  483      * the number of elements inside the set: simply return the whole set. */
  484     if (count >= size) {
  485         /* We just return the entire set */
  486         sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);
  487 
  488         /* Delete the set as it is now empty */
  489         dbDelete(c->db,c->argv[1]);
  490         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
  491 
  492         /* Propagate this command as a DEL operation */
  493         rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
  494         signalModifiedKey(c,c->db,c->argv[1]);
  495         return;
  496     }
  497 
  498     /* Case 2 and 3 require to replicate SPOP as a set of SREM commands.
  499      * Prepare our replication argument vector. Also send the array length
  500      * which is common to both the code paths. */
  501     robj *propargv[3];
  502     propargv[0] = shared.srem;
  503     propargv[1] = c->argv[1];
  504     addReplySetLen(c,count);
  505 
  506     /* Common iteration vars. */
  507     sds sdsele;
  508     robj *objele;
  509     int encoding;
  510     int64_t llele;
  511     unsigned long remaining = size-count; /* Elements left after SPOP. */
  512 
  513     /* If we are here, the number of requested elements is less than the
  514      * number of elements inside the set. Also we are sure that count < size.
  515      * Use two different strategies.
  516      *
  517      * CASE 2: The number of elements to return is small compared to the
  518      * set size. We can just extract random elements and return them to
  519      * the set. */
  520     if (remaining*SPOP_MOVE_STRATEGY_MUL > count) {
  521         while(count--) {
  522             /* Emit and remove. */
  523             encoding = setTypeRandomElement(set,&sdsele,&llele);
  524             if (encoding == OBJ_ENCODING_INTSET) {
  525                 addReplyBulkLongLong(c,llele);
  526                 objele = createStringObjectFromLongLong(llele);
  527                 set->ptr = intsetRemove(set->ptr,llele,NULL);
  528             } else {
  529                 addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));
  530                 objele = createStringObject(sdsele,sdslen(sdsele));
  531                 setTypeRemove(set,sdsele);
  532             }
  533 
  534             /* Replicate/AOF this command as an SREM operation */
  535             propargv[2] = objele;
  536             alsoPropagate(server.sremCommand,c->db->id,propargv,3,
  537                 PROPAGATE_AOF|PROPAGATE_REPL);
  538             decrRefCount(objele);
  539         }
  540     } else {
  541     /* CASE 3: The number of elements to return is very big, approaching
  542      * the size of the set itself. After some time extracting random elements
  543      * from such a set becomes computationally expensive, so we use
  544      * a different strategy, we extract random elements that we don't
  545      * want to return (the elements that will remain part of the set),
  546      * creating a new set as we do this (that will be stored as the original
  547      * set). Then we return the elements left in the original set and
  548      * release it. */
  549         robj *newset = NULL;
  550 
  551         /* Create a new set with just the remaining elements. */
  552         while(remaining--) {
  553             encoding = setTypeRandomElement(set,&sdsele,&llele);
  554             if (encoding == OBJ_ENCODING_INTSET) {
  555                 sdsele = sdsfromlonglong(llele);
  556             } else {
  557                 sdsele = sdsdup(sdsele);
  558             }
  559             if (!newset) newset = setTypeCreate(sdsele);
  560             setTypeAdd(newset,sdsele);
  561             setTypeRemove(set,sdsele);
  562             sdsfree(sdsele);
  563         }
  564 
  565         /* Transfer the old set to the client. */
  566         setTypeIterator *si;
  567         si = setTypeInitIterator(set);
  568         while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) {
  569             if (encoding == OBJ_ENCODING_INTSET) {
  570                 addReplyBulkLongLong(c,llele);
  571                 objele = createStringObjectFromLongLong(llele);
  572             } else {
  573                 addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));
  574                 objele = createStringObject(sdsele,sdslen(sdsele));
  575             }
  576 
  577             /* Replicate/AOF this command as an SREM operation */
  578             propargv[2] = objele;
  579             alsoPropagate(server.sremCommand,c->db->id,propargv,3,
  580                 PROPAGATE_AOF|PROPAGATE_REPL);
  581             decrRefCount(objele);
  582         }
  583         setTypeReleaseIterator(si);
  584 
  585         /* Assign the new set as the key value. */
  586         dbOverwrite(c->db,c->argv[1],newset);
  587     }
  588 
  589     /* Don't propagate the command itself even if we incremented the
  590      * dirty counter. We don't want to propagate an SPOP command since
  591      * we propagated the command as a set of SREMs operations using
  592      * the alsoPropagate() API. */
  593     preventCommandPropagation(c);
  594     signalModifiedKey(c,c->db,c->argv[1]);
  595 }
  596 
  597 void spopCommand(client *c) {
  598     robj *set, *ele;
  599     sds sdsele;
  600     int64_t llele;
  601     int encoding;
  602 
  603     if (c->argc == 3) {
  604         spopWithCountCommand(c);
  605         return;
  606     } else if (c->argc > 3) {
  607         addReplyErrorObject(c,shared.syntaxerr);
  608         return;
  609     }
  610 
  611     /* Make sure a key with the name inputted exists, and that it's type is
  612      * indeed a set */
  613     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
  614          == NULL || checkType(c,set,OBJ_SET)) return;
  615 
  616     /* Get a random element from the set */
  617     encoding = setTypeRandomElement(set,&sdsele,&llele);
  618 
  619     /* Remove the element from the set */
  620     if (encoding == OBJ_ENCODING_INTSET) {
  621         ele = createStringObjectFromLongLong(llele);
  622         set->ptr = intsetRemove(set->ptr,llele,NULL);
  623     } else {
  624         ele = createStringObject(sdsele,sdslen(sdsele));
  625         setTypeRemove(set,ele->ptr);
  626     }
  627 
  628     notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);
  629 
  630     /* Replicate/AOF this command as an SREM operation */
  631     rewriteClientCommandVector(c,3,shared.srem,c->argv[1],ele);
  632 
  633     /* Add the element to the reply */
  634     addReplyBulk(c,ele);
  635     decrRefCount(ele);
  636 
  637     /* Delete the set if it's empty */
  638     if (setTypeSize(set) == 0) {
  639         dbDelete(c->db,c->argv[1]);
  640         notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
  641     }
  642 
  643     /* Set has been modified */
  644     signalModifiedKey(c,c->db,c->argv[1]);
  645     server.dirty++;
  646 }
  647 
  648 /* handle the "SRANDMEMBER key <count>" variant. The normal version of the
  649  * command is handled by the srandmemberCommand() function itself. */
  650 
  651 /* How many times bigger should be the set compared to the requested size
  652  * for us to don't use the "remove elements" strategy? Read later in the
  653  * implementation for more info. */
  654 #define SRANDMEMBER_SUB_STRATEGY_MUL 3
  655 
  656 void srandmemberWithCountCommand(client *c) {
  657     long l;
  658     unsigned long count, size;
  659     int uniq = 1;
  660     robj *set;
  661     sds ele;
  662     int64_t llele;
  663     int encoding;
  664 
  665     dict *d;
  666 
  667     if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
  668     if (l >= 0) {
  669         count = (unsigned long) l;
  670     } else {
  671         /* A negative count means: return the same elements multiple times
  672          * (i.e. don't remove the extracted element after every extraction). */
  673         count = -l;
  674         uniq = 0;
  675     }
  676 
  677     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray))
  678         == NULL || checkType(c,set,OBJ_SET)) return;
  679     size = setTypeSize(set);
  680 
  681     /* If count is zero, serve it ASAP to avoid special cases later. */
  682     if (count == 0) {
  683         addReply(c,shared.emptyarray);
  684         return;
  685     }
  686 
  687     /* CASE 1: The count was negative, so the extraction method is just:
  688      * "return N random elements" sampling the whole set every time.
  689      * This case is trivial and can be served without auxiliary data
  690      * structures. This case is the only one that also needs to return the
  691      * elements in random order. */
  692     if (!uniq || count == 1) {
  693         addReplyArrayLen(c,count);
  694         while(count--) {
  695             encoding = setTypeRandomElement(set,&ele,&llele);
  696             if (encoding == OBJ_ENCODING_INTSET) {
  697                 addReplyBulkLongLong(c,llele);
  698             } else {
  699                 addReplyBulkCBuffer(c,ele,sdslen(ele));
  700             }
  701         }
  702         return;
  703     }
  704 
  705     /* CASE 2:
  706      * The number of requested elements is greater than the number of
  707      * elements inside the set: simply return the whole set. */
  708     if (count >= size) {
  709         setTypeIterator *si;
  710         addReplyArrayLen(c,size);
  711         si = setTypeInitIterator(set);
  712         while ((encoding = setTypeNext(si,&ele,&llele)) != -1) {
  713             if (encoding == OBJ_ENCODING_INTSET) {
  714                 addReplyBulkLongLong(c,llele);
  715             } else {
  716                 addReplyBulkCBuffer(c,ele,sdslen(ele));
  717             }
  718             size--;
  719         }
  720         setTypeReleaseIterator(si);
  721         serverAssert(size==0);
  722         return;
  723     }
  724 
  725     /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
  726     d = dictCreate(&sdsReplyDictType,NULL);
  727 
  728     /* CASE 3:
  729      * The number of elements inside the set is not greater than
  730      * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
  731      * In this case we create a set from scratch with all the elements, and
  732      * subtract random elements to reach the requested number of elements.
  733      *
  734      * This is done because if the number of requested elements is just
  735      * a bit less than the number of elements in the set, the natural approach
  736      * used into CASE 4 is highly inefficient. */
  737     if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
  738         setTypeIterator *si;
  739 
  740         /* Add all the elements into the temporary dictionary. */
  741         si = setTypeInitIterator(set);
  742         dictExpand(d, size);
  743         while ((encoding = setTypeNext(si,&ele,&llele)) != -1) {
  744             int retval = DICT_ERR;
  745 
  746             if (encoding == OBJ_ENCODING_INTSET) {
  747                 retval = dictAdd(d,sdsfromlonglong(llele),NULL);
  748             } else {
  749                 retval = dictAdd(d,sdsdup(ele),NULL);
  750             }
  751             serverAssert(retval == DICT_OK);
  752         }
  753         setTypeReleaseIterator(si);
  754         serverAssert(dictSize(d) == size);
  755 
  756         /* Remove random elements to reach the right count. */
  757         while (size > count) {
  758             dictEntry *de;
  759             de = dictGetRandomKey(d);
  760             dictUnlink(d,dictGetKey(de));
  761             sdsfree(dictGetKey(de));
  762             dictFreeUnlinkedEntry(d,de);
  763             size--;
  764         }
  765     }
  766 
  767     /* CASE 4: We have a big set compared to the requested number of elements.
  768      * In this case we can simply get random elements from the set and add
  769      * to the temporary set, trying to eventually get enough unique elements
  770      * to reach the specified count. */
  771     else {
  772         unsigned long added = 0;
  773         sds sdsele;
  774 
  775         dictExpand(d, count);
  776         while (added < count) {
  777             encoding = setTypeRandomElement(set,&ele,&llele);
  778             if (encoding == OBJ_ENCODING_INTSET) {
  779                 sdsele = sdsfromlonglong(llele);
  780             } else {
  781                 sdsele = sdsdup(ele);
  782             }
  783             /* Try to add the object to the dictionary. If it already exists
  784              * free it, otherwise increment the number of objects we have
  785              * in the result dictionary. */
  786             if (dictAdd(d,sdsele,NULL) == DICT_OK)
  787                 added++;
  788             else
  789                 sdsfree(sdsele);
  790         }
  791     }
  792 
  793     /* CASE 3 & 4: send the result to the user. */
  794     {
  795         dictIterator *di;
  796         dictEntry *de;
  797 
  798         addReplyArrayLen(c,count);
  799         di = dictGetIterator(d);
  800         while((de = dictNext(di)) != NULL)
  801             addReplyBulkSds(c,dictGetKey(de));
  802         dictReleaseIterator(di);
  803         dictRelease(d);
  804     }
  805 }
  806 
  807 /* SRANDMEMBER [<count>] */
  808 void srandmemberCommand(client *c) {
  809     robj *set;
  810     sds ele;
  811     int64_t llele;
  812     int encoding;
  813 
  814     if (c->argc == 3) {
  815         srandmemberWithCountCommand(c);
  816         return;
  817     } else if (c->argc > 3) {
  818         addReplyErrorObject(c,shared.syntaxerr);
  819         return;
  820     }
  821 
  822     /* Handle variant without <count> argument. Reply with simple bulk string */
  823     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
  824         == NULL || checkType(c,set,OBJ_SET)) return;
  825 
  826     encoding = setTypeRandomElement(set,&ele,&llele);
  827     if (encoding == OBJ_ENCODING_INTSET) {
  828         addReplyBulkLongLong(c,llele);
  829     } else {
  830         addReplyBulkCBuffer(c,ele,sdslen(ele));
  831     }
  832 }
  833 
  834 int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
  835     if (setTypeSize(*(robj**)s1) > setTypeSize(*(robj**)s2)) return 1;
  836     if (setTypeSize(*(robj**)s1) < setTypeSize(*(robj**)s2)) return -1;
  837     return 0;
  838 }
  839 
  840 /* This is used by SDIFF and in this case we can receive NULL that should
  841  * be handled as empty sets. */
  842 int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
  843     robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;
  844     unsigned long first = o1 ? setTypeSize(o1) : 0;
  845     unsigned long second = o2 ? setTypeSize(o2) : 0;
  846 
  847     if (first < second) return 1;
  848     if (first > second) return -1;
  849     return 0;
  850 }
  851 
  852 void sinterGenericCommand(client *c, robj **setkeys,
  853                           unsigned long setnum, robj *dstkey) {
  854     robj **sets = zmalloc(sizeof(robj*)*setnum);
  855     setTypeIterator *si;
  856     robj *dstset = NULL;
  857     sds elesds;
  858     int64_t intobj;
  859     void *replylen = NULL;
  860     unsigned long j, cardinality = 0;
  861     int encoding, empty = 0;
  862 
  863     for (j = 0; j < setnum; j++) {
  864         robj *setobj = dstkey ?
  865             lookupKeyWrite(c->db,setkeys[j]) :
  866             lookupKeyRead(c->db,setkeys[j]);
  867         if (!setobj) {
  868             /* A NULL is considered an empty set */
  869             empty += 1;
  870             sets[j] = NULL;
  871             continue;
  872         }
  873         if (checkType(c,setobj,OBJ_SET)) {
  874             zfree(sets);
  875             return;
  876         }
  877         sets[j] = setobj;
  878     }
  879 
  880     /* Set intersection with an empty set always results in an empty set.
  881      * Return ASAP if there is an empty set. */
  882     if (empty > 0) {
  883         zfree(sets);
  884         if (dstkey) {
  885             if (dbDelete(c->db,dstkey)) {
  886                 signalModifiedKey(c,c->db,dstkey);
  887                 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
  888                 server.dirty++;
  889             }
  890             addReply(c,shared.czero);
  891         } else {
  892             addReply(c,shared.emptyset[c->resp]);
  893         }
  894         return;
  895     }
  896 
  897     /* Sort sets from the smallest to largest, this will improve our
  898      * algorithm's performance */
  899     qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
  900 
  901     /* The first thing we should output is the total number of elements...
  902      * since this is a multi-bulk write, but at this stage we don't know
  903      * the intersection set size, so we use a trick, append an empty object
  904      * to the output list and save the pointer to later modify it with the
  905      * right length */
  906     if (!dstkey) {
  907         replylen = addReplyDeferredLen(c);
  908     } else {
  909         /* If we have a target key where to store the resulting set
  910          * create this key with an empty set inside */
  911         dstset = createIntsetObject();
  912     }
  913 
  914     /* Iterate all the elements of the first (smallest) set, and test
  915      * the element against all the other sets, if at least one set does
  916      * not include the element it is discarded */
  917     si = setTypeInitIterator(sets[0]);
  918     while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
  919         for (j = 1; j < setnum; j++) {
  920             if (sets[j] == sets[0]) continue;
  921             if (encoding == OBJ_ENCODING_INTSET) {
  922                 /* intset with intset is simple... and fast */
  923                 if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
  924                     !intsetFind((intset*)sets[j]->ptr,intobj))
  925                 {
  926                     break;
  927                 /* in order to compare an integer with an object we
  928                  * have to use the generic function, creating an object
  929                  * for this */
  930                 } else if (sets[j]->encoding == OBJ_ENCODING_HT) {
  931                     elesds = sdsfromlonglong(intobj);
  932                     if (!setTypeIsMember(sets[j],elesds)) {
  933                         sdsfree(elesds);
  934                         break;
  935                     }
  936                     sdsfree(elesds);
  937                 }
  938             } else if (encoding == OBJ_ENCODING_HT) {
  939                 if (!setTypeIsMember(sets[j],elesds)) {
  940                     break;
  941                 }
  942             }
  943         }
  944 
  945         /* Only take action when all sets contain the member */
  946         if (j == setnum) {
  947             if (!dstkey) {
  948                 if (encoding == OBJ_ENCODING_HT)
  949                     addReplyBulkCBuffer(c,elesds,sdslen(elesds));
  950                 else
  951                     addReplyBulkLongLong(c,intobj);
  952                 cardinality++;
  953             } else {
  954                 if (encoding == OBJ_ENCODING_INTSET) {
  955                     elesds = sdsfromlonglong(intobj);
  956                     setTypeAdd(dstset,elesds);
  957                     sdsfree(elesds);
  958                 } else {
  959                     setTypeAdd(dstset,elesds);
  960                 }
  961             }
  962         }
  963     }
  964     setTypeReleaseIterator(si);
  965 
  966     if (dstkey) {
  967         /* Store the resulting set into the target, if the intersection
  968          * is not an empty set. */
  969         if (setTypeSize(dstset) > 0) {
  970             setKey(c,c->db,dstkey,dstset);
  971             addReplyLongLong(c,setTypeSize(dstset));
  972             notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
  973                 dstkey,c->db->id);
  974             server.dirty++;
  975         } else {
  976             addReply(c,shared.czero);
  977             if (dbDelete(c->db,dstkey)) {
  978                 server.dirty++;
  979                 signalModifiedKey(c,c->db,dstkey);
  980                 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
  981             }
  982         }
  983         decrRefCount(dstset);
  984     } else {
  985         setDeferredSetLen(c,replylen,cardinality);
  986     }
  987     zfree(sets);
  988 }
  989 
  990 /* SINTER key [key ...] */
  991 void sinterCommand(client *c) {
  992     sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
  993 }
  994 
  995 /* SINTERSTORE destination key [key ...] */
  996 void sinterstoreCommand(client *c) {
  997     sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
  998 }
  999 
 1000 #define SET_OP_UNION 0
 1001 #define SET_OP_DIFF 1
 1002 #define SET_OP_INTER 2
 1003 
 1004 void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
 1005                               robj *dstkey, int op) {
 1006     robj **sets = zmalloc(sizeof(robj*)*setnum);
 1007     setTypeIterator *si;
 1008     robj *dstset = NULL;
 1009     sds ele;
 1010     int j, cardinality = 0;
 1011     int diff_algo = 1;
 1012 
 1013     for (j = 0; j < setnum; j++) {
 1014         robj *setobj = dstkey ?
 1015             lookupKeyWrite(c->db,setkeys[j]) :
 1016             lookupKeyRead(c->db,setkeys[j]);
 1017         if (!setobj) {
 1018             sets[j] = NULL;
 1019             continue;
 1020         }
 1021         if (checkType(c,setobj,OBJ_SET)) {
 1022             zfree(sets);
 1023             return;
 1024         }
 1025         sets[j] = setobj;
 1026     }
 1027 
 1028     /* Select what DIFF algorithm to use.
 1029      *
 1030      * Algorithm 1 is O(N*M) where N is the size of the element first set
 1031      * and M the total number of sets.
 1032      *
 1033      * Algorithm 2 is O(N) where N is the total number of elements in all
 1034      * the sets.
 1035      *
 1036      * We compute what is the best bet with the current input here. */
 1037     if (op == SET_OP_DIFF && sets[0]) {
 1038         long long algo_one_work = 0, algo_two_work = 0;
 1039 
 1040         for (j = 0; j < setnum; j++) {
 1041             if (sets[j] == NULL) continue;
 1042 
 1043             algo_one_work += setTypeSize(sets[0]);
 1044             algo_two_work += setTypeSize(sets[j]);
 1045         }
 1046 
 1047         /* Algorithm 1 has better constant times and performs less operations
 1048          * if there are elements in common. Give it some advantage. */
 1049         algo_one_work /= 2;
 1050         diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
 1051 
 1052         if (diff_algo == 1 && setnum > 1) {
 1053             /* With algorithm 1 it is better to order the sets to subtract
 1054              * by decreasing size, so that we are more likely to find
 1055              * duplicated elements ASAP. */
 1056             qsort(sets+1,setnum-1,sizeof(robj*),
 1057                 qsortCompareSetsByRevCardinality);
 1058         }
 1059     }
 1060 
 1061     /* We need a temp set object to store our union. If the dstkey
 1062      * is not NULL (that is, we are inside an SUNIONSTORE operation) then
 1063      * this set object will be the resulting object to set into the target key*/
 1064     dstset = createIntsetObject();
 1065 
 1066     if (op == SET_OP_UNION) {
 1067         /* Union is trivial, just add every element of every set to the
 1068          * temporary set. */
 1069         for (j = 0; j < setnum; j++) {
 1070             if (!sets[j]) continue; /* non existing keys are like empty sets */
 1071 
 1072             si = setTypeInitIterator(sets[j]);
 1073             while((ele = setTypeNextObject(si)) != NULL) {
 1074                 if (setTypeAdd(dstset,ele)) cardinality++;
 1075                 sdsfree(ele);
 1076             }
 1077             setTypeReleaseIterator(si);
 1078         }
 1079     } else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
 1080         /* DIFF Algorithm 1:
 1081          *
 1082          * We perform the diff by iterating all the elements of the first set,
 1083          * and only adding it to the target set if the element does not exist
 1084          * into all the other sets.
 1085          *
 1086          * This way we perform at max N*M operations, where N is the size of
 1087          * the first set, and M the number of sets. */
 1088         si = setTypeInitIterator(sets[0]);
 1089         while((ele = setTypeNextObject(si)) != NULL) {
 1090             for (j = 1; j < setnum; j++) {
 1091                 if (!sets[j]) continue; /* no key is an empty set. */
 1092                 if (sets[j] == sets[0]) break; /* same set! */
 1093                 if (setTypeIsMember(sets[j],ele)) break;
 1094             }
 1095             if (j == setnum) {
 1096                 /* There is no other set with this element. Add it. */
 1097                 setTypeAdd(dstset,ele);
 1098                 cardinality++;
 1099             }
 1100             sdsfree(ele);
 1101         }
 1102         setTypeReleaseIterator(si);
 1103     } else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
 1104         /* DIFF Algorithm 2:
 1105          *
 1106          * Add all the elements of the first set to the auxiliary set.
 1107          * Then remove all the elements of all the next sets from it.
 1108          *
 1109          * This is O(N) where N is the sum of all the elements in every
 1110          * set. */
 1111         for (j = 0; j < setnum; j++) {
 1112             if (!sets[j]) continue; /* non existing keys are like empty sets */
 1113 
 1114             si = setTypeInitIterator(sets[j]);
 1115             while((ele = setTypeNextObject(si)) != NULL) {
 1116                 if (j == 0) {
 1117                     if (setTypeAdd(dstset,ele)) cardinality++;
 1118                 } else {
 1119                     if (setTypeRemove(dstset,ele)) cardinality--;
 1120                 }
 1121                 sdsfree(ele);
 1122             }
 1123             setTypeReleaseIterator(si);
 1124 
 1125             /* Exit if result set is empty as any additional removal
 1126              * of elements will have no effect. */
 1127             if (cardinality == 0) break;
 1128         }
 1129     }
 1130 
 1131     /* Output the content of the resulting set, if not in STORE mode */
 1132     if (!dstkey) {
 1133         addReplySetLen(c,cardinality);
 1134         si = setTypeInitIterator(dstset);
 1135         while((ele = setTypeNextObject(si)) != NULL) {
 1136             addReplyBulkCBuffer(c,ele,sdslen(ele));
 1137             sdsfree(ele);
 1138         }
 1139         setTypeReleaseIterator(si);
 1140         server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset) :
 1141                                           decrRefCount(dstset);
 1142     } else {
 1143         /* If we have a target key where to store the resulting set
 1144          * create this key with the result set inside */
 1145         if (setTypeSize(dstset) > 0) {
 1146             setKey(c,c->db,dstkey,dstset);
 1147             addReplyLongLong(c,setTypeSize(dstset));
 1148             notifyKeyspaceEvent(NOTIFY_SET,
 1149                 op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
 1150                 dstkey,c->db->id);
 1151             server.dirty++;
 1152         } else {
 1153             addReply(c,shared.czero);
 1154             if (dbDelete(c->db,dstkey)) {
 1155                 server.dirty++;
 1156                 signalModifiedKey(c,c->db,dstkey);
 1157                 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
 1158             }
 1159         }
 1160         decrRefCount(dstset);
 1161     }
 1162     zfree(sets);
 1163 }
 1164 
 1165 /* SUNION key [key ...] */
 1166 void sunionCommand(client *c) {
 1167     sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION);
 1168 }
 1169 
 1170 /* SUNIONSTORE destination key [key ...] */
 1171 void sunionstoreCommand(client *c) {
 1172     sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_UNION);
 1173 }
 1174 
 1175 /* SDIFF key [key ...] */
 1176 void sdiffCommand(client *c) {
 1177     sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF);
 1178 }
 1179 
 1180 /* SDIFFSTORE destination key [key ...] */
 1181 void sdiffstoreCommand(client *c) {
 1182     sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF);
 1183 }
 1184 
 1185 void sscanCommand(client *c) {
 1186     robj *set;
 1187     unsigned long cursor;
 1188 
 1189     if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
 1190     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
 1191         checkType(c,set,OBJ_SET)) return;
 1192     scanGenericCommand(c,set,cursor);
 1193 }