LCOV - code coverage report
Current view: directory - redis/src - t_zset.c (source / functions) Found Hit Coverage
Test: redis.info Lines: 1125 1023 90.9 %
Date: 2012-04-04 Functions: 63 61 96.8 %
Colors: not hit hit

       1                 : #include "redis.h"
       2                 : 
       3                 : #include <math.h>
       4                 : 
       5                 : /*-----------------------------------------------------------------------------
       6                 :  * Sorted set API
       7                 :  *----------------------------------------------------------------------------*/
       8                 : 
       9                 : /* ZSETs are ordered sets using two data structures to hold the same elements
      10                 :  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
      11                 :  * data structure.
      12                 :  *
      13                 :  * The elements are added to an hash table mapping Redis objects to scores.
      14                 :  * At the same time the elements are added to a skip list mapping scores
      15                 :  * to Redis objects (so objects are sorted by scores in this "view"). */
      16                 : 
      17                 : /* This skiplist implementation is almost a C translation of the original
      18                 :  * algorithm described by William Pugh in "Skip Lists: A Probabilistic
      19                 :  * Alternative to Balanced Trees", modified in three ways:
      20                 :  * a) this implementation allows for repeated scores.
      21                 :  * b) the comparison is not just by key (our 'score') but by satellite data.
      22                 :  * c) there is a back pointer, so it's a doubly linked list with the back
      23                 :  * pointers being only at "level 1". This allows to traverse the list
      24                 :  * from tail to head, useful for ZREVRANGE. */
      25                 : 
      26           25204 : zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
      27           25204 :     zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
      28           25204 :     zn->score = score;
      29           25204 :     zn->obj = obj;
      30           25204 :     return zn;
      31                 : }
      32                 : 
      33            7696 : zskiplist *zslCreate(void) {
      34                 :     int j;
      35                 :     zskiplist *zsl;
      36                 : 
      37            7696 :     zsl = zmalloc(sizeof(*zsl));
      38            7696 :     zsl->level = 1;
      39            7696 :     zsl->length = 0;
      40            7696 :     zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
      41          253968 :     for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
      42          246272 :         zsl->header->level[j].forward = NULL;
      43          246272 :         zsl->header->level[j].span = 0;
      44                 :     }
      45            7696 :     zsl->header->backward = NULL;
      46            7696 :     zsl->tail = NULL;
      47            7696 :     return zsl;
      48                 : }
      49                 : 
      50           10976 : void zslFreeNode(zskiplistNode *node) {
      51           10976 :     decrRefCount(node->obj);
      52           10976 :     zfree(node);
      53           10976 : }
      54                 : 
      55            2895 : void zslFree(zskiplist *zsl) {
      56            2895 :     zskiplistNode *node = zsl->header->level[0].forward, *next;
      57                 : 
      58            2895 :     zfree(zsl->header);
      59            9947 :     while(node) {
      60            4157 :         next = node->level[0].forward;
      61            4157 :         zslFreeNode(node);
      62            4157 :         node = next;
      63                 :     }
      64            2895 :     zfree(zsl);
      65            2895 : }
      66                 : 
      67                 : /* Returns a random level for the new skiplist node we are going to create.
      68                 :  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
      69                 :  * (both inclusive), with a powerlaw-alike distribution where higher
      70                 :  * levels are less likely to be returned. */
      71           17508 : int zslRandomLevel(void) {
      72           17508 :     int level = 1;
      73           40985 :     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
      74            5969 :         level += 1;
      75           17508 :     return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
      76                 : }
      77                 : 
      78           17508 : zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
      79                 :     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
      80                 :     unsigned int rank[ZSKIPLIST_MAXLEVEL];
      81                 :     int i, level;
      82                 : 
      83           17508 :     redisAssert(!isnan(score));
      84           17508 :     x = zsl->header;
      85           62961 :     for (i = zsl->level-1; i >= 0; i--) {
      86                 :         /* store rank that is crossed to reach the insert position */
      87           45453 :         rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
      88          280329 :         while (x->level[i].forward &&
      89           94119 :             (x->level[i].forward->score < score ||
      90           26268 :                 (x->level[i].forward->score == score &&
      91             674 :                 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
      92           68362 :             rank[i] += x->level[i].span;
      93           68362 :             x = x->level[i].forward;
      94                 :         }
      95           45453 :         update[i] = x;
      96                 :     }
      97                 :     /* we assume the key is not already inside, since we allow duplicated
      98                 :      * scores, and the re-insertion of score and redis object should never
      99                 :      * happpen since the caller of zslInsert() should test in the hash table
     100                 :      * if the element is already inside or not. */
     101           17508 :     level = zslRandomLevel();
     102           17508 :     if (level > zsl->level) {
     103            5794 :         for (i = zsl->level; i < level; i++) {
     104            3292 :             rank[i] = 0;
     105            3292 :             update[i] = zsl->header;
     106            3292 :             update[i]->level[i].span = zsl->length;
     107                 :         }
     108            2502 :         zsl->level = level;
     109                 :     }
     110           17508 :     x = zslCreateNode(level,score,obj);
     111           40985 :     for (i = 0; i < level; i++) {
     112           23477 :         x->level[i].forward = update[i]->level[i].forward;
     113           23477 :         update[i]->level[i].forward = x;
     114                 : 
     115                 :         /* update span covered by update[i] as x is inserted here */
     116           23477 :         x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
     117           23477 :         update[i]->level[i].span = (rank[0] - rank[i]) + 1;
     118                 :     }
     119                 : 
     120                 :     /* increment span for untouched levels */
     121           42776 :     for (i = level; i < zsl->level; i++) {
     122           25268 :         update[i]->level[i].span++;
     123                 :     }
     124                 : 
     125           17508 :     x->backward = (update[0] == zsl->header) ? NULL : update[0];
     126           17508 :     if (x->level[0].forward)
     127            8036 :         x->level[0].forward->backward = x;
     128                 :     else
     129            9472 :         zsl->tail = x;
     130           17508 :     zsl->length++;
     131           17508 :     return x;
     132                 : }
     133                 : 
     134                 : /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
     135            2564 : void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
     136                 :     int i;
     137           10675 :     for (i = 0; i < zsl->level; i++) {
     138            8111 :         if (update[i]->level[i].forward == x) {
     139            3447 :             update[i]->level[i].span += x->level[i].span - 1;
     140            3447 :             update[i]->level[i].forward = x->level[i].forward;
     141                 :         } else {
     142            4664 :             update[i]->level[i].span -= 1;
     143                 :         }
     144                 :     }
     145            2564 :     if (x->level[0].forward) {
     146            1662 :         x->level[0].forward->backward = x->backward;
     147                 :     } else {
     148             902 :         zsl->tail = x->backward;
     149                 :     }
     150            2892 :     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
     151             328 :         zsl->level--;
     152            2564 :     zsl->length--;
     153            2564 : }
     154                 : 
     155                 : /* Delete an element with matching score/object from the skiplist. */
     156            2512 : int zslDelete(zskiplist *zsl, double score, robj *obj) {
     157                 :     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     158                 :     int i;
     159                 : 
     160            2512 :     x = zsl->header;
     161           10513 :     for (i = zsl->level-1; i >= 0; i--) {
     162           48678 :         while (x->level[i].forward &&
     163           18650 :             (x->level[i].forward->score < score ||
     164            6798 :                 (x->level[i].forward->score == score &&
     165            3371 :                 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
     166           11858 :             x = x->level[i].forward;
     167            8001 :         update[i] = x;
     168                 :     }
     169                 :     /* We may have multiple elements with the same score, what we need
     170                 :      * is to find the element with both the right score and object. */
     171            2512 :     x = x->level[0].forward;
     172            2512 :     if (x && score == x->score && equalStringObjects(x->obj,obj)) {
     173            2512 :         zslDeleteNode(zsl, x, update);
     174            2512 :         zslFreeNode(x);
     175            2512 :         return 1;
     176                 :     } else {
     177               0 :         return 0; /* not found */
     178                 :     }
     179                 :     return 0; /* not found */
     180                 : }
     181                 : 
     182                 : static int zslValueGteMin(double value, zrangespec *spec) {
     183           71459 :     return spec->minex ? (value > spec->min) : (value >= spec->min);
     184                 : }
     185                 : 
     186                 : static int zslValueLteMax(double value, zrangespec *spec) {
     187           87341 :     return spec->maxex ? (value < spec->max) : (value <= spec->max);
     188                 : }
     189                 : 
     190                 : /* Returns if there is a part of the zset is in range. */
     191            1846 : int zslIsInRange(zskiplist *zsl, zrangespec *range) {
     192                 :     zskiplistNode *x;
     193                 : 
     194                 :     /* Test for ranges that will always be empty. */
     195            3692 :     if (range->min > range->max ||
     196            1846 :             (range->min == range->max && (range->minex || range->maxex)))
     197               4 :         return 0;
     198            1842 :     x = zsl->tail;
     199            3684 :     if (x == NULL || !zslValueGteMin(x->score,range))
     200               4 :         return 0;
     201            1838 :     x = zsl->header->level[0].forward;
     202            3676 :     if (x == NULL || !zslValueLteMax(x->score,range))
     203               8 :         return 0;
     204            1830 :     return 1;
     205                 : }
     206                 : 
     207                 : /* Find the first node that is contained in the specified range.
     208                 :  * Returns NULL when no element is contained in the range. */
     209            1228 : zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
     210                 :     zskiplistNode *x;
     211                 :     int i;
     212                 : 
     213                 :     /* If everything is out of range, return early. */
     214            1228 :     if (!zslIsInRange(zsl,&range)) return NULL;
     215                 : 
     216            1216 :     x = zsl->header;
     217            7223 :     for (i = zsl->level-1; i >= 0; i--) {
     218                 :         /* Go forward while *OUT* of range. */
     219           26929 :         while (x->level[i].forward &&
     220           12770 :             !zslValueGteMin(x->level[i].forward->score,&range))
     221            8152 :                 x = x->level[i].forward;
     222                 :     }
     223                 : 
     224                 :     /* This is an inner range, so the next node cannot be NULL. */
     225            1216 :     x = x->level[0].forward;
     226            1216 :     redisAssert(x != NULL);
     227                 : 
     228                 :     /* Check if score <= max. */
     229            2432 :     if (!zslValueLteMax(x->score,&range)) return NULL;
     230            1212 :     return x;
     231                 : }
     232                 : 
     233                 : /* Find the last node that is contained in the specified range.
     234                 :  * Returns NULL when no element is contained in the range. */
     235             618 : zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
     236                 :     zskiplistNode *x;
     237                 :     int i;
     238                 : 
     239                 :     /* If everything is out of range, return early. */
     240             618 :     if (!zslIsInRange(zsl,&range)) return NULL;
     241                 : 
     242             614 :     x = zsl->header;
     243            3627 :     for (i = zsl->level-1; i >= 0; i--) {
     244                 :         /* Go forward while *IN* range. */
     245           18963 :         while (x->level[i].forward &&
     246            8631 :             zslValueLteMax(x->level[i].forward->score,&range))
     247            7319 :                 x = x->level[i].forward;
     248                 :     }
     249                 : 
     250                 :     /* This is an inner range, so this node cannot be NULL. */
     251             614 :     redisAssert(x != NULL);
     252                 : 
     253                 :     /* Check if score >= min. */
     254            1228 :     if (!zslValueGteMin(x->score,&range)) return NULL;
     255             614 :     return x;
     256                 : }
     257                 : 
     258                 : /* Delete all the elements with score between min and max from the skiplist.
     259                 :  * Min and mx are inclusive, so a score >= min || score <= max is deleted.
     260                 :  * Note that this function takes the reference to the hash table view of the
     261                 :  * sorted set, in order to remove the elements from the hash table too. */
     262              13 : unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
     263                 :     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     264              13 :     unsigned long removed = 0;
     265                 :     int i;
     266                 : 
     267              13 :     x = zsl->header;
     268              43 :     for (i = zsl->level-1; i >= 0; i--) {
     269              82 :         while (x->level[i].forward && (range.minex ?
     270              10 :             x->level[i].forward->score <= range.min :
     271              30 :             x->level[i].forward->score < range.min))
     272              12 :                 x = x->level[i].forward;
     273              30 :         update[i] = x;
     274                 :     }
     275                 : 
     276                 :     /* Current node is the last with score < or <= min. */
     277              13 :     x = x->level[0].forward;
     278                 : 
     279                 :     /* Delete nodes while in range. */
     280              64 :     while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
     281              38 :         zskiplistNode *next = x->level[0].forward;
     282              38 :         zslDeleteNode(zsl,x,update);
     283              38 :         dictDelete(dict,x->obj);
     284              38 :         zslFreeNode(x);
     285              38 :         removed++;
     286              38 :         x = next;
     287                 :     }
     288              13 :     return removed;
     289                 : }
     290                 : 
     291                 : /* Delete all the elements with rank between start and end from the skiplist.
     292                 :  * Start and end are inclusive. Note that start and end need to be 1-based */
     293               4 : unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
     294                 :     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     295               4 :     unsigned long traversed = 0, removed = 0;
     296                 :     int i;
     297                 : 
     298               4 :     x = zsl->header;
     299              14 :     for (i = zsl->level-1; i >= 0; i--) {
     300              11 :         while (x->level[i].forward && (traversed + x->level[i].span) < start) {
     301               1 :             traversed += x->level[i].span;
     302               1 :             x = x->level[i].forward;
     303                 :         }
     304              10 :         update[i] = x;
     305                 :     }
     306                 : 
     307               4 :     traversed++;
     308               4 :     x = x->level[0].forward;
     309              22 :     while (x && traversed <= end) {
     310              14 :         zskiplistNode *next = x->level[0].forward;
     311              14 :         zslDeleteNode(zsl,x,update);
     312              14 :         dictDelete(dict,x->obj);
     313              14 :         zslFreeNode(x);
     314              14 :         removed++;
     315              14 :         traversed++;
     316              14 :         x = next;
     317                 :     }
     318               4 :     return removed;
     319                 : }
     320                 : 
     321                 : /* Find the rank for an element by both score and key.
     322                 :  * Returns 0 when the element cannot be found, rank otherwise.
     323                 :  * Note that the rank is 1-based due to the span of zsl->header to the
     324                 :  * first element. */
     325            3208 : unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
     326                 :     zskiplistNode *x;
     327            3208 :     unsigned long rank = 0;
     328                 :     int i;
     329                 : 
     330            3208 :     x = zsl->header;
     331           13116 :     for (i = zsl->level-1; i >= 0; i--) {
     332           90096 :         while (x->level[i].forward &&
     333           35282 :             (x->level[i].forward->score < score ||
     334           12479 :                 (x->level[i].forward->score == score &&
     335            3208 :                 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
     336           26011 :             rank += x->level[i].span;
     337           26011 :             x = x->level[i].forward;
     338                 :         }
     339                 : 
     340                 :         /* x might be equal to zsl->header, so test if obj is non-NULL */
     341           13116 :         if (x->obj && equalStringObjects(x->obj,o)) {
     342            3208 :             return rank;
     343                 :         }
     344                 :     }
     345               0 :     return 0;
     346                 : }
     347                 : 
     348                 : /* Finds an element by its rank. The rank argument needs to be 1-based. */
     349               0 : zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
     350                 :     zskiplistNode *x;
     351            1990 :     unsigned long traversed = 0;
     352                 :     int i;
     353                 : 
     354            1990 :     x = zsl->header;
     355            7271 :     for (i = zsl->level-1; i >= 0; i--) {
     356           21808 :         while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
     357                 :         {
     358           14537 :             traversed += x->level[i].span;
     359           14537 :             x = x->level[i].forward;
     360                 :         }
     361            7271 :         if (traversed == rank) {
     362            1990 :             return x;
     363                 :         }
     364                 :     }
     365               0 :     return NULL;
     366                 : }
     367                 : 
     368                 : /* Populate the rangespec according to the objects min and max. */
     369            2530 : static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
     370                 :     char *eptr;
     371            2530 :     spec->minex = spec->maxex = 0;
     372                 : 
     373                 :     /* Parse the min-max interval. If one of the values is prefixed
     374                 :      * by the "(" character, it's considered "open". For instance
     375                 :      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
     376                 :      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
     377            2530 :     if (min->encoding == REDIS_ENCODING_INT) {
     378               0 :         spec->min = (long)min->ptr;
     379                 :     } else {
     380            2530 :         if (((char*)min->ptr)[0] == '(') {
     381             840 :             spec->min = strtod((char*)min->ptr+1,&eptr);
     382             840 :             if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
     383             840 :             spec->minex = 1;
     384                 :         } else {
     385            1690 :             spec->min = strtod((char*)min->ptr,&eptr);
     386            1690 :             if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
     387                 :         }
     388                 :     }
     389            2526 :     if (max->encoding == REDIS_ENCODING_INT) {
     390               0 :         spec->max = (long)max->ptr;
     391                 :     } else {
     392            2526 :         if (((char*)max->ptr)[0] == '(') {
     393             840 :             spec->max = strtod((char*)max->ptr+1,&eptr);
     394             840 :             if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
     395             840 :             spec->maxex = 1;
     396                 :         } else {
     397            1686 :             spec->max = strtod((char*)max->ptr,&eptr);
     398            1686 :             if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
     399                 :         }
     400                 :     }
     401                 : 
     402            2518 :     return REDIS_OK;
     403                 : }
     404                 : 
     405                 : /*-----------------------------------------------------------------------------
     406                 :  * Ziplist-backed sorted set API
     407                 :  *----------------------------------------------------------------------------*/
     408                 : 
     409          309171 : double zzlGetScore(unsigned char *sptr) {
     410                 :     unsigned char *vstr;
     411                 :     unsigned int vlen;
     412                 :     long long vlong;
     413                 :     char buf[128];
     414                 :     double score;
     415                 : 
     416          309171 :     redisAssert(sptr != NULL);
     417          309171 :     redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
     418                 : 
     419          309171 :     if (vstr) {
     420          301543 :         memcpy(buf,vstr,vlen);
     421          301543 :         buf[vlen] = '\0';
     422          301543 :         score = strtod(buf,NULL);
     423                 :     } else {
     424            7628 :         score = vlong;
     425                 :     }
     426                 : 
     427          309171 :     return score;
     428                 : }
     429                 : 
     430                 : /* Compare element in sorted set with given element. */
     431             774 : int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
     432                 :     unsigned char *vstr;
     433                 :     unsigned int vlen;
     434                 :     long long vlong;
     435                 :     unsigned char vbuf[32];
     436                 :     int minlen, cmp;
     437                 : 
     438             774 :     redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
     439             774 :     if (vstr == NULL) {
     440                 :         /* Store string representation of long long in buf. */
     441             772 :         vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
     442             772 :         vstr = vbuf;
     443                 :     }
     444                 : 
     445             774 :     minlen = (vlen < clen) ? vlen : clen;
     446             774 :     cmp = memcmp(vstr,cstr,minlen);
     447             774 :     if (cmp == 0) return vlen-clen;
     448             755 :     return cmp;
     449                 : }
     450                 : 
     451           41104 : unsigned int zzlLength(unsigned char *zl) {
     452           41104 :     return ziplistLen(zl)/2;
     453                 : }
     454                 : 
     455                 : /* Move to next entry based on the values in eptr and sptr. Both are set to
     456                 :  * NULL when there is no next entry. */
     457          196145 : void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
     458                 :     unsigned char *_eptr, *_sptr;
     459          196145 :     redisAssert(*eptr != NULL && *sptr != NULL);
     460                 : 
     461          196145 :     _eptr = ziplistNext(zl,*sptr);
     462          196145 :     if (_eptr != NULL) {
     463          162618 :         _sptr = ziplistNext(zl,_eptr);
     464          162618 :         redisAssert(_sptr != NULL);
     465                 :     } else {
     466                 :         /* No next entry. */
     467           33527 :         _sptr = NULL;
     468                 :     }
     469                 : 
     470          196145 :     *eptr = _eptr;
     471          196145 :     *sptr = _sptr;
     472          196145 : }
     473                 : 
     474                 : /* Move to the previous entry based on the values in eptr and sptr. Both are
     475                 :  * set to NULL when there is no next entry. */
     476             303 : void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
     477                 :     unsigned char *_eptr, *_sptr;
     478             303 :     redisAssert(*eptr != NULL && *sptr != NULL);
     479                 : 
     480             303 :     _sptr = ziplistPrev(zl,*eptr);
     481             303 :     if (_sptr != NULL) {
     482             294 :         _eptr = ziplistPrev(zl,_sptr);
     483             294 :         redisAssert(_eptr != NULL);
     484                 :     } else {
     485                 :         /* No previous entry. */
     486               9 :         _eptr = NULL;
     487                 :     }
     488                 : 
     489             303 :     *eptr = _eptr;
     490             303 :     *sptr = _sptr;
     491             303 : }
     492                 : 
     493                 : /* Returns if there is a part of the zset is in range. Should only be used
     494                 :  * internally by zzlFirstInRange and zzlLastInRange. */
     495            1259 : int zzlIsInRange(unsigned char *zl, zrangespec *range) {
     496                 :     unsigned char *p;
     497                 :     double score;
     498                 : 
     499                 :     /* Test for ranges that will always be empty. */
     500            2517 :     if (range->min > range->max ||
     501            1258 :             (range->min == range->max && (range->minex || range->maxex)))
     502               5 :         return 0;
     503                 : 
     504            1254 :     p = ziplistIndex(zl,-1); /* Last score. */
     505            1254 :     if (p == NULL) return 0; /* Empty sorted set */
     506            1254 :     score = zzlGetScore(p);
     507            1254 :     if (!zslValueGteMin(score,range))
     508               4 :         return 0;
     509                 : 
     510            1250 :     p = ziplistIndex(zl,1); /* First score. */
     511            1250 :     redisAssert(p != NULL);
     512            1250 :     score = zzlGetScore(p);
     513            1250 :     if (!zslValueLteMax(score,range))
     514               4 :         return 0;
     515                 : 
     516            1246 :     return 1;
     517                 : }
     518                 : 
     519                 : /* Find pointer to the first element contained in the specified range.
     520                 :  * Returns NULL when no element is contained in the range. */
     521            1241 : unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
     522            1241 :     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
     523                 :     double score;
     524                 : 
     525                 :     /* If everything is out of range, return early. */
     526            1241 :     if (!zzlIsInRange(zl,&range)) return NULL;
     527                 : 
     528           54883 :     while (eptr != NULL) {
     529           54883 :         sptr = ziplistNext(zl,eptr);
     530           54883 :         redisAssert(sptr != NULL);
     531                 : 
     532           54883 :         score = zzlGetScore(sptr);
     533           54883 :         if (zslValueGteMin(score,&range)) {
     534                 :             /* Check if score <= max. */
     535            1232 :             if (zslValueLteMax(score,&range))
     536            1228 :                 return eptr;
     537               4 :             return NULL;
     538                 :         }
     539                 : 
     540                 :         /* Move to next element. */
     541           53651 :         eptr = ziplistNext(zl,sptr);
     542                 :     }
     543                 : 
     544               0 :     return NULL;
     545                 : }
     546                 : 
     547                 : /* Find pointer to the last element contained in the specified range.
     548                 :  * Returns NULL when no element is contained in the range. */
     549              18 : unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
     550              18 :     unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
     551                 :     double score;
     552                 : 
     553                 :     /* If everything is out of range, return early. */
     554              18 :     if (!zzlIsInRange(zl,&range)) return NULL;
     555                 : 
     556              41 :     while (eptr != NULL) {
     557              41 :         sptr = ziplistNext(zl,eptr);
     558              41 :         redisAssert(sptr != NULL);
     559                 : 
     560              41 :         score = zzlGetScore(sptr);
     561              41 :         if (zslValueLteMax(score,&range)) {
     562                 :             /* Check if score >= min. */
     563              14 :             if (zslValueGteMin(score,&range))
     564              14 :                 return eptr;
     565               0 :             return NULL;
     566                 :         }
     567                 : 
     568                 :         /* Move to previous element by moving to the score of previous element.
     569                 :          * When this returns NULL, we know there also is no element. */
     570              27 :         sptr = ziplistPrev(zl,eptr);
     571              27 :         if (sptr != NULL)
     572              27 :             redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
     573                 :         else
     574               0 :             eptr = NULL;
     575                 :     }
     576                 : 
     577               0 :     return NULL;
     578                 : }
     579                 : 
     580           52266 : unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
     581           52266 :     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
     582                 : 
     583           52266 :     ele = getDecodedObject(ele);
     584         1996116 :     while (eptr != NULL) {
     585         1925025 :         sptr = ziplistNext(zl,eptr);
     586         1925025 :         redisAssertWithInfo(NULL,ele,sptr != NULL);
     587                 : 
     588         3850050 :         if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
     589                 :             /* Matching element, pull out score. */
     590           33441 :             if (score != NULL) *score = zzlGetScore(sptr);
     591           33441 :             decrRefCount(ele);
     592           33441 :             return eptr;
     593                 :         }
     594                 : 
     595                 :         /* Move to next element. */
     596         1891584 :         eptr = ziplistNext(zl,sptr);
     597                 :     }
     598                 : 
     599           18825 :     decrRefCount(ele);
     600           18825 :     return NULL;
     601                 : }
     602                 : 
     603                 : /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
     604                 :  * don't want to modify the one given as argument. */
     605            5451 : unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
     606            5451 :     unsigned char *p = eptr;
     607                 : 
     608                 :     /* TODO: add function to ziplist API to delete N elements from offset. */
     609            5451 :     zl = ziplistDelete(zl,&p);
     610            5451 :     zl = ziplistDelete(zl,&p);
     611            5451 :     return zl;
     612                 : }
     613                 : 
     614           20169 : unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
     615                 :     unsigned char *sptr;
     616                 :     char scorebuf[128];
     617                 :     int scorelen;
     618                 :     size_t offset;
     619                 : 
     620           20169 :     redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
     621           20169 :     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
     622           20169 :     if (eptr == NULL) {
     623           33618 :         zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
     624           16809 :         zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
     625                 :     } else {
     626                 :         /* Keep offset relative to zl, as it might be re-allocated. */
     627            3360 :         offset = eptr-zl;
     628            6720 :         zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
     629            3360 :         eptr = zl+offset;
     630                 : 
     631                 :         /* Insert score after the element. */
     632            3360 :         redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
     633            3360 :         zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
     634                 :     }
     635                 : 
     636           20169 :     return zl;
     637                 : }
     638                 : 
     639                 : /* Insert (element,score) pair in ziplist. This function assumes the element is
     640                 :  * not yet present in the list. */
     641           15914 : unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
     642           15914 :     unsigned char *eptr = ziplistIndex(zl,0), *sptr;
     643                 :     double s;
     644                 : 
     645           15914 :     ele = getDecodedObject(ele);
     646          155382 :     while (eptr != NULL) {
     647          126914 :         sptr = ziplistNext(zl,eptr);
     648          126914 :         redisAssertWithInfo(NULL,ele,sptr != NULL);
     649          126914 :         s = zzlGetScore(sptr);
     650                 : 
     651          126914 :         if (s > score) {
     652                 :             /* First element with score larger than score for element to be
     653                 :              * inserted. This means we should take its spot in the list to
     654                 :              * maintain ordering. */
     655            3259 :             zl = zzlInsertAt(zl,eptr,ele,score);
     656            3259 :             break;
     657          123655 :         } else if (s == score) {
     658                 :             /* Ensure lexicographical ordering for elements. */
     659            1548 :             if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
     660             101 :                 zl = zzlInsertAt(zl,eptr,ele,score);
     661             101 :                 break;
     662                 :             }
     663                 :         }
     664                 : 
     665                 :         /* Move to next element. */
     666          123554 :         eptr = ziplistNext(zl,sptr);
     667                 :     }
     668                 : 
     669                 :     /* Push on tail of list when it was not yet inserted. */
     670           15914 :     if (eptr == NULL)
     671           12554 :         zl = zzlInsertAt(zl,NULL,ele,score);
     672                 : 
     673           15914 :     decrRefCount(ele);
     674           15914 :     return zl;
     675                 : }
     676                 : 
     677              13 : unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
     678                 :     unsigned char *eptr, *sptr;
     679                 :     double score;
     680              13 :     unsigned long num = 0;
     681                 : 
     682              13 :     if (deleted != NULL) *deleted = 0;
     683                 : 
     684              13 :     eptr = zzlFirstInRange(zl,range);
     685              13 :     if (eptr == NULL) return zl;
     686                 : 
     687                 :     /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     688                 :      * byte and ziplistNext will return NULL. */
     689              50 :     while ((sptr = ziplistNext(zl,eptr)) != NULL) {
     690              44 :         score = zzlGetScore(sptr);
     691              44 :         if (zslValueLteMax(score,&range)) {
     692                 :             /* Delete both the element and the score. */
     693              38 :             zl = ziplistDelete(zl,&eptr);
     694              38 :             zl = ziplistDelete(zl,&eptr);
     695              38 :             num++;
     696                 :         } else {
     697                 :             /* No longer in range. */
     698                 :             break;
     699                 :         }
     700                 :     }
     701                 : 
     702              12 :     if (deleted != NULL) *deleted = num;
     703              12 :     return zl;
     704                 : }
     705                 : 
     706                 : /* Delete all the elements with rank between start and end from the skiplist.
     707                 :  * Start and end are inclusive. Note that start and end need to be 1-based */
     708               4 : unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
     709               4 :     unsigned int num = (end-start)+1;
     710               4 :     if (deleted) *deleted = num;
     711               4 :     zl = ziplistDeleteRange(zl,2*(start-1),2*num);
     712               4 :     return zl;
     713                 : }
     714                 : 
     715                 : /*-----------------------------------------------------------------------------
     716                 :  * Common sorted set API
     717                 :  *----------------------------------------------------------------------------*/
     718                 : 
     719           16031 : unsigned int zsetLength(robj *zobj) {
     720           16031 :     int length = -1;
     721           16031 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
     722            8612 :         length = zzlLength(zobj->ptr);
     723            7419 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
     724            7419 :         length = ((zset*)zobj->ptr)->zsl->length;
     725                 :     } else {
     726               0 :         redisPanic("Unknown sorted set encoding");
     727                 :     }
     728           16031 :     return length;
     729                 : }
     730                 : 
     731            1995 : void zsetConvert(robj *zobj, int encoding) {
     732                 :     zset *zs;
     733                 :     zskiplistNode *node, *next;
     734                 :     robj *ele;
     735                 :     double score;
     736                 : 
     737            1995 :     if (zobj->encoding == encoding) return;
     738            1995 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
     739             156 :         unsigned char *zl = zobj->ptr;
     740                 :         unsigned char *eptr, *sptr;
     741                 :         unsigned char *vstr;
     742                 :         unsigned int vlen;
     743                 :         long long vlong;
     744                 : 
     745             156 :         if (encoding != REDIS_ENCODING_SKIPLIST)
     746               0 :             redisPanic("Unknown target encoding");
     747                 : 
     748             156 :         zs = zmalloc(sizeof(*zs));
     749             156 :         zs->dict = dictCreate(&zsetDictType,NULL);
     750             156 :         zs->zsl = zslCreate();
     751                 : 
     752             156 :         eptr = ziplistIndex(zl,0);
     753             156 :         redisAssertWithInfo(NULL,zobj,eptr != NULL);
     754             156 :         sptr = ziplistNext(zl,eptr);
     755             156 :         redisAssertWithInfo(NULL,zobj,sptr != NULL);
     756                 : 
     757            1279 :         while (eptr != NULL) {
     758            1123 :             score = zzlGetScore(sptr);
     759            1123 :             redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
     760            1123 :             if (vstr == NULL)
     761             684 :                 ele = createStringObjectFromLongLong(vlong);
     762                 :             else
     763             439 :                 ele = createStringObject((char*)vstr,vlen);
     764                 : 
     765                 :             /* Has incremented refcount since it was just created. */
     766            1123 :             node = zslInsert(zs->zsl,score,ele);
     767            1123 :             redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
     768            1123 :             incrRefCount(ele); /* Added to dictionary. */
     769            1123 :             zzlNext(zl,&eptr,&sptr);
     770                 :         }
     771                 : 
     772             156 :         zfree(zobj->ptr);
     773             156 :         zobj->ptr = zs;
     774             156 :         zobj->encoding = REDIS_ENCODING_SKIPLIST;
     775            1839 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
     776            1839 :         unsigned char *zl = ziplistNew();
     777                 : 
     778            1839 :         if (encoding != REDIS_ENCODING_ZIPLIST)
     779               0 :             redisPanic("Unknown target encoding");
     780                 : 
     781                 :         /* Approach similar to zslFree(), since we want to free the skiplist at
     782                 :          * the same time as creating the ziplist. */
     783            1839 :         zs = zobj->ptr;
     784            1839 :         dictRelease(zs->dict);
     785            1839 :         node = zs->zsl->header->level[0].forward;
     786            1839 :         zfree(zs->zsl->header);
     787            1839 :         zfree(zs->zsl);
     788                 : 
     789            7933 :         while (node) {
     790            4255 :             ele = getDecodedObject(node->obj);
     791            4255 :             zl = zzlInsertAt(zl,NULL,ele,node->score);
     792            4255 :             decrRefCount(ele);
     793                 : 
     794            4255 :             next = node->level[0].forward;
     795            4255 :             zslFreeNode(node);
     796            4255 :             node = next;
     797                 :         }
     798                 : 
     799            1839 :         zfree(zs);
     800            1839 :         zobj->ptr = zl;
     801            1839 :         zobj->encoding = REDIS_ENCODING_ZIPLIST;
     802                 :     } else {
     803               0 :         redisPanic("Unknown sorted set encoding");
     804                 :     }
     805                 : }
     806                 : 
     807                 : /*-----------------------------------------------------------------------------
     808                 :  * Sorted set commands 
     809                 :  *----------------------------------------------------------------------------*/
     810                 : 
     811                 : /* This generic command implements both ZADD and ZINCRBY. */
     812           25226 : void zaddGenericCommand(redisClient *c, int incr) {
     813                 :     static char *nanerr = "resulting score is not a number (NaN)";
     814           25226 :     robj *key = c->argv[1];
     815                 :     robj *ele;
     816                 :     robj *zobj;
     817                 :     robj *curobj;
     818           25226 :     double score = 0, *scores, curscore = 0.0;
     819           25226 :     int j, elements = (c->argc-2)/2;
     820           25226 :     int added = 0;
     821                 : 
     822           25226 :     if (c->argc % 2) {
     823               2 :         addReply(c,shared.syntaxerr);
     824               2 :         return;
     825                 :     }
     826                 : 
     827                 :     /* Start parsing all the scores, we need to emit any syntax error
     828                 :      * before executing additions to the sorted set, as the command should
     829                 :      * either execute fully or nothing at all. */
     830           25224 :     scores = zmalloc(sizeof(double)*elements);
     831           52428 :     for (j = 0; j < elements; j++) {
     832           27210 :         if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
     833                 :             != REDIS_OK)
     834                 :         {
     835               6 :             zfree(scores);
     836               6 :             return;
     837                 :         }
     838                 :     }
     839                 : 
     840                 :     /* Lookup the key and create the sorted set if does not exist. */
     841           25218 :     zobj = lookupKeyWrite(c->db,key);
     842           25218 :     if (zobj == NULL) {
     843           31921 :         if (server.zset_max_ziplist_entries == 0 ||
     844           14609 :             server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
     845                 :         {
     846            2651 :             zobj = createZsetObject();
     847                 :         } else {
     848           12010 :             zobj = createZsetZiplistObject();
     849                 :         }
     850           14661 :         dbAdd(c->db,key,zobj);
     851                 :     } else {
     852           10557 :         if (zobj->type != REDIS_ZSET) {
     853               0 :             addReply(c,shared.wrongtypeerr);
     854               0 :             zfree(scores);
     855               0 :             return;
     856                 :         }
     857                 :     }
     858                 : 
     859           52416 :     for (j = 0; j < elements; j++) {
     860           27200 :         score = scores[j];
     861                 : 
     862           27200 :         if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
     863                 :             unsigned char *eptr;
     864                 : 
     865                 :             /* Prefer non-encoded element when dealing with ziplists. */
     866           18007 :             ele = c->argv[3+j*2];
     867           18007 :             if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
     868            3290 :                 if (incr) {
     869               5 :                     score += curscore;
     870               5 :                     if (isnan(score)) {
     871               1 :                         addReplyError(c,nanerr);
     872                 :                         /* Don't need to check if the sorted set is empty
     873                 :                          * because we know it has at least one element. */
     874               1 :                         zfree(scores);
     875               1 :                         return;
     876                 :                     }
     877                 :                 }
     878                 : 
     879                 :                 /* Remove and re-insert when score changed. */
     880            3289 :                 if (score != curscore) {
     881            1197 :                     zobj->ptr = zzlDelete(zobj->ptr,eptr);
     882            1197 :                     zobj->ptr = zzlInsert(zobj->ptr,ele,score);
     883                 : 
     884            1197 :                     signalModifiedKey(c->db,key);
     885            1197 :                     server.dirty++;
     886                 :                 }
     887                 :             } else {
     888                 :                 /* Optimize: check if the element is too large or the list
     889                 :                  * becomes too long *before* executing zzlInsert. */
     890           14717 :                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
     891           14717 :                 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
     892               4 :                     zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
     893           29434 :                 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
     894             139 :                     zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
     895                 : 
     896           14717 :                 signalModifiedKey(c->db,key);
     897           14717 :                 server.dirty++;
     898           14717 :                 if (!incr) added++;
     899                 :             }
     900            9193 :         } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
     901            9193 :             zset *zs = zobj->ptr;
     902                 :             zskiplistNode *znode;
     903                 :             dictEntry *de;
     904                 : 
     905            9193 :             ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
     906            9193 :             de = dictFind(zs->dict,ele);
     907            9193 :             if (de != NULL) {
     908            1758 :                 curobj = dictGetKey(de);
     909            1758 :                 curscore = *(double*)dictGetVal(de);
     910                 : 
     911            1758 :                 if (incr) {
     912               5 :                     score += curscore;
     913               5 :                     if (isnan(score)) {
     914               1 :                         addReplyError(c,nanerr);
     915                 :                         /* Don't need to check if the sorted set is empty
     916                 :                          * because we know it has at least one element. */
     917               1 :                         zfree(scores);
     918               1 :                         return;
     919                 :                     }
     920                 :                 }
     921                 : 
     922                 :                 /* Remove and re-insert when score changed. We can safely
     923                 :                  * delete the key object from the skiplist, since the
     924                 :                  * dictionary still has a reference to it. */
     925            1757 :                 if (score != curscore) {
     926            1308 :                     redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
     927            1308 :                     znode = zslInsert(zs->zsl,score,curobj);
     928            1308 :                     incrRefCount(curobj); /* Re-inserted in skiplist. */
     929            1308 :                     dictGetVal(de) = &znode->score; /* Update score ptr. */
     930                 : 
     931            1308 :                     signalModifiedKey(c->db,key);
     932            1308 :                     server.dirty++;
     933                 :                 }
     934                 :             } else {
     935            7435 :                 znode = zslInsert(zs->zsl,score,ele);
     936            7435 :                 incrRefCount(ele); /* Inserted in skiplist. */
     937            7435 :                 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
     938            7435 :                 incrRefCount(ele); /* Added to dictionary. */
     939                 : 
     940            7435 :                 signalModifiedKey(c->db,key);
     941            7435 :                 server.dirty++;
     942            7435 :                 if (!incr) added++;
     943                 :             }
     944                 :         } else {
     945               0 :             redisPanic("Unknown sorted set encoding");
     946                 :         }
     947                 :     }
     948           25216 :     zfree(scores);
     949           25216 :     if (incr) /* ZINCRBY */
     950              14 :         addReplyDouble(c,score);
     951                 :     else /* ZADD */
     952           25202 :         addReplyLongLong(c,added);
     953                 : }
     954                 : 
     955           25210 : void zaddCommand(redisClient *c) {
     956           25210 :     zaddGenericCommand(c,0);
     957           25210 : }
     958                 : 
     959              16 : void zincrbyCommand(redisClient *c) {
     960              16 :     zaddGenericCommand(c,1);
     961              16 : }
     962                 : 
     963            6291 : void zremCommand(redisClient *c) {
     964            6291 :     robj *key = c->argv[1];
     965                 :     robj *zobj;
     966            6291 :     int deleted = 0, j;
     967                 : 
     968           12582 :     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
     969            6291 :         checkType(c,zobj,REDIS_ZSET)) return;
     970                 : 
     971            6291 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
     972                 :         unsigned char *eptr;
     973                 : 
     974            5673 :         for (j = 2; j < c->argc; j++) {
     975            4782 :             if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
     976            4254 :                 deleted++;
     977            4254 :                 zobj->ptr = zzlDelete(zobj->ptr,eptr);
     978            4254 :                 if (zzlLength(zobj->ptr) == 0) {
     979            3884 :                     dbDelete(c->db,key);
     980            3884 :                     break;
     981                 :                 }
     982                 :             }
     983                 :         }
     984            1516 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
     985            1516 :         zset *zs = zobj->ptr;
     986                 :         dictEntry *de;
     987                 :         double score;
     988                 : 
     989            2174 :         for (j = 2; j < c->argc; j++) {
     990            1523 :             de = dictFind(zs->dict,c->argv[j]);
     991            1523 :             if (de != NULL) {
     992            1204 :                 deleted++;
     993                 : 
     994                 :                 /* Delete from the skiplist */
     995            1204 :                 score = *(double*)dictGetVal(de);
     996            1204 :                 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
     997                 : 
     998                 :                 /* Delete from the hash table */
     999            1204 :                 dictDelete(zs->dict,c->argv[j]);
    1000            1204 :                 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
    1001            1204 :                 if (dictSize(zs->dict) == 0) {
    1002             865 :                     dbDelete(c->db,key);
    1003             865 :                     break;
    1004                 :                 }
    1005                 :             }
    1006                 :         }
    1007                 :     } else {
    1008               0 :         redisPanic("Unknown sorted set encoding");
    1009                 :     }
    1010                 : 
    1011            6291 :     if (deleted) {
    1012            5452 :         signalModifiedKey(c->db,key);
    1013            5452 :         server.dirty += deleted;
    1014                 :     }
    1015            6291 :     addReplyLongLong(c,deleted);
    1016                 : }
    1017                 : 
    1018              32 : void zremrangebyscoreCommand(redisClient *c) {
    1019              32 :     robj *key = c->argv[1];
    1020                 :     robj *zobj;
    1021                 :     zrangespec range;
    1022                 :     unsigned long deleted;
    1023                 : 
    1024                 :     /* Parse the range arguments. */
    1025              32 :     if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
    1026               6 :         addReplyError(c,"min or max is not a float");
    1027               6 :         return;
    1028                 :     }
    1029                 : 
    1030              52 :     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
    1031              26 :         checkType(c,zobj,REDIS_ZSET)) return;
    1032                 : 
    1033              26 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    1034              13 :         zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
    1035              13 :         if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
    1036              13 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    1037              13 :         zset *zs = zobj->ptr;
    1038              13 :         deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
    1039              13 :         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
    1040              13 :         if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
    1041                 :     } else {
    1042               0 :         redisPanic("Unknown sorted set encoding");
    1043                 :     }
    1044                 : 
    1045              26 :     if (deleted) signalModifiedKey(c->db,key);
    1046              26 :     server.dirty += deleted;
    1047              26 :     addReplyLongLong(c,deleted);
    1048                 : }
    1049                 : 
    1050              12 : void zremrangebyrankCommand(redisClient *c) {
    1051              12 :     robj *key = c->argv[1];
    1052                 :     robj *zobj;
    1053                 :     long start;
    1054                 :     long end;
    1055                 :     int llen;
    1056                 :     unsigned long deleted;
    1057                 : 
    1058              24 :     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
    1059              12 :         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
    1060                 : 
    1061              24 :     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
    1062              12 :         checkType(c,zobj,REDIS_ZSET)) return;
    1063                 : 
    1064                 :     /* Sanitize indexes. */
    1065              12 :     llen = zsetLength(zobj);
    1066              12 :     if (start < 0) start = llen+start;
    1067              12 :     if (end < 0) end = llen+end;
    1068              12 :     if (start < 0) start = 0;
    1069                 : 
    1070                 :     /* Invariant: start >= 0, so this test will be true when end < 0.
    1071                 :      * The range is empty when start > end or start >= length. */
    1072              12 :     if (start > end || start >= llen) {
    1073               4 :         addReply(c,shared.czero);
    1074               4 :         return;
    1075                 :     }
    1076               8 :     if (end >= llen) end = llen-1;
    1077                 : 
    1078               8 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    1079                 :         /* Correct for 1-based rank. */
    1080               4 :         zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
    1081               4 :         if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
    1082               4 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    1083               4 :         zset *zs = zobj->ptr;
    1084                 : 
    1085                 :         /* Correct for 1-based rank. */
    1086               4 :         deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
    1087               4 :         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
    1088               4 :         if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
    1089                 :     } else {
    1090               0 :         redisPanic("Unknown sorted set encoding");
    1091                 :     }
    1092                 : 
    1093               8 :     if (deleted) signalModifiedKey(c->db,key);
    1094               8 :     server.dirty += deleted;
    1095               8 :     addReplyLongLong(c,deleted);
    1096                 : }
    1097                 : 
    1098                 : typedef struct {
    1099                 :     robj *subject;
    1100                 :     int type; /* Set, sorted set */
    1101                 :     int encoding;
    1102                 :     double weight;
    1103                 : 
    1104                 :     union {
    1105                 :         /* Set iterators. */
    1106                 :         union _iterset {
    1107                 :             struct {
    1108                 :                 intset *is;
    1109                 :                 int ii;
    1110                 :             } is;
    1111                 :             struct {
    1112                 :                 dict *dict;
    1113                 :                 dictIterator *di;
    1114                 :                 dictEntry *de;
    1115                 :             } ht;
    1116                 :         } set;
    1117                 : 
    1118                 :         /* Sorted set iterators. */
    1119                 :         union _iterzset {
    1120                 :             struct {
    1121                 :                 unsigned char *zl;
    1122                 :                 unsigned char *eptr, *sptr;
    1123                 :             } zl;
    1124                 :             struct {
    1125                 :                 zset *zs;
    1126                 :                 zskiplistNode *node;
    1127                 :             } sl;
    1128                 :         } zset;
    1129                 :     } iter;
    1130                 : } zsetopsrc;
    1131                 : 
    1132                 : 
    1133                 : /* Use dirty flags for pointers that need to be cleaned up in the next
    1134                 :  * iteration over the zsetopval. The dirty flag for the long long value is
    1135                 :  * special, since long long values don't need cleanup. Instead, it means that
    1136                 :  * we already checked that "ell" holds a long long, or tried to convert another
    1137                 :  * representation into a long long value. When this was successful,
    1138                 :  * OPVAL_VALID_LL is set as well. */
    1139                 : #define OPVAL_DIRTY_ROBJ 1
    1140                 : #define OPVAL_DIRTY_LL 2
    1141                 : #define OPVAL_VALID_LL 4
    1142                 : 
    1143                 : /* Store value retrieved from the iterator. */
    1144                 : typedef struct {
    1145                 :     int flags;
    1146                 :     unsigned char _buf[32]; /* Private buffer. */
    1147                 :     robj *ele;
    1148                 :     unsigned char *estr;
    1149                 :     unsigned int elen;
    1150                 :     long long ell;
    1151                 :     double score;
    1152                 : } zsetopval;
    1153                 : 
    1154                 : typedef union _iterset iterset;
    1155                 : typedef union _iterzset iterzset;
    1156                 : 
    1157            9457 : void zuiInitIterator(zsetopsrc *op) {
    1158            9457 :     if (op->subject == NULL)
    1159                 :         return;
    1160                 : 
    1161            9453 :     if (op->type == REDIS_SET) {
    1162               6 :         iterset *it = &op->iter.set;
    1163               6 :         if (op->encoding == REDIS_ENCODING_INTSET) {
    1164               1 :             it->is.is = op->subject->ptr;
    1165               1 :             it->is.ii = 0;
    1166               5 :         } else if (op->encoding == REDIS_ENCODING_HT) {
    1167               5 :             it->ht.dict = op->subject->ptr;
    1168               5 :             it->ht.di = dictGetIterator(op->subject->ptr);
    1169               5 :             it->ht.de = dictNext(it->ht.di);
    1170                 :         } else {
    1171               0 :             redisPanic("Unknown set encoding");
    1172                 :         }
    1173            9447 :     } else if (op->type == REDIS_ZSET) {
    1174            9447 :         iterzset *it = &op->iter.zset;
    1175            9447 :         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
    1176            7425 :             it->zl.zl = op->subject->ptr;
    1177            7425 :             it->zl.eptr = ziplistIndex(it->zl.zl,0);
    1178            7425 :             if (it->zl.eptr != NULL) {
    1179            7425 :                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
    1180            7425 :                 redisAssert(it->zl.sptr != NULL);
    1181                 :             }
    1182            2022 :         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
    1183            2022 :             it->sl.zs = op->subject->ptr;
    1184            2022 :             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
    1185                 :         } else {
    1186               0 :             redisPanic("Unknown sorted set encoding");
    1187                 :         }
    1188                 :     } else {
    1189               0 :         redisPanic("Unsupported type");
    1190                 :     }
    1191                 : }
    1192                 : 
    1193            9457 : void zuiClearIterator(zsetopsrc *op) {
    1194            9457 :     if (op->subject == NULL)
    1195                 :         return;
    1196                 : 
    1197            9453 :     if (op->type == REDIS_SET) {
    1198               6 :         iterset *it = &op->iter.set;
    1199               6 :         if (op->encoding == REDIS_ENCODING_INTSET) {
    1200                 :             REDIS_NOTUSED(it); /* skip */
    1201               5 :         } else if (op->encoding == REDIS_ENCODING_HT) {
    1202               5 :             dictReleaseIterator(it->ht.di);
    1203                 :         } else {
    1204               0 :             redisPanic("Unknown set encoding");
    1205                 :         }
    1206            9447 :     } else if (op->type == REDIS_ZSET) {
    1207            9447 :         iterzset *it = &op->iter.zset;
    1208            9447 :         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
    1209                 :             REDIS_NOTUSED(it); /* skip */
    1210            2022 :         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
    1211                 :             REDIS_NOTUSED(it); /* skip */
    1212                 :         } else {
    1213               0 :             redisPanic("Unknown sorted set encoding");
    1214                 :         }
    1215                 :     } else {
    1216               0 :         redisPanic("Unsupported type");
    1217                 :     }
    1218                 : }
    1219                 : 
    1220           17123 : int zuiLength(zsetopsrc *op) {
    1221           17123 :     if (op->subject == NULL)
    1222               6 :         return 0;
    1223                 : 
    1224           17117 :     if (op->type == REDIS_SET) {
    1225              11 :         iterset *it = &op->iter.set;
    1226              11 :         if (op->encoding == REDIS_ENCODING_INTSET) {
    1227               1 :             return intsetLen(it->is.is);
    1228              10 :         } else if (op->encoding == REDIS_ENCODING_HT) {
    1229              10 :             return dictSize(it->ht.dict);
    1230                 :         } else {
    1231               0 :             redisPanic("Unknown set encoding");
    1232                 :         }
    1233           17106 :     } else if (op->type == REDIS_ZSET) {
    1234           17106 :         iterzset *it = &op->iter.zset;
    1235           17106 :         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
    1236           13504 :             return zzlLength(it->zl.zl);
    1237            3602 :         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
    1238            3602 :             return it->sl.zs->zsl->length;
    1239                 :         } else {
    1240               0 :             redisPanic("Unknown sorted set encoding");
    1241                 :         }
    1242                 :     } else {
    1243               0 :         redisPanic("Unsupported type");
    1244                 :     }
    1245                 : }
    1246                 : 
    1247                 : /* Check if the current value is valid. If so, store it in the passed structure
    1248                 :  * and move to the next element. If not valid, this means we have reached the
    1249                 :  * end of the structure and can abort. */
    1250           16809 : int zuiNext(zsetopsrc *op, zsetopval *val) {
    1251           16809 :     if (op->subject == NULL)
    1252               0 :         return 0;
    1253                 : 
    1254           16809 :     if (val->flags & OPVAL_DIRTY_ROBJ)
    1255            6882 :         decrRefCount(val->ele);
    1256                 : 
    1257                 :     memset(val,0,sizeof(zsetopval));
    1258                 : 
    1259           16809 :     if (op->type == REDIS_SET) {
    1260              18 :         iterset *it = &op->iter.set;
    1261              18 :         if (op->encoding == REDIS_ENCODING_INTSET) {
    1262               0 :             if (!intsetGet(it->is.is,it->is.ii,(int64_t*)&val->ell))
    1263               0 :                 return 0;
    1264               0 :             val->score = 1.0;
    1265                 : 
    1266                 :             /* Move to next element. */
    1267               0 :             it->is.ii++;
    1268              18 :         } else if (op->encoding == REDIS_ENCODING_HT) {
    1269              18 :             if (it->ht.de == NULL)
    1270               5 :                 return 0;
    1271              13 :             val->ele = dictGetKey(it->ht.de);
    1272              13 :             val->score = 1.0;
    1273                 : 
    1274                 :             /* Move to next element. */
    1275              13 :             it->ht.de = dictNext(it->ht.di);
    1276                 :         } else {
    1277               0 :             redisPanic("Unknown set encoding");
    1278                 :         }
    1279           16791 :     } else if (op->type == REDIS_ZSET) {
    1280           16791 :         iterzset *it = &op->iter.zset;
    1281           16791 :         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
    1282                 :             /* No need to check both, but better be explicit. */
    1283           12961 :             if (it->zl.eptr == NULL || it->zl.sptr == NULL)
    1284            6079 :                 return 0;
    1285            6882 :             redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
    1286            6882 :             val->score = zzlGetScore(it->zl.sptr);
    1287                 : 
    1288                 :             /* Move to next element. */
    1289            6882 :             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
    1290            3830 :         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
    1291            3830 :             if (it->sl.node == NULL)
    1292            1581 :                 return 0;
    1293            2249 :             val->ele = it->sl.node->obj;
    1294            2249 :             val->score = it->sl.node->score;
    1295                 : 
    1296                 :             /* Move to next element. */
    1297            2249 :             it->sl.node = it->sl.node->level[0].forward;
    1298                 :         } else {
    1299               0 :             redisPanic("Unknown sorted set encoding");
    1300                 :         }
    1301                 :     } else {
    1302               0 :         redisPanic("Unsupported type");
    1303                 :     }
    1304            9144 :     return 1;
    1305                 : }
    1306                 : 
    1307               1 : int zuiLongLongFromValue(zsetopval *val) {
    1308               1 :     if (!(val->flags & OPVAL_DIRTY_LL)) {
    1309               1 :         val->flags |= OPVAL_DIRTY_LL;
    1310                 : 
    1311               1 :         if (val->ele != NULL) {
    1312               1 :             if (val->ele->encoding == REDIS_ENCODING_INT) {
    1313               0 :                 val->ell = (long)val->ele->ptr;
    1314               0 :                 val->flags |= OPVAL_VALID_LL;
    1315               1 :             } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
    1316               2 :                 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
    1317               0 :                     val->flags |= OPVAL_VALID_LL;
    1318                 :             } else {
    1319               0 :                 redisPanic("Unsupported element encoding");
    1320                 :             }
    1321               0 :         } else if (val->estr != NULL) {
    1322               0 :             if (string2ll((char*)val->estr,val->elen,&val->ell))
    1323               0 :                 val->flags |= OPVAL_VALID_LL;
    1324                 :         } else {
    1325                 :             /* The long long was already set, flag as valid. */
    1326               0 :             val->flags |= OPVAL_VALID_LL;
    1327                 :         }
    1328                 :     }
    1329               1 :     return val->flags & OPVAL_VALID_LL;
    1330                 : }
    1331                 : 
    1332           19432 : robj *zuiObjectFromValue(zsetopval *val) {
    1333           19432 :     if (val->ele == NULL) {
    1334            6882 :         if (val->estr != NULL) {
    1335             535 :             val->ele = createStringObject((char*)val->estr,val->elen);
    1336                 :         } else {
    1337            6347 :             val->ele = createStringObjectFromLongLong(val->ell);
    1338                 :         }
    1339            6882 :         val->flags |= OPVAL_DIRTY_ROBJ;
    1340                 :     }
    1341           19432 :     return val->ele;
    1342                 : }
    1343                 : 
    1344               0 : int zuiBufferFromValue(zsetopval *val) {
    1345               0 :     if (val->estr == NULL) {
    1346               0 :         if (val->ele != NULL) {
    1347               0 :             if (val->ele->encoding == REDIS_ENCODING_INT) {
    1348               0 :                 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
    1349               0 :                 val->estr = val->_buf;
    1350               0 :             } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
    1351               0 :                 val->elen = sdslen(val->ele->ptr);
    1352               0 :                 val->estr = val->ele->ptr;
    1353                 :             } else {
    1354               0 :                 redisPanic("Unsupported element encoding");
    1355                 :             }
    1356                 :         } else {
    1357               0 :             val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
    1358               0 :             val->estr = val->_buf;
    1359                 :         }
    1360                 :     }
    1361               0 :     return 1;
    1362                 : }
    1363                 : 
    1364                 : /* Find value pointed to by val in the source pointer to by op. When found,
    1365                 :  * return 1 and store its score in target. Return 0 otherwise. */
    1366            4864 : int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
    1367            4864 :     if (op->subject == NULL)
    1368               0 :         return 0;
    1369                 : 
    1370            4864 :     if (op->type == REDIS_SET) {
    1371               1 :         iterset *it = &op->iter.set;
    1372                 : 
    1373               1 :         if (op->encoding == REDIS_ENCODING_INTSET) {
    1374               1 :             if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
    1375               0 :                 *score = 1.0;
    1376               0 :                 return 1;
    1377                 :             } else {
    1378               1 :                 return 0;
    1379                 :             }
    1380               0 :         } else if (op->encoding == REDIS_ENCODING_HT) {
    1381               0 :             zuiObjectFromValue(val);
    1382               0 :             if (dictFind(it->ht.dict,val->ele) != NULL) {
    1383               0 :                 *score = 1.0;
    1384               0 :                 return 1;
    1385                 :             } else {
    1386               0 :                 return 0;
    1387                 :             }
    1388                 :         } else {
    1389               0 :             redisPanic("Unknown set encoding");
    1390                 :         }
    1391            4863 :     } else if (op->type == REDIS_ZSET) {
    1392            4863 :         iterzset *it = &op->iter.zset;
    1393            4863 :         zuiObjectFromValue(val);
    1394                 : 
    1395            4863 :         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
    1396            3610 :             if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
    1397                 :                 /* Score is already set by zzlFind. */
    1398              30 :                 return 1;
    1399                 :             } else {
    1400            3580 :                 return 0;
    1401                 :             }
    1402            1253 :         } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
    1403                 :             dictEntry *de;
    1404            1253 :             if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
    1405              32 :                 *score = *(double*)dictGetVal(de);
    1406              32 :                 return 1;
    1407                 :             } else {
    1408            1221 :                 return 0;
    1409                 :             }
    1410                 :         } else {
    1411               0 :             redisPanic("Unknown sorted set encoding");
    1412                 :         }
    1413                 :     } else {
    1414               0 :         redisPanic("Unsupported type");
    1415                 :     }
    1416                 : }
    1417                 : 
    1418            4727 : int zuiCompareByCardinality(const void *s1, const void *s2) {
    1419            4727 :     return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
    1420                 : }
    1421                 : 
    1422                 : #define REDIS_AGGR_SUM 1
    1423                 : #define REDIS_AGGR_MIN 2
    1424                 : #define REDIS_AGGR_MAX 3
    1425                 : #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
    1426                 : 
    1427              89 : inline static void zunionInterAggregate(double *target, double val, int aggregate) {
    1428              89 :     if (aggregate == REDIS_AGGR_SUM) {
    1429              73 :         *target = *target + val;
    1430                 :         /* The result of adding two doubles is NaN when one variable
    1431                 :          * is +inf and the other is -inf. When these numbers are added,
    1432                 :          * we maintain the convention of the result being 0.0. */
    1433              73 :         if (isnan(*target)) *target = 0.0;
    1434              16 :     } else if (aggregate == REDIS_AGGR_MIN) {
    1435               8 :         *target = val < *target ? val : *target;
    1436               8 :     } else if (aggregate == REDIS_AGGR_MAX) {
    1437               8 :         *target = val > *target ? val : *target;
    1438                 :     } else {
    1439                 :         /* safety net */
    1440               0 :         redisPanic("Unknown ZUNION/INTER aggregate type");
    1441                 :     }
    1442              89 : }
    1443                 : 
    1444            4734 : void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
    1445                 :     int i, j;
    1446                 :     long setnum;
    1447            4734 :     int aggregate = REDIS_AGGR_SUM;
    1448                 :     zsetopsrc *src;
    1449                 :     zsetopval zval;
    1450                 :     robj *tmp;
    1451            4734 :     unsigned int maxelelen = 0;
    1452                 :     robj *dstobj;
    1453                 :     zset *dstzset;
    1454                 :     zskiplistNode *znode;
    1455            4734 :     int touched = 0;
    1456                 : 
    1457                 :     /* expect setnum input keys to be given */
    1458            4734 :     if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
    1459                 :         return;
    1460                 : 
    1461            4734 :     if (setnum < 1) {
    1462               0 :         addReplyError(c,
    1463                 :             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
    1464               0 :         return;
    1465                 :     }
    1466                 : 
    1467                 :     /* test if the expected number of keys would overflow */
    1468            4734 :     if (3+setnum > c->argc) {
    1469               0 :         addReply(c,shared.syntaxerr);
    1470               0 :         return;
    1471                 :     }
    1472                 : 
    1473                 :     /* read keys to be used for input */
    1474            4734 :     src = zcalloc(sizeof(zsetopsrc) * setnum);
    1475           14199 :     for (i = 0, j = 3; i < setnum; i++, j++) {
    1476            9465 :         robj *obj = lookupKeyWrite(c->db,c->argv[j]);
    1477            9465 :         if (obj != NULL) {
    1478            9461 :             if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
    1479               0 :                 zfree(src);
    1480               0 :                 addReply(c,shared.wrongtypeerr);
    1481               0 :                 return;
    1482                 :             }
    1483                 : 
    1484            9461 :             src[i].subject = obj;
    1485            9461 :             src[i].type = obj->type;
    1486            9461 :             src[i].encoding = obj->encoding;
    1487                 :         } else {
    1488               4 :             src[i].subject = NULL;
    1489                 :         }
    1490                 : 
    1491                 :         /* Default all weights to 1. */
    1492            9465 :         src[i].weight = 1.0;
    1493                 :     }
    1494                 : 
    1495                 :     /* parse optional extra arguments */
    1496            4734 :     if (j < c->argc) {
    1497              21 :         int remaining = c->argc - j;
    1498                 : 
    1499              59 :         while (remaining) {
    1500              21 :             if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
    1501              13 :                 j++; remaining--;
    1502              30 :                 for (i = 0; i < setnum; i++, j++, remaining--) {
    1503              21 :                     if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
    1504                 :                             "weight value is not a float") != REDIS_OK)
    1505                 :                     {
    1506               4 :                         zfree(src);
    1507               4 :                         return;
    1508                 :                     }
    1509                 :                 }
    1510              16 :             } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
    1511               8 :                 j++; remaining--;
    1512               8 :                 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
    1513               0 :                     aggregate = REDIS_AGGR_SUM;
    1514               8 :                 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
    1515               4 :                     aggregate = REDIS_AGGR_MIN;
    1516               4 :                 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
    1517               4 :                     aggregate = REDIS_AGGR_MAX;
    1518                 :                 } else {
    1519               0 :                     zfree(src);
    1520               0 :                     addReply(c,shared.syntaxerr);
    1521               0 :                     return;
    1522                 :                 }
    1523               8 :                 j++; remaining--;
    1524                 :             } else {
    1525               0 :                 zfree(src);
    1526               0 :                 addReply(c,shared.syntaxerr);
    1527               0 :                 return;
    1528                 :             }
    1529                 :         }
    1530                 :     }
    1531                 : 
    1532           14187 :     for (i = 0; i < setnum; i++)
    1533            9457 :         zuiInitIterator(&src[i]);
    1534                 : 
    1535                 :     /* sort sets from the smallest to largest, this will improve our
    1536                 :      * algorithm's performance */
    1537            4730 :     qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
    1538                 : 
    1539            4730 :     dstobj = createZsetObject();
    1540            4730 :     dstzset = dstobj->ptr;
    1541                 :     memset(&zval, 0, sizeof(zval));
    1542                 : 
    1543            4730 :     if (op == REDIS_OP_INTER) {
    1544                 :         /* Skip everything if the smallest input is empty. */
    1545            1788 :         if (zuiLength(&src[0]) > 0) {
    1546                 :             /* Precondition: as src[0] is non-empty and the inputs are ordered
    1547                 :              * by size, all src[i > 0] are non-empty too. */
    1548            3650 :             while (zuiNext(&src[0],&zval)) {
    1549                 :                 double score, value;
    1550                 : 
    1551            1862 :                 score = src[0].weight * zval.score;
    1552            1862 :                 if (isnan(score)) score = 0;
    1553                 : 
    1554            1909 :                 for (j = 1; j < setnum; j++) {
    1555                 :                     /* It is not safe to access the zset we are
    1556                 :                      * iterating, so explicitly check for equal object. */
    1557            1862 :                     if (src[j].subject == src[0].subject) {
    1558              13 :                         value = zval.score*src[j].weight;
    1559              13 :                         zunionInterAggregate(&score,value,aggregate);
    1560            1849 :                     } else if (zuiFind(&src[j],&zval,&value)) {
    1561              34 :                         value *= src[j].weight;
    1562              34 :                         zunionInterAggregate(&score,value,aggregate);
    1563                 :                     } else {
    1564                 :                         break;
    1565                 :                     }
    1566                 :                 }
    1567                 : 
    1568                 :                 /* Only continue when present in every input. */
    1569            1862 :                 if (j == setnum) {
    1570              47 :                     tmp = zuiObjectFromValue(&zval);
    1571              47 :                     znode = zslInsert(dstzset->zsl,score,tmp);
    1572              47 :                     incrRefCount(tmp); /* added to skiplist */
    1573              47 :                     dictAdd(dstzset->dict,tmp,&znode->score);
    1574              47 :                     incrRefCount(tmp); /* added to dictionary */
    1575                 : 
    1576              47 :                     if (tmp->encoding == REDIS_ENCODING_RAW)
    1577              56 :                         if (sdslen(tmp->ptr) > maxelelen)
    1578              36 :                             maxelelen = sdslen(tmp->ptr);
    1579                 :                 }
    1580                 :             }
    1581                 :         }
    1582            2942 :     } else if (op == REDIS_OP_UNION) {
    1583            8823 :         for (i = 0; i < setnum; i++) {
    1584            5881 :             if (zuiLength(&src[i]) == 0)
    1585               4 :                 continue;
    1586                 : 
    1587           13159 :             while (zuiNext(&src[i],&zval)) {
    1588                 :                 double score, value;
    1589                 : 
    1590                 :                 /* Skip key when already processed */
    1591            7282 :                 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
    1592              42 :                     continue;
    1593                 : 
    1594                 :                 /* Initialize score */
    1595            7240 :                 score = src[i].weight * zval.score;
    1596            7240 :                 if (isnan(score)) score = 0;
    1597                 : 
    1598                 :                 /* Because the inputs are sorted by size, it's only possible
    1599                 :                  * for sets at larger indices to hold this element. */
    1600           10269 :                 for (j = (i+1); j < setnum; j++) {
    1601                 :                     /* It is not safe to access the zset we are
    1602                 :                      * iterating, so explicitly check for equal object. */
    1603            3029 :                     if(src[j].subject == src[i].subject) {
    1604              14 :                         value = zval.score*src[j].weight;
    1605              14 :                         zunionInterAggregate(&score,value,aggregate);
    1606            3015 :                     } else if (zuiFind(&src[j],&zval,&value)) {
    1607              28 :                         value *= src[j].weight;
    1608              28 :                         zunionInterAggregate(&score,value,aggregate);
    1609                 :                     }
    1610                 :                 }
    1611                 : 
    1612            7240 :                 tmp = zuiObjectFromValue(&zval);
    1613            7240 :                 znode = zslInsert(dstzset->zsl,score,tmp);
    1614            7240 :                 incrRefCount(zval.ele); /* added to skiplist */
    1615            7240 :                 dictAdd(dstzset->dict,tmp,&znode->score);
    1616            7240 :                 incrRefCount(zval.ele); /* added to dictionary */
    1617                 : 
    1618            7240 :                 if (tmp->encoding == REDIS_ENCODING_RAW)
    1619            3560 :                     if (sdslen(tmp->ptr) > maxelelen)
    1620            3152 :                         maxelelen = sdslen(tmp->ptr);
    1621                 :             }
    1622                 :         }
    1623                 :     } else {
    1624               0 :         redisPanic("Unknown operator");
    1625                 :     }
    1626                 : 
    1627           14187 :     for (i = 0; i < setnum; i++)
    1628            9457 :         zuiClearIterator(&src[i]);
    1629                 : 
    1630            4730 :     if (dbDelete(c->db,dstkey)) {
    1631             791 :         signalModifiedKey(c->db,dstkey);
    1632             791 :         touched = 1;
    1633             791 :         server.dirty++;
    1634                 :     }
    1635            4730 :     if (dstzset->zsl->length) {
    1636                 :         /* Convert to ziplist when in limits. */
    1637            5926 :         if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
    1638            2953 :             maxelelen <= server.zset_max_ziplist_value)
    1639            1828 :                 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
    1640                 : 
    1641            2973 :         dbAdd(c->db,dstkey,dstobj);
    1642            2973 :         addReplyLongLong(c,zsetLength(dstobj));
    1643            2973 :         if (!touched) signalModifiedKey(c->db,dstkey);
    1644            2973 :         server.dirty++;
    1645                 :     } else {
    1646            1757 :         decrRefCount(dstobj);
    1647            1757 :         addReply(c,shared.czero);
    1648                 :     }
    1649            4730 :     zfree(src);
    1650                 : }
    1651                 : 
    1652            2944 : void zunionstoreCommand(redisClient *c) {
    1653            2944 :     zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
    1654            2944 : }
    1655                 : 
    1656            1790 : void zinterstoreCommand(redisClient *c) {
    1657            1790 :     zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
    1658            1790 : }
    1659                 : 
    1660            4281 : void zrangeGenericCommand(redisClient *c, int reverse) {
    1661            4281 :     robj *key = c->argv[1];
    1662                 :     robj *zobj;
    1663            4281 :     int withscores = 0;
    1664                 :     long start;
    1665                 :     long end;
    1666                 :     int llen;
    1667                 :     int rangelen;
    1668                 : 
    1669            8562 :     if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
    1670            4281 :         (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
    1671                 : 
    1672            4453 :     if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
    1673             172 :         withscores = 1;
    1674            4109 :     } else if (c->argc >= 5) {
    1675               0 :         addReply(c,shared.syntaxerr);
    1676               0 :         return;
    1677                 :     }
    1678                 : 
    1679            8558 :     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
    1680            4277 :          || checkType(c,zobj,REDIS_ZSET)) return;
    1681                 : 
    1682                 :     /* Sanitize indexes. */
    1683            4277 :     llen = zsetLength(zobj);
    1684            4277 :     if (start < 0) start = llen+start;
    1685            4277 :     if (end < 0) end = llen+end;
    1686            4277 :     if (start < 0) start = 0;
    1687                 : 
    1688                 :     /* Invariant: start >= 0, so this test will be true when end < 0.
    1689                 :      * The range is empty when start > end or start >= length. */
    1690            4277 :     if (start > end || start >= llen) {
    1691              16 :         addReply(c,shared.emptymultibulk);
    1692              16 :         return;
    1693                 :     }
    1694            4261 :     if (end >= llen) end = llen-1;
    1695            4261 :     rangelen = (end-start)+1;
    1696                 : 
    1697                 :     /* Return the result in form of a multi-bulk reply */
    1698            4261 :     addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
    1699                 : 
    1700            4261 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    1701            2171 :         unsigned char *zl = zobj->ptr;
    1702                 :         unsigned char *eptr, *sptr;
    1703                 :         unsigned char *vstr;
    1704                 :         unsigned int vlen;
    1705                 :         long long vlong;
    1706                 : 
    1707            2171 :         if (reverse)
    1708              12 :             eptr = ziplistIndex(zl,-2-(2*start));
    1709                 :         else
    1710            2159 :             eptr = ziplistIndex(zl,2*start);
    1711                 : 
    1712            2171 :         redisAssertWithInfo(c,zobj,eptr != NULL);
    1713            2171 :         sptr = ziplistNext(zl,eptr);
    1714                 : 
    1715            7404 :         while (rangelen--) {
    1716            3062 :             redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
    1717            3062 :             redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
    1718            3062 :             if (vstr == NULL)
    1719            2713 :                 addReplyBulkLongLong(c,vlong);
    1720                 :             else
    1721             349 :                 addReplyBulkCBuffer(c,vstr,vlen);
    1722                 : 
    1723            3062 :             if (withscores)
    1724             187 :                 addReplyDouble(c,zzlGetScore(sptr));
    1725                 : 
    1726            3062 :             if (reverse)
    1727             260 :                 zzlPrev(zl,&eptr,&sptr);
    1728                 :             else
    1729            2802 :                 zzlNext(zl,&eptr,&sptr);
    1730                 :         }
    1731                 : 
    1732            2090 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    1733            2090 :         zset *zs = zobj->ptr;
    1734            2090 :         zskiplist *zsl = zs->zsl;
    1735                 :         zskiplistNode *ln;
    1736                 :         robj *ele;
    1737                 : 
    1738                 :         /* Check if starting point is trivial, before doing log(N) lookup. */
    1739            2090 :         if (reverse) {
    1740              12 :             ln = zsl->tail;
    1741              12 :             if (start > 0)
    1742              10 :                 ln = zslGetElementByRank(zsl,llen-start);
    1743                 :         } else {
    1744            2078 :             ln = zsl->header->level[0].forward;
    1745            2078 :             if (start > 0)
    1746            3970 :                 ln = zslGetElementByRank(zsl,start+1);
    1747                 :         }
    1748                 : 
    1749            4901 :         while(rangelen--) {
    1750            2811 :             redisAssertWithInfo(c,zobj,ln != NULL);
    1751            2811 :             ele = ln->obj;
    1752            2811 :             addReplyBulk(c,ele);
    1753            2811 :             if (withscores)
    1754              97 :                 addReplyDouble(c,ln->score);
    1755            2811 :             ln = reverse ? ln->backward : ln->level[0].forward;
    1756                 :         }
    1757                 :     } else {
    1758               0 :         redisPanic("Unknown sorted set encoding");
    1759                 :     }
    1760                 : }
    1761                 : 
    1762            4249 : void zrangeCommand(redisClient *c) {
    1763            4249 :     zrangeGenericCommand(c,0);
    1764            4249 : }
    1765                 : 
    1766              32 : void zrevrangeCommand(redisClient *c) {
    1767              32 :     zrangeGenericCommand(c,1);
    1768              32 : }
    1769                 : 
    1770                 : /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
    1771            1294 : void genericZrangebyscoreCommand(redisClient *c, int reverse) {
    1772                 :     zrangespec range;
    1773            1294 :     robj *key = c->argv[1];
    1774                 :     robj *zobj;
    1775            1294 :     long offset = 0, limit = -1;
    1776            1294 :     int withscores = 0;
    1777            1294 :     unsigned long rangelen = 0;
    1778            1294 :     void *replylen = NULL;
    1779                 :     int minidx, maxidx;
    1780                 : 
    1781                 :     /* Parse the range arguments. */
    1782            1294 :     if (reverse) {
    1783                 :         /* Range is given as [max,min] */
    1784              36 :         maxidx = 2; minidx = 3;
    1785                 :     } else {
    1786                 :         /* Range is given as [min,max] */
    1787            1258 :         minidx = 2; maxidx = 3;
    1788                 :     }
    1789                 : 
    1790            1294 :     if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
    1791               6 :         addReplyError(c,"min or max is not a float");
    1792               6 :         return;
    1793                 :     }
    1794                 : 
    1795                 :     /* Parse optional extra arguments. Note that ZCOUNT will exactly have
    1796                 :      * 4 arguments, so we'll never enter the following code path. */
    1797            1288 :     if (c->argc > 4) {
    1798              24 :         int remaining = c->argc - 4;
    1799              24 :         int pos = 4;
    1800                 : 
    1801              76 :         while (remaining) {
    1802              36 :             if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
    1803               8 :                 pos++; remaining--;
    1804               8 :                 withscores = 1;
    1805              40 :             } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
    1806              40 :                 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
    1807              20 :                     (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
    1808              20 :                 pos += 3; remaining -= 3;
    1809                 :             } else {
    1810               0 :                 addReply(c,shared.syntaxerr);
    1811               0 :                 return;
    1812                 :             }
    1813                 :         }
    1814                 :     }
    1815                 : 
    1816                 :     /* Ok, lookup the key and get the range */
    1817            2576 :     if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
    1818            1288 :         checkType(c,zobj,REDIS_ZSET)) return;
    1819                 : 
    1820            1288 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    1821             644 :         unsigned char *zl = zobj->ptr;
    1822                 :         unsigned char *eptr, *sptr;
    1823                 :         unsigned char *vstr;
    1824                 :         unsigned int vlen;
    1825                 :         long long vlong;
    1826                 :         double score;
    1827                 : 
    1828                 :         /* If reversed, get the last node in range as starting point. */
    1829             644 :         if (reverse) {
    1830              18 :             eptr = zzlLastInRange(zl,range);
    1831                 :         } else {
    1832             626 :             eptr = zzlFirstInRange(zl,range);
    1833                 :         }
    1834                 : 
    1835                 :         /* No "first" element in the specified interval. */
    1836             644 :         if (eptr == NULL) {
    1837              16 :             addReply(c, shared.emptymultibulk);
    1838              16 :             return;
    1839                 :         }
    1840                 : 
    1841                 :         /* Get score pointer for the first element. */
    1842             628 :         redisAssertWithInfo(c,zobj,eptr != NULL);
    1843             628 :         sptr = ziplistNext(zl,eptr);
    1844                 : 
    1845                 :         /* We don't know in advance how many matching elements there are in the
    1846                 :          * list, so we push this object that will represent the multi-bulk
    1847                 :          * length in the output buffer, and will "fix" it later */
    1848             628 :         replylen = addDeferredMultiBulkLength(c);
    1849                 : 
    1850                 :         /* If there is an offset, just traverse the number of elements without
    1851                 :          * checking the score because that is done in the next loop. */
    1852            1280 :         while (eptr && offset--) {
    1853              24 :             if (reverse) {
    1854              12 :                 zzlPrev(zl,&eptr,&sptr);
    1855                 :             } else {
    1856              12 :                 zzlNext(zl,&eptr,&sptr);
    1857                 :             }
    1858                 :         }
    1859                 : 
    1860           26290 :         while (eptr && limit--) {
    1861           26082 :             score = zzlGetScore(sptr);
    1862                 : 
    1863                 :             /* Abort when the node is no longer in range. */
    1864           26082 :             if (reverse) {
    1865              41 :                 if (!zslValueGteMin(score,&range)) break;
    1866                 :             } else {
    1867           26041 :                 if (!zslValueLteMax(score,&range)) break;
    1868                 :             }
    1869                 : 
    1870                 :             /* We know the element exists, so ziplistGet should always succeed */
    1871           25662 :             redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
    1872                 : 
    1873           25662 :             rangelen++;
    1874           25662 :             if (vstr == NULL) {
    1875           25600 :                 addReplyBulkLongLong(c,vlong);
    1876                 :             } else {
    1877              62 :                 addReplyBulkCBuffer(c,vstr,vlen);
    1878                 :             }
    1879                 : 
    1880           25662 :             if (withscores) {
    1881              10 :                 addReplyDouble(c,score);
    1882                 :             }
    1883                 : 
    1884                 :             /* Move to next node */
    1885           25662 :             if (reverse) {
    1886              31 :                 zzlPrev(zl,&eptr,&sptr);
    1887                 :             } else {
    1888           25631 :                 zzlNext(zl,&eptr,&sptr);
    1889                 :             }
    1890                 :         }
    1891             644 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    1892             644 :         zset *zs = zobj->ptr;
    1893             644 :         zskiplist *zsl = zs->zsl;
    1894                 :         zskiplistNode *ln;
    1895                 : 
    1896                 :         /* If reversed, get the last node in range as starting point. */
    1897             644 :         if (reverse) {
    1898              18 :             ln = zslLastInRange(zsl,range);
    1899                 :         } else {
    1900             626 :             ln = zslFirstInRange(zsl,range);
    1901                 :         }
    1902                 : 
    1903                 :         /* No "first" element in the specified interval. */
    1904             644 :         if (ln == NULL) {
    1905              18 :             addReply(c, shared.emptymultibulk);
    1906              18 :             return;
    1907                 :         }
    1908                 : 
    1909                 :         /* We don't know in advance how many matching elements there are in the
    1910                 :          * list, so we push this object that will represent the multi-bulk
    1911                 :          * length in the output buffer, and will "fix" it later */
    1912             626 :         replylen = addDeferredMultiBulkLength(c);
    1913                 : 
    1914                 :         /* If there is an offset, just traverse the number of elements without
    1915                 :          * checking the score because that is done in the next loop. */
    1916            1276 :         while (ln && offset--) {
    1917              24 :             if (reverse) {
    1918              12 :                 ln = ln->backward;
    1919                 :             } else {
    1920              12 :                 ln = ln->level[0].forward;
    1921                 :             }
    1922                 :         }
    1923                 : 
    1924           20688 :         while (ln && limit--) {
    1925                 :             /* Abort when the node is no longer in range. */
    1926           20480 :             if (reverse) {
    1927              82 :                 if (!zslValueGteMin(ln->score,&range)) break;
    1928                 :             } else {
    1929           40878 :                 if (!zslValueLteMax(ln->score,&range)) break;
    1930                 :             }
    1931                 : 
    1932           20062 :             rangelen++;
    1933           20062 :             addReplyBulk(c,ln->obj);
    1934                 : 
    1935           20062 :             if (withscores) {
    1936              10 :                 addReplyDouble(c,ln->score);
    1937                 :             }
    1938                 : 
    1939                 :             /* Move to next node */
    1940           20062 :             if (reverse) {
    1941              31 :                 ln = ln->backward;
    1942                 :             } else {
    1943           20031 :                 ln = ln->level[0].forward;
    1944                 :             }
    1945                 :         }
    1946                 :     } else {
    1947               0 :         redisPanic("Unknown sorted set encoding");
    1948                 :     }
    1949                 : 
    1950            1254 :     if (withscores) {
    1951               8 :         rangelen *= 2;
    1952                 :     }
    1953                 : 
    1954            1254 :     setDeferredMultiBulkLength(c, replylen, rangelen);
    1955                 : }
    1956                 : 
    1957            1258 : void zrangebyscoreCommand(redisClient *c) {
    1958            1258 :     genericZrangebyscoreCommand(c,0);
    1959            1258 : }
    1960                 : 
    1961              36 : void zrevrangebyscoreCommand(redisClient *c) {
    1962              36 :     genericZrangebyscoreCommand(c,1);
    1963              36 : }
    1964                 : 
    1965            1204 : void zcountCommand(redisClient *c) {
    1966            1204 :     robj *key = c->argv[1];
    1967                 :     robj *zobj;
    1968                 :     zrangespec range;
    1969            1204 :     int count = 0;
    1970                 : 
    1971                 :     /* Parse the range arguments */
    1972            1204 :     if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
    1973               0 :         addReplyError(c,"min or max is not a float");
    1974               0 :         return;
    1975                 :     }
    1976                 : 
    1977                 :     /* Lookup the sorted set */
    1978            2408 :     if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
    1979            1204 :         checkType(c, zobj, REDIS_ZSET)) return;
    1980                 : 
    1981            1204 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    1982             602 :         unsigned char *zl = zobj->ptr;
    1983                 :         unsigned char *eptr, *sptr;
    1984                 :         double score;
    1985                 : 
    1986                 :         /* Use the first element in range as the starting point */
    1987             602 :         eptr = zzlFirstInRange(zl,range);
    1988                 : 
    1989                 :         /* No "first" element */
    1990             602 :         if (eptr == NULL) {
    1991               0 :             addReply(c, shared.czero);
    1992               0 :             return;
    1993                 :         }
    1994                 : 
    1995                 :         /* First element is in range */
    1996             602 :         sptr = ziplistNext(zl,eptr);
    1997             602 :         score = zzlGetScore(sptr);
    1998             602 :         redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
    1999                 : 
    2000                 :         /* Iterate over elements in range */
    2001           26207 :         while (eptr) {
    2002           26007 :             score = zzlGetScore(sptr);
    2003                 : 
    2004                 :             /* Abort when the node is no longer in range. */
    2005           26007 :             if (!zslValueLteMax(score,&range)) {
    2006                 :                 break;
    2007                 :             } else {
    2008           25605 :                 count++;
    2009           25605 :                 zzlNext(zl,&eptr,&sptr);
    2010                 :             }
    2011                 :         }
    2012             602 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    2013             602 :         zset *zs = zobj->ptr;
    2014             602 :         zskiplist *zsl = zs->zsl;
    2015                 :         zskiplistNode *zn;
    2016                 :         unsigned long rank;
    2017                 : 
    2018                 :         /* Find first element in range */
    2019             602 :         zn = zslFirstInRange(zsl, range);
    2020                 : 
    2021                 :         /* Use rank of first element, if any, to determine preliminary count */
    2022             602 :         if (zn != NULL) {
    2023             600 :             rank = zslGetRank(zsl, zn->score, zn->obj);
    2024             600 :             count = (zsl->length - (rank - 1));
    2025                 : 
    2026                 :             /* Find last element in range */
    2027             600 :             zn = zslLastInRange(zsl, range);
    2028                 : 
    2029                 :             /* Use rank of last element, if any, to determine the actual count */
    2030             600 :             if (zn != NULL) {
    2031             600 :                 rank = zslGetRank(zsl, zn->score, zn->obj);
    2032             600 :                 count -= (zsl->length - rank);
    2033                 :             }
    2034                 :         }
    2035                 :     } else {
    2036               0 :         redisPanic("Unknown sorted set encoding");
    2037                 :     }
    2038                 : 
    2039            1204 :     addReplyLongLong(c, count);
    2040                 : }
    2041                 : 
    2042            4004 : void zcardCommand(redisClient *c) {
    2043            4004 :     robj *key = c->argv[1];
    2044                 :     robj *zobj;
    2045                 : 
    2046            8006 :     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
    2047            4002 :         checkType(c,zobj,REDIS_ZSET)) return;
    2048                 : 
    2049            4002 :     addReplyLongLong(c,zsetLength(zobj));
    2050                 : }
    2051                 : 
    2052           46078 : void zscoreCommand(redisClient *c) {
    2053           46078 :     robj *key = c->argv[1];
    2054                 :     robj *zobj;
    2055                 :     double score;
    2056                 : 
    2057           92156 :     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
    2058           46078 :         checkType(c,zobj,REDIS_ZSET)) return;
    2059                 : 
    2060           46078 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    2061           25867 :         if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
    2062           25867 :             addReplyDouble(c,score);
    2063                 :         else
    2064               0 :             addReply(c,shared.nullbulk);
    2065           20211 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    2066           20211 :         zset *zs = zobj->ptr;
    2067                 :         dictEntry *de;
    2068                 : 
    2069           20211 :         c->argv[2] = tryObjectEncoding(c->argv[2]);
    2070           20211 :         de = dictFind(zs->dict,c->argv[2]);
    2071           20211 :         if (de != NULL) {
    2072           20211 :             score = *(double*)dictGetVal(de);
    2073           20211 :             addReplyDouble(c,score);
    2074                 :         } else {
    2075               0 :             addReply(c,shared.nullbulk);
    2076                 :         }
    2077                 :     } else {
    2078               0 :         redisPanic("Unknown sorted set encoding");
    2079                 :     }
    2080                 : }
    2081                 : 
    2082            4020 : void zrankGenericCommand(redisClient *c, int reverse) {
    2083            4020 :     robj *key = c->argv[1];
    2084            4020 :     robj *ele = c->argv[2];
    2085                 :     robj *zobj;
    2086                 :     unsigned long llen;
    2087                 :     unsigned long rank;
    2088                 : 
    2089            8040 :     if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
    2090            4020 :         checkType(c,zobj,REDIS_ZSET)) return;
    2091            4020 :     llen = zsetLength(zobj);
    2092                 : 
    2093            4020 :     redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
    2094            4020 :     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
    2095            2010 :         unsigned char *zl = zobj->ptr;
    2096                 :         unsigned char *eptr, *sptr;
    2097                 : 
    2098            2010 :         eptr = ziplistIndex(zl,0);
    2099            2010 :         redisAssertWithInfo(c,zobj,eptr != NULL);
    2100            2010 :         sptr = ziplistNext(zl,eptr);
    2101            2010 :         redisAssertWithInfo(c,zobj,sptr != NULL);
    2102                 : 
    2103            2010 :         rank = 1;
    2104          103395 :         while(eptr != NULL) {
    2105          202766 :             if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
    2106                 :                 break;
    2107           99375 :             rank++;
    2108           99375 :             zzlNext(zl,&eptr,&sptr);
    2109                 :         }
    2110                 : 
    2111            2010 :         if (eptr != NULL) {
    2112            2008 :             if (reverse)
    2113               3 :                 addReplyLongLong(c,llen-rank);
    2114                 :             else
    2115            2005 :                 addReplyLongLong(c,rank-1);
    2116                 :         } else {
    2117               2 :             addReply(c,shared.nullbulk);
    2118                 :         }
    2119            2010 :     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
    2120            2010 :         zset *zs = zobj->ptr;
    2121            2010 :         zskiplist *zsl = zs->zsl;
    2122                 :         dictEntry *de;
    2123                 :         double score;
    2124                 : 
    2125            2010 :         ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
    2126            2010 :         de = dictFind(zs->dict,ele);
    2127            2010 :         if (de != NULL) {
    2128            2008 :             score = *(double*)dictGetVal(de);
    2129            2008 :             rank = zslGetRank(zsl,score,ele);
    2130            2008 :             redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
    2131            2008 :             if (reverse)
    2132               3 :                 addReplyLongLong(c,llen-rank);
    2133                 :             else
    2134            2005 :                 addReplyLongLong(c,rank-1);
    2135                 :         } else {
    2136               2 :             addReply(c,shared.nullbulk);
    2137                 :         }
    2138                 :     } else {
    2139               0 :         redisPanic("Unknown sorted set encoding");
    2140                 :     }
    2141                 : }
    2142                 : 
    2143            4012 : void zrankCommand(redisClient *c) {
    2144            4012 :     zrankGenericCommand(c, 0);
    2145            4012 : }
    2146                 : 
    2147               8 : void zrevrankCommand(redisClient *c) {
    2148               8 :     zrankGenericCommand(c, 1);
    2149               8 : }

Generated by: LCOV version 1.7