LCOV - code coverage report
Current view: directory - redis/src - networking.c (source / functions) Found Hit Coverage
Test: redis.info Lines: 701 607 86.6 %
Date: 2012-04-04 Functions: 58 53 91.4 %
Colors: not hit hit

       1                 : #include "redis.h"
       2                 : #include <sys/uio.h>
       3                 : 
       4                 : static void setProtocolError(redisClient *c, int pos);
       5                 : 
       6                 : /* To evaluate the output buffer size of a client we need to get size of
       7                 :  * allocated objects, however we can't used zmalloc_size() directly on sds
       8                 :  * strings because of the trick they use to work (the header is before the
       9                 :  * returned pointer), so we use this helper function. */
      10         6681824 : size_t zmalloc_size_sds(sds s) {
      11         6681824 :     return zmalloc_size(s-sizeof(struct sdshdr));
      12                 : }
      13                 : 
      14             133 : void *dupClientReplyValue(void *o) {
      15             133 :     incrRefCount((robj*)o);
      16             133 :     return o;
      17                 : }
      18                 : 
      19              12 : int listMatchObjects(void *a, void *b) {
      20              12 :     return equalStringObjects(a,b);
      21                 : }
      22                 : 
      23             255 : redisClient *createClient(int fd) {
      24             255 :     redisClient *c = zmalloc(sizeof(redisClient));
      25                 : 
      26                 :     /* passing -1 as fd it is possible to create a non connected client.
      27                 :      * This is useful since all the Redis commands needs to be executed
      28                 :      * in the context of a client. When commands are executed in other
      29                 :      * contexts (for instance a Lua script) we need a non connected client. */
      30             255 :     if (fd != -1) {
      31             202 :         anetNonBlock(NULL,fd);
      32             202 :         anetTcpNoDelay(NULL,fd);
      33             202 :         if (aeCreateFileEvent(server.el,fd,AE_READABLE,
      34                 :             readQueryFromClient, c) == AE_ERR)
      35                 :         {
      36               0 :             close(fd);
      37               0 :             zfree(c);
      38               0 :             return NULL;
      39                 :         }
      40                 :     }
      41                 : 
      42             255 :     selectDb(c,0);
      43             255 :     c->fd = fd;
      44             255 :     c->bufpos = 0;
      45             255 :     c->querybuf = sdsempty();
      46             255 :     c->querybuf_peak = 0;
      47             255 :     c->reqtype = 0;
      48             255 :     c->argc = 0;
      49             255 :     c->argv = NULL;
      50             255 :     c->cmd = c->lastcmd = NULL;
      51             255 :     c->multibulklen = 0;
      52             255 :     c->bulklen = -1;
      53             255 :     c->sentlen = 0;
      54             255 :     c->flags = 0;
      55             255 :     c->ctime = c->lastinteraction = server.unixtime;
      56             255 :     c->authenticated = 0;
      57             255 :     c->replstate = REDIS_REPL_NONE;
      58             255 :     c->reply = listCreate();
      59             255 :     c->reply_bytes = 0;
      60             255 :     c->obuf_soft_limit_reached_time = 0;
      61             255 :     listSetFreeMethod(c->reply,decrRefCount);
      62             255 :     listSetDupMethod(c->reply,dupClientReplyValue);
      63             255 :     c->bpop.keys = NULL;
      64             255 :     c->bpop.count = 0;
      65             255 :     c->bpop.timeout = 0;
      66             255 :     c->bpop.target = NULL;
      67             255 :     c->io_keys = listCreate();
      68             255 :     c->watched_keys = listCreate();
      69             255 :     listSetFreeMethod(c->io_keys,decrRefCount);
      70             255 :     c->pubsub_channels = dictCreate(&setDictType,NULL);
      71             255 :     c->pubsub_patterns = listCreate();
      72             255 :     listSetFreeMethod(c->pubsub_patterns,decrRefCount);
      73             255 :     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
      74             255 :     if (fd != -1) listAddNodeTail(server.clients,c);
      75             255 :     initClientMultiState(c);
      76             255 :     return c;
      77                 : }
      78                 : 
      79                 : /* This function is called every time we are going to transmit new data
      80                 :  * to the client. The behavior is the following:
      81                 :  *
      82                 :  * If the client should receive new data (normal clients will) the function
      83                 :  * returns REDIS_OK, and make sure to install the write handler in our event
      84                 :  * loop so that when the socket is writable new data gets written.
      85                 :  *
      86                 :  * If the client should not receive new data, because it is a fake client
      87                 :  * or a slave, or because the setup of the write handler failed, the function
      88                 :  * returns REDIS_ERR.
      89                 :  *
      90                 :  * Typically gets called every time a reply is built, before adding more
      91                 :  * data to the clients output buffers. If the function returns REDIS_ERR no
      92                 :  * data should be appended to the output buffers. */
      93         6517067 : int prepareClientToWrite(redisClient *c) {
      94         6517067 :     if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
      95         6516845 :     if (c->fd <= 0) return REDIS_ERR; /* Fake client */
      96         8648068 :     if (c->bufpos == 0 && listLength(c->reply) == 0 &&
      97         1066007 :         (c->replstate == REDIS_REPL_NONE ||
      98                 :          c->replstate == REDIS_REPL_ONLINE) &&
      99                 :         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
     100         1066005 :         sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
     101         6516056 :     return REDIS_OK;
     102                 : }
     103                 : 
     104                 : /* Create a duplicate of the last object in the reply list when
     105                 :  * it is not exclusively owned by the reply list. */
     106         3334602 : robj *dupLastObjectIfNeeded(list *reply) {
     107                 :     robj *new, *cur;
     108                 :     listNode *ln;
     109         3334602 :     redisAssert(listLength(reply) > 0);
     110         3334602 :     ln = listLast(reply);
     111         3334602 :     cur = listNodeValue(ln);
     112         3334602 :     if (cur->refcount > 1) {
     113            2592 :         new = dupStringObject(cur);
     114            2592 :         decrRefCount(cur);
     115            2592 :         listNodeValue(ln) = new;
     116                 :     }
     117         3334602 :     return listNodeValue(ln);
     118                 : }
     119                 : 
     120                 : /* -----------------------------------------------------------------------------
     121                 :  * Low level functions to add more data to output buffers.
     122                 :  * -------------------------------------------------------------------------- */
     123                 : 
     124         6514801 : int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
     125         6514801 :     size_t available = sizeof(c->buf)-c->bufpos;
     126                 : 
     127         6514801 :     if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
     128                 : 
     129                 :     /* If there already are entries in the reply list, we cannot
     130                 :      * add anything more to the static buffer. */
     131         6514801 :     if (listLength(c->reply) > 0) return REDIS_ERR;
     132                 : 
     133                 :     /* Check that the buffer has enough space available for this string. */
     134         3176133 :     if (len > available) return REDIS_ERR;
     135                 : 
     136         3174661 :     memcpy(c->buf+c->bufpos,s,len);
     137         3174661 :     c->bufpos+=len;
     138         3174661 :     return REDIS_OK;
     139                 : }
     140                 : 
     141         3308405 : void _addReplyObjectToList(redisClient *c, robj *o) {
     142                 :     robj *tail;
     143                 : 
     144         3308405 :     if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
     145                 : 
     146         3308405 :     if (listLength(c->reply) == 0) {
     147            1472 :         incrRefCount(o);
     148            1472 :         listAddNodeTail(c->reply,o);
     149            1472 :         c->reply_bytes += zmalloc_size_sds(o->ptr);
     150                 :     } else {
     151         3306933 :         tail = listNodeValue(listLast(c->reply));
     152                 : 
     153                 :         /* Append to this object when possible. */
     154         9915316 :         if (tail->ptr != NULL &&
     155         9916464 :             sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
     156                 :         {
     157         3302895 :             c->reply_bytes -= zmalloc_size_sds(tail->ptr);
     158         3302895 :             tail = dupLastObjectIfNeeded(c->reply);
     159         6605790 :             tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
     160         3302895 :             c->reply_bytes += zmalloc_size_sds(tail->ptr);
     161                 :         } else {
     162            4038 :             incrRefCount(o);
     163            4038 :             listAddNodeTail(c->reply,o);
     164            4038 :             c->reply_bytes += zmalloc_size_sds(o->ptr);
     165                 :         }
     166                 :     }
     167         3308405 :     asyncCloseClientOnOutputBufferLimitReached(c);
     168                 : }
     169                 : 
     170                 : /* This method takes responsibility over the sds. When it is no longer
     171                 :  * needed it will be free'd, otherwise it ends up in a robj. */
     172               4 : void _addReplySdsToList(redisClient *c, sds s) {
     173                 :     robj *tail;
     174                 : 
     175               4 :     if (c->flags & REDIS_CLOSE_AFTER_REPLY) {
     176               0 :         sdsfree(s);
     177               0 :         return;
     178                 :     }
     179                 : 
     180               4 :     if (listLength(c->reply) == 0) {
     181               0 :         listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
     182               0 :         c->reply_bytes += zmalloc_size_sds(s);
     183                 :     } else {
     184               4 :         tail = listNodeValue(listLast(c->reply));
     185                 : 
     186                 :         /* Append to this object when possible. */
     187              12 :         if (tail->ptr != NULL &&
     188               8 :             sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
     189                 :         {
     190               4 :             c->reply_bytes -= zmalloc_size_sds(tail->ptr);
     191               4 :             tail = dupLastObjectIfNeeded(c->reply);
     192               4 :             tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
     193               4 :             c->reply_bytes += zmalloc_size_sds(tail->ptr);
     194               4 :             sdsfree(s);
     195                 :         } else {
     196               0 :             listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
     197               0 :             c->reply_bytes += zmalloc_size_sds(s);
     198                 :         }
     199                 :     }
     200               4 :     asyncCloseClientOnOutputBufferLimitReached(c);
     201                 : }
     202                 : 
     203           31731 : void _addReplyStringToList(redisClient *c, char *s, size_t len) {
     204                 :     robj *tail;
     205                 : 
     206           31731 :     if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
     207                 : 
     208           31731 :     if (listLength(c->reply) == 0) {
     209               0 :         robj *o = createStringObject(s,len);
     210                 : 
     211               0 :         listAddNodeTail(c->reply,o);
     212               0 :         c->reply_bytes += zmalloc_size_sds(o->ptr);
     213                 :     } else {
     214           31731 :         tail = listNodeValue(listLast(c->reply));
     215                 : 
     216                 :         /* Append to this object when possible. */
     217           95139 :         if (tail->ptr != NULL &&
     218           63410 :             sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
     219                 :         {
     220           31703 :             c->reply_bytes -= zmalloc_size_sds(tail->ptr);
     221           31703 :             tail = dupLastObjectIfNeeded(c->reply);
     222           31703 :             tail->ptr = sdscatlen(tail->ptr,s,len);
     223           31703 :             c->reply_bytes += zmalloc_size_sds(tail->ptr);
     224                 :         } else {
     225              28 :             robj *o = createStringObject(s,len);
     226                 : 
     227              28 :             listAddNodeTail(c->reply,o);
     228              28 :             c->reply_bytes += zmalloc_size_sds(o->ptr);
     229                 :         }
     230                 :     }
     231           31731 :     asyncCloseClientOnOutputBufferLimitReached(c);
     232                 : }
     233                 : 
     234                 : /* -----------------------------------------------------------------------------
     235                 :  * Higher level functions to queue data on the client output buffer.
     236                 :  * The following functions are the ones that commands implementations will call.
     237                 :  * -------------------------------------------------------------------------- */
     238                 : 
     239         5685021 : void addReply(redisClient *c, robj *obj) {
     240         5685021 :     if (prepareClientToWrite(c) != REDIS_OK) return;
     241                 : 
     242                 :     /* This is an important place where we can avoid copy-on-write
     243                 :      * when there is a saving child running, avoiding touching the
     244                 :      * refcount field of the object if it's not needed.
     245                 :      *
     246                 :      * If the encoding is RAW and there is room in the static buffer
     247                 :      * we'll be able to send the object to the client without
     248                 :      * messing with its page. */
     249         5684445 :     if (obj->encoding == REDIS_ENCODING_RAW) {
     250        11014716 :         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
     251         3245694 :             _addReplyObjectToList(c,obj);
     252          177087 :     } else if (obj->encoding == REDIS_ENCODING_INT) {
     253                 :         /* Optimization: if there is room in the static buffer for 32 bytes
     254                 :          * (more than the max chars a 64 bit integer can take as string) we
     255                 :          * avoid decoding the object and go for the lower level approach. */
     256          177087 :         if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
     257                 :             char buf[32];
     258                 :             int len;
     259                 : 
     260          114364 :             len = ll2string(buf,sizeof(buf),(long)obj->ptr);
     261          114364 :             if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
     262                 :                 return;
     263                 :             /* else... continue with the normal code path, but should never
     264                 :              * happen actually since we verified there is room. */
     265                 :         }
     266           62723 :         obj = getDecodedObject(obj);
     267          125446 :         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
     268           62711 :             _addReplyObjectToList(c,obj);
     269           62723 :         decrRefCount(obj);
     270                 :     } else {
     271               0 :         redisPanic("Wrong obj->encoding in addReply()");
     272                 :     }
     273                 : }
     274                 : 
     275           18354 : void addReplySds(redisClient *c, sds s) {
     276           18354 :     if (prepareClientToWrite(c) != REDIS_OK) {
     277                 :         /* The caller expects the sds to be free'd. */
     278               0 :         sdsfree(s);
     279               0 :         return;
     280                 :     }
     281           18354 :     if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
     282           18350 :         sdsfree(s);
     283                 :     } else {
     284                 :         /* This method free's the sds when it is no longer needed. */
     285               4 :         _addReplySdsToList(c,s);
     286                 :     }
     287                 : }
     288                 : 
     289          812215 : void addReplyString(redisClient *c, char *s, size_t len) {
     290          812215 :     if (prepareClientToWrite(c) != REDIS_OK) return;
     291          812002 :     if (_addReplyToBuffer(c,s,len) != REDIS_OK)
     292           31731 :         _addReplyStringToList(c,s,len);
     293                 : }
     294                 : 
     295              83 : void addReplyErrorLength(redisClient *c, char *s, size_t len) {
     296              83 :     addReplyString(c,"-ERR ",5);
     297              83 :     addReplyString(c,s,len);
     298              83 :     addReplyString(c,"\r\n",2);
     299              83 : }
     300                 : 
     301              74 : void addReplyError(redisClient *c, char *err) {
     302              74 :     addReplyErrorLength(c,err,strlen(err));
     303              74 : }
     304                 : 
     305               9 : void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
     306                 :     size_t l, j;
     307                 :     va_list ap;
     308               9 :     va_start(ap,fmt);
     309               9 :     sds s = sdscatvprintf(sdsempty(),fmt,ap);
     310               9 :     va_end(ap);
     311                 :     /* Make sure there are no newlines in the string, otherwise invalid protocol
     312                 :      * is emitted. */
     313               9 :     l = sdslen(s);
     314             643 :     for (j = 0; j < l; j++) {
     315             634 :         if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
     316                 :     }
     317               9 :     addReplyErrorLength(c,s,sdslen(s));
     318               9 :     sdsfree(s);
     319               9 : }
     320                 : 
     321          129452 : void addReplyStatusLength(redisClient *c, char *s, size_t len) {
     322          129452 :     addReplyString(c,"+",1);
     323          129452 :     addReplyString(c,s,len);
     324          129452 :     addReplyString(c,"\r\n",2);
     325          129452 : }
     326                 : 
     327          126052 : void addReplyStatus(redisClient *c, char *status) {
     328          126052 :     addReplyStatusLength(c,status,strlen(status));
     329          126052 : }
     330                 : 
     331            3400 : void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
     332                 :     va_list ap;
     333            3400 :     va_start(ap,fmt);
     334            3400 :     sds s = sdscatvprintf(sdsempty(),fmt,ap);
     335            3400 :     va_end(ap);
     336            3400 :     addReplyStatusLength(c,s,sdslen(s));
     337            3400 :     sdsfree(s);
     338            3400 : }
     339                 : 
     340                 : /* Adds an empty object to the reply list that will contain the multi bulk
     341                 :  * length, which is not known when this function is called. */
     342            1477 : void *addDeferredMultiBulkLength(redisClient *c) {
     343                 :     /* Note that we install the write event here even if the object is not
     344                 :      * ready to be sent, since we are sure that before returning to the
     345                 :      * event loop setDeferredMultiBulkLength() will be called. */
     346            1477 :     if (prepareClientToWrite(c) != REDIS_OK) return NULL;
     347            1477 :     listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
     348            1477 :     return listLast(c->reply);
     349                 : }
     350                 : 
     351                 : /* Populate the length object and try glueing it to the next chunk. */
     352            1477 : void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
     353            1477 :     listNode *ln = (listNode*)node;
     354                 :     robj *len, *next;
     355                 : 
     356                 :     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
     357            1477 :     if (node == NULL) return;
     358                 : 
     359            1477 :     len = listNodeValue(ln);
     360            1477 :     len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
     361            1477 :     c->reply_bytes += zmalloc_size_sds(len->ptr);
     362            1477 :     if (ln->next != NULL) {
     363            1471 :         next = listNodeValue(ln->next);
     364                 : 
     365                 :         /* Only glue when the next node is non-NULL (an sds in this case) */
     366            1471 :         if (next->ptr != NULL) {
     367            2942 :             len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
     368            1471 :             listDelNode(c->reply,ln->next);
     369                 :         }
     370                 :     }
     371            1477 :     asyncCloseClientOnOutputBufferLimitReached(c);
     372                 : }
     373                 : 
     374                 : /* Add a duble as a bulk reply */
     375           46396 : void addReplyDouble(redisClient *c, double d) {
     376                 :     char dbuf[128], sbuf[128];
     377                 :     int dlen, slen;
     378           92792 :     dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
     379           92792 :     slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
     380           46396 :     addReplyString(c,sbuf,slen);
     381           46396 : }
     382                 : 
     383                 : /* Add a long long as integer reply or bulk len / multi bulk count.
     384                 :  * Basically this is used to output <prefix><long long><crlf>. */
     385         1845661 : void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) {
     386                 :     char buf[128];
     387                 :     int len;
     388                 : 
     389                 :     /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
     390                 :      * so we have a few shared objects to use if the integer is small
     391                 :      * like it is most of the times. */
     392         1845661 :     if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN) {
     393          278361 :         addReply(c,shared.mbulkhdr[ll]);
     394          278361 :         return;
     395         1567300 :     } else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN) {
     396         1311288 :         addReply(c,shared.bulkhdr[ll]);
     397         1311288 :         return;
     398                 :     }
     399                 : 
     400          256012 :     buf[0] = prefix;
     401          256012 :     len = ll2string(buf+1,sizeof(buf)-1,ll);
     402          256012 :     buf[len+1] = '\r';
     403          256012 :     buf[len+2] = '\n';
     404          256012 :     addReplyString(c,buf,len+3);
     405                 : }
     406                 : 
     407          340313 : void addReplyLongLong(redisClient *c, long long ll) {
     408          340313 :     if (ll == 0)
     409           17659 :         addReply(c,shared.czero);
     410          322654 :     else if (ll == 1)
     411          196269 :         addReply(c,shared.cone);
     412                 :     else
     413          126385 :         addReplyLongLongWithPrefix(c,ll,':');
     414          340313 : }
     415                 : 
     416          278387 : void addReplyMultiBulkLen(redisClient *c, long length) {
     417          278387 :     addReplyLongLongWithPrefix(c,length,'*');
     418          278387 : }
     419                 : 
     420                 : /* Create the length prefix of a bulk reply, example: $2234 */
     421         1319687 : void addReplyBulkLen(redisClient *c, robj *obj) {
     422                 :     size_t len;
     423                 : 
     424         1319687 :     if (obj->encoding == REDIS_ENCODING_RAW) {
     425         2285226 :         len = sdslen(obj->ptr);
     426                 :     } else {
     427          177074 :         long n = (long)obj->ptr;
     428                 : 
     429                 :         /* Compute how many bytes will take this integer as a radix 10 string */
     430          177074 :         len = 1;
     431          177074 :         if (n < 0) {
     432            3179 :             len++;
     433            3179 :             n = -n;
     434                 :         }
     435         1167572 :         while((n = n/10) != 0) {
     436          990498 :             len++;
     437                 :         }
     438                 :     }
     439         1319687 :     addReplyLongLongWithPrefix(c,len,'$');
     440         1319687 : }
     441                 : 
     442                 : /* Add a Redis Object as a bulk reply */
     443         1319687 : void addReplyBulk(redisClient *c, robj *obj) {
     444         1319687 :     addReplyBulkLen(c,obj);
     445         1319687 :     addReply(c,obj);
     446         1319687 :     addReply(c,shared.crlf);
     447         1319687 : }
     448                 : 
     449                 : /* Add a C buffer as bulk reply */
     450          121202 : void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) {
     451          121202 :     addReplyLongLongWithPrefix(c,len,'$');
     452          121202 :     addReplyString(c,p,len);
     453          121202 :     addReply(c,shared.crlf);
     454          121202 : }
     455                 : 
     456                 : /* Add a C nul term string as bulk reply */
     457              14 : void addReplyBulkCString(redisClient *c, char *s) {
     458              14 :     if (s == NULL) {
     459               0 :         addReply(c,shared.nullbulk);
     460                 :     } else {
     461              14 :         addReplyBulkCBuffer(c,s,strlen(s));
     462                 :     }
     463              14 : }
     464                 : 
     465                 : /* Add a long long as a bulk reply */
     466           44782 : void addReplyBulkLongLong(redisClient *c, long long ll) {
     467                 :     char buf[64];
     468                 :     int len;
     469                 : 
     470           44782 :     len = ll2string(buf,64,ll);
     471           44782 :     addReplyBulkCBuffer(c,buf,len);
     472           44782 : }
     473                 : 
     474                 : /* Copy 'src' client output buffers into 'dst' client output buffers.
     475                 :  * The function takes care of freeing the old output buffers of the
     476                 :  * destination client. */
     477               2 : void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
     478               2 :     listRelease(dst->reply);
     479               2 :     dst->reply = listDup(src->reply);
     480               2 :     memcpy(dst->buf,src->buf,src->bufpos);
     481               2 :     dst->bufpos = src->bufpos;
     482               2 :     dst->reply_bytes = src->reply_bytes;
     483               2 : }
     484                 : 
     485             193 : static void acceptCommonHandler(int fd) {
     486                 :     redisClient *c;
     487             193 :     if ((c = createClient(fd)) == NULL) {
     488               0 :         redisLog(REDIS_WARNING,"Error allocating resoures for the client");
     489               0 :         close(fd); /* May be already closed, just ingore errors */
     490               0 :         return;
     491                 :     }
     492                 :     /* If maxclient directive is set and this is one client more... close the
     493                 :      * connection. Note that we create the client instead to check before
     494                 :      * for this condition, since now the socket is already set in nonblocking
     495                 :      * mode and we can send an error for free using the Kernel I/O */
     496             193 :     if (listLength(server.clients) > server.maxclients) {
     497               0 :         char *err = "-ERR max number of clients reached\r\n";
     498                 : 
     499                 :         /* That's a best effort error message, don't check write errors */
     500               0 :         if (write(c->fd,err,strlen(err)) == -1) {
     501                 :             /* Nothing to do, Just to avoid the warning... */
     502                 :         }
     503               0 :         server.stat_rejected_conn++;
     504               0 :         freeClient(c);
     505               0 :         return;
     506                 :     }
     507             193 :     server.stat_numconnections++;
     508                 : }
     509                 : 
     510             193 : void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     511                 :     int cport, cfd;
     512                 :     char cip[128];
     513                 :     REDIS_NOTUSED(el);
     514                 :     REDIS_NOTUSED(mask);
     515                 :     REDIS_NOTUSED(privdata);
     516                 : 
     517             193 :     cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
     518             193 :     if (cfd == AE_ERR) {
     519               0 :         redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
     520               0 :         return;
     521                 :     }
     522             193 :     redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
     523             193 :     acceptCommonHandler(cfd);
     524                 : }
     525                 : 
     526               0 : void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     527                 :     int cfd;
     528                 :     REDIS_NOTUSED(el);
     529                 :     REDIS_NOTUSED(mask);
     530                 :     REDIS_NOTUSED(privdata);
     531                 : 
     532               0 :     cfd = anetUnixAccept(server.neterr, fd);
     533               0 :     if (cfd == AE_ERR) {
     534               0 :         redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
     535               0 :         return;
     536                 :     }
     537               0 :     redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
     538               0 :     acceptCommonHandler(cfd);
     539                 : }
     540                 : 
     541                 : 
     542                 : static void freeClientArgv(redisClient *c) {
     543                 :     int j;
     544         6671761 :     for (j = 0; j < c->argc; j++)
     545         4921023 :         decrRefCount(c->argv[j]);
     546         1750738 :     c->argc = 0;
     547         1750738 :     c->cmd = NULL;
     548                 : }
     549                 : 
     550                 : /* Close all the slaves connections. This is useful in chained replication
     551                 :  * when we resync with our own master and want to force all our slaves to
     552                 :  * resync with us as well. */
     553              14 : void disconnectSlaves(void) {
     554              28 :     while (listLength(server.slaves)) {
     555               0 :         listNode *ln = listFirst(server.slaves);
     556               0 :         freeClient((redisClient*)ln->value);
     557                 :     }
     558              14 : }
     559                 : 
     560              96 : void freeClient(redisClient *c) {
     561                 :     listNode *ln;
     562                 : 
     563                 :     /* If this is marked as current client unset it */
     564              96 :     if (server.current_client == c) server.current_client = NULL;
     565                 : 
     566                 :     /* Note that if the client we are freeing is blocked into a blocking
     567                 :      * call, we have to set querybuf to NULL *before* to call
     568                 :      * unblockClientWaitingData() to avoid processInputBuffer() will get
     569                 :      * called. Also it is important to remove the file events after
     570                 :      * this, because this call adds the READABLE event. */
     571              96 :     sdsfree(c->querybuf);
     572              96 :     c->querybuf = NULL;
     573              96 :     if (c->flags & REDIS_BLOCKED)
     574               1 :         unblockClientWaitingData(c);
     575                 : 
     576                 :     /* UNWATCH all the keys */
     577              96 :     unwatchAllKeys(c);
     578              96 :     listRelease(c->watched_keys);
     579                 :     /* Unsubscribe from all the pubsub channels */
     580              96 :     pubsubUnsubscribeAllChannels(c,0);
     581              96 :     pubsubUnsubscribeAllPatterns(c,0);
     582              96 :     dictRelease(c->pubsub_channels);
     583              96 :     listRelease(c->pubsub_patterns);
     584                 :     /* Obvious cleanup */
     585              96 :     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
     586              96 :     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
     587              96 :     listRelease(c->reply);
     588                 :     freeClientArgv(c);
     589              96 :     close(c->fd);
     590                 :     /* Remove from the list of clients */
     591              96 :     ln = listSearchKey(server.clients,c);
     592              96 :     redisAssert(ln != NULL);
     593              96 :     listDelNode(server.clients,ln);
     594                 :     /* When client was just unblocked because of a blocking operation,
     595                 :      * remove it from the list with unblocked clients. */
     596              96 :     if (c->flags & REDIS_UNBLOCKED) {
     597               1 :         ln = listSearchKey(server.unblocked_clients,c);
     598               1 :         redisAssert(ln != NULL);
     599               1 :         listDelNode(server.unblocked_clients,ln);
     600                 :     }
     601              96 :     listRelease(c->io_keys);
     602                 :     /* Master/slave cleanup.
     603                 :      * Case 1: we lost the connection with a slave. */
     604              96 :     if (c->flags & REDIS_SLAVE) {
     605               4 :         if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
     606               0 :             close(c->repldbfd);
     607               4 :         list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
     608               4 :         ln = listSearchKey(l,c);
     609               4 :         redisAssert(ln != NULL);
     610               4 :         listDelNode(l,ln);
     611                 :     }
     612                 : 
     613                 :     /* Case 2: we lost the connection with the master. */
     614              96 :     if (c->flags & REDIS_MASTER) {
     615               5 :         server.master = NULL;
     616               5 :         server.repl_state = REDIS_REPL_CONNECT;
     617               5 :         server.repl_down_since = server.unixtime;
     618                 :         /* We lost connection with our master, force our slaves to resync
     619                 :          * with us as well to load the new data set.
     620                 :          *
     621                 :          * If server.masterhost is NULL the user called SLAVEOF NO ONE so
     622                 :          * slave resync is not needed. */
     623               5 :         if (server.masterhost != NULL) disconnectSlaves();
     624                 :     }
     625                 : 
     626                 :     /* If this client was scheduled for async freeing we need to remove it
     627                 :      * from the queue. */
     628              96 :     if (c->flags & REDIS_CLOSE_ASAP) {
     629               0 :         ln = listSearchKey(server.clients_to_close,c);
     630               0 :         redisAssert(ln != NULL);
     631               0 :         listDelNode(server.clients_to_close,ln);
     632                 :     }
     633                 : 
     634                 :     /* Release memory */
     635              96 :     zfree(c->argv);
     636              96 :     freeClientMultiState(c);
     637              96 :     zfree(c);
     638              96 : }
     639                 : 
     640                 : /* Schedule a client to free it at a safe time in the serverCron() function.
     641                 :  * This function is useful when we need to terminate a client but we are in
     642                 :  * a context where calling freeClient() is not possible, because the client
     643                 :  * should be valid for the continuation of the flow of the program. */
     644               2 : void freeClientAsync(redisClient *c) {
     645               2 :     if (c->flags & REDIS_CLOSE_ASAP) return;
     646               2 :     c->flags |= REDIS_CLOSE_ASAP;
     647               2 :     listAddNodeTail(server.clients_to_close,c);
     648                 : }
     649                 : 
     650            2402 : void freeClientsInAsyncFreeQueue(void) {
     651            4806 :     while (listLength(server.clients_to_close)) {
     652               2 :         listNode *ln = listFirst(server.clients_to_close);
     653               2 :         redisClient *c = listNodeValue(ln);
     654                 : 
     655               2 :         c->flags &= ~REDIS_CLOSE_ASAP;
     656               2 :         freeClient(c);
     657               2 :         listDelNode(server.clients_to_close,ln);
     658                 :     }
     659            2402 : }
     660                 : 
     661         1067319 : void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     662         1067319 :     redisClient *c = privdata;
     663         1067319 :     int nwritten = 0, totwritten = 0, objlen;
     664                 :     size_t objmem;
     665                 :     robj *o;
     666                 :     REDIS_NOTUSED(el);
     667                 :     REDIS_NOTUSED(mask);
     668                 : 
     669         3203449 :     while(c->bufpos > 0 || listLength(c->reply)) {
     670         1070130 :         if (c->bufpos > 0) {
     671         1064525 :             if (c->flags & REDIS_MASTER) {
     672                 :                 /* Don't reply to a master */
     673           54222 :                 nwritten = c->bufpos - c->sentlen;
     674                 :             } else {
     675         1010303 :                 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
     676         1010303 :                 if (nwritten <= 0) break;
     677                 :             }
     678         1064525 :             c->sentlen += nwritten;
     679         1064525 :             totwritten += nwritten;
     680                 : 
     681                 :             /* If the buffer was sent, set bufpos to zero to continue with
     682                 :              * the remainder of the reply. */
     683         1064525 :             if (c->sentlen == c->bufpos) {
     684         1064525 :                 c->bufpos = 0;
     685         1064525 :                 c->sentlen = 0;
     686                 :             }
     687                 :         } else {
     688            5605 :             o = listNodeValue(listFirst(c->reply));
     689           11210 :             objlen = sdslen(o->ptr);
     690            5605 :             objmem = zmalloc_size_sds(o->ptr);
     691                 : 
     692            5605 :             if (objlen == 0) {
     693               0 :                 listDelNode(c->reply,listFirst(c->reply));
     694               0 :                 continue;
     695                 :             }
     696                 : 
     697            5605 :             if (c->flags & REDIS_MASTER) {
     698                 :                 /* Don't reply to a master */
     699               0 :                 nwritten = objlen - c->sentlen;
     700                 :             } else {
     701            5605 :                 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
     702            5605 :                 if (nwritten <= 0) break;
     703                 :             }
     704            5600 :             c->sentlen += nwritten;
     705            5600 :             totwritten += nwritten;
     706                 : 
     707                 :             /* If we fully sent the object on head go to the next one */
     708            5600 :             if (c->sentlen == objlen) {
     709            5550 :                 listDelNode(c->reply,listFirst(c->reply));
     710            5550 :                 c->sentlen = 0;
     711            5550 :                 c->reply_bytes -= objmem;
     712                 :             }
     713                 :         }
     714                 :         /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
     715                 :          * bytes, in a single threaded server it's a good idea to serve
     716                 :          * other clients as well, even if a very large request comes from
     717                 :          * super fast link that is always able to accept data (in real world
     718                 :          * scenario think about 'KEYS *' against the loopback interface).
     719                 :          *
     720                 :          * However if we are over the maxmemory limit we ignore that and
     721                 :          * just deliver as much data as it is possible to deliver. */
     722         1071439 :         if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
     723            1314 :             (server.maxmemory == 0 ||
     724               0 :              zmalloc_used_memory() < server.maxmemory)) break;
     725                 :     }
     726         1067319 :     if (nwritten == -1) {
     727               5 :         if (errno == EAGAIN) {
     728               5 :             nwritten = 0;
     729                 :         } else {
     730               0 :             redisLog(REDIS_VERBOSE,
     731                 :                 "Error writing to client: %s", strerror(errno));
     732               0 :             freeClient(c);
     733               0 :             return;
     734                 :         }
     735                 :     }
     736         1067319 :     if (totwritten > 0) c->lastinteraction = server.unixtime;
     737         1067319 :     if (c->bufpos == 0 && listLength(c->reply) == 0) {
     738         1066005 :         c->sentlen = 0;
     739         1066005 :         aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
     740                 : 
     741                 :         /* Close connection after entire reply has been sent. */
     742         1066005 :         if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
     743                 :     }
     744                 : }
     745                 : 
     746                 : /* resetClient prepare the client to process the next command */
     747         1750642 : void resetClient(redisClient *c) {
     748                 :     freeClientArgv(c);
     749         1750642 :     c->reqtype = 0;
     750         1750642 :     c->multibulklen = 0;
     751         1750642 :     c->bulklen = -1;
     752                 :     /* We clear the ASKING flag as well if we are not inside a MULTI. */
     753         1750642 :     if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
     754         1750642 : }
     755                 : 
     756          247781 : int processInlineBuffer(redisClient *c) {
     757          247781 :     char *newline = strstr(c->querybuf,"\r\n");
     758                 :     int argc, j;
     759                 :     sds *argv;
     760                 :     size_t querylen;
     761                 : 
     762                 :     /* Nothing to do without a \r\n */
     763          247781 :     if (newline == NULL) {
     764           38072 :         if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
     765               2 :             addReplyError(c,"Protocol error: too big inline request");
     766               2 :             setProtocolError(c,0);
     767                 :         }
     768           19036 :         return REDIS_ERR;
     769                 :     }
     770                 : 
     771                 :     /* Split the input buffer up to the \r\n */
     772          228745 :     querylen = newline-(c->querybuf);
     773          228745 :     argv = sdssplitlen(c->querybuf,querylen," ",1,&argc);
     774                 : 
     775                 :     /* Leave data after the first line of the query in the buffer */
     776          228745 :     c->querybuf = sdsrange(c->querybuf,querylen+2,-1);
     777                 : 
     778                 :     /* Setup argv array on client structure */
     779          228745 :     if (c->argv) zfree(c->argv);
     780          228745 :     c->argv = zmalloc(sizeof(robj*)*argc);
     781                 : 
     782                 :     /* Create redis objects for all arguments. */
     783          786185 :     for (c->argc = 0, j = 0; j < argc; j++) {
     784         1114880 :         if (sdslen(argv[j])) {
     785          557431 :             c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
     786          557431 :             c->argc++;
     787                 :         } else {
     788               9 :             sdsfree(argv[j]);
     789                 :         }
     790                 :     }
     791          228745 :     zfree(argv);
     792          228745 :     return REDIS_OK;
     793                 : }
     794                 : 
     795                 : /* Helper function. Trims query buffer to make the function that processes
     796                 :  * multi bulk requests idempotent. */
     797               9 : static void setProtocolError(redisClient *c, int pos) {
     798               9 :     if (server.verbosity >= REDIS_VERBOSE) {
     799               9 :         sds client = getClientInfoString(c);
     800               9 :         redisLog(REDIS_VERBOSE,
     801                 :             "Protocol error from client: %s", client);
     802               9 :         sdsfree(client);
     803                 :     }
     804               9 :     c->flags |= REDIS_CLOSE_AFTER_REPLY;
     805               9 :     c->querybuf = sdsrange(c->querybuf,pos,-1);
     806               9 : }
     807                 : 
     808         1535975 : int processMultibulkBuffer(redisClient *c) {
     809         1535975 :     char *newline = NULL;
     810         1535975 :     int pos = 0, ok;
     811                 :     long long ll;
     812                 : 
     813         1535975 :     if (c->multibulklen == 0) {
     814                 :         /* The client should have been reset */
     815         1531316 :         redisAssertWithInfo(c,NULL,c->argc == 0);
     816                 : 
     817                 :         /* Multi bulk length cannot be read without a \r\n */
     818         1531316 :         newline = strchr(c->querybuf,'\r');
     819         1531316 :         if (newline == NULL) {
     820           18786 :             if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
     821               1 :                 addReplyError(c,"Protocol error: too big mbulk count string");
     822               1 :                 setProtocolError(c,0);
     823                 :             }
     824            9393 :             return REDIS_ERR;
     825                 :         }
     826                 : 
     827                 :         /* Buffer should also contain \n */
     828         3043846 :         if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
     829              16 :             return REDIS_ERR;
     830                 : 
     831                 :         /* We know for sure there is a whole line since newline != NULL,
     832                 :          * so go ahead and find out the multi bulk length. */
     833         1521907 :         redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
     834         1521907 :         ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
     835         1521907 :         if (!ok || ll > 1024*1024) {
     836               1 :             addReplyError(c,"Protocol error: invalid multibulk length");
     837               1 :             setProtocolError(c,pos);
     838               1 :             return REDIS_ERR;
     839                 :         }
     840                 : 
     841         1521906 :         pos = (newline-c->querybuf)+2;
     842         1521906 :         if (ll <= 0) {
     843               1 :             c->querybuf = sdsrange(c->querybuf,pos,-1);
     844               1 :             return REDIS_OK;
     845                 :         }
     846                 : 
     847         1521905 :         c->multibulklen = ll;
     848                 : 
     849                 :         /* Setup argv array on client structure */
     850         1521905 :         if (c->argv) zfree(c->argv);
     851         1521905 :         c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
     852                 :     }
     853                 : 
     854         1526564 :     redisAssertWithInfo(c,NULL,c->multibulklen > 0);
     855         5879833 :     while(c->multibulklen) {
     856                 :         /* Read bulk length if unknown */
     857         4357934 :         if (c->bulklen == -1) {
     858         4353524 :             newline = strchr(c->querybuf+pos,'\r');
     859         4353524 :             if (newline == NULL) {
     860             428 :                 if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
     861               0 :                     addReplyError(c,"Protocol error: too big bulk count string");
     862               0 :                     setProtocolError(c,0);
     863                 :                 }
     864                 :                 break;
     865                 :             }
     866                 : 
     867                 :             /* Buffer should also contain \n */
     868         8706620 :             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
     869                 :                 break;
     870                 : 
     871         4353275 :             if (c->querybuf[pos] != '$') {
     872               2 :                 addReplyErrorFormat(c,
     873                 :                     "Protocol error: expected '$', got '%c'",
     874               2 :                     c->querybuf[pos]);
     875               2 :                 setProtocolError(c,pos);
     876               2 :                 return REDIS_ERR;
     877                 :             }
     878                 : 
     879         4353273 :             ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
     880         4353273 :             if (!ok || ll < 0 || ll > 512*1024*1024) {
     881               3 :                 addReplyError(c,"Protocol error: invalid bulk length");
     882               3 :                 setProtocolError(c,pos);
     883               3 :                 return REDIS_ERR;
     884                 :             }
     885                 : 
     886         4353270 :             pos += newline-(c->querybuf+pos)+2;
     887         4353270 :             if (ll >= REDIS_MBULK_BIG_ARG) {
     888                 :                 /* If we are going to read a large object from network
     889                 :                  * try to make it likely that it will start at c->querybuf
     890                 :                  * boundary so that we can optimized object creation
     891                 :                  * avoiding a large copy of data. */
     892             459 :                 c->querybuf = sdsrange(c->querybuf,pos,-1);
     893             459 :                 pos = 0;
     894                 :                 /* Hint the sds library about the amount of bytes this string is
     895                 :                  * going to contain. */
     896             459 :                 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
     897                 :             }
     898         4353270 :             c->bulklen = ll;
     899                 :         }
     900                 : 
     901                 :         /* Read bulk argument */
     902         8715360 :         if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
     903                 :             /* Not enough data (+2 == trailing \r\n) */
     904                 :             break;
     905                 :         } else {
     906                 :             /* Optimization: if the buffer contanins JUST our bulk element
     907                 :              * instead of creating a new object by *copying* the sds we
     908                 :              * just use the current sds string. */
     909         4355538 :             if (pos == 0 &&
     910            1351 :                 c->bulklen >= REDIS_MBULK_BIG_ARG &&
     911             918 :                 (signed) sdslen(c->querybuf) == c->bulklen+2)
     912                 :             {
     913             459 :                 c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
     914             459 :                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
     915             459 :                 c->querybuf = sdsempty();
     916                 :                 /* Assume that if we saw a fat argument we'll see another one
     917                 :                  * likely... */
     918             459 :                 c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
     919             459 :                 pos = 0;
     920                 :             } else {
     921         8705620 :                 c->argv[c->argc++] =
     922         4352810 :                     createStringObject(c->querybuf+pos,c->bulklen);
     923         4352810 :                 pos += c->bulklen+2;
     924                 :             }
     925         4353269 :             c->bulklen = -1;
     926         4353269 :             c->multibulklen--;
     927                 :         }
     928                 :     }
     929                 : 
     930                 :     /* Trim to pos */
     931         1526559 :     if (pos) c->querybuf = sdsrange(c->querybuf,pos,-1);
     932                 : 
     933                 :     /* We're done when c->multibulk == 0 */
     934         1526559 :     if (c->multibulklen == 0) return REDIS_OK;
     935                 : 
     936                 :     /* Still not read to process the command */
     937            4660 :     return REDIS_ERR;
     938                 : }
     939                 : 
     940         1031988 : void processInputBuffer(redisClient *c) {
     941                 :     /* Keep processing while there is something in the input buffer */
     942         6597254 :     while(sdslen(c->querybuf)) {
     943                 :         /* Immediately abort if the client is in the middle of something. */
     944         1783763 :         if (c->flags & REDIS_BLOCKED) return;
     945                 : 
     946                 :         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
     947                 :          * written to the client. Make sure to not let the reply grow after
     948                 :          * this flag has been set (i.e. don't process more commands). */
     949         1783761 :         if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
     950                 : 
     951                 :         /* Determine request type when unknown. */
     952         1783756 :         if (!c->reqtype) {
     953         1750703 :             if (c->querybuf[0] == '*') {
     954         1521908 :                 c->reqtype = REDIS_REQ_MULTIBULK;
     955                 :             } else {
     956          228795 :                 c->reqtype = REDIS_REQ_INLINE;
     957                 :             }
     958                 :         }
     959                 : 
     960         1783756 :         if (c->reqtype == REDIS_REQ_INLINE) {
     961          247781 :             if (processInlineBuffer(c) != REDIS_OK) break;
     962         1535975 :         } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
     963         1535975 :             if (processMultibulkBuffer(c) != REDIS_OK) break;
     964                 :         } else {
     965               0 :             redisPanic("Unknown request type");
     966                 :         }
     967                 : 
     968                 :         /* Multibulk processing could see a <= 0 length. */
     969         1750645 :         if (c->argc == 0) {
     970               2 :             resetClient(c);
     971                 :         } else {
     972                 :             /* Only reset the client when the command was executed. */
     973         1750643 :             if (processCommand(c) == REDIS_OK)
     974         1750640 :                 resetClient(c);
     975                 :         }
     976                 :     }
     977                 : }
     978                 : 
     979         1032068 : void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     980         1032068 :     redisClient *c = (redisClient*) privdata;
     981                 :     int nread, readlen;
     982                 :     size_t qblen;
     983                 :     REDIS_NOTUSED(el);
     984                 :     REDIS_NOTUSED(mask);
     985                 : 
     986         1032068 :     server.current_client = c;
     987         1032068 :     readlen = REDIS_IOBUF_LEN;
     988                 :     /* If this is a multi bulk request, and we are processing a bulk reply
     989                 :      * that is large enough, try to maximize the probabilty that the query
     990                 :      * buffer contains excatly the SDS string representing the object, even
     991                 :      * at the risk of requring more read(2) calls. This way the function
     992                 :      * processMultiBulkBuffer() can avoid copying buffers to create the
     993                 :      * Redis Object representing the argument. */
     994         1041139 :     if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
     995            9071 :         && c->bulklen >= REDIS_MBULK_BIG_ARG)
     996                 :     {
     997            6898 :         int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
     998                 : 
     999            3449 :         if (remaining < readlen) readlen = remaining;
    1000                 :     }
    1001                 : 
    1002         2064136 :     qblen = sdslen(c->querybuf);
    1003         1032068 :     if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    1004         1032068 :     c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    1005         2064136 :     nread = read(fd, c->querybuf+qblen, readlen);
    1006         1032068 :     if (nread == -1) {
    1007              10 :         if (errno == EAGAIN) {
    1008               0 :             nread = 0;
    1009                 :         } else {
    1010              10 :             redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
    1011              10 :             freeClient(c);
    1012              10 :             return;
    1013                 :         }
    1014         1032058 :     } else if (nread == 0) {
    1015              72 :         redisLog(REDIS_VERBOSE, "Client closed connection");
    1016              72 :         freeClient(c);
    1017              72 :         return;
    1018                 :     }
    1019         1031986 :     if (nread) {
    1020         1031986 :         sdsIncrLen(c->querybuf,nread);
    1021         1031986 :         c->lastinteraction = server.unixtime;
    1022                 :     } else {
    1023               0 :         server.current_client = NULL;
    1024               0 :         return;
    1025                 :     }
    1026         2063972 :     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
    1027               0 :         sds ci = getClientInfoString(c), bytes = sdsempty();
    1028                 : 
    1029               0 :         bytes = sdscatrepr(bytes,c->querybuf,64);
    1030               0 :         redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
    1031               0 :         sdsfree(ci);
    1032               0 :         sdsfree(bytes);
    1033               0 :         freeClient(c);
    1034               0 :         return;
    1035                 :     }
    1036         1031986 :     processInputBuffer(c);
    1037         1031986 :     server.current_client = NULL;
    1038                 : }
    1039                 : 
    1040            9168 : void getClientsMaxBuffers(unsigned long *longest_output_list,
    1041                 :                           unsigned long *biggest_input_buffer) {
    1042                 :     redisClient *c;
    1043                 :     listNode *ln;
    1044                 :     listIter li;
    1045            9168 :     unsigned long lol = 0, bib = 0;
    1046                 : 
    1047            9168 :     listRewind(server.clients,&li);
    1048           27614 :     while ((ln = listNext(&li)) != NULL) {
    1049            9278 :         c = listNodeValue(ln);
    1050                 : 
    1051            9278 :         if (listLength(c->reply) > lol) lol = listLength(c->reply);
    1052           18558 :         if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf);
    1053                 :     }
    1054            9168 :     *longest_output_list = lol;
    1055            9168 :     *biggest_input_buffer = bib;
    1056            9168 : }
    1057                 : 
    1058                 : /* Turn a Redis client into an sds string representing its state. */
    1059          150722 : sds getClientInfoString(redisClient *client) {
    1060                 :     char ip[32], flags[16], events[3], *p;
    1061                 :     int port;
    1062                 :     int emask;
    1063                 : 
    1064          150722 :     anetPeerToString(client->fd,ip,&port);
    1065          150722 :     p = flags;
    1066          150722 :     if (client->flags & REDIS_SLAVE) {
    1067               0 :         if (client->flags & REDIS_MONITOR)
    1068               0 :             *p++ = 'O';
    1069                 :         else
    1070               0 :             *p++ = 'S';
    1071                 :     }
    1072          150722 :     if (client->flags & REDIS_MASTER) *p++ = 'M';
    1073          150722 :     if (client->flags & REDIS_MULTI) *p++ = 'x';
    1074          150722 :     if (client->flags & REDIS_BLOCKED) *p++ = 'b';
    1075          150722 :     if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
    1076          150722 :     if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
    1077          150722 :     if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
    1078          150722 :     if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
    1079          150722 :     if (p == flags) *p++ = 'N';
    1080          150722 :     *p++ = '\0';
    1081                 : 
    1082          150722 :     emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
    1083          150722 :     p = events;
    1084          150722 :     if (emask & AE_READABLE) *p++ = 'r';
    1085          150722 :     if (emask & AE_WRITABLE) *p++ = 'w';
    1086          150722 :     *p = '\0';
    1087         1205773 :     return sdscatprintf(sdsempty(),
    1088                 :         "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
    1089                 :         ip,port,client->fd,
    1090                 :         (long)(server.unixtime - client->ctime),
    1091                 :         (long)(server.unixtime - client->lastinteraction),
    1092                 :         flags,
    1093          150722 :         client->db->id,
    1094          301444 :         (int) dictSize(client->pubsub_channels),
    1095          150722 :         (int) listLength(client->pubsub_patterns),
    1096                 :         (unsigned long) sdslen(client->querybuf),
    1097                 :         (unsigned long) sdsavail(client->querybuf),
    1098                 :         (unsigned long) client->bufpos,
    1099          150722 :         (unsigned long) listLength(client->reply),
    1100                 :         getClientOutputBufferMemoryUsage(client),
    1101                 :         events,
    1102          301441 :         client->lastcmd ? client->lastcmd->name : "NULL");
    1103                 : }
    1104                 : 
    1105           75357 : sds getAllClientsInfoString(void) {
    1106                 :     listNode *ln;
    1107                 :     listIter li;
    1108                 :     redisClient *client;
    1109           75357 :     sds o = sdsempty();
    1110                 : 
    1111           75357 :     listRewind(server.clients,&li);
    1112          301425 :     while ((ln = listNext(&li)) != NULL) {
    1113                 :         sds cs;
    1114                 : 
    1115          150711 :         client = listNodeValue(ln);
    1116          150711 :         cs = getClientInfoString(client);
    1117          150711 :         o = sdscatsds(o,cs);
    1118          150711 :         sdsfree(cs);
    1119          150711 :         o = sdscatlen(o,"\n",1);
    1120                 :     }
    1121           75357 :     return o;
    1122                 : }
    1123                 : 
    1124           75357 : void clientCommand(redisClient *c) {
    1125                 :     listNode *ln;
    1126                 :     listIter li;
    1127                 :     redisClient *client;
    1128                 : 
    1129          150714 :     if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
    1130           75357 :         sds o = getAllClientsInfoString();
    1131           75357 :         addReplyBulkCBuffer(c,o,sdslen(o));
    1132           75357 :         sdsfree(o);
    1133               0 :     } else if (!strcasecmp(c->argv[1]->ptr,"kill") && c->argc == 3) {
    1134               0 :         listRewind(server.clients,&li);
    1135               0 :         while ((ln = listNext(&li)) != NULL) {
    1136                 :             char ip[32], addr[64];
    1137                 :             int port;
    1138                 : 
    1139               0 :             client = listNodeValue(ln);
    1140               0 :             if (anetPeerToString(client->fd,ip,&port) == -1) continue;
    1141               0 :             snprintf(addr,sizeof(addr),"%s:%d",ip,port);
    1142               0 :             if (strcmp(addr,c->argv[2]->ptr) == 0) {
    1143               0 :                 addReply(c,shared.ok);
    1144               0 :                 if (c == client) {
    1145               0 :                     client->flags |= REDIS_CLOSE_AFTER_REPLY;
    1146                 :                 } else {
    1147               0 :                     freeClient(client);
    1148                 :                 }
    1149                 :                 return;
    1150                 :             }
    1151                 :         }
    1152               0 :         addReplyError(c,"No such client");
    1153                 :     } else {
    1154               0 :         addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
    1155                 :     }
    1156                 : }
    1157                 : 
    1158                 : /* Rewrite the command vector of the client. All the new objects ref count
    1159                 :  * is incremented. The old command vector is freed, and the old objects
    1160                 :  * ref count is decremented. */
    1161           10472 : void rewriteClientCommandVector(redisClient *c, int argc, ...) {
    1162                 :     va_list ap;
    1163                 :     int j;
    1164                 :     robj **argv; /* The new argument vector */
    1165                 : 
    1166           10472 :     argv = zmalloc(sizeof(robj*)*argc);
    1167           10472 :     va_start(ap,argc);
    1168           41822 :     for (j = 0; j < argc; j++) {
    1169                 :         robj *a;
    1170                 :         
    1171           31350 :         a = va_arg(ap, robj*);
    1172           31350 :         argv[j] = a;
    1173           31350 :         incrRefCount(a);
    1174                 :     }
    1175                 :     /* We free the objects in the original vector at the end, so we are
    1176                 :      * sure that if the same objects are reused in the new vector the
    1177                 :      * refcount gets incremented before it gets decremented. */
    1178           10472 :     for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
    1179           10472 :     zfree(c->argv);
    1180                 :     /* Replace argv and argc with our new versions. */
    1181           10472 :     c->argv = argv;
    1182           10472 :     c->argc = argc;
    1183           10472 :     c->cmd = lookupCommand(c->argv[0]->ptr);
    1184           10472 :     redisAssertWithInfo(c,NULL,c->cmd != NULL);
    1185           10472 :     va_end(ap);
    1186           10472 : }
    1187                 : 
    1188                 : /* Rewrite a single item in the command vector.
    1189                 :  * The new val ref count is incremented, and the old decremented. */
    1190              42 : void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
    1191                 :     robj *oldval;
    1192                 :    
    1193              42 :     redisAssertWithInfo(c,NULL,i < c->argc);
    1194              42 :     oldval = c->argv[i];
    1195              42 :     c->argv[i] = newval;
    1196              42 :     incrRefCount(newval);
    1197              42 :     decrRefCount(oldval);
    1198                 : 
    1199                 :     /* If this is the command name make sure to fix c->cmd. */
    1200              42 :     if (i == 0) {
    1201              21 :         c->cmd = lookupCommand(c->argv[0]->ptr);
    1202              21 :         redisAssertWithInfo(c,NULL,c->cmd != NULL);
    1203                 :     }
    1204              42 : }
    1205                 : 
    1206                 : /* This function returns the number of bytes that Redis is virtually
    1207                 :  * using to store the reply still not read by the client.
    1208                 :  * It is "virtual" since the reply output list may contain objects that
    1209                 :  * are shared and are not really using additional memory.
    1210                 :  *
    1211                 :  * The function returns the total sum of the length of all the objects
    1212                 :  * stored in the output list, plus the memory used to allocate every
    1213                 :  * list node. The static reply buffer is not taken into account since it
    1214                 :  * is allocated anyway.
    1215                 :  *
    1216                 :  * Note: this function is very fast so can be called as many time as
    1217                 :  * the caller wishes. The main usage of this function currently is
    1218                 :  * enforcing the client output length limits. */
    1219               0 : unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
    1220         3484157 :     unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
    1221                 : 
    1222         3484157 :     return c->reply_bytes + (list_item_size*listLength(c->reply));
    1223                 : }
    1224                 : 
    1225                 : /* Get the class of a client, used in order to envorce limits to different
    1226                 :  * classes of clients.
    1227                 :  *
    1228                 :  * The function will return one of the following:
    1229                 :  * REDIS_CLIENT_LIMIT_CLASS_NORMAL -> Normal client
    1230                 :  * REDIS_CLIENT_LIMIT_CLASS_SLAVE  -> Slave or client executing MONITOR command
    1231                 :  * REDIS_CLIENT_LIMIT_CLASS_PUBSUB -> Client subscribed to Pub/Sub channels
    1232                 :  */
    1233               0 : int getClientLimitClass(redisClient *c) {
    1234         3333435 :     if (c->flags & REDIS_SLAVE) return REDIS_CLIENT_LIMIT_CLASS_SLAVE;
    1235         1205950 :     if (dictSize(c->pubsub_channels) || listLength(c->pubsub_patterns))
    1236          470975 :         return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
    1237          734975 :     return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
    1238                 : }
    1239                 : 
    1240               6 : int getClientLimitClassByName(char *name) {
    1241               6 :     if (!strcasecmp(name,"normal")) return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
    1242               6 :     else if (!strcasecmp(name,"slave")) return REDIS_CLIENT_LIMIT_CLASS_SLAVE;
    1243               6 :     else if (!strcasecmp(name,"pubsub")) return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
    1244               0 :     else return -1;
    1245                 : }
    1246                 : 
    1247               0 : char *getClientLimitClassName(int class) {
    1248               0 :     switch(class) {
    1249               0 :     case REDIS_CLIENT_LIMIT_CLASS_NORMAL:   return "normal";
    1250               0 :     case REDIS_CLIENT_LIMIT_CLASS_SLAVE:    return "slave";
    1251               0 :     case REDIS_CLIENT_LIMIT_CLASS_PUBSUB:   return "pubsub";
    1252               0 :     default:                                return NULL;
    1253                 :     }
    1254                 : }
    1255                 : 
    1256                 : /* The function checks if the client reached output buffer soft or hard
    1257                 :  * limit, and also update the state needed to check the soft limit as
    1258                 :  * a side effect.
    1259                 :  *
    1260                 :  * Return value: non-zero if the client reached the soft or the hard limit.
    1261                 :  *               Otherwise zero is returned. */
    1262         3333435 : int checkClientOutputBufferLimits(redisClient *c) {
    1263         3333435 :     int soft = 0, hard = 0, class;
    1264         3333435 :     unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
    1265                 : 
    1266         3333435 :     class = getClientLimitClass(c);
    1267         5480106 :     if (server.client_obuf_limits[class].hard_limit_bytes &&
    1268         2146671 :         used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
    1269               1 :         hard = 1;
    1270         5912709 :     if (server.client_obuf_limits[class].soft_limit_bytes &&
    1271         2579274 :         used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
    1272          413587 :         soft = 1;
    1273                 : 
    1274                 :     /* We need to check if the soft limit is reached continuously for the
    1275                 :      * specified amount of seconds. */
    1276         3333435 :     if (soft) {
    1277          413587 :         if (c->obuf_soft_limit_reached_time == 0) {
    1278               2 :             c->obuf_soft_limit_reached_time = server.unixtime;
    1279               2 :             soft = 0; /* First time we see the soft limit reached */
    1280                 :         } else {
    1281          413585 :             time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
    1282                 : 
    1283          413585 :             if (elapsed <=
    1284                 :                 server.client_obuf_limits[class].soft_limit_seconds) {
    1285          413584 :                 soft = 0; /* The client still did not reached the max number of
    1286                 :                              seconds for the soft limit to be considered
    1287                 :                              reached. */
    1288                 :             }
    1289                 :         }
    1290                 :     } else {
    1291         2919848 :         c->obuf_soft_limit_reached_time = 0;
    1292                 :     }
    1293         3333435 :     return soft || hard;
    1294                 : }
    1295                 : 
    1296                 : /* Asynchronously close a client if soft or hard limit is reached on the
    1297                 :  * output buffer size. The caller can check if the client will be closed
    1298                 :  * checking if the client REDIS_CLOSE_ASAP flag is set.
    1299                 :  *
    1300                 :  * Note: we need to close the client asynchronously because this function is
    1301                 :  * called from contexts where the client can't be freed safely, i.e. from the
    1302                 :  * lower level functions pushing data inside the client output buffers. */
    1303         3341617 : void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
    1304         3341617 :     if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
    1305         3333435 :     if (checkClientOutputBufferLimits(c)) {
    1306               2 :         sds client = getClientInfoString(c);
    1307                 : 
    1308               2 :         freeClientAsync(c);
    1309               2 :         redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
    1310               2 :         sdsfree(client);
    1311                 :     }
    1312                 : }
    1313                 : 
    1314                 : /* Helper function used by freeMemoryIfNeeded() in order to flush slaves
    1315                 :  * output buffers without returning control to the event loop. */
    1316               0 : void flushSlavesOutputBuffers(void) {
    1317                 :     listIter li;
    1318                 :     listNode *ln;
    1319                 : 
    1320               0 :     listRewind(server.slaves,&li);
    1321               0 :     while((ln = listNext(&li))) {
    1322               0 :         redisClient *slave = listNodeValue(ln);
    1323                 :         int events;
    1324                 : 
    1325               0 :         events = aeGetFileEvents(server.el,slave->fd);
    1326               0 :         if (events & AE_WRITABLE &&
    1327               0 :             slave->replstate == REDIS_REPL_ONLINE &&
    1328               0 :             listLength(slave->reply))
    1329                 :         {
    1330               0 :             sendReplyToClient(server.el,slave->fd,slave,0);
    1331                 :         }
    1332                 :     }
    1333               0 : }

Generated by: LCOV version 1.7