1 : #include "redis.h"
2 : #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
3 : #include <math.h> /* isnan() */
4 :
5 8 : redisSortOperation *createSortOperation(int type, robj *pattern) {
6 8 : redisSortOperation *so = zmalloc(sizeof(*so));
7 8 : so->type = type;
8 8 : so->pattern = pattern;
9 8 : return so;
10 : }
11 :
12 : /* Return the value associated to the key with a name obtained
13 : * substituting the first occurence of '*' in 'pattern' with 'subst'.
14 : * The returned object will always have its refcount increased by 1
15 : * when it is non-NULL. */
16 86258 : robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
17 : char *p, *f;
18 : sds spat, ssub;
19 : robj keyobj, fieldobj, *o;
20 : int prefixlen, sublen, postfixlen, fieldlen;
21 : /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
22 : struct {
23 : int len;
24 : int free;
25 : char buf[REDIS_SORTKEY_MAX+1];
26 : } keyname, fieldname;
27 :
28 : /* If the pattern is "#" return the substitution object itself in order
29 : * to implement the "SORT ... GET #" feature. */
30 86258 : spat = pattern->ptr;
31 86258 : if (spat[0] == '#' && spat[1] == '\0') {
32 51 : incrRefCount(subst);
33 51 : return subst;
34 : }
35 :
36 : /* The substitution object may be specially encoded. If so we create
37 : * a decoded object on the fly. Otherwise getDecodedObject will just
38 : * increment the ref count, that we'll decrement later. */
39 86207 : subst = getDecodedObject(subst);
40 :
41 86207 : ssub = subst->ptr;
42 86207 : if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
43 86207 : p = strchr(spat,'*');
44 86207 : if (!p) {
45 16 : decrRefCount(subst);
46 16 : return NULL;
47 : }
48 :
49 : /* Find out if we're dealing with a hash dereference. */
50 86191 : if ((f = strstr(p+1, "->")) != NULL) {
51 32064 : fieldlen = sdslen(spat)-(f-spat);
52 : /* this also copies \0 character */
53 32064 : memcpy(fieldname.buf,f+2,fieldlen-1);
54 32064 : fieldname.len = fieldlen-2;
55 : } else {
56 54127 : fieldlen = 0;
57 : }
58 :
59 86191 : prefixlen = p-spat;
60 86191 : sublen = sdslen(ssub);
61 86191 : postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen;
62 86191 : memcpy(keyname.buf,spat,prefixlen);
63 86191 : memcpy(keyname.buf+prefixlen,ssub,sublen);
64 86191 : memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
65 86191 : keyname.buf[prefixlen+sublen+postfixlen] = '\0';
66 86191 : keyname.len = prefixlen+sublen+postfixlen;
67 86191 : decrRefCount(subst);
68 :
69 : /* Lookup substituted key */
70 86191 : initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr)));
71 86191 : o = lookupKeyRead(db,&keyobj);
72 86191 : if (o == NULL) return NULL;
73 :
74 86164 : if (fieldlen > 0) {
75 32064 : if (o->type != REDIS_HASH || fieldname.len < 1) return NULL;
76 :
77 : /* Retrieve value from hash by the field name. This operation
78 : * already increases the refcount of the returned object. */
79 32064 : initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr)));
80 32064 : o = hashTypeGetObject(o, &fieldobj);
81 : } else {
82 54100 : if (o->type != REDIS_STRING) return NULL;
83 :
84 : /* Every object that this function returns needs to have its refcount
85 : * increased. sortCommand decreases it again. */
86 54100 : incrRefCount(o);
87 : }
88 :
89 86164 : return o;
90 : }
91 :
92 : /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
93 : * the additional parameter is not standard but a BSD-specific we have to
94 : * pass sorting parameters via the global 'server' structure */
95 653570 : int sortCompare(const void *s1, const void *s2) {
96 653570 : const redisSortObject *so1 = s1, *so2 = s2;
97 : int cmp;
98 :
99 653570 : if (!server.sort_alpha) {
100 : /* Numeric sorting. Here it's trivial as we precomputed scores */
101 653483 : if (so1->u.score > so2->u.score) {
102 344292 : cmp = 1;
103 309191 : } else if (so1->u.score < so2->u.score) {
104 309117 : cmp = -1;
105 : } else {
106 : /* Objects have the same score, but we don't want the comparison
107 : * to be undefined, so we compare objects lexicographycally.
108 : * This way the result of SORT is deterministic. */
109 74 : cmp = compareStringObjects(so1->obj,so2->obj);
110 : }
111 : } else {
112 : /* Alphanumeric sorting */
113 87 : if (server.sort_bypattern) {
114 0 : if (!so1->u.cmpobj || !so2->u.cmpobj) {
115 : /* At least one compare object is NULL */
116 0 : if (so1->u.cmpobj == so2->u.cmpobj)
117 0 : cmp = 0;
118 0 : else if (so1->u.cmpobj == NULL)
119 0 : cmp = -1;
120 : else
121 0 : cmp = 1;
122 : } else {
123 : /* We have both the objects, use strcoll */
124 0 : cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
125 : }
126 : } else {
127 : /* Compare elements directly. */
128 87 : cmp = compareStringObjects(so1->obj,so2->obj);
129 : }
130 : }
131 653570 : return server.sort_desc ? -cmp : cmp;
132 : }
133 :
134 : /* The SORT command is the most complex command in Redis. Warning: this code
135 : * is optimized for speed and a bit less for readability */
136 439 : void sortCommand(redisClient *c) {
137 : list *operations;
138 439 : unsigned int outputlen = 0;
139 439 : int desc = 0, alpha = 0;
140 439 : long limit_start = 0, limit_count = -1, start, end;
141 439 : int j, dontsort = 0, vectorlen;
142 439 : int getop = 0; /* GET operation counter */
143 439 : int int_convertion_error = 0;
144 439 : robj *sortval, *sortby = NULL, *storekey = NULL;
145 : redisSortObject *vector; /* Resulting vector to sort */
146 :
147 : /* Lookup the key to sort. It must be of the right types */
148 439 : sortval = lookupKeyRead(c->db,c->argv[1]);
149 440 : if (sortval && sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
150 1 : sortval->type != REDIS_ZSET)
151 : {
152 0 : addReply(c,shared.wrongtypeerr);
153 0 : return;
154 : }
155 :
156 : /* Create a list of operations to perform for every sorted element.
157 : * Operations can be GET/DEL/INCR/DECR */
158 439 : operations = listCreate();
159 439 : listSetFreeMethod(operations,zfree);
160 439 : j = 2;
161 :
162 : /* Now we need to protect sortval incrementing its count, in the future
163 : * SORT may have options able to overwrite/delete keys during the sorting
164 : * and the sorted key itself may get destroied */
165 439 : if (sortval)
166 436 : incrRefCount(sortval);
167 : else
168 3 : sortval = createListObject();
169 :
170 : /* The SORT command has an SQL-alike syntax, parse it */
171 1193 : while(j < c->argc) {
172 754 : int leftargs = c->argc-j-1;
173 754 : if (!strcasecmp(c->argv[j]->ptr,"asc")) {
174 0 : desc = 0;
175 754 : } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
176 3 : desc = 1;
177 751 : } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
178 4 : alpha = 1;
179 747 : } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
180 814 : if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) != REDIS_OK) ||
181 407 : (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) != REDIS_OK)) return;
182 407 : j+=2;
183 340 : } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
184 7 : storekey = c->argv[j+1];
185 7 : j++;
186 333 : } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
187 325 : sortby = c->argv[j+1];
188 : /* If the BY pattern does not contain '*', i.e. it is constant,
189 : * we don't need to sort nor to lookup the weight keys. */
190 325 : if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
191 325 : j++;
192 8 : } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
193 8 : listAddNodeTail(operations,createSortOperation(
194 8 : REDIS_SORT_GET,c->argv[j+1]));
195 8 : getop++;
196 8 : j++;
197 : } else {
198 0 : decrRefCount(sortval);
199 0 : listRelease(operations);
200 0 : addReply(c,shared.syntaxerr);
201 0 : return;
202 : }
203 754 : j++;
204 : }
205 :
206 : /* If we have STORE we need to force sorting for deterministic output
207 : * and replication. We use alpha sorting since this is guaranteed to
208 : * work with any input. */
209 439 : if (storekey && dontsort) {
210 1 : dontsort = 0;
211 1 : alpha = 1;
212 1 : sortby = NULL;
213 : }
214 :
215 : /* Destructively convert encoded sorted sets for SORT. */
216 439 : if (sortval->type == REDIS_ZSET)
217 1 : zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
218 :
219 : /* Load the sorting vector with all the objects to sort */
220 439 : switch(sortval->type) {
221 422 : case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
222 16 : case REDIS_SET: vectorlen = setTypeSize(sortval); break;
223 1 : case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
224 0 : default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
225 : }
226 439 : vector = zmalloc(sizeof(redisSortObject)*vectorlen);
227 439 : j = 0;
228 :
229 439 : if (sortval->type == REDIS_LIST) {
230 422 : listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL);
231 : listTypeEntry entry;
232 74018 : while(listTypeNext(li,&entry)) {
233 73174 : vector[j].obj = listTypeGet(&entry);
234 73174 : vector[j].u.score = 0;
235 73174 : vector[j].u.cmpobj = NULL;
236 73174 : j++;
237 : }
238 422 : listTypeReleaseIterator(li);
239 17 : } else if (sortval->type == REDIS_SET) {
240 16 : setTypeIterator *si = setTypeInitIterator(sortval);
241 : robj *ele;
242 33169 : while((ele = setTypeNextObject(si)) != NULL) {
243 33137 : vector[j].obj = ele;
244 33137 : vector[j].u.score = 0;
245 33137 : vector[j].u.cmpobj = NULL;
246 33137 : j++;
247 : }
248 16 : setTypeReleaseIterator(si);
249 1 : } else if (sortval->type == REDIS_ZSET) {
250 1 : dict *set = ((zset*)sortval->ptr)->dict;
251 : dictIterator *di;
252 : dictEntry *setele;
253 1 : di = dictGetIterator(set);
254 7 : while((setele = dictNext(di)) != NULL) {
255 5 : vector[j].obj = dictGetKey(setele);
256 5 : vector[j].u.score = 0;
257 5 : vector[j].u.cmpobj = NULL;
258 5 : j++;
259 : }
260 1 : dictReleaseIterator(di);
261 : } else {
262 0 : redisPanic("Unknown type");
263 : }
264 439 : redisAssertWithInfo(c,sortval,j == vectorlen);
265 :
266 : /* Now it's time to load the right scores in the sorting vector */
267 439 : if (dontsort == 0) {
268 96626 : for (j = 0; j < vectorlen; j++) {
269 : robj *byval;
270 96289 : if (sortby) {
271 : /* lookup value to sort by */
272 86156 : byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
273 86156 : if (!byval) continue;
274 : } else {
275 : /* use object itself to sort by */
276 10133 : byval = vector[j].obj;
277 : }
278 :
279 96265 : if (alpha) {
280 34 : if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
281 : } else {
282 96231 : if (byval->encoding == REDIS_ENCODING_RAW) {
283 : char *eptr;
284 :
285 44120 : vector[j].u.score = strtod(byval->ptr,&eptr);
286 88238 : if (eptr[0] != '\0' || errno == ERANGE ||
287 44118 : isnan(vector[j].u.score))
288 : {
289 2 : int_convertion_error = 1;
290 : }
291 52111 : } else if (byval->encoding == REDIS_ENCODING_INT) {
292 : /* Don't need to decode the object if it's
293 : * integer-encoded (the only encoding supported) so
294 : * far. We can just cast it */
295 52111 : vector[j].u.score = (long)byval->ptr;
296 : } else {
297 0 : redisAssertWithInfo(c,sortval,1 != 1);
298 : }
299 : }
300 :
301 : /* when the object was retrieved using lookupKeyByPattern,
302 : * its refcount needs to be decreased. */
303 96265 : if (sortby) {
304 86132 : decrRefCount(byval);
305 : }
306 : }
307 : }
308 :
309 : /* We are ready to sort the vector... perform a bit of sanity check
310 : * on the LIMIT option too. We'll use a partial version of quicksort. */
311 439 : start = (limit_start < 0) ? 0 : limit_start;
312 439 : end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
313 439 : if (start >= vectorlen) {
314 4 : start = vectorlen-1;
315 4 : end = vectorlen-2;
316 : }
317 439 : if (end >= vectorlen) end = vectorlen-1;
318 :
319 439 : server.sort_dontsort = dontsort;
320 439 : if (dontsort == 0) {
321 337 : server.sort_desc = desc;
322 337 : server.sort_alpha = alpha;
323 337 : server.sort_bypattern = sortby ? 1 : 0;
324 543 : if (sortby && (start != 0 || end != vectorlen-1))
325 206 : pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
326 : else
327 131 : qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
328 : }
329 :
330 : /* Send command output to the output buffer, performing the specified
331 : * GET/DEL/INCR/DECR operations if any. */
332 439 : outputlen = getop ? getop*(end-start+1) : end-start+1;
333 439 : if (int_convertion_error) {
334 2 : addReplyError(c,"One or more scores can't be converted into double");
335 437 : } else if (storekey == NULL) {
336 : /* STORE option not specified, sent the sorting result to client */
337 430 : addReplyMultiBulkLen(c,outputlen);
338 48678 : for (j = start; j <= end; j++) {
339 : listNode *ln;
340 : listIter li;
341 :
342 48248 : if (!getop) addReplyBulk(c,vector[j].obj);
343 48248 : listRewind(operations,&li);
344 48248 : while((ln = listNext(&li))) {
345 102 : redisSortOperation *sop = ln->value;
346 102 : robj *val = lookupKeyByPattern(c->db,sop->pattern,
347 204 : vector[j].obj);
348 :
349 102 : if (sop->type == REDIS_SORT_GET) {
350 102 : if (!val) {
351 19 : addReply(c,shared.nullbulk);
352 : } else {
353 83 : addReplyBulk(c,val);
354 83 : decrRefCount(val);
355 : }
356 : } else {
357 : /* Always fails */
358 0 : redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
359 : }
360 : }
361 : }
362 : } else {
363 7 : robj *sobj = createZiplistObject();
364 :
365 : /* STORE option specified, set the sorting result as a List object */
366 63 : for (j = start; j <= end; j++) {
367 : listNode *ln;
368 : listIter li;
369 :
370 56 : if (!getop) {
371 56 : listTypePush(sobj,vector[j].obj,REDIS_TAIL);
372 : } else {
373 0 : listRewind(operations,&li);
374 0 : while((ln = listNext(&li))) {
375 0 : redisSortOperation *sop = ln->value;
376 0 : robj *val = lookupKeyByPattern(c->db,sop->pattern,
377 0 : vector[j].obj);
378 :
379 0 : if (sop->type == REDIS_SORT_GET) {
380 0 : if (!val) val = createStringObject("",0);
381 :
382 : /* listTypePush does an incrRefCount, so we should take care
383 : * care of the incremented refcount caused by either
384 : * lookupKeyByPattern or createStringObject("",0) */
385 0 : listTypePush(sobj,val,REDIS_TAIL);
386 0 : decrRefCount(val);
387 : } else {
388 : /* Always fails */
389 0 : redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
390 : }
391 : }
392 : }
393 : }
394 7 : if (outputlen) {
395 3 : setKey(c->db,storekey,sobj);
396 3 : server.dirty += outputlen;
397 4 : } else if (dbDelete(c->db,storekey)) {
398 2 : signalModifiedKey(c->db,storekey);
399 2 : server.dirty++;
400 : }
401 7 : decrRefCount(sobj);
402 7 : addReplyLongLong(c,outputlen);
403 : }
404 :
405 : /* Cleanup */
406 439 : if (sortval->type == REDIS_LIST || sortval->type == REDIS_SET)
407 106749 : for (j = 0; j < vectorlen; j++)
408 106311 : decrRefCount(vector[j].obj);
409 439 : decrRefCount(sortval);
410 439 : listRelease(operations);
411 106755 : for (j = 0; j < vectorlen; j++) {
412 106316 : if (alpha && vector[j].u.cmpobj)
413 0 : decrRefCount(vector[j].u.cmpobj);
414 : }
415 439 : zfree(vector);
416 : }
417 :
418 :
|