1 : #include "redis.h"
2 :
3 : /*-----------------------------------------------------------------------------
4 : * List API
5 : *----------------------------------------------------------------------------*/
6 :
7 : /* Check the argument length to see if it requires us to convert the ziplist
8 : * to a real list. Only check raw-encoded objects because integer encoded
9 : * objects are never too long. */
10 124044 : void listTypeTryConversion(robj *subject, robj *value) {
11 124044 : if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
12 98699 : if (value->encoding == REDIS_ENCODING_RAW &&
13 31900 : sdslen(value->ptr) > server.list_max_ziplist_value)
14 2864 : listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
15 : }
16 :
17 124018 : void listTypePush(robj *subject, robj *value, int where) {
18 : /* Check if we need to convert the ziplist */
19 124018 : listTypeTryConversion(subject,value);
20 203889 : if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
21 79871 : ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
22 88 : listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
23 :
24 124018 : if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
25 79783 : int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
26 79783 : value = getDecodedObject(value);
27 159566 : subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
28 79783 : decrRefCount(value);
29 44235 : } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
30 44235 : if (where == REDIS_HEAD) {
31 18689 : listAddNodeHead(subject->ptr,value);
32 : } else {
33 25546 : listAddNodeTail(subject->ptr,value);
34 : }
35 44235 : incrRefCount(value);
36 : } else {
37 0 : redisPanic("Unknown list encoding");
38 : }
39 124018 : }
40 :
41 7260 : robj *listTypePop(robj *subject, int where) {
42 7260 : robj *value = NULL;
43 7260 : if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
44 : unsigned char *p;
45 : unsigned char *vstr;
46 : unsigned int vlen;
47 : long long vlong;
48 5488 : int pos = (where == REDIS_HEAD) ? 0 : -1;
49 5488 : p = ziplistIndex(subject->ptr,pos);
50 5488 : if (ziplistGet(p,&vstr,&vlen,&vlong)) {
51 5488 : if (vstr) {
52 396 : value = createStringObject((char*)vstr,vlen);
53 : } else {
54 5092 : value = createStringObjectFromLongLong(vlong);
55 : }
56 : /* We only need to delete an element when it exists */
57 5488 : subject->ptr = ziplistDelete(subject->ptr,&p);
58 : }
59 1772 : } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
60 1772 : list *list = subject->ptr;
61 : listNode *ln;
62 1772 : if (where == REDIS_HEAD) {
63 870 : ln = listFirst(list);
64 : } else {
65 902 : ln = listLast(list);
66 : }
67 1772 : if (ln != NULL) {
68 1772 : value = listNodeValue(ln);
69 1772 : incrRefCount(value);
70 1772 : listDelNode(list,ln);
71 : }
72 : } else {
73 0 : redisPanic("Unknown list encoding");
74 : }
75 7260 : return value;
76 : }
77 :
78 141205 : unsigned long listTypeLength(robj *subject) {
79 141205 : if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 90995 : return ziplistLen(subject->ptr);
81 50210 : } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
82 50210 : return listLength((list*)subject->ptr);
83 : } else {
84 0 : redisPanic("Unknown list encoding");
85 : }
86 : }
87 :
88 : /* Initialize an iterator at the specified index. */
89 22627 : listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
90 22627 : listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
91 22627 : li->subject = subject;
92 22627 : li->encoding = subject->encoding;
93 22627 : li->direction = direction;
94 22627 : if (li->encoding == REDIS_ENCODING_ZIPLIST) {
95 18405 : li->zi = ziplistIndex(subject->ptr,index);
96 4222 : } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
97 4222 : li->ln = listIndex(subject->ptr,index);
98 : } else {
99 0 : redisPanic("Unknown list encoding");
100 : }
101 22627 : return li;
102 : }
103 :
104 : /* Clean up the iterator. */
105 16451 : void listTypeReleaseIterator(listTypeIterator *li) {
106 22627 : zfree(li);
107 16451 : }
108 :
109 : /* Stores pointer to current the entry in the provided entry structure
110 : * and advances the position of the iterator. Returns 1 when the current
111 : * entry is in fact an entry, 0 otherwise. */
112 162255 : int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 : /* Protect from converting when iterating */
114 162255 : redisAssert(li->subject->encoding == li->encoding);
115 :
116 162255 : entry->li = li;
117 162255 : if (li->encoding == REDIS_ENCODING_ZIPLIST) {
118 73678 : entry->zi = li->zi;
119 73678 : if (entry->zi != NULL) {
120 55284 : if (li->direction == REDIS_TAIL)
121 55278 : li->zi = ziplistNext(li->subject->ptr,li->zi);
122 : else
123 6 : li->zi = ziplistPrev(li->subject->ptr,li->zi);
124 55284 : return 1;
125 : }
126 88577 : } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
127 88577 : entry->ln = li->ln;
128 88577 : if (entry->ln != NULL) {
129 84366 : if (li->direction == REDIS_TAIL)
130 84360 : li->ln = li->ln->next;
131 : else
132 6 : li->ln = li->ln->prev;
133 84366 : return 1;
134 : }
135 : } else {
136 0 : redisPanic("Unknown list encoding");
137 : }
138 22605 : return 0;
139 : }
140 :
141 : /* Return entry or NULL at the current position of the iterator. */
142 133315 : robj *listTypeGet(listTypeEntry *entry) {
143 133315 : listTypeIterator *li = entry->li;
144 133315 : robj *value = NULL;
145 133315 : if (li->encoding == REDIS_ENCODING_ZIPLIST) {
146 : unsigned char *vstr;
147 : unsigned int vlen;
148 : long long vlong;
149 49619 : redisAssert(entry->zi != NULL);
150 49619 : if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
151 49619 : if (vstr) {
152 8790 : value = createStringObject((char*)vstr,vlen);
153 : } else {
154 40829 : value = createStringObjectFromLongLong(vlong);
155 : }
156 : }
157 83696 : } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
158 83696 : redisAssert(entry->ln != NULL);
159 83696 : value = listNodeValue(entry->ln);
160 83696 : incrRefCount(value);
161 : } else {
162 0 : redisPanic("Unknown list encoding");
163 : }
164 133315 : return value;
165 : }
166 :
167 14 : void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
168 14 : robj *subject = entry->li->subject;
169 14 : if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
170 7 : value = getDecodedObject(value);
171 7 : if (where == REDIS_TAIL) {
172 3 : unsigned char *next = ziplistNext(subject->ptr,entry->zi);
173 :
174 : /* When we insert after the current element, but the current element
175 : * is the tail of the list, we need to do a push. */
176 3 : if (next == NULL) {
177 2 : subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
178 : } else {
179 4 : subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
180 : }
181 : } else {
182 8 : subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
183 : }
184 7 : decrRefCount(value);
185 7 : } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
186 7 : if (where == REDIS_TAIL) {
187 3 : listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
188 : } else {
189 4 : listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
190 : }
191 7 : incrRefCount(value);
192 : } else {
193 0 : redisPanic("Unknown list encoding");
194 : }
195 14 : }
196 :
197 : /* Compare the given object with the entry at the current position. */
198 6335 : int listTypeEqual(listTypeEntry *entry, robj *o) {
199 6335 : listTypeIterator *li = entry->li;
200 6335 : if (li->encoding == REDIS_ENCODING_ZIPLIST) {
201 5665 : redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
202 11330 : return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
203 670 : } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
204 670 : return equalStringObjects(o,listNodeValue(entry->ln));
205 : } else {
206 0 : redisPanic("Unknown list encoding");
207 : }
208 : }
209 :
210 : /* Delete the element pointed to. */
211 2991 : void listTypeDelete(listTypeEntry *entry) {
212 2991 : listTypeIterator *li = entry->li;
213 2991 : if (li->encoding == REDIS_ENCODING_ZIPLIST) {
214 2446 : unsigned char *p = entry->zi;
215 2446 : li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
216 :
217 : /* Update position of the iterator depending on the direction */
218 2446 : if (li->direction == REDIS_TAIL)
219 2443 : li->zi = p;
220 : else
221 3 : li->zi = ziplistPrev(li->subject->ptr,p);
222 545 : } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
223 : listNode *next;
224 545 : if (li->direction == REDIS_TAIL)
225 542 : next = entry->ln->next;
226 : else
227 3 : next = entry->ln->prev;
228 545 : listDelNode(li->subject->ptr,entry->ln);
229 545 : li->ln = next;
230 : } else {
231 0 : redisPanic("Unknown list encoding");
232 : }
233 2991 : }
234 :
235 3032 : void listTypeConvert(robj *subject, int enc) {
236 : listTypeIterator *li;
237 : listTypeEntry entry;
238 3032 : redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
239 :
240 3032 : if (enc == REDIS_ENCODING_LINKEDLIST) {
241 3032 : list *l = listCreate();
242 3032 : listSetFreeMethod(l,decrRefCount);
243 :
244 : /* listTypeGet returns a robj with incremented refcount */
245 3032 : li = listTypeInitIterator(subject,0,REDIS_TAIL);
246 3032 : while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
247 : listTypeReleaseIterator(li);
248 :
249 3032 : subject->encoding = REDIS_ENCODING_LINKEDLIST;
250 3032 : zfree(subject->ptr);
251 3032 : subject->ptr = l;
252 : } else {
253 0 : redisPanic("Unsupported list conversion");
254 : }
255 3032 : }
256 :
257 : /*-----------------------------------------------------------------------------
258 : * List Commands
259 : *----------------------------------------------------------------------------*/
260 :
261 121890 : void pushGenericCommand(redisClient *c, int where) {
262 121890 : int j, waiting = 0, pushed = 0;
263 121890 : robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
264 121890 : int may_have_waiting_clients = (lobj == NULL);
265 :
266 121890 : if (lobj && lobj->type != REDIS_LIST) {
267 3 : addReply(c,shared.wrongtypeerr);
268 3 : return;
269 : }
270 :
271 245834 : for (j = 2; j < c->argc; j++) {
272 123947 : c->argv[j] = tryObjectEncoding(c->argv[j]);
273 123947 : if (may_have_waiting_clients) {
274 15170 : if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
275 19 : waiting++;
276 19 : continue;
277 : } else {
278 15151 : may_have_waiting_clients = 0;
279 : }
280 : }
281 123928 : if (!lobj) {
282 15151 : lobj = createZiplistObject();
283 15151 : dbAdd(c->db,c->argv[1],lobj);
284 : }
285 123928 : listTypePush(lobj,c->argv[j],where);
286 123928 : pushed++;
287 : }
288 121887 : addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
289 121887 : if (pushed) signalModifiedKey(c->db,c->argv[1]);
290 121887 : server.dirty += pushed;
291 :
292 : /* Alter the replication of the command accordingly to the number of
293 : * list elements delivered to clients waiting into a blocking operation.
294 : * We do that only if there were waiting clients, and only if still some
295 : * element was pushed into the list (othewise dirty is 0 and nothign will
296 : * be propagated). */
297 121887 : if (waiting && pushed) {
298 : /* CMD KEY a b C D E */
299 1 : for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
300 1 : memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
301 1 : c->argc -= waiting;
302 : }
303 : }
304 :
305 52284 : void lpushCommand(redisClient *c) {
306 52284 : pushGenericCommand(c,REDIS_HEAD);
307 52284 : }
308 :
309 69606 : void rpushCommand(redisClient *c) {
310 69606 : pushGenericCommand(c,REDIS_TAIL);
311 69606 : }
312 :
313 30 : void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
314 : robj *subject;
315 : listTypeIterator *iter;
316 : listTypeEntry entry;
317 30 : int inserted = 0;
318 :
319 58 : if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
320 28 : checkType(c,subject,REDIS_LIST)) return;
321 :
322 28 : if (refval != NULL) {
323 : /* Note: we expect refval to be string-encoded because it is *not* the
324 : * last argument of the multi-bulk LINSERT. */
325 20 : redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
326 :
327 : /* We're not sure if this value can be inserted yet, but we cannot
328 : * convert the list inside the iterator. We don't want to loop over
329 : * the list twice (once to see if the value can be inserted and once
330 : * to do the actual insert), so we assume this value can be inserted
331 : * and convert the ziplist to a regular list if necessary. */
332 20 : listTypeTryConversion(subject,val);
333 :
334 : /* Seek refval from head to tail */
335 20 : iter = listTypeInitIterator(subject,0,REDIS_TAIL);
336 602 : while (listTypeNext(iter,&entry)) {
337 576 : if (listTypeEqual(&entry,refval)) {
338 14 : listTypeInsert(&entry,val,where);
339 14 : inserted = 1;
340 14 : break;
341 : }
342 : }
343 : listTypeReleaseIterator(iter);
344 :
345 20 : if (inserted) {
346 : /* Check if the length exceeds the ziplist length threshold. */
347 21 : if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
348 7 : ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
349 2 : listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
350 14 : signalModifiedKey(c->db,c->argv[1]);
351 14 : server.dirty++;
352 : } else {
353 : /* Notify client of a failed insert */
354 6 : addReply(c,shared.cnegone);
355 6 : return;
356 : }
357 : } else {
358 8 : listTypePush(subject,val,where);
359 8 : signalModifiedKey(c->db,c->argv[1]);
360 8 : server.dirty++;
361 : }
362 :
363 22 : addReplyLongLong(c,listTypeLength(subject));
364 : }
365 :
366 5 : void lpushxCommand(redisClient *c) {
367 5 : c->argv[2] = tryObjectEncoding(c->argv[2]);
368 5 : pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
369 5 : }
370 :
371 5 : void rpushxCommand(redisClient *c) {
372 5 : c->argv[2] = tryObjectEncoding(c->argv[2]);
373 5 : pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
374 5 : }
375 :
376 20 : void linsertCommand(redisClient *c) {
377 20 : c->argv[4] = tryObjectEncoding(c->argv[4]);
378 20 : if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
379 9 : pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
380 11 : } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
381 11 : pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
382 : } else {
383 0 : addReply(c,shared.syntaxerr);
384 : }
385 20 : }
386 :
387 2247 : void llenCommand(redisClient *c) {
388 2247 : robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
389 2247 : if (o == NULL || checkType(c,o,REDIS_LIST)) return;
390 2235 : addReplyLongLong(c,listTypeLength(o));
391 : }
392 :
393 48891 : void lindexCommand(redisClient *c) {
394 48891 : robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
395 48891 : if (o == NULL || checkType(c,o,REDIS_LIST)) return;
396 : long index;
397 48889 : robj *value = NULL;
398 :
399 48889 : if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
400 : return;
401 :
402 48889 : if (o->encoding == REDIS_ENCODING_ZIPLIST) {
403 : unsigned char *p;
404 : unsigned char *vstr;
405 : unsigned int vlen;
406 : long long vlong;
407 18962 : p = ziplistIndex(o->ptr,index);
408 18962 : if (ziplistGet(p,&vstr,&vlen,&vlong)) {
409 18962 : if (vstr) {
410 4310 : value = createStringObject((char*)vstr,vlen);
411 : } else {
412 14652 : value = createStringObjectFromLongLong(vlong);
413 : }
414 18962 : addReplyBulk(c,value);
415 18962 : decrRefCount(value);
416 : } else {
417 0 : addReply(c,shared.nullbulk);
418 : }
419 29927 : } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
420 29927 : listNode *ln = listIndex(o->ptr,index);
421 29927 : if (ln != NULL) {
422 29927 : value = listNodeValue(ln);
423 29927 : addReplyBulk(c,value);
424 : } else {
425 0 : addReply(c,shared.nullbulk);
426 : }
427 : } else {
428 0 : redisPanic("Unknown list encoding");
429 : }
430 : }
431 :
432 8 : void lsetCommand(redisClient *c) {
433 8 : robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
434 8 : if (o == NULL || checkType(c,o,REDIS_LIST)) return;
435 : long index;
436 6 : robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
437 :
438 6 : if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
439 : return;
440 :
441 6 : listTypeTryConversion(o,value);
442 6 : if (o->encoding == REDIS_ENCODING_ZIPLIST) {
443 3 : unsigned char *p, *zl = o->ptr;
444 3 : p = ziplistIndex(zl,index);
445 3 : if (p == NULL) {
446 1 : addReply(c,shared.outofrangeerr);
447 : } else {
448 2 : o->ptr = ziplistDelete(o->ptr,&p);
449 2 : value = getDecodedObject(value);
450 4 : o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
451 2 : decrRefCount(value);
452 2 : addReply(c,shared.ok);
453 2 : signalModifiedKey(c->db,c->argv[1]);
454 2 : server.dirty++;
455 : }
456 3 : } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
457 3 : listNode *ln = listIndex(o->ptr,index);
458 3 : if (ln == NULL) {
459 1 : addReply(c,shared.outofrangeerr);
460 : } else {
461 2 : decrRefCount((robj*)listNodeValue(ln));
462 2 : listNodeValue(ln) = value;
463 2 : incrRefCount(value);
464 2 : addReply(c,shared.ok);
465 2 : signalModifiedKey(c->db,c->argv[1]);
466 2 : server.dirty++;
467 : }
468 : } else {
469 0 : redisPanic("Unknown list encoding");
470 : }
471 : }
472 :
473 7225 : void popGenericCommand(redisClient *c, int where) {
474 7225 : robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
475 7225 : if (o == NULL || checkType(c,o,REDIS_LIST)) return;
476 :
477 7219 : robj *value = listTypePop(o,where);
478 7219 : if (value == NULL) {
479 0 : addReply(c,shared.nullbulk);
480 : } else {
481 7219 : addReplyBulk(c,value);
482 7219 : decrRefCount(value);
483 7219 : if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
484 7219 : signalModifiedKey(c->db,c->argv[1]);
485 7219 : server.dirty++;
486 : }
487 : }
488 :
489 3620 : void lpopCommand(redisClient *c) {
490 3620 : popGenericCommand(c,REDIS_HEAD);
491 3620 : }
492 :
493 3605 : void rpopCommand(redisClient *c) {
494 3605 : popGenericCommand(c,REDIS_TAIL);
495 3605 : }
496 :
497 2175 : void lrangeCommand(redisClient *c) {
498 : robj *o;
499 : long start, end, llen, rangelen;
500 :
501 4350 : if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
502 2175 : (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
503 :
504 4343 : if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
505 2168 : || checkType(c,o,REDIS_LIST)) return;
506 2168 : llen = listTypeLength(o);
507 :
508 : /* convert negative indexes */
509 2168 : if (start < 0) start = llen+start;
510 2168 : if (end < 0) end = llen+end;
511 2168 : if (start < 0) start = 0;
512 :
513 : /* Invariant: start >= 0, so this test will be true when end < 0.
514 : * The range is empty when start > end or start >= length. */
515 2168 : if (start > end || start >= llen) {
516 4 : addReply(c,shared.emptymultibulk);
517 4 : return;
518 : }
519 2164 : if (end >= llen) end = llen-1;
520 2164 : rangelen = (end-start)+1;
521 :
522 : /* Return the result in form of a multi-bulk reply */
523 2164 : addReplyMultiBulkLen(c,rangelen);
524 2164 : if (o->encoding == REDIS_ENCODING_ZIPLIST) {
525 1116 : unsigned char *p = ziplistIndex(o->ptr,start);
526 : unsigned char *vstr;
527 : unsigned int vlen;
528 : long long vlong;
529 :
530 13910 : while(rangelen--) {
531 11678 : ziplistGet(p,&vstr,&vlen,&vlong);
532 11678 : if (vstr) {
533 168 : addReplyBulkCBuffer(c,vstr,vlen);
534 : } else {
535 11510 : addReplyBulkLongLong(c,vlong);
536 : }
537 11678 : p = ziplistNext(o->ptr,p);
538 : }
539 1048 : } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
540 : listNode *ln;
541 :
542 : /* If we are nearest to the end of the list, reach the element
543 : * starting from tail and going backward, as it is faster. */
544 1048 : if (start > llen/2) start -= llen;
545 1048 : ln = listIndex(o->ptr,start);
546 :
547 13867 : while(rangelen--) {
548 11771 : addReplyBulk(c,ln->value);
549 11771 : ln = ln->next;
550 : }
551 : } else {
552 0 : redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
553 : }
554 : }
555 :
556 2028 : void ltrimCommand(redisClient *c) {
557 : robj *o;
558 : long start, end, llen, j, ltrim, rtrim;
559 : list *list;
560 : listNode *ln;
561 :
562 4056 : if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
563 2028 : (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
564 :
565 4056 : if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
566 2028 : checkType(c,o,REDIS_LIST)) return;
567 2028 : llen = listTypeLength(o);
568 :
569 : /* convert negative indexes */
570 2028 : if (start < 0) start = llen+start;
571 2028 : if (end < 0) end = llen+end;
572 2028 : if (start < 0) start = 0;
573 :
574 : /* Invariant: start >= 0, so this test will be true when end < 0.
575 : * The range is empty when start > end or start >= length. */
576 2030 : if (start > end || start >= llen) {
577 : /* Out of range start or start > end result in empty list */
578 2 : ltrim = llen;
579 2 : rtrim = 0;
580 : } else {
581 2026 : if (end >= llen) end = llen-1;
582 2026 : ltrim = start;
583 2026 : rtrim = llen-end-1;
584 : }
585 :
586 : /* Remove list elements to perform the trim */
587 2028 : if (o->encoding == REDIS_ENCODING_ZIPLIST) {
588 1014 : o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
589 1014 : o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
590 1014 : } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
591 1014 : list = o->ptr;
592 15969 : for (j = 0; j < ltrim; j++) {
593 14955 : ln = listFirst(list);
594 14955 : listDelNode(list,ln);
595 : }
596 6490 : for (j = 0; j < rtrim; j++) {
597 5476 : ln = listLast(list);
598 5476 : listDelNode(list,ln);
599 : }
600 : } else {
601 0 : redisPanic("Unknown list encoding");
602 : }
603 2028 : if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
604 2028 : signalModifiedKey(c->db,c->argv[1]);
605 2028 : server.dirty++;
606 2028 : addReply(c,shared.ok);
607 : }
608 :
609 3124 : void lremCommand(redisClient *c) {
610 : robj *subject, *obj;
611 3124 : obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
612 : long toremove;
613 3124 : long removed = 0;
614 : listTypeEntry entry;
615 :
616 3124 : if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
617 : return;
618 :
619 3124 : subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
620 3124 : if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
621 :
622 : /* Make sure obj is raw when we're dealing with a ziplist */
623 3124 : if (subject->encoding == REDIS_ENCODING_ZIPLIST)
624 2549 : obj = getDecodedObject(obj);
625 :
626 : listTypeIterator *li;
627 3124 : if (toremove < 0) {
628 4 : toremove = -toremove;
629 4 : li = listTypeInitIterator(subject,-1,REDIS_HEAD);
630 : } else {
631 3120 : li = listTypeInitIterator(subject,0,REDIS_TAIL);
632 : }
633 :
634 8875 : while (listTypeNext(li,&entry)) {
635 5759 : if (listTypeEqual(&entry,obj)) {
636 2991 : listTypeDelete(&entry);
637 2991 : server.dirty++;
638 2991 : removed++;
639 2991 : if (toremove && removed == toremove) break;
640 : }
641 : }
642 : listTypeReleaseIterator(li);
643 :
644 : /* Clean up raw encoded object */
645 3124 : if (subject->encoding == REDIS_ENCODING_ZIPLIST)
646 2549 : decrRefCount(obj);
647 :
648 3124 : if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
649 3124 : addReplyLongLong(c,removed);
650 3124 : if (removed) signalModifiedKey(c->db,c->argv[1]);
651 : }
652 :
653 : /* This is the semantic of this command:
654 : * RPOPLPUSH srclist dstlist:
655 : * IF LLEN(srclist) > 0
656 : * element = RPOP srclist
657 : * LPUSH dstlist element
658 : * RETURN element
659 : * ELSE
660 : * RETURN nil
661 : * END
662 : * END
663 : *
664 : * The idea is to be able to get an element from a list in a reliable way
665 : * since the element is not just returned but pushed against another list
666 : * as well. This command was originally proposed by Ezra Zygmuntowicz.
667 : */
668 :
669 30 : void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
670 30 : if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
671 : /* Create the list if the key does not exist */
672 26 : if (!dstobj) {
673 13 : dstobj = createZiplistObject();
674 13 : dbAdd(c->db,dstkey,dstobj);
675 : } else {
676 13 : signalModifiedKey(c->db,dstkey);
677 : }
678 26 : listTypePush(dstobj,value,REDIS_HEAD);
679 : /* Additionally propagate this PUSH operation together with
680 : * the operation performed by the command. */
681 : {
682 26 : robj **argv = zmalloc(sizeof(robj*)*3);
683 26 : argv[0] = createStringObject("LPUSH",5);
684 26 : argv[1] = dstkey;
685 26 : argv[2] = value;
686 26 : incrRefCount(argv[1]);
687 26 : incrRefCount(argv[2]);
688 26 : alsoPropagate(server.lpushCommand,c->db->id,argv,3,
689 : REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
690 : }
691 : }
692 : /* Always send the pushed value to the client. */
693 30 : addReplyBulk(c,value);
694 30 : }
695 :
696 24 : void rpoplpushCommand(redisClient *c) {
697 : robj *sobj, *value;
698 46 : if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
699 22 : checkType(c,sobj,REDIS_LIST)) return;
700 :
701 21 : if (listTypeLength(sobj) == 0) {
702 0 : addReply(c,shared.nullbulk);
703 : } else {
704 21 : robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
705 21 : robj *touchedkey = c->argv[1];
706 :
707 21 : if (dobj && checkType(c,dobj,REDIS_LIST)) return;
708 19 : value = listTypePop(sobj,REDIS_TAIL);
709 : /* We saved touched key, and protect it, since rpoplpushHandlePush
710 : * may change the client command argument vector. */
711 19 : incrRefCount(touchedkey);
712 19 : rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
713 :
714 : /* listTypePop returns an object with its refcount incremented */
715 19 : decrRefCount(value);
716 :
717 : /* Delete the source list when it is empty */
718 19 : if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
719 19 : signalModifiedKey(c->db,touchedkey);
720 19 : decrRefCount(touchedkey);
721 19 : server.dirty++;
722 :
723 : /* Replicate this as a simple RPOP since the LPUSH side is replicated
724 : * by rpoplpushHandlePush() call if needed (it may not be needed
725 : * if a client is blocking wait a push against the list). */
726 19 : rewriteClientCommandVector(c,2,
727 : resetRefCount(createStringObject("RPOP",4)),
728 19 : c->argv[1]);
729 : }
730 : }
731 :
732 : /*-----------------------------------------------------------------------------
733 : * Blocking POP operations
734 : *----------------------------------------------------------------------------*/
735 :
736 : /* Currently Redis blocking operations support is limited to list POP ops,
737 : * so the current implementation is not fully generic, but it is also not
738 : * completely specific so it will not require a rewrite to support new
739 : * kind of blocking operations in the future.
740 : *
741 : * Still it's important to note that list blocking operations can be already
742 : * used as a notification mechanism in order to implement other blocking
743 : * operations at application level, so there must be a very strong evidence
744 : * of usefulness and generality before new blocking operations are implemented.
745 : *
746 : * This is how the current blocking POP works, we use BLPOP as example:
747 : * - If the user calls BLPOP and the key exists and contains a non empty list
748 : * then LPOP is called instead. So BLPOP is semantically the same as LPOP
749 : * if there is not to block.
750 : * - If instead BLPOP is called and the key does not exists or the list is
751 : * empty we need to block. In order to do so we remove the notification for
752 : * new data to read in the client socket (so that we'll not serve new
753 : * requests if the blocking request is not served). Also we put the client
754 : * in a dictionary (db->blocking_keys) mapping keys to a list of clients
755 : * blocking for this keys.
756 : * - If a PUSH operation against a key with blocked clients waiting is
757 : * performed, we serve the first in the list: basically instead to push
758 : * the new element inside the list we return it to the (first / oldest)
759 : * blocking client, unblock the client, and remove it form the list.
760 : *
761 : * The above comment and the source code should be enough in order to understand
762 : * the implementation and modify / fix it later.
763 : */
764 :
765 : /* Set a client in blocking mode for the specified key, with the specified
766 : * timeout */
767 29 : void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
768 : dictEntry *de;
769 : list *l;
770 : int j;
771 :
772 29 : c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
773 29 : c->bpop.count = numkeys;
774 29 : c->bpop.timeout = timeout;
775 29 : c->bpop.target = target;
776 :
777 29 : if (target != NULL) {
778 15 : incrRefCount(target);
779 : }
780 :
781 64 : for (j = 0; j < numkeys; j++) {
782 : /* Add the key in the client structure, to map clients -> keys */
783 35 : c->bpop.keys[j] = keys[j];
784 35 : incrRefCount(keys[j]);
785 :
786 : /* And in the other "side", to map keys -> clients */
787 35 : de = dictFind(c->db->blocking_keys,keys[j]);
788 35 : if (de == NULL) {
789 : int retval;
790 :
791 : /* For every key we take a list of clients blocked for it */
792 34 : l = listCreate();
793 34 : retval = dictAdd(c->db->blocking_keys,keys[j],l);
794 34 : incrRefCount(keys[j]);
795 34 : redisAssertWithInfo(c,keys[j],retval == DICT_OK);
796 : } else {
797 1 : l = dictGetVal(de);
798 : }
799 35 : listAddNodeTail(l,c);
800 : }
801 : /* Mark the client as a blocked client */
802 29 : c->flags |= REDIS_BLOCKED;
803 29 : server.bpop_blocked_clients++;
804 29 : }
805 :
806 : /* Unblock a client that's waiting in a blocking operation such as BLPOP */
807 29 : void unblockClientWaitingData(redisClient *c) {
808 : dictEntry *de;
809 : list *l;
810 : int j;
811 :
812 29 : redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
813 : /* The client may wait for multiple keys, so unblock it for every key. */
814 64 : for (j = 0; j < c->bpop.count; j++) {
815 : /* Remove this client from the list of clients waiting for this key. */
816 35 : de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
817 35 : redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
818 35 : l = dictGetVal(de);
819 35 : listDelNode(l,listSearchKey(l,c));
820 : /* If the list is empty we need to remove it to avoid wasting memory */
821 35 : if (listLength(l) == 0)
822 34 : dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
823 35 : decrRefCount(c->bpop.keys[j]);
824 : }
825 :
826 : /* Cleanup the client structure */
827 29 : zfree(c->bpop.keys);
828 29 : c->bpop.keys = NULL;
829 29 : if (c->bpop.target) decrRefCount(c->bpop.target);
830 29 : c->bpop.target = NULL;
831 29 : c->flags &= ~REDIS_BLOCKED;
832 29 : c->flags |= REDIS_UNBLOCKED;
833 29 : server.bpop_blocked_clients--;
834 29 : listAddNodeTail(server.unblocked_clients,c);
835 29 : }
836 :
837 : /* This should be called from any function PUSHing into lists.
838 : * 'c' is the "pushing client", 'key' is the key it is pushing data against,
839 : * 'ele' is the element pushed.
840 : *
841 : * If the function returns 0 there was no client waiting for a list push
842 : * against this key.
843 : *
844 : * If the function returns 1 there was a client waiting for a list push
845 : * against this key, the element was passed to this client thus it's not
846 : * needed to actually add it to the list and the caller should return asap. */
847 15200 : int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
848 : struct dictEntry *de;
849 : redisClient *receiver;
850 : int numclients;
851 : list *clients;
852 : listNode *ln;
853 : robj *dstkey, *dstobj;
854 :
855 15200 : de = dictFind(c->db->blocking_keys,key);
856 15200 : if (de == NULL) return 0;
857 24 : clients = dictGetVal(de);
858 24 : numclients = listLength(clients);
859 :
860 : /* Try to handle the push as long as there are clients waiting for a push.
861 : * Note that "numclients" is used because the list of clients waiting for a
862 : * push on "key" is deleted by unblockClient() when empty.
863 : *
864 : * This loop will have more than 1 iteration when there is a BRPOPLPUSH
865 : * that cannot push the target list because it does not contain a list. If
866 : * this happens, it simply tries the next client waiting for a push. */
867 50 : while (numclients--) {
868 25 : ln = listFirst(clients);
869 25 : redisAssertWithInfo(c,key,ln != NULL);
870 25 : receiver = ln->value;
871 25 : dstkey = receiver->bpop.target;
872 :
873 : /* Protect receiver->bpop.target, that will be freed by
874 : * the next unblockClientWaitingData() call. */
875 25 : if (dstkey) incrRefCount(dstkey);
876 :
877 : /* This should remove the first element of the "clients" list. */
878 25 : unblockClientWaitingData(receiver);
879 :
880 25 : if (dstkey == NULL) {
881 : /* BRPOP/BLPOP */
882 12 : addReplyMultiBulkLen(receiver,2);
883 12 : addReplyBulk(receiver,key);
884 12 : addReplyBulk(receiver,ele);
885 12 : return 1; /* Serve just the first client as in B[RL]POP semantics */
886 : } else {
887 : /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
888 13 : dstobj = lookupKeyWrite(receiver->db,dstkey);
889 13 : if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
890 11 : rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
891 11 : decrRefCount(dstkey);
892 11 : return 1;
893 : }
894 2 : decrRefCount(dstkey);
895 : }
896 : }
897 :
898 1 : return 0;
899 : }
900 :
901 66 : int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
902 : long tval;
903 :
904 66 : if (getLongFromObjectOrReply(c,object,&tval,
905 : "timeout is not an integer or out of range") != REDIS_OK)
906 2 : return REDIS_ERR;
907 :
908 64 : if (tval < 0) {
909 2 : addReplyError(c,"timeout is negative");
910 2 : return REDIS_ERR;
911 : }
912 :
913 62 : if (tval > 0) tval += server.unixtime;
914 62 : *timeout = tval;
915 :
916 62 : return REDIS_OK;
917 : }
918 :
919 : /* Blocking RPOP/LPOP */
920 43 : void blockingPopGenericCommand(redisClient *c, int where) {
921 : robj *o;
922 : time_t timeout;
923 : int j;
924 :
925 43 : if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
926 : return;
927 :
928 66 : for (j = 1; j < c->argc-1; j++) {
929 51 : o = lookupKeyWrite(c->db,c->argv[j]);
930 51 : if (o != NULL) {
931 24 : if (o->type != REDIS_LIST) {
932 2 : addReply(c,shared.wrongtypeerr);
933 2 : return;
934 : } else {
935 22 : if (listTypeLength(o) != 0) {
936 : /* Non empty list, this is like a non normal [LR]POP. */
937 22 : robj *value = listTypePop(o,where);
938 22 : redisAssert(value != NULL);
939 :
940 22 : addReplyMultiBulkLen(c,2);
941 22 : addReplyBulk(c,c->argv[j]);
942 22 : addReplyBulk(c,value);
943 22 : decrRefCount(value);
944 22 : if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
945 22 : signalModifiedKey(c->db,c->argv[j]);
946 22 : server.dirty++;
947 :
948 : /* Replicate it as an [LR]POP instead of B[LR]POP. */
949 22 : rewriteClientCommandVector(c,2,
950 : (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
951 22 : c->argv[j]);
952 22 : return;
953 : }
954 : }
955 : }
956 : }
957 :
958 : /* If we are inside a MULTI/EXEC and the list is empty the only thing
959 : * we can do is treating it as a timeout (even with timeout 0). */
960 15 : if (c->flags & REDIS_MULTI) {
961 1 : addReply(c,shared.nullmultibulk);
962 1 : return;
963 : }
964 :
965 : /* If the list is empty or the key does not exists we must block */
966 14 : blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
967 : }
968 :
969 25 : void blpopCommand(redisClient *c) {
970 25 : blockingPopGenericCommand(c,REDIS_HEAD);
971 25 : }
972 :
973 18 : void brpopCommand(redisClient *c) {
974 18 : blockingPopGenericCommand(c,REDIS_TAIL);
975 18 : }
976 :
977 23 : void brpoplpushCommand(redisClient *c) {
978 : time_t timeout;
979 :
980 23 : if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
981 : return;
982 :
983 23 : robj *key = lookupKeyWrite(c->db, c->argv[1]);
984 :
985 23 : if (key == NULL) {
986 16 : if (c->flags & REDIS_MULTI) {
987 :
988 : /* Blocking against an empty list in a multi state
989 : * returns immediately. */
990 1 : addReply(c, shared.nullbulk);
991 : } else {
992 : /* The list is empty and the client blocks. */
993 15 : blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
994 : }
995 : } else {
996 7 : if (key->type != REDIS_LIST) {
997 1 : addReply(c, shared.wrongtypeerr);
998 : } else {
999 :
1000 : /* The list exists and has elements, so
1001 : * the regular rpoplpushCommand is executed. */
1002 6 : redisAssertWithInfo(c,key,listTypeLength(key) > 0);
1003 6 : rpoplpushCommand(c);
1004 : }
1005 : }
1006 : }
|