1 : #include "redis.h"
2 : #include <sys/uio.h>
3 :
4 : static void setProtocolError(redisClient *c, int pos);
5 :
6 : /* To evaluate the output buffer size of a client we need to get size of
7 : * allocated objects, however we can't used zmalloc_size() directly on sds
8 : * strings because of the trick they use to work (the header is before the
9 : * returned pointer), so we use this helper function. */
10 6681824 : size_t zmalloc_size_sds(sds s) {
11 6681824 : return zmalloc_size(s-sizeof(struct sdshdr));
12 : }
13 :
14 133 : void *dupClientReplyValue(void *o) {
15 133 : incrRefCount((robj*)o);
16 133 : return o;
17 : }
18 :
19 12 : int listMatchObjects(void *a, void *b) {
20 12 : return equalStringObjects(a,b);
21 : }
22 :
23 255 : redisClient *createClient(int fd) {
24 255 : redisClient *c = zmalloc(sizeof(redisClient));
25 :
26 : /* passing -1 as fd it is possible to create a non connected client.
27 : * This is useful since all the Redis commands needs to be executed
28 : * in the context of a client. When commands are executed in other
29 : * contexts (for instance a Lua script) we need a non connected client. */
30 255 : if (fd != -1) {
31 202 : anetNonBlock(NULL,fd);
32 202 : anetTcpNoDelay(NULL,fd);
33 202 : if (aeCreateFileEvent(server.el,fd,AE_READABLE,
34 : readQueryFromClient, c) == AE_ERR)
35 : {
36 0 : close(fd);
37 0 : zfree(c);
38 0 : return NULL;
39 : }
40 : }
41 :
42 255 : selectDb(c,0);
43 255 : c->fd = fd;
44 255 : c->bufpos = 0;
45 255 : c->querybuf = sdsempty();
46 255 : c->querybuf_peak = 0;
47 255 : c->reqtype = 0;
48 255 : c->argc = 0;
49 255 : c->argv = NULL;
50 255 : c->cmd = c->lastcmd = NULL;
51 255 : c->multibulklen = 0;
52 255 : c->bulklen = -1;
53 255 : c->sentlen = 0;
54 255 : c->flags = 0;
55 255 : c->ctime = c->lastinteraction = server.unixtime;
56 255 : c->authenticated = 0;
57 255 : c->replstate = REDIS_REPL_NONE;
58 255 : c->reply = listCreate();
59 255 : c->reply_bytes = 0;
60 255 : c->obuf_soft_limit_reached_time = 0;
61 255 : listSetFreeMethod(c->reply,decrRefCount);
62 255 : listSetDupMethod(c->reply,dupClientReplyValue);
63 255 : c->bpop.keys = NULL;
64 255 : c->bpop.count = 0;
65 255 : c->bpop.timeout = 0;
66 255 : c->bpop.target = NULL;
67 255 : c->io_keys = listCreate();
68 255 : c->watched_keys = listCreate();
69 255 : listSetFreeMethod(c->io_keys,decrRefCount);
70 255 : c->pubsub_channels = dictCreate(&setDictType,NULL);
71 255 : c->pubsub_patterns = listCreate();
72 255 : listSetFreeMethod(c->pubsub_patterns,decrRefCount);
73 255 : listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
74 255 : if (fd != -1) listAddNodeTail(server.clients,c);
75 255 : initClientMultiState(c);
76 255 : return c;
77 : }
78 :
79 : /* This function is called every time we are going to transmit new data
80 : * to the client. The behavior is the following:
81 : *
82 : * If the client should receive new data (normal clients will) the function
83 : * returns REDIS_OK, and make sure to install the write handler in our event
84 : * loop so that when the socket is writable new data gets written.
85 : *
86 : * If the client should not receive new data, because it is a fake client
87 : * or a slave, or because the setup of the write handler failed, the function
88 : * returns REDIS_ERR.
89 : *
90 : * Typically gets called every time a reply is built, before adding more
91 : * data to the clients output buffers. If the function returns REDIS_ERR no
92 : * data should be appended to the output buffers. */
93 6517067 : int prepareClientToWrite(redisClient *c) {
94 6517067 : if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
95 6516845 : if (c->fd <= 0) return REDIS_ERR; /* Fake client */
96 8648068 : if (c->bufpos == 0 && listLength(c->reply) == 0 &&
97 1066007 : (c->replstate == REDIS_REPL_NONE ||
98 : c->replstate == REDIS_REPL_ONLINE) &&
99 : aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
100 1066005 : sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
101 6516056 : return REDIS_OK;
102 : }
103 :
104 : /* Create a duplicate of the last object in the reply list when
105 : * it is not exclusively owned by the reply list. */
106 3334602 : robj *dupLastObjectIfNeeded(list *reply) {
107 : robj *new, *cur;
108 : listNode *ln;
109 3334602 : redisAssert(listLength(reply) > 0);
110 3334602 : ln = listLast(reply);
111 3334602 : cur = listNodeValue(ln);
112 3334602 : if (cur->refcount > 1) {
113 2592 : new = dupStringObject(cur);
114 2592 : decrRefCount(cur);
115 2592 : listNodeValue(ln) = new;
116 : }
117 3334602 : return listNodeValue(ln);
118 : }
119 :
120 : /* -----------------------------------------------------------------------------
121 : * Low level functions to add more data to output buffers.
122 : * -------------------------------------------------------------------------- */
123 :
124 6514801 : int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
125 6514801 : size_t available = sizeof(c->buf)-c->bufpos;
126 :
127 6514801 : if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
128 :
129 : /* If there already are entries in the reply list, we cannot
130 : * add anything more to the static buffer. */
131 6514801 : if (listLength(c->reply) > 0) return REDIS_ERR;
132 :
133 : /* Check that the buffer has enough space available for this string. */
134 3176133 : if (len > available) return REDIS_ERR;
135 :
136 3174661 : memcpy(c->buf+c->bufpos,s,len);
137 3174661 : c->bufpos+=len;
138 3174661 : return REDIS_OK;
139 : }
140 :
141 3308405 : void _addReplyObjectToList(redisClient *c, robj *o) {
142 : robj *tail;
143 :
144 3308405 : if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
145 :
146 3308405 : if (listLength(c->reply) == 0) {
147 1472 : incrRefCount(o);
148 1472 : listAddNodeTail(c->reply,o);
149 1472 : c->reply_bytes += zmalloc_size_sds(o->ptr);
150 : } else {
151 3306933 : tail = listNodeValue(listLast(c->reply));
152 :
153 : /* Append to this object when possible. */
154 9915316 : if (tail->ptr != NULL &&
155 9916464 : sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
156 : {
157 3302895 : c->reply_bytes -= zmalloc_size_sds(tail->ptr);
158 3302895 : tail = dupLastObjectIfNeeded(c->reply);
159 6605790 : tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
160 3302895 : c->reply_bytes += zmalloc_size_sds(tail->ptr);
161 : } else {
162 4038 : incrRefCount(o);
163 4038 : listAddNodeTail(c->reply,o);
164 4038 : c->reply_bytes += zmalloc_size_sds(o->ptr);
165 : }
166 : }
167 3308405 : asyncCloseClientOnOutputBufferLimitReached(c);
168 : }
169 :
170 : /* This method takes responsibility over the sds. When it is no longer
171 : * needed it will be free'd, otherwise it ends up in a robj. */
172 4 : void _addReplySdsToList(redisClient *c, sds s) {
173 : robj *tail;
174 :
175 4 : if (c->flags & REDIS_CLOSE_AFTER_REPLY) {
176 0 : sdsfree(s);
177 0 : return;
178 : }
179 :
180 4 : if (listLength(c->reply) == 0) {
181 0 : listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
182 0 : c->reply_bytes += zmalloc_size_sds(s);
183 : } else {
184 4 : tail = listNodeValue(listLast(c->reply));
185 :
186 : /* Append to this object when possible. */
187 12 : if (tail->ptr != NULL &&
188 8 : sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
189 : {
190 4 : c->reply_bytes -= zmalloc_size_sds(tail->ptr);
191 4 : tail = dupLastObjectIfNeeded(c->reply);
192 4 : tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
193 4 : c->reply_bytes += zmalloc_size_sds(tail->ptr);
194 4 : sdsfree(s);
195 : } else {
196 0 : listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
197 0 : c->reply_bytes += zmalloc_size_sds(s);
198 : }
199 : }
200 4 : asyncCloseClientOnOutputBufferLimitReached(c);
201 : }
202 :
203 31731 : void _addReplyStringToList(redisClient *c, char *s, size_t len) {
204 : robj *tail;
205 :
206 31731 : if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
207 :
208 31731 : if (listLength(c->reply) == 0) {
209 0 : robj *o = createStringObject(s,len);
210 :
211 0 : listAddNodeTail(c->reply,o);
212 0 : c->reply_bytes += zmalloc_size_sds(o->ptr);
213 : } else {
214 31731 : tail = listNodeValue(listLast(c->reply));
215 :
216 : /* Append to this object when possible. */
217 95139 : if (tail->ptr != NULL &&
218 63410 : sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
219 : {
220 31703 : c->reply_bytes -= zmalloc_size_sds(tail->ptr);
221 31703 : tail = dupLastObjectIfNeeded(c->reply);
222 31703 : tail->ptr = sdscatlen(tail->ptr,s,len);
223 31703 : c->reply_bytes += zmalloc_size_sds(tail->ptr);
224 : } else {
225 28 : robj *o = createStringObject(s,len);
226 :
227 28 : listAddNodeTail(c->reply,o);
228 28 : c->reply_bytes += zmalloc_size_sds(o->ptr);
229 : }
230 : }
231 31731 : asyncCloseClientOnOutputBufferLimitReached(c);
232 : }
233 :
234 : /* -----------------------------------------------------------------------------
235 : * Higher level functions to queue data on the client output buffer.
236 : * The following functions are the ones that commands implementations will call.
237 : * -------------------------------------------------------------------------- */
238 :
239 5685021 : void addReply(redisClient *c, robj *obj) {
240 5685021 : if (prepareClientToWrite(c) != REDIS_OK) return;
241 :
242 : /* This is an important place where we can avoid copy-on-write
243 : * when there is a saving child running, avoiding touching the
244 : * refcount field of the object if it's not needed.
245 : *
246 : * If the encoding is RAW and there is room in the static buffer
247 : * we'll be able to send the object to the client without
248 : * messing with its page. */
249 5684445 : if (obj->encoding == REDIS_ENCODING_RAW) {
250 11014716 : if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
251 3245694 : _addReplyObjectToList(c,obj);
252 177087 : } else if (obj->encoding == REDIS_ENCODING_INT) {
253 : /* Optimization: if there is room in the static buffer for 32 bytes
254 : * (more than the max chars a 64 bit integer can take as string) we
255 : * avoid decoding the object and go for the lower level approach. */
256 177087 : if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
257 : char buf[32];
258 : int len;
259 :
260 114364 : len = ll2string(buf,sizeof(buf),(long)obj->ptr);
261 114364 : if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
262 : return;
263 : /* else... continue with the normal code path, but should never
264 : * happen actually since we verified there is room. */
265 : }
266 62723 : obj = getDecodedObject(obj);
267 125446 : if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
268 62711 : _addReplyObjectToList(c,obj);
269 62723 : decrRefCount(obj);
270 : } else {
271 0 : redisPanic("Wrong obj->encoding in addReply()");
272 : }
273 : }
274 :
275 18354 : void addReplySds(redisClient *c, sds s) {
276 18354 : if (prepareClientToWrite(c) != REDIS_OK) {
277 : /* The caller expects the sds to be free'd. */
278 0 : sdsfree(s);
279 0 : return;
280 : }
281 18354 : if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
282 18350 : sdsfree(s);
283 : } else {
284 : /* This method free's the sds when it is no longer needed. */
285 4 : _addReplySdsToList(c,s);
286 : }
287 : }
288 :
289 812215 : void addReplyString(redisClient *c, char *s, size_t len) {
290 812215 : if (prepareClientToWrite(c) != REDIS_OK) return;
291 812002 : if (_addReplyToBuffer(c,s,len) != REDIS_OK)
292 31731 : _addReplyStringToList(c,s,len);
293 : }
294 :
295 83 : void addReplyErrorLength(redisClient *c, char *s, size_t len) {
296 83 : addReplyString(c,"-ERR ",5);
297 83 : addReplyString(c,s,len);
298 83 : addReplyString(c,"\r\n",2);
299 83 : }
300 :
301 74 : void addReplyError(redisClient *c, char *err) {
302 74 : addReplyErrorLength(c,err,strlen(err));
303 74 : }
304 :
305 9 : void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
306 : size_t l, j;
307 : va_list ap;
308 9 : va_start(ap,fmt);
309 9 : sds s = sdscatvprintf(sdsempty(),fmt,ap);
310 9 : va_end(ap);
311 : /* Make sure there are no newlines in the string, otherwise invalid protocol
312 : * is emitted. */
313 9 : l = sdslen(s);
314 643 : for (j = 0; j < l; j++) {
315 634 : if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
316 : }
317 9 : addReplyErrorLength(c,s,sdslen(s));
318 9 : sdsfree(s);
319 9 : }
320 :
321 129452 : void addReplyStatusLength(redisClient *c, char *s, size_t len) {
322 129452 : addReplyString(c,"+",1);
323 129452 : addReplyString(c,s,len);
324 129452 : addReplyString(c,"\r\n",2);
325 129452 : }
326 :
327 126052 : void addReplyStatus(redisClient *c, char *status) {
328 126052 : addReplyStatusLength(c,status,strlen(status));
329 126052 : }
330 :
331 3400 : void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
332 : va_list ap;
333 3400 : va_start(ap,fmt);
334 3400 : sds s = sdscatvprintf(sdsempty(),fmt,ap);
335 3400 : va_end(ap);
336 3400 : addReplyStatusLength(c,s,sdslen(s));
337 3400 : sdsfree(s);
338 3400 : }
339 :
340 : /* Adds an empty object to the reply list that will contain the multi bulk
341 : * length, which is not known when this function is called. */
342 1477 : void *addDeferredMultiBulkLength(redisClient *c) {
343 : /* Note that we install the write event here even if the object is not
344 : * ready to be sent, since we are sure that before returning to the
345 : * event loop setDeferredMultiBulkLength() will be called. */
346 1477 : if (prepareClientToWrite(c) != REDIS_OK) return NULL;
347 1477 : listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
348 1477 : return listLast(c->reply);
349 : }
350 :
351 : /* Populate the length object and try glueing it to the next chunk. */
352 1477 : void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
353 1477 : listNode *ln = (listNode*)node;
354 : robj *len, *next;
355 :
356 : /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
357 1477 : if (node == NULL) return;
358 :
359 1477 : len = listNodeValue(ln);
360 1477 : len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
361 1477 : c->reply_bytes += zmalloc_size_sds(len->ptr);
362 1477 : if (ln->next != NULL) {
363 1471 : next = listNodeValue(ln->next);
364 :
365 : /* Only glue when the next node is non-NULL (an sds in this case) */
366 1471 : if (next->ptr != NULL) {
367 2942 : len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
368 1471 : listDelNode(c->reply,ln->next);
369 : }
370 : }
371 1477 : asyncCloseClientOnOutputBufferLimitReached(c);
372 : }
373 :
374 : /* Add a duble as a bulk reply */
375 46396 : void addReplyDouble(redisClient *c, double d) {
376 : char dbuf[128], sbuf[128];
377 : int dlen, slen;
378 92792 : dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
379 92792 : slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
380 46396 : addReplyString(c,sbuf,slen);
381 46396 : }
382 :
383 : /* Add a long long as integer reply or bulk len / multi bulk count.
384 : * Basically this is used to output <prefix><long long><crlf>. */
385 1845661 : void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) {
386 : char buf[128];
387 : int len;
388 :
389 : /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
390 : * so we have a few shared objects to use if the integer is small
391 : * like it is most of the times. */
392 1845661 : if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN) {
393 278361 : addReply(c,shared.mbulkhdr[ll]);
394 278361 : return;
395 1567300 : } else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN) {
396 1311288 : addReply(c,shared.bulkhdr[ll]);
397 1311288 : return;
398 : }
399 :
400 256012 : buf[0] = prefix;
401 256012 : len = ll2string(buf+1,sizeof(buf)-1,ll);
402 256012 : buf[len+1] = '\r';
403 256012 : buf[len+2] = '\n';
404 256012 : addReplyString(c,buf,len+3);
405 : }
406 :
407 340313 : void addReplyLongLong(redisClient *c, long long ll) {
408 340313 : if (ll == 0)
409 17659 : addReply(c,shared.czero);
410 322654 : else if (ll == 1)
411 196269 : addReply(c,shared.cone);
412 : else
413 126385 : addReplyLongLongWithPrefix(c,ll,':');
414 340313 : }
415 :
416 278387 : void addReplyMultiBulkLen(redisClient *c, long length) {
417 278387 : addReplyLongLongWithPrefix(c,length,'*');
418 278387 : }
419 :
420 : /* Create the length prefix of a bulk reply, example: $2234 */
421 1319687 : void addReplyBulkLen(redisClient *c, robj *obj) {
422 : size_t len;
423 :
424 1319687 : if (obj->encoding == REDIS_ENCODING_RAW) {
425 2285226 : len = sdslen(obj->ptr);
426 : } else {
427 177074 : long n = (long)obj->ptr;
428 :
429 : /* Compute how many bytes will take this integer as a radix 10 string */
430 177074 : len = 1;
431 177074 : if (n < 0) {
432 3179 : len++;
433 3179 : n = -n;
434 : }
435 1167572 : while((n = n/10) != 0) {
436 990498 : len++;
437 : }
438 : }
439 1319687 : addReplyLongLongWithPrefix(c,len,'$');
440 1319687 : }
441 :
442 : /* Add a Redis Object as a bulk reply */
443 1319687 : void addReplyBulk(redisClient *c, robj *obj) {
444 1319687 : addReplyBulkLen(c,obj);
445 1319687 : addReply(c,obj);
446 1319687 : addReply(c,shared.crlf);
447 1319687 : }
448 :
449 : /* Add a C buffer as bulk reply */
450 121202 : void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) {
451 121202 : addReplyLongLongWithPrefix(c,len,'$');
452 121202 : addReplyString(c,p,len);
453 121202 : addReply(c,shared.crlf);
454 121202 : }
455 :
456 : /* Add a C nul term string as bulk reply */
457 14 : void addReplyBulkCString(redisClient *c, char *s) {
458 14 : if (s == NULL) {
459 0 : addReply(c,shared.nullbulk);
460 : } else {
461 14 : addReplyBulkCBuffer(c,s,strlen(s));
462 : }
463 14 : }
464 :
465 : /* Add a long long as a bulk reply */
466 44782 : void addReplyBulkLongLong(redisClient *c, long long ll) {
467 : char buf[64];
468 : int len;
469 :
470 44782 : len = ll2string(buf,64,ll);
471 44782 : addReplyBulkCBuffer(c,buf,len);
472 44782 : }
473 :
474 : /* Copy 'src' client output buffers into 'dst' client output buffers.
475 : * The function takes care of freeing the old output buffers of the
476 : * destination client. */
477 2 : void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
478 2 : listRelease(dst->reply);
479 2 : dst->reply = listDup(src->reply);
480 2 : memcpy(dst->buf,src->buf,src->bufpos);
481 2 : dst->bufpos = src->bufpos;
482 2 : dst->reply_bytes = src->reply_bytes;
483 2 : }
484 :
485 193 : static void acceptCommonHandler(int fd) {
486 : redisClient *c;
487 193 : if ((c = createClient(fd)) == NULL) {
488 0 : redisLog(REDIS_WARNING,"Error allocating resoures for the client");
489 0 : close(fd); /* May be already closed, just ingore errors */
490 0 : return;
491 : }
492 : /* If maxclient directive is set and this is one client more... close the
493 : * connection. Note that we create the client instead to check before
494 : * for this condition, since now the socket is already set in nonblocking
495 : * mode and we can send an error for free using the Kernel I/O */
496 193 : if (listLength(server.clients) > server.maxclients) {
497 0 : char *err = "-ERR max number of clients reached\r\n";
498 :
499 : /* That's a best effort error message, don't check write errors */
500 0 : if (write(c->fd,err,strlen(err)) == -1) {
501 : /* Nothing to do, Just to avoid the warning... */
502 : }
503 0 : server.stat_rejected_conn++;
504 0 : freeClient(c);
505 0 : return;
506 : }
507 193 : server.stat_numconnections++;
508 : }
509 :
510 193 : void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
511 : int cport, cfd;
512 : char cip[128];
513 : REDIS_NOTUSED(el);
514 : REDIS_NOTUSED(mask);
515 : REDIS_NOTUSED(privdata);
516 :
517 193 : cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
518 193 : if (cfd == AE_ERR) {
519 0 : redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
520 0 : return;
521 : }
522 193 : redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
523 193 : acceptCommonHandler(cfd);
524 : }
525 :
526 0 : void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
527 : int cfd;
528 : REDIS_NOTUSED(el);
529 : REDIS_NOTUSED(mask);
530 : REDIS_NOTUSED(privdata);
531 :
532 0 : cfd = anetUnixAccept(server.neterr, fd);
533 0 : if (cfd == AE_ERR) {
534 0 : redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
535 0 : return;
536 : }
537 0 : redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
538 0 : acceptCommonHandler(cfd);
539 : }
540 :
541 :
542 : static void freeClientArgv(redisClient *c) {
543 : int j;
544 6671761 : for (j = 0; j < c->argc; j++)
545 4921023 : decrRefCount(c->argv[j]);
546 1750738 : c->argc = 0;
547 1750738 : c->cmd = NULL;
548 : }
549 :
550 : /* Close all the slaves connections. This is useful in chained replication
551 : * when we resync with our own master and want to force all our slaves to
552 : * resync with us as well. */
553 14 : void disconnectSlaves(void) {
554 28 : while (listLength(server.slaves)) {
555 0 : listNode *ln = listFirst(server.slaves);
556 0 : freeClient((redisClient*)ln->value);
557 : }
558 14 : }
559 :
560 96 : void freeClient(redisClient *c) {
561 : listNode *ln;
562 :
563 : /* If this is marked as current client unset it */
564 96 : if (server.current_client == c) server.current_client = NULL;
565 :
566 : /* Note that if the client we are freeing is blocked into a blocking
567 : * call, we have to set querybuf to NULL *before* to call
568 : * unblockClientWaitingData() to avoid processInputBuffer() will get
569 : * called. Also it is important to remove the file events after
570 : * this, because this call adds the READABLE event. */
571 96 : sdsfree(c->querybuf);
572 96 : c->querybuf = NULL;
573 96 : if (c->flags & REDIS_BLOCKED)
574 1 : unblockClientWaitingData(c);
575 :
576 : /* UNWATCH all the keys */
577 96 : unwatchAllKeys(c);
578 96 : listRelease(c->watched_keys);
579 : /* Unsubscribe from all the pubsub channels */
580 96 : pubsubUnsubscribeAllChannels(c,0);
581 96 : pubsubUnsubscribeAllPatterns(c,0);
582 96 : dictRelease(c->pubsub_channels);
583 96 : listRelease(c->pubsub_patterns);
584 : /* Obvious cleanup */
585 96 : aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
586 96 : aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
587 96 : listRelease(c->reply);
588 : freeClientArgv(c);
589 96 : close(c->fd);
590 : /* Remove from the list of clients */
591 96 : ln = listSearchKey(server.clients,c);
592 96 : redisAssert(ln != NULL);
593 96 : listDelNode(server.clients,ln);
594 : /* When client was just unblocked because of a blocking operation,
595 : * remove it from the list with unblocked clients. */
596 96 : if (c->flags & REDIS_UNBLOCKED) {
597 1 : ln = listSearchKey(server.unblocked_clients,c);
598 1 : redisAssert(ln != NULL);
599 1 : listDelNode(server.unblocked_clients,ln);
600 : }
601 96 : listRelease(c->io_keys);
602 : /* Master/slave cleanup.
603 : * Case 1: we lost the connection with a slave. */
604 96 : if (c->flags & REDIS_SLAVE) {
605 4 : if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
606 0 : close(c->repldbfd);
607 4 : list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
608 4 : ln = listSearchKey(l,c);
609 4 : redisAssert(ln != NULL);
610 4 : listDelNode(l,ln);
611 : }
612 :
613 : /* Case 2: we lost the connection with the master. */
614 96 : if (c->flags & REDIS_MASTER) {
615 5 : server.master = NULL;
616 5 : server.repl_state = REDIS_REPL_CONNECT;
617 5 : server.repl_down_since = server.unixtime;
618 : /* We lost connection with our master, force our slaves to resync
619 : * with us as well to load the new data set.
620 : *
621 : * If server.masterhost is NULL the user called SLAVEOF NO ONE so
622 : * slave resync is not needed. */
623 5 : if (server.masterhost != NULL) disconnectSlaves();
624 : }
625 :
626 : /* If this client was scheduled for async freeing we need to remove it
627 : * from the queue. */
628 96 : if (c->flags & REDIS_CLOSE_ASAP) {
629 0 : ln = listSearchKey(server.clients_to_close,c);
630 0 : redisAssert(ln != NULL);
631 0 : listDelNode(server.clients_to_close,ln);
632 : }
633 :
634 : /* Release memory */
635 96 : zfree(c->argv);
636 96 : freeClientMultiState(c);
637 96 : zfree(c);
638 96 : }
639 :
640 : /* Schedule a client to free it at a safe time in the serverCron() function.
641 : * This function is useful when we need to terminate a client but we are in
642 : * a context where calling freeClient() is not possible, because the client
643 : * should be valid for the continuation of the flow of the program. */
644 2 : void freeClientAsync(redisClient *c) {
645 2 : if (c->flags & REDIS_CLOSE_ASAP) return;
646 2 : c->flags |= REDIS_CLOSE_ASAP;
647 2 : listAddNodeTail(server.clients_to_close,c);
648 : }
649 :
650 2402 : void freeClientsInAsyncFreeQueue(void) {
651 4806 : while (listLength(server.clients_to_close)) {
652 2 : listNode *ln = listFirst(server.clients_to_close);
653 2 : redisClient *c = listNodeValue(ln);
654 :
655 2 : c->flags &= ~REDIS_CLOSE_ASAP;
656 2 : freeClient(c);
657 2 : listDelNode(server.clients_to_close,ln);
658 : }
659 2402 : }
660 :
661 1067319 : void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
662 1067319 : redisClient *c = privdata;
663 1067319 : int nwritten = 0, totwritten = 0, objlen;
664 : size_t objmem;
665 : robj *o;
666 : REDIS_NOTUSED(el);
667 : REDIS_NOTUSED(mask);
668 :
669 3203449 : while(c->bufpos > 0 || listLength(c->reply)) {
670 1070130 : if (c->bufpos > 0) {
671 1064525 : if (c->flags & REDIS_MASTER) {
672 : /* Don't reply to a master */
673 54222 : nwritten = c->bufpos - c->sentlen;
674 : } else {
675 1010303 : nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
676 1010303 : if (nwritten <= 0) break;
677 : }
678 1064525 : c->sentlen += nwritten;
679 1064525 : totwritten += nwritten;
680 :
681 : /* If the buffer was sent, set bufpos to zero to continue with
682 : * the remainder of the reply. */
683 1064525 : if (c->sentlen == c->bufpos) {
684 1064525 : c->bufpos = 0;
685 1064525 : c->sentlen = 0;
686 : }
687 : } else {
688 5605 : o = listNodeValue(listFirst(c->reply));
689 11210 : objlen = sdslen(o->ptr);
690 5605 : objmem = zmalloc_size_sds(o->ptr);
691 :
692 5605 : if (objlen == 0) {
693 0 : listDelNode(c->reply,listFirst(c->reply));
694 0 : continue;
695 : }
696 :
697 5605 : if (c->flags & REDIS_MASTER) {
698 : /* Don't reply to a master */
699 0 : nwritten = objlen - c->sentlen;
700 : } else {
701 5605 : nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
702 5605 : if (nwritten <= 0) break;
703 : }
704 5600 : c->sentlen += nwritten;
705 5600 : totwritten += nwritten;
706 :
707 : /* If we fully sent the object on head go to the next one */
708 5600 : if (c->sentlen == objlen) {
709 5550 : listDelNode(c->reply,listFirst(c->reply));
710 5550 : c->sentlen = 0;
711 5550 : c->reply_bytes -= objmem;
712 : }
713 : }
714 : /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
715 : * bytes, in a single threaded server it's a good idea to serve
716 : * other clients as well, even if a very large request comes from
717 : * super fast link that is always able to accept data (in real world
718 : * scenario think about 'KEYS *' against the loopback interface).
719 : *
720 : * However if we are over the maxmemory limit we ignore that and
721 : * just deliver as much data as it is possible to deliver. */
722 1071439 : if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
723 1314 : (server.maxmemory == 0 ||
724 0 : zmalloc_used_memory() < server.maxmemory)) break;
725 : }
726 1067319 : if (nwritten == -1) {
727 5 : if (errno == EAGAIN) {
728 5 : nwritten = 0;
729 : } else {
730 0 : redisLog(REDIS_VERBOSE,
731 : "Error writing to client: %s", strerror(errno));
732 0 : freeClient(c);
733 0 : return;
734 : }
735 : }
736 1067319 : if (totwritten > 0) c->lastinteraction = server.unixtime;
737 1067319 : if (c->bufpos == 0 && listLength(c->reply) == 0) {
738 1066005 : c->sentlen = 0;
739 1066005 : aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
740 :
741 : /* Close connection after entire reply has been sent. */
742 1066005 : if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
743 : }
744 : }
745 :
746 : /* resetClient prepare the client to process the next command */
747 1750642 : void resetClient(redisClient *c) {
748 : freeClientArgv(c);
749 1750642 : c->reqtype = 0;
750 1750642 : c->multibulklen = 0;
751 1750642 : c->bulklen = -1;
752 : /* We clear the ASKING flag as well if we are not inside a MULTI. */
753 1750642 : if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
754 1750642 : }
755 :
756 247781 : int processInlineBuffer(redisClient *c) {
757 247781 : char *newline = strstr(c->querybuf,"\r\n");
758 : int argc, j;
759 : sds *argv;
760 : size_t querylen;
761 :
762 : /* Nothing to do without a \r\n */
763 247781 : if (newline == NULL) {
764 38072 : if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
765 2 : addReplyError(c,"Protocol error: too big inline request");
766 2 : setProtocolError(c,0);
767 : }
768 19036 : return REDIS_ERR;
769 : }
770 :
771 : /* Split the input buffer up to the \r\n */
772 228745 : querylen = newline-(c->querybuf);
773 228745 : argv = sdssplitlen(c->querybuf,querylen," ",1,&argc);
774 :
775 : /* Leave data after the first line of the query in the buffer */
776 228745 : c->querybuf = sdsrange(c->querybuf,querylen+2,-1);
777 :
778 : /* Setup argv array on client structure */
779 228745 : if (c->argv) zfree(c->argv);
780 228745 : c->argv = zmalloc(sizeof(robj*)*argc);
781 :
782 : /* Create redis objects for all arguments. */
783 786185 : for (c->argc = 0, j = 0; j < argc; j++) {
784 1114880 : if (sdslen(argv[j])) {
785 557431 : c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
786 557431 : c->argc++;
787 : } else {
788 9 : sdsfree(argv[j]);
789 : }
790 : }
791 228745 : zfree(argv);
792 228745 : return REDIS_OK;
793 : }
794 :
795 : /* Helper function. Trims query buffer to make the function that processes
796 : * multi bulk requests idempotent. */
797 9 : static void setProtocolError(redisClient *c, int pos) {
798 9 : if (server.verbosity >= REDIS_VERBOSE) {
799 9 : sds client = getClientInfoString(c);
800 9 : redisLog(REDIS_VERBOSE,
801 : "Protocol error from client: %s", client);
802 9 : sdsfree(client);
803 : }
804 9 : c->flags |= REDIS_CLOSE_AFTER_REPLY;
805 9 : c->querybuf = sdsrange(c->querybuf,pos,-1);
806 9 : }
807 :
808 1535975 : int processMultibulkBuffer(redisClient *c) {
809 1535975 : char *newline = NULL;
810 1535975 : int pos = 0, ok;
811 : long long ll;
812 :
813 1535975 : if (c->multibulklen == 0) {
814 : /* The client should have been reset */
815 1531316 : redisAssertWithInfo(c,NULL,c->argc == 0);
816 :
817 : /* Multi bulk length cannot be read without a \r\n */
818 1531316 : newline = strchr(c->querybuf,'\r');
819 1531316 : if (newline == NULL) {
820 18786 : if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
821 1 : addReplyError(c,"Protocol error: too big mbulk count string");
822 1 : setProtocolError(c,0);
823 : }
824 9393 : return REDIS_ERR;
825 : }
826 :
827 : /* Buffer should also contain \n */
828 3043846 : if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
829 16 : return REDIS_ERR;
830 :
831 : /* We know for sure there is a whole line since newline != NULL,
832 : * so go ahead and find out the multi bulk length. */
833 1521907 : redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
834 1521907 : ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
835 1521907 : if (!ok || ll > 1024*1024) {
836 1 : addReplyError(c,"Protocol error: invalid multibulk length");
837 1 : setProtocolError(c,pos);
838 1 : return REDIS_ERR;
839 : }
840 :
841 1521906 : pos = (newline-c->querybuf)+2;
842 1521906 : if (ll <= 0) {
843 1 : c->querybuf = sdsrange(c->querybuf,pos,-1);
844 1 : return REDIS_OK;
845 : }
846 :
847 1521905 : c->multibulklen = ll;
848 :
849 : /* Setup argv array on client structure */
850 1521905 : if (c->argv) zfree(c->argv);
851 1521905 : c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
852 : }
853 :
854 1526564 : redisAssertWithInfo(c,NULL,c->multibulklen > 0);
855 5879833 : while(c->multibulklen) {
856 : /* Read bulk length if unknown */
857 4357934 : if (c->bulklen == -1) {
858 4353524 : newline = strchr(c->querybuf+pos,'\r');
859 4353524 : if (newline == NULL) {
860 428 : if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
861 0 : addReplyError(c,"Protocol error: too big bulk count string");
862 0 : setProtocolError(c,0);
863 : }
864 : break;
865 : }
866 :
867 : /* Buffer should also contain \n */
868 8706620 : if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
869 : break;
870 :
871 4353275 : if (c->querybuf[pos] != '$') {
872 2 : addReplyErrorFormat(c,
873 : "Protocol error: expected '$', got '%c'",
874 2 : c->querybuf[pos]);
875 2 : setProtocolError(c,pos);
876 2 : return REDIS_ERR;
877 : }
878 :
879 4353273 : ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
880 4353273 : if (!ok || ll < 0 || ll > 512*1024*1024) {
881 3 : addReplyError(c,"Protocol error: invalid bulk length");
882 3 : setProtocolError(c,pos);
883 3 : return REDIS_ERR;
884 : }
885 :
886 4353270 : pos += newline-(c->querybuf+pos)+2;
887 4353270 : if (ll >= REDIS_MBULK_BIG_ARG) {
888 : /* If we are going to read a large object from network
889 : * try to make it likely that it will start at c->querybuf
890 : * boundary so that we can optimized object creation
891 : * avoiding a large copy of data. */
892 459 : c->querybuf = sdsrange(c->querybuf,pos,-1);
893 459 : pos = 0;
894 : /* Hint the sds library about the amount of bytes this string is
895 : * going to contain. */
896 459 : c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
897 : }
898 4353270 : c->bulklen = ll;
899 : }
900 :
901 : /* Read bulk argument */
902 8715360 : if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
903 : /* Not enough data (+2 == trailing \r\n) */
904 : break;
905 : } else {
906 : /* Optimization: if the buffer contanins JUST our bulk element
907 : * instead of creating a new object by *copying* the sds we
908 : * just use the current sds string. */
909 4355538 : if (pos == 0 &&
910 1351 : c->bulklen >= REDIS_MBULK_BIG_ARG &&
911 918 : (signed) sdslen(c->querybuf) == c->bulklen+2)
912 : {
913 459 : c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
914 459 : sdsIncrLen(c->querybuf,-2); /* remove CRLF */
915 459 : c->querybuf = sdsempty();
916 : /* Assume that if we saw a fat argument we'll see another one
917 : * likely... */
918 459 : c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
919 459 : pos = 0;
920 : } else {
921 8705620 : c->argv[c->argc++] =
922 4352810 : createStringObject(c->querybuf+pos,c->bulklen);
923 4352810 : pos += c->bulklen+2;
924 : }
925 4353269 : c->bulklen = -1;
926 4353269 : c->multibulklen--;
927 : }
928 : }
929 :
930 : /* Trim to pos */
931 1526559 : if (pos) c->querybuf = sdsrange(c->querybuf,pos,-1);
932 :
933 : /* We're done when c->multibulk == 0 */
934 1526559 : if (c->multibulklen == 0) return REDIS_OK;
935 :
936 : /* Still not read to process the command */
937 4660 : return REDIS_ERR;
938 : }
939 :
940 1031988 : void processInputBuffer(redisClient *c) {
941 : /* Keep processing while there is something in the input buffer */
942 6597254 : while(sdslen(c->querybuf)) {
943 : /* Immediately abort if the client is in the middle of something. */
944 1783763 : if (c->flags & REDIS_BLOCKED) return;
945 :
946 : /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
947 : * written to the client. Make sure to not let the reply grow after
948 : * this flag has been set (i.e. don't process more commands). */
949 1783761 : if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
950 :
951 : /* Determine request type when unknown. */
952 1783756 : if (!c->reqtype) {
953 1750703 : if (c->querybuf[0] == '*') {
954 1521908 : c->reqtype = REDIS_REQ_MULTIBULK;
955 : } else {
956 228795 : c->reqtype = REDIS_REQ_INLINE;
957 : }
958 : }
959 :
960 1783756 : if (c->reqtype == REDIS_REQ_INLINE) {
961 247781 : if (processInlineBuffer(c) != REDIS_OK) break;
962 1535975 : } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
963 1535975 : if (processMultibulkBuffer(c) != REDIS_OK) break;
964 : } else {
965 0 : redisPanic("Unknown request type");
966 : }
967 :
968 : /* Multibulk processing could see a <= 0 length. */
969 1750645 : if (c->argc == 0) {
970 2 : resetClient(c);
971 : } else {
972 : /* Only reset the client when the command was executed. */
973 1750643 : if (processCommand(c) == REDIS_OK)
974 1750640 : resetClient(c);
975 : }
976 : }
977 : }
978 :
979 1032068 : void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
980 1032068 : redisClient *c = (redisClient*) privdata;
981 : int nread, readlen;
982 : size_t qblen;
983 : REDIS_NOTUSED(el);
984 : REDIS_NOTUSED(mask);
985 :
986 1032068 : server.current_client = c;
987 1032068 : readlen = REDIS_IOBUF_LEN;
988 : /* If this is a multi bulk request, and we are processing a bulk reply
989 : * that is large enough, try to maximize the probabilty that the query
990 : * buffer contains excatly the SDS string representing the object, even
991 : * at the risk of requring more read(2) calls. This way the function
992 : * processMultiBulkBuffer() can avoid copying buffers to create the
993 : * Redis Object representing the argument. */
994 1041139 : if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
995 9071 : && c->bulklen >= REDIS_MBULK_BIG_ARG)
996 : {
997 6898 : int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
998 :
999 3449 : if (remaining < readlen) readlen = remaining;
1000 : }
1001 :
1002 2064136 : qblen = sdslen(c->querybuf);
1003 1032068 : if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
1004 1032068 : c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
1005 2064136 : nread = read(fd, c->querybuf+qblen, readlen);
1006 1032068 : if (nread == -1) {
1007 10 : if (errno == EAGAIN) {
1008 0 : nread = 0;
1009 : } else {
1010 10 : redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
1011 10 : freeClient(c);
1012 10 : return;
1013 : }
1014 1032058 : } else if (nread == 0) {
1015 72 : redisLog(REDIS_VERBOSE, "Client closed connection");
1016 72 : freeClient(c);
1017 72 : return;
1018 : }
1019 1031986 : if (nread) {
1020 1031986 : sdsIncrLen(c->querybuf,nread);
1021 1031986 : c->lastinteraction = server.unixtime;
1022 : } else {
1023 0 : server.current_client = NULL;
1024 0 : return;
1025 : }
1026 2063972 : if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
1027 0 : sds ci = getClientInfoString(c), bytes = sdsempty();
1028 :
1029 0 : bytes = sdscatrepr(bytes,c->querybuf,64);
1030 0 : redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
1031 0 : sdsfree(ci);
1032 0 : sdsfree(bytes);
1033 0 : freeClient(c);
1034 0 : return;
1035 : }
1036 1031986 : processInputBuffer(c);
1037 1031986 : server.current_client = NULL;
1038 : }
1039 :
1040 9168 : void getClientsMaxBuffers(unsigned long *longest_output_list,
1041 : unsigned long *biggest_input_buffer) {
1042 : redisClient *c;
1043 : listNode *ln;
1044 : listIter li;
1045 9168 : unsigned long lol = 0, bib = 0;
1046 :
1047 9168 : listRewind(server.clients,&li);
1048 27614 : while ((ln = listNext(&li)) != NULL) {
1049 9278 : c = listNodeValue(ln);
1050 :
1051 9278 : if (listLength(c->reply) > lol) lol = listLength(c->reply);
1052 18558 : if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
1053 : }
1054 9168 : *longest_output_list = lol;
1055 9168 : *biggest_input_buffer = bib;
1056 9168 : }
1057 :
1058 : /* Turn a Redis client into an sds string representing its state. */
1059 150722 : sds getClientInfoString(redisClient *client) {
1060 : char ip[32], flags[16], events[3], *p;
1061 : int port;
1062 : int emask;
1063 :
1064 150722 : anetPeerToString(client->fd,ip,&port);
1065 150722 : p = flags;
1066 150722 : if (client->flags & REDIS_SLAVE) {
1067 0 : if (client->flags & REDIS_MONITOR)
1068 0 : *p++ = 'O';
1069 : else
1070 0 : *p++ = 'S';
1071 : }
1072 150722 : if (client->flags & REDIS_MASTER) *p++ = 'M';
1073 150722 : if (client->flags & REDIS_MULTI) *p++ = 'x';
1074 150722 : if (client->flags & REDIS_BLOCKED) *p++ = 'b';
1075 150722 : if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
1076 150722 : if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
1077 150722 : if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
1078 150722 : if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
1079 150722 : if (p == flags) *p++ = 'N';
1080 150722 : *p++ = '\0';
1081 :
1082 150722 : emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
1083 150722 : p = events;
1084 150722 : if (emask & AE_READABLE) *p++ = 'r';
1085 150722 : if (emask & AE_WRITABLE) *p++ = 'w';
1086 150722 : *p = '\0';
1087 1205773 : return sdscatprintf(sdsempty(),
1088 : "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
1089 : ip,port,client->fd,
1090 : (long)(server.unixtime - client->ctime),
1091 : (long)(server.unixtime - client->lastinteraction),
1092 : flags,
1093 150722 : client->db->id,
1094 301444 : (int) dictSize(client->pubsub_channels),
1095 150722 : (int) listLength(client->pubsub_patterns),
1096 : (unsigned long) sdslen(client->querybuf),
1097 : (unsigned long) sdsavail(client->querybuf),
1098 : (unsigned long) client->bufpos,
1099 150722 : (unsigned long) listLength(client->reply),
1100 : getClientOutputBufferMemoryUsage(client),
1101 : events,
1102 301441 : client->lastcmd ? client->lastcmd->name : "NULL");
1103 : }
1104 :
1105 75357 : sds getAllClientsInfoString(void) {
1106 : listNode *ln;
1107 : listIter li;
1108 : redisClient *client;
1109 75357 : sds o = sdsempty();
1110 :
1111 75357 : listRewind(server.clients,&li);
1112 301425 : while ((ln = listNext(&li)) != NULL) {
1113 : sds cs;
1114 :
1115 150711 : client = listNodeValue(ln);
1116 150711 : cs = getClientInfoString(client);
1117 150711 : o = sdscatsds(o,cs);
1118 150711 : sdsfree(cs);
1119 150711 : o = sdscatlen(o,"\n",1);
1120 : }
1121 75357 : return o;
1122 : }
1123 :
1124 75357 : void clientCommand(redisClient *c) {
1125 : listNode *ln;
1126 : listIter li;
1127 : redisClient *client;
1128 :
1129 150714 : if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
1130 75357 : sds o = getAllClientsInfoString();
1131 75357 : addReplyBulkCBuffer(c,o,sdslen(o));
1132 75357 : sdsfree(o);
1133 0 : } else if (!strcasecmp(c->argv[1]->ptr,"kill") && c->argc == 3) {
1134 0 : listRewind(server.clients,&li);
1135 0 : while ((ln = listNext(&li)) != NULL) {
1136 : char ip[32], addr[64];
1137 : int port;
1138 :
1139 0 : client = listNodeValue(ln);
1140 0 : if (anetPeerToString(client->fd,ip,&port) == -1) continue;
1141 0 : snprintf(addr,sizeof(addr),"%s:%d",ip,port);
1142 0 : if (strcmp(addr,c->argv[2]->ptr) == 0) {
1143 0 : addReply(c,shared.ok);
1144 0 : if (c == client) {
1145 0 : client->flags |= REDIS_CLOSE_AFTER_REPLY;
1146 : } else {
1147 0 : freeClient(client);
1148 : }
1149 : return;
1150 : }
1151 : }
1152 0 : addReplyError(c,"No such client");
1153 : } else {
1154 0 : addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
1155 : }
1156 : }
1157 :
1158 : /* Rewrite the command vector of the client. All the new objects ref count
1159 : * is incremented. The old command vector is freed, and the old objects
1160 : * ref count is decremented. */
1161 10472 : void rewriteClientCommandVector(redisClient *c, int argc, ...) {
1162 : va_list ap;
1163 : int j;
1164 : robj **argv; /* The new argument vector */
1165 :
1166 10472 : argv = zmalloc(sizeof(robj*)*argc);
1167 10472 : va_start(ap,argc);
1168 41822 : for (j = 0; j < argc; j++) {
1169 : robj *a;
1170 :
1171 31350 : a = va_arg(ap, robj*);
1172 31350 : argv[j] = a;
1173 31350 : incrRefCount(a);
1174 : }
1175 : /* We free the objects in the original vector at the end, so we are
1176 : * sure that if the same objects are reused in the new vector the
1177 : * refcount gets incremented before it gets decremented. */
1178 10472 : for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
1179 10472 : zfree(c->argv);
1180 : /* Replace argv and argc with our new versions. */
1181 10472 : c->argv = argv;
1182 10472 : c->argc = argc;
1183 10472 : c->cmd = lookupCommand(c->argv[0]->ptr);
1184 10472 : redisAssertWithInfo(c,NULL,c->cmd != NULL);
1185 10472 : va_end(ap);
1186 10472 : }
1187 :
1188 : /* Rewrite a single item in the command vector.
1189 : * The new val ref count is incremented, and the old decremented. */
1190 42 : void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
1191 : robj *oldval;
1192 :
1193 42 : redisAssertWithInfo(c,NULL,i < c->argc);
1194 42 : oldval = c->argv[i];
1195 42 : c->argv[i] = newval;
1196 42 : incrRefCount(newval);
1197 42 : decrRefCount(oldval);
1198 :
1199 : /* If this is the command name make sure to fix c->cmd. */
1200 42 : if (i == 0) {
1201 21 : c->cmd = lookupCommand(c->argv[0]->ptr);
1202 21 : redisAssertWithInfo(c,NULL,c->cmd != NULL);
1203 : }
1204 42 : }
1205 :
1206 : /* This function returns the number of bytes that Redis is virtually
1207 : * using to store the reply still not read by the client.
1208 : * It is "virtual" since the reply output list may contain objects that
1209 : * are shared and are not really using additional memory.
1210 : *
1211 : * The function returns the total sum of the length of all the objects
1212 : * stored in the output list, plus the memory used to allocate every
1213 : * list node. The static reply buffer is not taken into account since it
1214 : * is allocated anyway.
1215 : *
1216 : * Note: this function is very fast so can be called as many time as
1217 : * the caller wishes. The main usage of this function currently is
1218 : * enforcing the client output length limits. */
1219 0 : unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
1220 3484157 : unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
1221 :
1222 3484157 : return c->reply_bytes + (list_item_size*listLength(c->reply));
1223 : }
1224 :
1225 : /* Get the class of a client, used in order to envorce limits to different
1226 : * classes of clients.
1227 : *
1228 : * The function will return one of the following:
1229 : * REDIS_CLIENT_LIMIT_CLASS_NORMAL -> Normal client
1230 : * REDIS_CLIENT_LIMIT_CLASS_SLAVE -> Slave or client executing MONITOR command
1231 : * REDIS_CLIENT_LIMIT_CLASS_PUBSUB -> Client subscribed to Pub/Sub channels
1232 : */
1233 0 : int getClientLimitClass(redisClient *c) {
1234 3333435 : if (c->flags & REDIS_SLAVE) return REDIS_CLIENT_LIMIT_CLASS_SLAVE;
1235 1205950 : if (dictSize(c->pubsub_channels) || listLength(c->pubsub_patterns))
1236 470975 : return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
1237 734975 : return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
1238 : }
1239 :
1240 6 : int getClientLimitClassByName(char *name) {
1241 6 : if (!strcasecmp(name,"normal")) return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
1242 6 : else if (!strcasecmp(name,"slave")) return REDIS_CLIENT_LIMIT_CLASS_SLAVE;
1243 6 : else if (!strcasecmp(name,"pubsub")) return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
1244 0 : else return -1;
1245 : }
1246 :
1247 0 : char *getClientLimitClassName(int class) {
1248 0 : switch(class) {
1249 0 : case REDIS_CLIENT_LIMIT_CLASS_NORMAL: return "normal";
1250 0 : case REDIS_CLIENT_LIMIT_CLASS_SLAVE: return "slave";
1251 0 : case REDIS_CLIENT_LIMIT_CLASS_PUBSUB: return "pubsub";
1252 0 : default: return NULL;
1253 : }
1254 : }
1255 :
1256 : /* The function checks if the client reached output buffer soft or hard
1257 : * limit, and also update the state needed to check the soft limit as
1258 : * a side effect.
1259 : *
1260 : * Return value: non-zero if the client reached the soft or the hard limit.
1261 : * Otherwise zero is returned. */
1262 3333435 : int checkClientOutputBufferLimits(redisClient *c) {
1263 3333435 : int soft = 0, hard = 0, class;
1264 3333435 : unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
1265 :
1266 3333435 : class = getClientLimitClass(c);
1267 5480106 : if (server.client_obuf_limits[class].hard_limit_bytes &&
1268 2146671 : used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
1269 1 : hard = 1;
1270 5912709 : if (server.client_obuf_limits[class].soft_limit_bytes &&
1271 2579274 : used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
1272 413587 : soft = 1;
1273 :
1274 : /* We need to check if the soft limit is reached continuously for the
1275 : * specified amount of seconds. */
1276 3333435 : if (soft) {
1277 413587 : if (c->obuf_soft_limit_reached_time == 0) {
1278 2 : c->obuf_soft_limit_reached_time = server.unixtime;
1279 2 : soft = 0; /* First time we see the soft limit reached */
1280 : } else {
1281 413585 : time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
1282 :
1283 413585 : if (elapsed <=
1284 : server.client_obuf_limits[class].soft_limit_seconds) {
1285 413584 : soft = 0; /* The client still did not reached the max number of
1286 : seconds for the soft limit to be considered
1287 : reached. */
1288 : }
1289 : }
1290 : } else {
1291 2919848 : c->obuf_soft_limit_reached_time = 0;
1292 : }
1293 3333435 : return soft || hard;
1294 : }
1295 :
1296 : /* Asynchronously close a client if soft or hard limit is reached on the
1297 : * output buffer size. The caller can check if the client will be closed
1298 : * checking if the client REDIS_CLOSE_ASAP flag is set.
1299 : *
1300 : * Note: we need to close the client asynchronously because this function is
1301 : * called from contexts where the client can't be freed safely, i.e. from the
1302 : * lower level functions pushing data inside the client output buffers. */
1303 3341617 : void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
1304 3341617 : if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
1305 3333435 : if (checkClientOutputBufferLimits(c)) {
1306 2 : sds client = getClientInfoString(c);
1307 :
1308 2 : freeClientAsync(c);
1309 2 : redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
1310 2 : sdsfree(client);
1311 : }
1312 : }
1313 :
1314 : /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
1315 : * output buffers without returning control to the event loop. */
1316 0 : void flushSlavesOutputBuffers(void) {
1317 : listIter li;
1318 : listNode *ln;
1319 :
1320 0 : listRewind(server.slaves,&li);
1321 0 : while((ln = listNext(&li))) {
1322 0 : redisClient *slave = listNodeValue(ln);
1323 : int events;
1324 :
1325 0 : events = aeGetFileEvents(server.el,slave->fd);
1326 0 : if (events & AE_WRITABLE &&
1327 0 : slave->replstate == REDIS_REPL_ONLINE &&
1328 0 : listLength(slave->reply))
1329 : {
1330 0 : sendReplyToClient(server.el,slave->fd,slave,0);
1331 : }
1332 : }
1333 0 : }
|