LCOV - code coverage report
Current view: directory - redis/src - cluster.c (source / functions) Found Hit Coverage
Test: redis.info Lines: 905 89 9.8 %
Date: 2012-04-04 Functions: 47 5 10.6 %
Colors: not hit hit

       1                 : #include "redis.h"
       2                 : #include "endianconv.h"
       3                 : 
       4                 : #include <arpa/inet.h>
       5                 : #include <fcntl.h>
       6                 : #include <unistd.h>
       7                 : 
       8                 : void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
       9                 : void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
      10                 : void clusterSendPing(clusterLink *link, int type);
      11                 : void clusterSendFail(char *nodename);
      12                 : void clusterUpdateState(void);
      13                 : int clusterNodeGetSlotBit(clusterNode *n, int slot);
      14                 : sds clusterGenNodesDescription(void);
      15                 : clusterNode *clusterLookupNode(char *name);
      16                 : int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
      17                 : int clusterAddSlot(clusterNode *n, int slot);
      18                 : 
      19                 : /* -----------------------------------------------------------------------------
      20                 :  * Initialization
      21                 :  * -------------------------------------------------------------------------- */
      22                 : 
      23               0 : int clusterLoadConfig(char *filename) {
      24               0 :     FILE *fp = fopen(filename,"r");
      25                 :     char *line;
      26                 :     int maxline, j;
      27                 :    
      28               0 :     if (fp == NULL) return REDIS_ERR;
      29                 : 
      30                 :     /* Parse the file. Note that single liens of the cluster config file can
      31                 :      * be really long as they include all the hash slots of the node.
      32                 :      * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
      33                 :      * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
      34               0 :     maxline = 1024+REDIS_CLUSTER_SLOTS*16;
      35               0 :     line = zmalloc(maxline);
      36               0 :     while(fgets(line,maxline,fp) != NULL) {
      37                 :         int argc;
      38               0 :         sds *argv = sdssplitargs(line,&argc);
      39                 :         clusterNode *n, *master;
      40                 :         char *p, *s;
      41                 : 
      42                 :         /* Create this node if it does not exist */
      43               0 :         n = clusterLookupNode(argv[0]);
      44               0 :         if (!n) {
      45               0 :             n = createClusterNode(argv[0],0);
      46               0 :             clusterAddNode(n);
      47                 :         }
      48                 :         /* Address and port */
      49               0 :         if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
      50               0 :         *p = '\0';
      51               0 :         memcpy(n->ip,argv[1],strlen(argv[1])+1);
      52               0 :         n->port = atoi(p+1);
      53                 : 
      54                 :         /* Parse flags */
      55               0 :         p = s = argv[2];
      56               0 :         while(p) {
      57               0 :             p = strchr(s,',');
      58               0 :             if (p) *p = '\0';
      59               0 :             if (!strcasecmp(s,"myself")) {
      60               0 :                 redisAssert(server.cluster.myself == NULL);
      61               0 :                 server.cluster.myself = n;
      62               0 :                 n->flags |= REDIS_NODE_MYSELF;
      63               0 :             } else if (!strcasecmp(s,"master")) {
      64               0 :                 n->flags |= REDIS_NODE_MASTER;
      65               0 :             } else if (!strcasecmp(s,"slave")) {
      66               0 :                 n->flags |= REDIS_NODE_SLAVE;
      67               0 :             } else if (!strcasecmp(s,"fail?")) {
      68               0 :                 n->flags |= REDIS_NODE_PFAIL;
      69               0 :             } else if (!strcasecmp(s,"fail")) {
      70               0 :                 n->flags |= REDIS_NODE_FAIL;
      71               0 :             } else if (!strcasecmp(s,"handshake")) {
      72               0 :                 n->flags |= REDIS_NODE_HANDSHAKE;
      73               0 :             } else if (!strcasecmp(s,"noaddr")) {
      74               0 :                 n->flags |= REDIS_NODE_NOADDR;
      75               0 :             } else if (!strcasecmp(s,"noflags")) {
      76                 :                 /* nothing to do */
      77                 :             } else {
      78               0 :                 redisPanic("Unknown flag in redis cluster config file");
      79                 :             }
      80               0 :             if (p) s = p+1;
      81                 :         }
      82                 : 
      83                 :         /* Get master if any. Set the master and populate master's
      84                 :          * slave list. */
      85               0 :         if (argv[3][0] != '-') {
      86               0 :             master = clusterLookupNode(argv[3]);
      87               0 :             if (!master) {
      88               0 :                 master = createClusterNode(argv[3],0);
      89               0 :                 clusterAddNode(master);
      90                 :             }
      91               0 :             n->slaveof = master;
      92               0 :             clusterNodeAddSlave(master,n);
      93                 :         }
      94                 : 
      95                 :         /* Set ping sent / pong received timestamps */
      96               0 :         if (atoi(argv[4])) n->ping_sent = time(NULL);
      97               0 :         if (atoi(argv[5])) n->pong_received = time(NULL);
      98                 : 
      99                 :         /* Populate hash slots served by this instance. */
     100               0 :         for (j = 7; j < argc; j++) {
     101                 :             int start, stop;
     102                 : 
     103               0 :             if (argv[j][0] == '[') {
     104                 :                 /* Here we handle migrating / importing slots */
     105                 :                 int slot;
     106                 :                 char direction;
     107                 :                 clusterNode *cn;
     108                 : 
     109               0 :                 p = strchr(argv[j],'-');
     110               0 :                 redisAssert(p != NULL);
     111               0 :                 *p = '\0';
     112               0 :                 direction = p[1]; /* Either '>' or '<' */
     113               0 :                 slot = atoi(argv[j]+1);
     114               0 :                 p += 3;
     115               0 :                 cn = clusterLookupNode(p);
     116               0 :                 if (!cn) {
     117               0 :                     cn = createClusterNode(p,0);
     118               0 :                     clusterAddNode(cn);
     119                 :                 }
     120               0 :                 if (direction == '>') {
     121               0 :                     server.cluster.migrating_slots_to[slot] = cn;
     122                 :                 } else {
     123               0 :                     server.cluster.importing_slots_from[slot] = cn;
     124                 :                 }
     125               0 :                 continue;
     126               0 :             } else if ((p = strchr(argv[j],'-')) != NULL) {
     127               0 :                 *p = '\0';
     128               0 :                 start = atoi(argv[j]);
     129               0 :                 stop = atoi(p+1);
     130                 :             } else {
     131               0 :                 start = stop = atoi(argv[j]);
     132                 :             }
     133               0 :             while(start <= stop) clusterAddSlot(n, start++);
     134                 :         }
     135                 : 
     136               0 :         sdssplitargs_free(argv,argc);
     137                 :     }
     138               0 :     zfree(line);
     139               0 :     fclose(fp);
     140                 : 
     141                 :     /* Config sanity check */
     142               0 :     redisAssert(server.cluster.myself != NULL);
     143               0 :     redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
     144               0 :         server.cluster.myself->name);
     145               0 :     clusterUpdateState();
     146               0 :     return REDIS_OK;
     147                 : 
     148                 : fmterr:
     149               0 :     redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
     150               0 :     fclose(fp);
     151               0 :     exit(1);
     152                 : }
     153                 : 
     154                 : /* Cluster node configuration is exactly the same as CLUSTER NODES output.
     155                 :  *
     156                 :  * This function writes the node config and returns 0, on error -1
     157                 :  * is returned. */
     158               0 : int clusterSaveConfig(void) {
     159               0 :     sds ci = clusterGenNodesDescription();
     160                 :     int fd;
     161                 :     
     162               0 :     if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
     163                 :         == -1) goto err;
     164               0 :     if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
     165               0 :     close(fd);
     166               0 :     sdsfree(ci);
     167               0 :     return 0;
     168                 : 
     169                 : err:
     170               0 :     sdsfree(ci);
     171               0 :     return -1;
     172                 : }
     173                 : 
     174               0 : void clusterSaveConfigOrDie(void) {
     175               0 :     if (clusterSaveConfig() == -1) {
     176               0 :         redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
     177               0 :         exit(1);
     178                 :     }
     179               0 : }
     180                 : 
     181               0 : void clusterInit(void) {
     182               0 :     int saveconf = 0;
     183                 : 
     184               0 :     server.cluster.myself = NULL;
     185               0 :     server.cluster.state = REDIS_CLUSTER_FAIL;
     186               0 :     server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
     187               0 :     server.cluster.node_timeout = 15;
     188                 :     memset(server.cluster.migrating_slots_to,0,
     189                 :         sizeof(server.cluster.migrating_slots_to));
     190                 :     memset(server.cluster.importing_slots_from,0,
     191                 :         sizeof(server.cluster.importing_slots_from));
     192                 :     memset(server.cluster.slots,0,
     193                 :         sizeof(server.cluster.slots));
     194               0 :     if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
     195                 :         /* No configuration found. We will just use the random name provided
     196                 :          * by the createClusterNode() function. */
     197               0 :         server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
     198               0 :         redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
     199               0 :             server.cluster.myself->name);
     200               0 :         clusterAddNode(server.cluster.myself);
     201               0 :         saveconf = 1;
     202                 :     }
     203               0 :     if (saveconf) clusterSaveConfigOrDie();
     204                 :     /* We need a listening TCP port for our cluster messaging needs */
     205               0 :     server.cfd = anetTcpServer(server.neterr,
     206                 :             server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
     207               0 :     if (server.cfd == -1) {
     208               0 :         redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
     209               0 :         exit(1);
     210                 :     }
     211               0 :     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
     212               0 :         clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
     213               0 :     server.cluster.slots_to_keys = zslCreate();
     214               0 : }
     215                 : 
     216                 : /* -----------------------------------------------------------------------------
     217                 :  * CLUSTER communication link
     218                 :  * -------------------------------------------------------------------------- */
     219                 : 
     220               0 : clusterLink *createClusterLink(clusterNode *node) {
     221               0 :     clusterLink *link = zmalloc(sizeof(*link));
     222               0 :     link->sndbuf = sdsempty();
     223               0 :     link->rcvbuf = sdsempty();
     224               0 :     link->node = node;
     225               0 :     link->fd = -1;
     226               0 :     return link;
     227                 : }
     228                 : 
     229                 : /* Free a cluster link, but does not free the associated node of course.
     230                 :  * Just this function will make sure that the original node associated
     231                 :  * with this link will have the 'link' field set to NULL. */
     232               0 : void freeClusterLink(clusterLink *link) {
     233               0 :     if (link->fd != -1) {
     234               0 :         aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
     235               0 :         aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
     236                 :     }
     237               0 :     sdsfree(link->sndbuf);
     238               0 :     sdsfree(link->rcvbuf);
     239               0 :     if (link->node)
     240               0 :         link->node->link = NULL;
     241               0 :     close(link->fd);
     242               0 :     zfree(link);
     243               0 : }
     244                 : 
     245               0 : void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     246                 :     int cport, cfd;
     247                 :     char cip[128];
     248                 :     clusterLink *link;
     249                 :     REDIS_NOTUSED(el);
     250                 :     REDIS_NOTUSED(mask);
     251                 :     REDIS_NOTUSED(privdata);
     252                 : 
     253               0 :     cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
     254               0 :     if (cfd == AE_ERR) {
     255               0 :         redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
     256               0 :         return;
     257                 :     }
     258               0 :     redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
     259                 :     /* We need to create a temporary node in order to read the incoming
     260                 :      * packet in a valid contest. This node will be released once we
     261                 :      * read the packet and reply. */
     262               0 :     link = createClusterLink(NULL);
     263               0 :     link->fd = cfd;
     264               0 :     aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
     265                 : }
     266                 : 
     267                 : /* -----------------------------------------------------------------------------
     268                 :  * Key space handling
     269                 :  * -------------------------------------------------------------------------- */
     270                 : 
     271                 : /* We have 4096 hash slots. The hash slot of a given key is obtained
     272                 :  * as the least significant 12 bits of the crc16 of the key. */
     273               0 : unsigned int keyHashSlot(char *key, int keylen) {
     274               0 :     return crc16(key,keylen) & 0x0FFF;
     275                 : }
     276                 : 
     277                 : /* -----------------------------------------------------------------------------
     278                 :  * CLUSTER node API
     279                 :  * -------------------------------------------------------------------------- */
     280                 : 
     281                 : /* Create a new cluster node, with the specified flags.
     282                 :  * If "nodename" is NULL this is considered a first handshake and a random
     283                 :  * node name is assigned to this node (it will be fixed later when we'll
     284                 :  * receive the first pong).
     285                 :  *
     286                 :  * The node is created and returned to the user, but it is not automatically
     287                 :  * added to the nodes hash table. */
     288               0 : clusterNode *createClusterNode(char *nodename, int flags) {
     289               0 :     clusterNode *node = zmalloc(sizeof(*node));
     290                 : 
     291               0 :     if (nodename)
     292               0 :         memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
     293                 :     else
     294               0 :         getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
     295               0 :     node->flags = flags;
     296               0 :     memset(node->slots,0,sizeof(node->slots));
     297               0 :     node->numslaves = 0;
     298               0 :     node->slaves = NULL;
     299               0 :     node->slaveof = NULL;
     300               0 :     node->ping_sent = node->pong_received = 0;
     301               0 :     node->configdigest = NULL;
     302               0 :     node->configdigest_ts = 0;
     303               0 :     node->link = NULL;
     304               0 :     return node;
     305                 : }
     306                 : 
     307               0 : int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
     308                 :     int j;
     309                 : 
     310               0 :     for (j = 0; j < master->numslaves; j++) {
     311               0 :         if (master->slaves[j] == slave) {
     312               0 :             memmove(master->slaves+j,master->slaves+(j+1),
     313               0 :                 (master->numslaves-1)-j);
     314               0 :             master->numslaves--;
     315               0 :             return REDIS_OK;
     316                 :         }
     317                 :     }
     318               0 :     return REDIS_ERR;
     319                 : }
     320                 : 
     321               0 : int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
     322                 :     int j;
     323                 : 
     324                 :     /* If it's already a slave, don't add it again. */
     325               0 :     for (j = 0; j < master->numslaves; j++)
     326               0 :         if (master->slaves[j] == slave) return REDIS_ERR;
     327               0 :     master->slaves = zrealloc(master->slaves,
     328               0 :         sizeof(clusterNode*)*(master->numslaves+1));
     329               0 :     master->slaves[master->numslaves] = slave;
     330               0 :     master->numslaves++;
     331               0 :     return REDIS_OK;
     332                 : }
     333                 : 
     334               0 : void clusterNodeResetSlaves(clusterNode *n) {
     335               0 :     zfree(n->slaves);
     336               0 :     n->numslaves = 0;
     337               0 : }
     338                 : 
     339               0 : void freeClusterNode(clusterNode *n) {
     340                 :     sds nodename;
     341                 :     
     342               0 :     nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
     343               0 :     redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
     344               0 :     sdsfree(nodename);
     345               0 :     if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
     346               0 :     if (n->link) freeClusterLink(n->link);
     347               0 :     zfree(n);
     348               0 : }
     349                 : 
     350                 : /* Add a node to the nodes hash table */
     351               0 : int clusterAddNode(clusterNode *node) {
     352                 :     int retval;
     353                 :     
     354               0 :     retval = dictAdd(server.cluster.nodes,
     355               0 :             sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
     356               0 :     return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
     357                 : }
     358                 : 
     359                 : /* Node lookup by name */
     360               0 : clusterNode *clusterLookupNode(char *name) {
     361               0 :     sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
     362                 :     struct dictEntry *de;
     363                 : 
     364               0 :     de = dictFind(server.cluster.nodes,s);
     365               0 :     sdsfree(s);
     366               0 :     if (de == NULL) return NULL;
     367               0 :     return dictGetVal(de);
     368                 : }
     369                 : 
     370                 : /* This is only used after the handshake. When we connect a given IP/PORT
     371                 :  * as a result of CLUSTER MEET we don't have the node name yet, so we
     372                 :  * pick a random one, and will fix it when we receive the PONG request using
     373                 :  * this function. */
     374               0 : void clusterRenameNode(clusterNode *node, char *newname) {
     375                 :     int retval;
     376               0 :     sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
     377                 :    
     378               0 :     redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
     379                 :         node->name, newname);
     380               0 :     retval = dictDelete(server.cluster.nodes, s);
     381               0 :     sdsfree(s);
     382               0 :     redisAssert(retval == DICT_OK);
     383               0 :     memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
     384               0 :     clusterAddNode(node);
     385               0 : }
     386                 : 
     387                 : /* -----------------------------------------------------------------------------
     388                 :  * CLUSTER messages exchange - PING/PONG and gossip
     389                 :  * -------------------------------------------------------------------------- */
     390                 : 
     391                 : /* Process the gossip section of PING or PONG packets.
     392                 :  * Note that this function assumes that the packet is already sanity-checked
     393                 :  * by the caller, not in the content of the gossip section, but in the
     394                 :  * length. */
     395               0 : void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
     396               0 :     uint16_t count = ntohs(hdr->count);
     397               0 :     clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
     398               0 :     clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
     399                 : 
     400               0 :     while(count--) {
     401               0 :         sds ci = sdsempty();
     402               0 :         uint16_t flags = ntohs(g->flags);
     403                 :         clusterNode *node;
     404                 : 
     405               0 :         if (flags == 0) ci = sdscat(ci,"noflags,");
     406               0 :         if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
     407               0 :         if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
     408               0 :         if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
     409               0 :         if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
     410               0 :         if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
     411               0 :         if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
     412               0 :         if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
     413               0 :         if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
     414                 : 
     415               0 :         redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
     416                 :             g->nodename,
     417                 :             g->ip,
     418               0 :             ntohs(g->port),
     419                 :             ci);
     420               0 :         sdsfree(ci);
     421                 : 
     422                 :         /* Update our state accordingly to the gossip sections */
     423               0 :         node = clusterLookupNode(g->nodename);
     424               0 :         if (node != NULL) {
     425                 :             /* We already know this node. Let's start updating the last
     426                 :              * time PONG figure if it is newer than our figure.
     427                 :              * Note that it's not a problem if we have a PING already 
     428                 :              * in progress against this node. */
     429               0 :             if (node->pong_received < (signed) ntohl(g->pong_received)) {
     430               0 :                  redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
     431               0 :                 node->pong_received = ntohl(g->pong_received);
     432                 :             }
     433                 :             /* Mark this node as FAILED if we think it is possibly failing
     434                 :              * and another node also thinks it's failing. */
     435               0 :             if (node->flags & REDIS_NODE_PFAIL &&
     436               0 :                 (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
     437                 :             {
     438               0 :                 redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
     439               0 :                 node->flags &= ~REDIS_NODE_PFAIL;
     440               0 :                 node->flags |= REDIS_NODE_FAIL;
     441                 :                 /* Broadcast the failing node name to everybody */
     442               0 :                 clusterSendFail(node->name);
     443               0 :                 clusterUpdateState();
     444               0 :                 clusterSaveConfigOrDie();
     445                 :             }
     446                 :         } else {
     447                 :             /* If it's not in NOADDR state and we don't have it, we
     448                 :              * start an handshake process against this IP/PORT pairs.
     449                 :              *
     450                 :              * Note that we require that the sender of this gossip message
     451                 :              * is a well known node in our cluster, otherwise we risk
     452                 :              * joining another cluster. */
     453               0 :             if (sender && !(flags & REDIS_NODE_NOADDR)) {
     454                 :                 clusterNode *newnode;
     455                 : 
     456               0 :                 redisLog(REDIS_DEBUG,"Adding the new node");
     457               0 :                 newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
     458               0 :                 memcpy(newnode->ip,g->ip,sizeof(g->ip));
     459               0 :                 newnode->port = ntohs(g->port);
     460               0 :                 clusterAddNode(newnode);
     461                 :             }
     462                 :         }
     463                 : 
     464                 :         /* Next node */
     465               0 :         g++;
     466                 :     }
     467               0 : }
     468                 : 
     469                 : /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
     470               0 : void nodeIp2String(char *buf, clusterLink *link) {
     471                 :     struct sockaddr_in sa;
     472               0 :     socklen_t salen = sizeof(sa);
     473                 : 
     474               0 :     if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
     475               0 :         redisPanic("getpeername() failed.");
     476               0 :     strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
     477               0 : }
     478                 : 
     479                 : 
     480                 : /* Update the node address to the IP address that can be extracted
     481                 :  * from link->fd, and at the specified port. */
     482               0 : void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
     483                 :     /* TODO */
     484               0 : }
     485                 : 
     486                 : /* When this function is called, there is a packet to process starting
     487                 :  * at node->rcvbuf. Releasing the buffer is up to the caller, so this
     488                 :  * function should just handle the higher level stuff of processing the
     489                 :  * packet, modifying the cluster state if needed.
     490                 :  *
     491                 :  * The function returns 1 if the link is still valid after the packet
     492                 :  * was processed, otherwise 0 if the link was freed since the packet
     493                 :  * processing lead to some inconsistency error (for instance a PONG
     494                 :  * received from the wrong sender ID). */
     495               0 : int clusterProcessPacket(clusterLink *link) {
     496               0 :     clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
     497               0 :     uint32_t totlen = ntohl(hdr->totlen);
     498               0 :     uint16_t type = ntohs(hdr->type);
     499                 :     clusterNode *sender;
     500                 : 
     501               0 :     redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
     502                 :         type, (unsigned long) totlen);
     503                 : 
     504                 :     /* Perform sanity checks */
     505               0 :     if (totlen < 8) return 1;
     506               0 :     if (totlen > sdslen(link->rcvbuf)) return 1;
     507               0 :     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
     508                 :         type == CLUSTERMSG_TYPE_MEET)
     509                 :     {
     510               0 :         uint16_t count = ntohs(hdr->count);
     511                 :         uint32_t explen; /* expected length of this packet */
     512                 : 
     513               0 :         explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
     514               0 :         explen += (sizeof(clusterMsgDataGossip)*count);
     515               0 :         if (totlen != explen) return 1;
     516                 :     }
     517               0 :     if (type == CLUSTERMSG_TYPE_FAIL) {
     518               0 :         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
     519                 : 
     520               0 :         explen += sizeof(clusterMsgDataFail);
     521               0 :         if (totlen != explen) return 1;
     522                 :     }
     523               0 :     if (type == CLUSTERMSG_TYPE_PUBLISH) {
     524               0 :         uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
     525                 : 
     526               0 :         explen += sizeof(clusterMsgDataPublish) +
     527               0 :                 ntohl(hdr->data.publish.msg.channel_len) +
     528               0 :                 ntohl(hdr->data.publish.msg.message_len);
     529               0 :         if (totlen != explen) return 1;
     530                 :     }
     531                 : 
     532                 :     /* Ready to process the packet. Dispatch by type. */
     533               0 :     sender = clusterLookupNode(hdr->sender);
     534               0 :     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
     535               0 :         int update_config = 0;
     536               0 :         redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
     537                 : 
     538                 :         /* Add this node if it is new for us and the msg type is MEET.
     539                 :          * In this stage we don't try to add the node with the right
     540                 :          * flags, slaveof pointer, and so forth, as this details will be
     541                 :          * resolved when we'll receive PONGs from the server. */
     542               0 :         if (!sender && type == CLUSTERMSG_TYPE_MEET) {
     543                 :             clusterNode *node;
     544                 : 
     545               0 :             node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
     546               0 :             nodeIp2String(node->ip,link);
     547               0 :             node->port = ntohs(hdr->port);
     548               0 :             clusterAddNode(node);
     549               0 :             update_config = 1;
     550                 :         }
     551                 : 
     552                 :         /* Get info from the gossip section */
     553               0 :         clusterProcessGossipSection(hdr,link);
     554                 : 
     555                 :         /* Anyway reply with a PONG */
     556               0 :         clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
     557                 : 
     558                 :         /* Update config if needed */
     559               0 :         if (update_config) clusterSaveConfigOrDie();
     560               0 :     } else if (type == CLUSTERMSG_TYPE_PONG) {
     561               0 :         int update_state = 0;
     562               0 :         int update_config = 0;
     563                 : 
     564               0 :         redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
     565               0 :         if (link->node) {
     566               0 :             if (link->node->flags & REDIS_NODE_HANDSHAKE) {
     567                 :                 /* If we already have this node, try to change the
     568                 :                  * IP/port of the node with the new one. */
     569               0 :                 if (sender) {
     570               0 :                     redisLog(REDIS_WARNING,
     571                 :                         "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
     572               0 :                     nodeUpdateAddress(sender,link,ntohs(hdr->port));
     573               0 :                     freeClusterNode(link->node); /* will free the link too */
     574               0 :                     return 0;
     575                 :                 }
     576                 : 
     577                 :                 /* First thing to do is replacing the random name with the
     578                 :                  * right node name if this was an handshake stage. */
     579               0 :                 clusterRenameNode(link->node, hdr->sender);
     580               0 :                 redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
     581               0 :                     link->node->name);
     582               0 :                 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
     583               0 :                 update_config = 1;
     584               0 :             } else if (memcmp(link->node->name,hdr->sender,
     585                 :                         REDIS_CLUSTER_NAMELEN) != 0)
     586                 :             {
     587                 :                 /* If the reply has a non matching node ID we
     588                 :                  * disconnect this node and set it as not having an associated
     589                 :                  * address. */
     590               0 :                 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
     591               0 :                 link->node->flags |= REDIS_NODE_NOADDR;
     592               0 :                 freeClusterLink(link);
     593               0 :                 update_config = 1;
     594                 :                 /* FIXME: remove this node if we already have it.
     595                 :                  *
     596                 :                  * If we already have it but the IP is different, use
     597                 :                  * the new one if the old node is in FAIL, PFAIL, or NOADDR
     598                 :                  * status... */
     599               0 :                 return 0;
     600                 :             }
     601                 :         }
     602                 :         /* Update our info about the node */
     603               0 :         if (link->node) link->node->pong_received = time(NULL);
     604                 : 
     605                 :         /* Update master/slave info */
     606               0 :         if (sender) {
     607               0 :             if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
     608                 :                 sizeof(hdr->slaveof)))
     609                 :             {
     610               0 :                 sender->flags &= ~REDIS_NODE_SLAVE;
     611               0 :                 sender->flags |= REDIS_NODE_MASTER;
     612               0 :                 sender->slaveof = NULL;
     613                 :             } else {
     614               0 :                 clusterNode *master = clusterLookupNode(hdr->slaveof);
     615                 : 
     616               0 :                 sender->flags &= ~REDIS_NODE_MASTER;
     617               0 :                 sender->flags |= REDIS_NODE_SLAVE;
     618               0 :                 if (sender->numslaves) clusterNodeResetSlaves(sender);
     619               0 :                 if (master) clusterNodeAddSlave(master,sender);
     620                 :             }
     621                 :         }
     622                 : 
     623                 :         /* Update our info about served slots if this new node is serving
     624                 :          * slots that are not served from our point of view. */
     625               0 :         if (sender && sender->flags & REDIS_NODE_MASTER) {
     626                 :             int newslots, j;
     627                 : 
     628               0 :             newslots =
     629               0 :                 memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
     630               0 :             memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
     631               0 :             if (newslots) {
     632               0 :                 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
     633               0 :                     if (clusterNodeGetSlotBit(sender,j)) {
     634               0 :                         if (server.cluster.slots[j] == sender) continue;
     635               0 :                         if (server.cluster.slots[j] == NULL ||
     636               0 :                             server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
     637                 :                         {
     638               0 :                             server.cluster.slots[j] = sender;
     639               0 :                             update_state = update_config = 1;
     640                 :                         }
     641                 :                     }
     642                 :                 }
     643                 :             }
     644                 :         }
     645                 : 
     646                 :         /* Get info from the gossip section */
     647               0 :         clusterProcessGossipSection(hdr,link);
     648                 : 
     649                 :         /* Update the cluster state if needed */
     650               0 :         if (update_state) clusterUpdateState();
     651               0 :         if (update_config) clusterSaveConfigOrDie();
     652               0 :     } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
     653                 :         clusterNode *failing;
     654                 : 
     655               0 :         failing = clusterLookupNode(hdr->data.fail.about.nodename);
     656               0 :         if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
     657                 :         {
     658               0 :             redisLog(REDIS_NOTICE,
     659                 :                 "FAIL message received from %.40s about %.40s",
     660                 :                 hdr->sender, hdr->data.fail.about.nodename);
     661               0 :             failing->flags |= REDIS_NODE_FAIL;
     662               0 :             failing->flags &= ~REDIS_NODE_PFAIL;
     663               0 :             clusterUpdateState();
     664               0 :             clusterSaveConfigOrDie();
     665                 :         }
     666               0 :     } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
     667                 :         robj *channel, *message;
     668                 :         uint32_t channel_len, message_len;
     669                 : 
     670                 :         /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
     671               0 :         if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
     672               0 :             channel_len = ntohl(hdr->data.publish.msg.channel_len);
     673               0 :             message_len = ntohl(hdr->data.publish.msg.message_len);
     674               0 :             channel = createStringObject(
     675                 :                         (char*)hdr->data.publish.msg.bulk_data,channel_len);
     676               0 :             message = createStringObject(
     677                 :                         (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
     678               0 :             pubsubPublishMessage(channel,message);
     679               0 :             decrRefCount(channel);
     680               0 :             decrRefCount(message);
     681                 :         }
     682                 :     } else {
     683               0 :         redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
     684                 :     }
     685               0 :     return 1;
     686                 : }
     687                 : 
     688                 : /* This function is called when we detect the link with this node is lost.
     689                 :    We set the node as no longer connected. The Cluster Cron will detect
     690                 :    this connection and will try to get it connected again.
     691                 :    
     692                 :    Instead if the node is a temporary node used to accept a query, we
     693                 :    completely free the node on error. */
     694               0 : void handleLinkIOError(clusterLink *link) {
     695               0 :     freeClusterLink(link);
     696               0 : }
     697                 : 
     698                 : /* Send data. This is handled using a trivial send buffer that gets
     699                 :  * consumed by write(). We don't try to optimize this for speed too much
     700                 :  * as this is a very low traffic channel. */
     701               0 : void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     702               0 :     clusterLink *link = (clusterLink*) privdata;
     703                 :     ssize_t nwritten;
     704                 :     REDIS_NOTUSED(el);
     705                 :     REDIS_NOTUSED(mask);
     706                 : 
     707               0 :     nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
     708               0 :     if (nwritten <= 0) {
     709               0 :         redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
     710                 :             strerror(errno));
     711                 :         handleLinkIOError(link);
     712                 :         return;
     713                 :     }
     714               0 :     link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
     715               0 :     if (sdslen(link->sndbuf) == 0)
     716               0 :         aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
     717                 : }
     718                 : 
     719                 : /* Read data. Try to read the first field of the header first to check the
     720                 :  * full length of the packet. When a whole packet is in memory this function
     721                 :  * will call the function to process the packet. And so forth. */
     722               0 : void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     723                 :     char buf[1024];
     724                 :     ssize_t nread;
     725                 :     clusterMsg *hdr;
     726               0 :     clusterLink *link = (clusterLink*) privdata;
     727                 :     int readlen;
     728                 :     REDIS_NOTUSED(el);
     729                 :     REDIS_NOTUSED(mask);
     730                 : 
     731                 : again:
     732               0 :     if (sdslen(link->rcvbuf) >= 4) {
     733               0 :         hdr = (clusterMsg*) link->rcvbuf;
     734               0 :         readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
     735                 :     } else {
     736               0 :         readlen = 4 - sdslen(link->rcvbuf);
     737                 :     }
     738                 : 
     739               0 :     nread = read(fd,buf,readlen);
     740               0 :     if (nread == -1 && errno == EAGAIN) return; /* Just no data */
     741                 : 
     742               0 :     if (nread <= 0) {
     743                 :         /* I/O error... */
     744               0 :         redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
     745               0 :             (nread == 0) ? "connection closed" : strerror(errno));
     746                 :         handleLinkIOError(link);
     747                 :         return;
     748                 :     } else {
     749                 :         /* Read data and recast the pointer to the new buffer. */
     750               0 :         link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
     751               0 :         hdr = (clusterMsg*) link->rcvbuf;
     752                 :     }
     753                 : 
     754                 :     /* Total length obtained? read the payload now instead of burning
     755                 :      * cycles waiting for a new event to fire. */
     756               0 :     if (sdslen(link->rcvbuf) == 4) goto again;
     757                 : 
     758                 :     /* Whole packet in memory? We can process it. */
     759               0 :     if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
     760               0 :         if (clusterProcessPacket(link)) {
     761               0 :             sdsfree(link->rcvbuf);
     762               0 :             link->rcvbuf = sdsempty();
     763                 :         }
     764                 :     }
     765                 : }
     766                 : 
     767                 : /* Put stuff into the send buffer. */
     768               0 : void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
     769               0 :     if (sdslen(link->sndbuf) == 0 && msglen != 0)
     770               0 :         aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
     771                 :                     clusterWriteHandler,link);
     772                 : 
     773               0 :     link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
     774               0 : }
     775                 : 
     776                 : /* Send a message to all the nodes with a reliable link */
     777               0 : void clusterBroadcastMessage(void *buf, size_t len) {
     778                 :     dictIterator *di;
     779                 :     dictEntry *de;
     780                 : 
     781               0 :     di = dictGetIterator(server.cluster.nodes);
     782               0 :     while((de = dictNext(di)) != NULL) {
     783               0 :         clusterNode *node = dictGetVal(de);
     784                 : 
     785               0 :         if (!node->link) continue;
     786               0 :         if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
     787               0 :         clusterSendMessage(node->link,buf,len);
     788                 :     }
     789               0 :     dictReleaseIterator(di);
     790               0 : }
     791                 : 
     792                 : /* Build the message header */
     793               0 : void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
     794               0 :     int totlen = 0;
     795                 : 
     796                 :     memset(hdr,0,sizeof(*hdr));
     797               0 :     hdr->type = htons(type);
     798               0 :     memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
     799               0 :     memcpy(hdr->myslots,server.cluster.myself->slots,
     800                 :         sizeof(hdr->myslots));
     801               0 :     memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
     802               0 :     if (server.cluster.myself->slaveof != NULL) {
     803               0 :         memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
     804                 :                                     REDIS_CLUSTER_NAMELEN);
     805                 :     }
     806               0 :     hdr->port = htons(server.port);
     807               0 :     hdr->state = server.cluster.state;
     808               0 :     memset(hdr->configdigest,0,32); /* FIXME: set config digest */
     809                 : 
     810               0 :     if (type == CLUSTERMSG_TYPE_FAIL) {
     811               0 :         totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
     812               0 :         totlen += sizeof(clusterMsgDataFail);
     813                 :     }
     814               0 :     hdr->totlen = htonl(totlen);
     815                 :     /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
     816               0 : }
     817                 : 
     818                 : /* Send a PING or PONG packet to the specified node, making sure to add enough
     819                 :  * gossip informations. */
     820               0 : void clusterSendPing(clusterLink *link, int type) {
     821                 :     unsigned char buf[1024];
     822               0 :     clusterMsg *hdr = (clusterMsg*) buf;
     823               0 :     int gossipcount = 0, totlen;
     824                 :     /* freshnodes is the number of nodes we can still use to populate the
     825                 :      * gossip section of the ping packet. Basically we start with the nodes
     826                 :      * we have in memory minus two (ourself and the node we are sending the
     827                 :      * message to). Every time we add a node we decrement the counter, so when
     828                 :      * it will drop to <= zero we know there is no more gossip info we can
     829                 :      * send. */
     830               0 :     int freshnodes = dictSize(server.cluster.nodes)-2;
     831                 : 
     832               0 :     if (link->node && type == CLUSTERMSG_TYPE_PING)
     833               0 :         link->node->ping_sent = time(NULL);
     834               0 :     clusterBuildMessageHdr(hdr,type);
     835                 :         
     836                 :     /* Populate the gossip fields */
     837               0 :     while(freshnodes > 0 && gossipcount < 3) {
     838               0 :         struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
     839               0 :         clusterNode *this = dictGetVal(de);
     840                 :         clusterMsgDataGossip *gossip;
     841                 :         int j;
     842                 : 
     843                 :         /* Not interesting to gossip about ourself.
     844                 :          * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
     845               0 :         if (this == server.cluster.myself ||
     846               0 :             this->flags & REDIS_NODE_HANDSHAKE) {
     847               0 :                 freshnodes--; /* otherwise we may loop forever. */
     848               0 :                 continue;
     849                 :         }
     850                 : 
     851                 :         /* Check if we already added this node */
     852               0 :         for (j = 0; j < gossipcount; j++) {
     853               0 :             if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
     854                 :                     REDIS_CLUSTER_NAMELEN) == 0) break;
     855                 :         }
     856               0 :         if (j != gossipcount) continue;
     857                 : 
     858                 :         /* Add it */
     859               0 :         freshnodes--;
     860               0 :         gossip = &(hdr->data.ping.gossip[gossipcount]);
     861               0 :         memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
     862               0 :         gossip->ping_sent = htonl(this->ping_sent);
     863               0 :         gossip->pong_received = htonl(this->pong_received);
     864               0 :         memcpy(gossip->ip,this->ip,sizeof(this->ip));
     865               0 :         gossip->port = htons(this->port);
     866               0 :         gossip->flags = htons(this->flags);
     867               0 :         gossipcount++;
     868                 :     }
     869               0 :     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
     870               0 :     totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
     871               0 :     hdr->count = htons(gossipcount);
     872               0 :     hdr->totlen = htonl(totlen);
     873               0 :     clusterSendMessage(link,buf,totlen);
     874               0 : }
     875                 : 
     876                 : /* Send a PUBLISH message.
     877                 :  *
     878                 :  * If link is NULL, then the message is broadcasted to the whole cluster. */
     879               0 : void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
     880                 :     unsigned char buf[4096], *payload;
     881               0 :     clusterMsg *hdr = (clusterMsg*) buf;
     882                 :     uint32_t totlen;
     883                 :     uint32_t channel_len, message_len;
     884                 : 
     885               0 :     channel = getDecodedObject(channel);
     886               0 :     message = getDecodedObject(message);
     887               0 :     channel_len = sdslen(channel->ptr);
     888               0 :     message_len = sdslen(message->ptr);
     889                 : 
     890               0 :     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
     891               0 :     totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
     892               0 :     totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
     893                 : 
     894               0 :     hdr->data.publish.msg.channel_len = htonl(channel_len);
     895               0 :     hdr->data.publish.msg.message_len = htonl(message_len);
     896               0 :     hdr->totlen = htonl(totlen);
     897                 : 
     898                 :     /* Try to use the local buffer if possible */
     899               0 :     if (totlen < sizeof(buf)) {
     900               0 :         payload = buf;
     901                 :     } else {
     902               0 :         payload = zmalloc(totlen);
     903               0 :         hdr = (clusterMsg*) payload;
     904                 :         memcpy(payload,hdr,sizeof(hdr));
     905                 :     }
     906               0 :     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     907               0 :     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
     908               0 :         message->ptr,sdslen(message->ptr));
     909                 : 
     910               0 :     if (link)
     911               0 :         clusterSendMessage(link,payload,totlen);
     912                 :     else
     913               0 :         clusterBroadcastMessage(payload,totlen);
     914                 : 
     915               0 :     decrRefCount(channel);
     916               0 :     decrRefCount(message);
     917               0 :     if (payload != buf) zfree(payload);
     918               0 : }
     919                 : 
     920                 : /* Send a FAIL message to all the nodes we are able to contact.
     921                 :  * The FAIL message is sent when we detect that a node is failing
     922                 :  * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
     923                 :  * we switch the node state to REDIS_NODE_FAIL and ask all the other
     924                 :  * nodes to do the same ASAP. */
     925               0 : void clusterSendFail(char *nodename) {
     926                 :     unsigned char buf[1024];
     927               0 :     clusterMsg *hdr = (clusterMsg*) buf;
     928                 : 
     929               0 :     clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
     930               0 :     memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
     931               0 :     clusterBroadcastMessage(buf,ntohl(hdr->totlen));
     932               0 : }
     933                 : 
     934                 : /* -----------------------------------------------------------------------------
     935                 :  * CLUSTER Pub/Sub support
     936                 :  *
     937                 :  * For now we do very little, just propagating PUBLISH messages across the whole
     938                 :  * cluster. In the future we'll try to get smarter and avoiding propagating those
     939                 :  * messages to hosts without receives for a given channel.
     940                 :  * -------------------------------------------------------------------------- */
     941               0 : void clusterPropagatePublish(robj *channel, robj *message) {
     942               0 :     clusterSendPublish(NULL, channel, message);
     943               0 : }
     944                 : 
     945                 : /* -----------------------------------------------------------------------------
     946                 :  * CLUSTER cron job
     947                 :  * -------------------------------------------------------------------------- */
     948                 : 
     949                 : /* This is executed 1 time every second */
     950               0 : void clusterCron(void) {
     951                 :     dictIterator *di;
     952                 :     dictEntry *de;
     953                 :     int j;
     954               0 :     time_t min_ping_sent = 0;
     955               0 :     clusterNode *min_ping_node = NULL;
     956                 : 
     957                 :     /* Check if we have disconnected nodes and reestablish the connection. */
     958               0 :     di = dictGetIterator(server.cluster.nodes);
     959               0 :     while((de = dictNext(di)) != NULL) {
     960               0 :         clusterNode *node = dictGetVal(de);
     961                 : 
     962               0 :         if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
     963               0 :         if (node->link == NULL) {
     964                 :             int fd;
     965                 :             clusterLink *link;
     966                 : 
     967               0 :             fd = anetTcpNonBlockConnect(server.neterr, node->ip,
     968                 :                 node->port+REDIS_CLUSTER_PORT_INCR);
     969               0 :             if (fd == -1) continue;
     970               0 :             link = createClusterLink(node);
     971               0 :             link->fd = fd;
     972               0 :             node->link = link;
     973               0 :             aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
     974                 :             /* If the node is flagged as MEET, we send a MEET message instead
     975                 :              * of a PING one, to force the receiver to add us in its node
     976                 :              * table. */
     977               0 :             clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
     978                 :                     CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
     979                 :             /* We can clear the flag after the first packet is sent.
     980                 :              * If we'll never receive a PONG, we'll never send new packets
     981                 :              * to this node. Instead after the PONG is received and we
     982                 :              * are no longer in meet/handshake status, we want to send
     983                 :              * normal PING packets. */
     984               0 :             node->flags &= ~REDIS_NODE_MEET;
     985                 : 
     986               0 :             redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
     987                 :         }
     988                 :     }
     989               0 :     dictReleaseIterator(di);
     990                 : 
     991                 :     /* Ping some random node. Check a few random nodes and ping the one with
     992                 :      * the oldest ping_sent time */
     993               0 :     for (j = 0; j < 5; j++) {
     994               0 :         de = dictGetRandomKey(server.cluster.nodes);
     995               0 :         clusterNode *this = dictGetVal(de);
     996                 : 
     997               0 :         if (this->link == NULL) continue;
     998               0 :         if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
     999               0 :         if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
    1000               0 :             min_ping_node = this;
    1001               0 :             min_ping_sent = this->ping_sent;
    1002                 :         }
    1003                 :     }
    1004               0 :     if (min_ping_node) {
    1005               0 :         redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
    1006               0 :         clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
    1007                 :     }
    1008                 : 
    1009                 :     /* Iterate nodes to check if we need to flag something as failing */
    1010               0 :     di = dictGetIterator(server.cluster.nodes);
    1011               0 :     while((de = dictNext(di)) != NULL) {
    1012               0 :         clusterNode *node = dictGetVal(de);
    1013                 :         int delay;
    1014                 : 
    1015               0 :         if (node->flags &
    1016                 :             (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
    1017               0 :                 continue;
    1018                 :         /* Check only if we already sent a ping and did not received
    1019                 :          * a reply yet. */
    1020               0 :         if (node->ping_sent == 0 ||
    1021               0 :             node->ping_sent <= node->pong_received) continue;
    1022                 : 
    1023               0 :         delay = time(NULL) - node->pong_received;
    1024               0 :         if (delay < server.cluster.node_timeout) {
    1025                 :             /* The PFAIL condition can be reversed without external
    1026                 :              * help if it is not transitive (that is, if it does not
    1027                 :              * turn into a FAIL state).
    1028                 :              *
    1029                 :              * The FAIL condition is also reversible if there are no slaves
    1030                 :              * for this host, so no slave election should be in progress.
    1031                 :              *
    1032                 :              * TODO: consider all the implications of resurrecting a
    1033                 :              * FAIL node. */
    1034               0 :             if (node->flags & REDIS_NODE_PFAIL) {
    1035               0 :                 node->flags &= ~REDIS_NODE_PFAIL;
    1036               0 :             } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
    1037               0 :                 node->flags &= ~REDIS_NODE_FAIL;
    1038               0 :                 clusterUpdateState();
    1039                 :             }
    1040                 :         } else {
    1041                 :             /* Timeout reached. Set the noad se possibly failing if it is
    1042                 :              * not already in this state. */
    1043               0 :             if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
    1044               0 :                 redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
    1045                 :                     node->name);
    1046               0 :                 node->flags |= REDIS_NODE_PFAIL;
    1047                 :             }
    1048                 :         }
    1049                 :     }
    1050               0 :     dictReleaseIterator(di);
    1051               0 : }
    1052                 : 
    1053                 : /* -----------------------------------------------------------------------------
    1054                 :  * Slots management
    1055                 :  * -------------------------------------------------------------------------- */
    1056                 : 
    1057                 : /* Set the slot bit and return the old value. */
    1058               0 : int clusterNodeSetSlotBit(clusterNode *n, int slot) {
    1059               0 :     off_t byte = slot/8;
    1060               0 :     int bit = slot&7;
    1061               0 :     int old = (n->slots[byte] & (1<<bit)) != 0;
    1062               0 :     n->slots[byte] |= 1<<bit;
    1063               0 :     return old;
    1064                 : }
    1065                 : 
    1066                 : /* Clear the slot bit and return the old value. */
    1067               0 : int clusterNodeClearSlotBit(clusterNode *n, int slot) {
    1068               0 :     off_t byte = slot/8;
    1069               0 :     int bit = slot&7;
    1070               0 :     int old = (n->slots[byte] & (1<<bit)) != 0;
    1071               0 :     n->slots[byte] &= ~(1<<bit);
    1072               0 :     return old;
    1073                 : }
    1074                 : 
    1075                 : /* Return the slot bit from the cluster node structure. */
    1076               0 : int clusterNodeGetSlotBit(clusterNode *n, int slot) {
    1077               0 :     off_t byte = slot/8;
    1078               0 :     int bit = slot&7;
    1079               0 :     return (n->slots[byte] & (1<<bit)) != 0;
    1080                 : }
    1081                 : 
    1082                 : /* Add the specified slot to the list of slots that node 'n' will
    1083                 :  * serve. Return REDIS_OK if the operation ended with success.
    1084                 :  * If the slot is already assigned to another instance this is considered
    1085                 :  * an error and REDIS_ERR is returned. */
    1086               0 : int clusterAddSlot(clusterNode *n, int slot) {
    1087               0 :     if (clusterNodeSetSlotBit(n,slot) != 0)
    1088               0 :         return REDIS_ERR;
    1089               0 :     server.cluster.slots[slot] = n;
    1090               0 :     return REDIS_OK;
    1091                 : }
    1092                 : 
    1093                 : /* Delete the specified slot marking it as unassigned.
    1094                 :  * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
    1095                 :  * already unassigned REDIS_ERR is returned. */
    1096               0 : int clusterDelSlot(int slot) {
    1097               0 :     clusterNode *n = server.cluster.slots[slot];
    1098                 : 
    1099               0 :     if (!n) return REDIS_ERR;
    1100               0 :     redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
    1101               0 :     server.cluster.slots[slot] = NULL;
    1102               0 :     return REDIS_OK;
    1103                 : }
    1104                 : 
    1105                 : /* -----------------------------------------------------------------------------
    1106                 :  * Cluster state evaluation function
    1107                 :  * -------------------------------------------------------------------------- */
    1108               0 : void clusterUpdateState(void) {
    1109               0 :     int ok = 1;
    1110                 :     int j;
    1111                 : 
    1112               0 :     for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    1113               0 :         if (server.cluster.slots[j] == NULL ||
    1114               0 :             server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
    1115                 :         {
    1116               0 :             ok = 0;
    1117               0 :             break;
    1118                 :         }
    1119                 :     }
    1120               0 :     if (ok) {
    1121               0 :         if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
    1122               0 :             server.cluster.state = REDIS_CLUSTER_NEEDHELP;
    1123                 :         } else {
    1124               0 :             server.cluster.state = REDIS_CLUSTER_OK;
    1125                 :         }
    1126                 :     } else {
    1127               0 :         server.cluster.state = REDIS_CLUSTER_FAIL;
    1128                 :     }
    1129               0 : }
    1130                 : 
    1131                 : /* -----------------------------------------------------------------------------
    1132                 :  * CLUSTER command
    1133                 :  * -------------------------------------------------------------------------- */
    1134                 : 
    1135               0 : sds clusterGenNodesDescription(void) {
    1136               0 :     sds ci = sdsempty();
    1137                 :     dictIterator *di;
    1138                 :     dictEntry *de;
    1139                 :     int j, start;
    1140                 : 
    1141               0 :     di = dictGetIterator(server.cluster.nodes);
    1142               0 :     while((de = dictNext(di)) != NULL) {
    1143               0 :         clusterNode *node = dictGetVal(de);
    1144                 : 
    1145                 :         /* Node coordinates */
    1146               0 :         ci = sdscatprintf(ci,"%.40s %s:%d ",
    1147                 :             node->name,
    1148                 :             node->ip,
    1149                 :             node->port);
    1150                 : 
    1151                 :         /* Flags */
    1152               0 :         if (node->flags == 0) ci = sdscat(ci,"noflags,");
    1153               0 :         if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
    1154               0 :         if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
    1155               0 :         if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
    1156               0 :         if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
    1157               0 :         if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
    1158               0 :         if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
    1159               0 :         if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
    1160               0 :         if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
    1161                 : 
    1162                 :         /* Slave of... or just "-" */
    1163               0 :         if (node->slaveof)
    1164               0 :             ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
    1165                 :         else
    1166               0 :             ci = sdscatprintf(ci,"- ");
    1167                 : 
    1168                 :         /* Latency from the POV of this node, link status */
    1169               0 :         ci = sdscatprintf(ci,"%ld %ld %s",
    1170                 :             (long) node->ping_sent,
    1171                 :             (long) node->pong_received,
    1172               0 :             (node->link || node->flags & REDIS_NODE_MYSELF) ?
    1173                 :                         "connected" : "disconnected");
    1174                 : 
    1175                 :         /* Slots served by this instance */
    1176               0 :         start = -1;
    1177               0 :         for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    1178                 :             int bit;
    1179                 : 
    1180               0 :             if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
    1181               0 :                 if (start == -1) start = j;
    1182                 :             }
    1183               0 :             if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
    1184               0 :                 if (j == REDIS_CLUSTER_SLOTS-1) j++;
    1185                 : 
    1186               0 :                 if (start == j-1) {
    1187               0 :                     ci = sdscatprintf(ci," %d",start);
    1188                 :                 } else {
    1189               0 :                     ci = sdscatprintf(ci," %d-%d",start,j-1);
    1190                 :                 }
    1191               0 :                 start = -1;
    1192                 :             }
    1193                 :         }
    1194                 : 
    1195                 :         /* Just for MYSELF node we also dump info about slots that
    1196                 :          * we are migrating to other instances or importing from other
    1197                 :          * instances. */
    1198               0 :         if (node->flags & REDIS_NODE_MYSELF) {
    1199               0 :             for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    1200               0 :                 if (server.cluster.migrating_slots_to[j]) {
    1201               0 :                     ci = sdscatprintf(ci," [%d->-%.40s]",j,
    1202               0 :                         server.cluster.migrating_slots_to[j]->name);
    1203               0 :                 } else if (server.cluster.importing_slots_from[j]) {
    1204               0 :                     ci = sdscatprintf(ci," [%d-<-%.40s]",j,
    1205               0 :                         server.cluster.importing_slots_from[j]->name);
    1206                 :                 }
    1207                 :             }
    1208                 :         }
    1209               0 :         ci = sdscatlen(ci,"\n",1);
    1210                 :     }
    1211               0 :     dictReleaseIterator(di);
    1212               0 :     return ci;
    1213                 : }
    1214                 : 
    1215               0 : int getSlotOrReply(redisClient *c, robj *o) {
    1216                 :     long long slot;
    1217                 : 
    1218               0 :     if (getLongLongFromObject(o,&slot) != REDIS_OK ||
    1219               0 :         slot < 0 || slot > REDIS_CLUSTER_SLOTS)
    1220                 :     {
    1221               0 :         addReplyError(c,"Invalid or out of range slot");
    1222               0 :         return -1;
    1223                 :     }
    1224               0 :     return (int) slot;
    1225                 : }
    1226                 : 
    1227               0 : void clusterCommand(redisClient *c) {
    1228               0 :     if (server.cluster_enabled == 0) {
    1229               0 :         addReplyError(c,"This instance has cluster support disabled");
    1230               0 :         return;
    1231                 :     }
    1232                 : 
    1233               0 :     if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
    1234                 :         clusterNode *n;
    1235                 :         struct sockaddr_in sa;
    1236                 :         long port;
    1237                 : 
    1238                 :         /* Perform sanity checks on IP/port */
    1239               0 :         if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
    1240               0 :             addReplyError(c,"Invalid IP address in MEET");
    1241               0 :             return;
    1242                 :         }
    1243               0 :         if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
    1244               0 :                     port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
    1245                 :         {
    1246               0 :             addReplyError(c,"Invalid TCP port specified");
    1247               0 :             return;
    1248                 :         }
    1249                 : 
    1250                 :         /* Finally add the node to the cluster with a random name, this 
    1251                 :          * will get fixed in the first handshake (ping/pong). */
    1252               0 :         n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    1253               0 :         strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
    1254               0 :         n->port = port;
    1255               0 :         clusterAddNode(n);
    1256               0 :         addReply(c,shared.ok);
    1257               0 :     } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
    1258                 :         robj *o;
    1259               0 :         sds ci = clusterGenNodesDescription();
    1260                 : 
    1261               0 :         o = createObject(REDIS_STRING,ci);
    1262               0 :         addReplyBulk(c,o);
    1263               0 :         decrRefCount(o);
    1264               0 :     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
    1265               0 :                !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
    1266                 :     {
    1267                 :         /* CLUSTER ADDSLOTS <slot> [slot] ... */
    1268                 :         /* CLUSTER DELSLOTS <slot> [slot] ... */
    1269                 :         int j, slot;
    1270               0 :         unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
    1271               0 :         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
    1272                 : 
    1273                 :         memset(slots,0,REDIS_CLUSTER_SLOTS);
    1274                 :         /* Check that all the arguments are parsable and that all the
    1275                 :          * slots are not already busy. */
    1276               0 :         for (j = 2; j < c->argc; j++) {
    1277               0 :             if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
    1278               0 :                 zfree(slots);
    1279               0 :                 return;
    1280                 :             }
    1281               0 :             if (del && server.cluster.slots[slot] == NULL) {
    1282               0 :                 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
    1283               0 :                 zfree(slots);
    1284               0 :                 return;
    1285               0 :             } else if (!del && server.cluster.slots[slot]) {
    1286               0 :                 addReplyErrorFormat(c,"Slot %d is already busy", slot);
    1287               0 :                 zfree(slots);
    1288               0 :                 return;
    1289                 :             }
    1290               0 :             if (slots[slot]++ == 1) {
    1291               0 :                 addReplyErrorFormat(c,"Slot %d specified multiple times",
    1292                 :                     (int)slot);
    1293               0 :                 zfree(slots);
    1294               0 :                 return;
    1295                 :             }
    1296                 :         }
    1297               0 :         for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    1298               0 :             if (slots[j]) {
    1299                 :                 int retval;
    1300                 : 
    1301                 :                 /* If this slot was set as importing we can clear this 
    1302                 :                  * state as now we are the real owner of the slot. */
    1303               0 :                 if (server.cluster.importing_slots_from[j])
    1304               0 :                     server.cluster.importing_slots_from[j] = NULL;
    1305                 : 
    1306               0 :                 retval = del ? clusterDelSlot(j) :
    1307               0 :                                clusterAddSlot(server.cluster.myself,j);
    1308               0 :                 redisAssertWithInfo(c,NULL,retval == REDIS_OK);
    1309                 :             }
    1310                 :         }
    1311               0 :         zfree(slots);
    1312               0 :         clusterUpdateState();
    1313               0 :         clusterSaveConfigOrDie();
    1314               0 :         addReply(c,shared.ok);
    1315               0 :     } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
    1316                 :         /* SETSLOT 10 MIGRATING <node ID> */
    1317                 :         /* SETSLOT 10 IMPORTING <node ID> */
    1318                 :         /* SETSLOT 10 STABLE */
    1319                 :         /* SETSLOT 10 NODE <node ID> */
    1320                 :         int slot;
    1321                 :         clusterNode *n;
    1322                 : 
    1323               0 :         if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
    1324                 : 
    1325               0 :         if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
    1326               0 :             if (server.cluster.slots[slot] != server.cluster.myself) {
    1327               0 :                 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
    1328               0 :                 return;
    1329                 :             }
    1330               0 :             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
    1331               0 :                 addReplyErrorFormat(c,"I don't know about node %s",
    1332               0 :                     (char*)c->argv[4]->ptr);
    1333               0 :                 return;
    1334                 :             }
    1335               0 :             server.cluster.migrating_slots_to[slot] = n;
    1336               0 :         } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
    1337               0 :             if (server.cluster.slots[slot] == server.cluster.myself) {
    1338               0 :                 addReplyErrorFormat(c,
    1339                 :                     "I'm already the owner of hash slot %u",slot);
    1340               0 :                 return;
    1341                 :             }
    1342               0 :             if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
    1343               0 :                 addReplyErrorFormat(c,"I don't know about node %s",
    1344               0 :                     (char*)c->argv[3]->ptr);
    1345               0 :                 return;
    1346                 :             }
    1347               0 :             server.cluster.importing_slots_from[slot] = n;
    1348               0 :         } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
    1349                 :             /* CLUSTER SETSLOT <SLOT> STABLE */
    1350               0 :             server.cluster.importing_slots_from[slot] = NULL;
    1351               0 :             server.cluster.migrating_slots_to[slot] = NULL;
    1352               0 :         } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
    1353                 :             /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
    1354               0 :             clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
    1355                 : 
    1356               0 :             if (!n) addReplyErrorFormat(c,"Unknown node %s",
    1357               0 :                 (char*)c->argv[4]->ptr);
    1358                 :             /* If this hash slot was served by 'myself' before to switch
    1359                 :              * make sure there are no longer local keys for this hash slot. */
    1360               0 :             if (server.cluster.slots[slot] == server.cluster.myself &&
    1361               0 :                 n != server.cluster.myself)
    1362                 :             {
    1363                 :                 int numkeys;
    1364                 :                 robj **keys;
    1365                 : 
    1366               0 :                 keys = zmalloc(sizeof(robj*)*1);
    1367               0 :                 numkeys = GetKeysInSlot(slot, keys, 1);
    1368               0 :                 zfree(keys);
    1369               0 :                 if (numkeys != 0) {
    1370               0 :                     addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
    1371               0 :                     return;
    1372                 :                 }
    1373                 :             }
    1374                 :             /* If this node was the slot owner and the slot was marked as
    1375                 :              * migrating, assigning the slot to another node will clear
    1376                 :              * the migratig status. */
    1377               0 :             if (server.cluster.slots[slot] == server.cluster.myself &&
    1378               0 :                 server.cluster.migrating_slots_to[slot])
    1379               0 :                 server.cluster.migrating_slots_to[slot] = NULL;
    1380                 : 
    1381                 :             /* If this node was importing this slot, assigning the slot to
    1382                 :              * itself also clears the importing status. */
    1383               0 :             if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
    1384               0 :                 server.cluster.importing_slots_from[slot] = NULL;
    1385                 : 
    1386               0 :             clusterDelSlot(slot);
    1387               0 :             clusterAddSlot(n,slot);
    1388                 :         } else {
    1389               0 :             addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
    1390               0 :             return;
    1391                 :         }
    1392               0 :         clusterSaveConfigOrDie();
    1393               0 :         addReply(c,shared.ok);
    1394               0 :     } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
    1395               0 :         char *statestr[] = {"ok","fail","needhelp"};
    1396               0 :         int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
    1397                 :         int j;
    1398                 : 
    1399               0 :         for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
    1400               0 :             clusterNode *n = server.cluster.slots[j];
    1401                 : 
    1402               0 :             if (n == NULL) continue;
    1403               0 :             slots_assigned++;
    1404               0 :             if (n->flags & REDIS_NODE_FAIL) {
    1405               0 :                 slots_fail++;
    1406               0 :             } else if (n->flags & REDIS_NODE_PFAIL) {
    1407               0 :                 slots_pfail++;
    1408                 :             } else {
    1409               0 :                 slots_ok++;
    1410                 :             }
    1411                 :         }
    1412                 : 
    1413               0 :         sds info = sdscatprintf(sdsempty(),
    1414                 :             "cluster_state:%s\r\n"
    1415                 :             "cluster_slots_assigned:%d\r\n"
    1416                 :             "cluster_slots_ok:%d\r\n"
    1417                 :             "cluster_slots_pfail:%d\r\n"
    1418                 :             "cluster_slots_fail:%d\r\n"
    1419                 :             "cluster_known_nodes:%lu\r\n"
    1420               0 :             , statestr[server.cluster.state],
    1421                 :             slots_assigned,
    1422                 :             slots_ok,
    1423                 :             slots_pfail,
    1424                 :             slots_fail,
    1425               0 :             dictSize(server.cluster.nodes)
    1426               0 :         );
    1427               0 :         addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
    1428                 :             (unsigned long)sdslen(info)));
    1429               0 :         addReplySds(c,info);
    1430               0 :         addReply(c,shared.crlf);
    1431               0 :     } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
    1432               0 :         sds key = c->argv[2]->ptr;
    1433                 : 
    1434               0 :         addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
    1435               0 :     } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
    1436                 :         long long maxkeys, slot;
    1437                 :         unsigned int numkeys, j;
    1438                 :         robj **keys;
    1439                 : 
    1440               0 :         if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
    1441                 :             return;
    1442               0 :         if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
    1443                 :             return;
    1444               0 :         if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
    1445               0 :             maxkeys > 1024*1024) {
    1446               0 :             addReplyError(c,"Invalid slot or number of keys");
    1447               0 :             return;
    1448                 :         }
    1449                 : 
    1450               0 :         keys = zmalloc(sizeof(robj*)*maxkeys);
    1451               0 :         numkeys = GetKeysInSlot(slot, keys, maxkeys);
    1452               0 :         addReplyMultiBulkLen(c,numkeys);
    1453               0 :         for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
    1454               0 :         zfree(keys);
    1455                 :     } else {
    1456               0 :         addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
    1457                 :     }
    1458                 : }
    1459                 : 
    1460                 : /* -----------------------------------------------------------------------------
    1461                 :  * DUMP, RESTORE and MIGRATE commands
    1462                 :  * -------------------------------------------------------------------------- */
    1463                 : 
    1464                 : /* Generates a DUMP-format representation of the object 'o', adding it to the
    1465                 :  * io stream pointed by 'rio'. This function can't fail. */
    1466               4 : void createDumpPayload(rio *payload, robj *o) {
    1467                 :     unsigned char buf[2];
    1468                 :     uint64_t crc;
    1469                 : 
    1470                 :     /* Serialize the object in a RDB-like format. It consist of an object type
    1471                 :      * byte followed by the serialized object. This is understood by RESTORE. */
    1472               4 :     rioInitWithBuffer(payload,sdsempty());
    1473               4 :     redisAssert(rdbSaveObjectType(payload,o));
    1474               4 :     redisAssert(rdbSaveObject(payload,o));
    1475                 : 
    1476                 :     /* Write the footer, this is how it looks like:
    1477                 :      * ----------------+---------------------+---------------+
    1478                 :      * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
    1479                 :      * ----------------+---------------------+---------------+
    1480                 :      * RDB version and CRC are both in little endian.
    1481                 :      */
    1482                 : 
    1483                 :     /* RDB version */
    1484               4 :     buf[0] = REDIS_RDB_VERSION & 0xff;
    1485               4 :     buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
    1486               4 :     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
    1487                 : 
    1488                 :     /* CRC64 */
    1489               8 :     crc = crc64((unsigned char*)payload->io.buffer.ptr,
    1490                 :                 sdslen(payload->io.buffer.ptr));
    1491                 :     memrev64ifbe(&crc);
    1492               4 :     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
    1493               4 : }
    1494                 : 
    1495                 : /* Verify that the RDB version of the dump payload matches the one of this Redis
    1496                 :  * instance and that the checksum is ok.
    1497                 :  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
    1498                 :  * is returned. */
    1499               3 : int verifyDumpPayload(unsigned char *p, size_t len) {
    1500                 :     unsigned char *footer;
    1501                 :     uint16_t rdbver;
    1502                 :     uint64_t crc;
    1503                 : 
    1504                 :     /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
    1505               3 :     if (len < 10) return REDIS_ERR;
    1506               3 :     footer = p+(len-10);
    1507                 : 
    1508                 :     /* Verify RDB version */
    1509               3 :     rdbver = (footer[1] << 8) | footer[0];
    1510               3 :     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
    1511                 : 
    1512                 :     /* Verify CRC64 */
    1513               3 :     crc = crc64(p,len-8);
    1514                 :     memrev64ifbe(&crc);
    1515               3 :     return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
    1516                 : }
    1517                 : 
    1518                 : /* DUMP keyname
    1519                 :  * DUMP is actually not used by Redis Cluster but it is the obvious
    1520                 :  * complement of RESTORE and can be useful for different applications. */
    1521               3 : void dumpCommand(redisClient *c) {
    1522                 :     robj *o, *dumpobj;
    1523                 :     rio payload;
    1524                 : 
    1525                 :     /* Check if the key is here. */
    1526               3 :     if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
    1527               1 :         addReply(c,shared.nullbulk);
    1528               1 :         return;
    1529                 :     }
    1530                 : 
    1531                 :     /* Create the DUMP encoded representation. */
    1532               2 :     createDumpPayload(&payload,o);
    1533                 : 
    1534                 :     /* Transfer to the client */
    1535               2 :     dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
    1536               2 :     addReplyBulk(c,dumpobj);
    1537               2 :     decrRefCount(dumpobj);
    1538               2 :     return;
    1539                 : }
    1540                 : 
    1541                 : /* RESTORE key ttl serialized-value */
    1542               4 : void restoreCommand(redisClient *c) {
    1543                 :     long ttl;
    1544                 :     rio payload;
    1545                 :     int type;
    1546                 :     robj *obj;
    1547                 : 
    1548                 :     /* Make sure this key does not already exist here... */
    1549               4 :     if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
    1550               1 :         addReplyError(c,"Target key name is busy.");
    1551               1 :         return;
    1552                 :     }
    1553                 : 
    1554                 :     /* Check if the TTL value makes sense */
    1555               3 :     if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
    1556                 :         return;
    1557               3 :     } else if (ttl < 0) {
    1558               0 :         addReplyError(c,"Invalid TTL value, must be >= 0");
    1559               0 :         return;
    1560                 :     }
    1561                 : 
    1562                 :     /* Verify RDB version and data checksum. */
    1563               6 :     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
    1564               0 :         addReplyError(c,"DUMP payload version or checksum are wrong");
    1565               0 :         return;
    1566                 :     }
    1567                 : 
    1568               3 :     rioInitWithBuffer(&payload,c->argv[3]->ptr);
    1569               6 :     if (((type = rdbLoadObjectType(&payload)) == -1) ||
    1570               3 :         ((obj = rdbLoadObject(type,&payload)) == NULL))
    1571                 :     {
    1572               0 :         addReplyError(c,"Bad data format");
    1573               0 :         return;
    1574                 :     }
    1575                 : 
    1576                 :     /* Create the key and set the TTL if any */
    1577               3 :     dbAdd(c->db,c->argv[1],obj);
    1578               3 :     if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
    1579               3 :     signalModifiedKey(c->db,c->argv[1]);
    1580               3 :     addReply(c,shared.ok);
    1581               3 :     server.dirty++;
    1582                 : }
    1583                 : 
    1584                 : /* MIGRATE host port key dbid timeout */
    1585               2 : void migrateCommand(redisClient *c) {
    1586                 :     int fd;
    1587                 :     long timeout;
    1588                 :     long dbid;
    1589                 :     long long ttl;
    1590                 :     robj *o;
    1591                 :     rio cmd, payload;
    1592                 : 
    1593                 :     /* Sanity check */
    1594               2 :     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
    1595                 :         return;
    1596               2 :     if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
    1597                 :         return;
    1598               2 :     if (timeout <= 0) timeout = 1;
    1599                 : 
    1600                 :     /* Check if the key is here. If not we reply with success as there is
    1601                 :      * nothing to migrate (for instance the key expired in the meantime), but
    1602                 :      * we include such information in the reply string. */
    1603               2 :     if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
    1604               0 :         addReplySds(c,sdsnew("+NOKEY\r\n"));
    1605               0 :         return;
    1606                 :     }
    1607                 :     
    1608                 :     /* Connect */
    1609               4 :     fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
    1610               2 :                 atoi(c->argv[2]->ptr));
    1611               2 :     if (fd == -1) {
    1612               0 :         addReplyErrorFormat(c,"Can't connect to target node: %s",
    1613                 :             server.neterr);
    1614               0 :         return;
    1615                 :     }
    1616               2 :     if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
    1617               0 :         addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
    1618               0 :         return;
    1619                 :     }
    1620                 : 
    1621                 :     /* Create RESTORE payload and generate the protocol to call the command. */
    1622               2 :     rioInitWithBuffer(&cmd,sdsempty());
    1623               2 :     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
    1624               2 :     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
    1625               2 :     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
    1626                 : 
    1627               2 :     ttl = getExpire(c->db,c->argv[3])-mstime();
    1628               2 :     if (ttl < 1) ttl = 1;
    1629               2 :     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
    1630               2 :     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
    1631               2 :     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
    1632               4 :     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
    1633               2 :     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
    1634                 : 
    1635                 :     /* Finally the last argument that is the serailized object payload
    1636                 :      * in the DUMP format. */
    1637               2 :     createDumpPayload(&payload,o);
    1638               4 :     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
    1639                 :                                 sdslen(payload.io.buffer.ptr)));
    1640               2 :     sdsfree(payload.io.buffer.ptr);
    1641                 : 
    1642                 :     /* Tranfer the query to the other node in 64K chunks. */
    1643                 :     {
    1644               2 :         sds buf = cmd.io.buffer.ptr;
    1645               2 :         size_t pos = 0, towrite;
    1646               2 :         int nwritten = 0;
    1647                 : 
    1648               6 :         while ((towrite = sdslen(buf)-pos) > 0) {
    1649               2 :             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
    1650               2 :             nwritten = syncWrite(fd,buf+pos,towrite,timeout);
    1651               2 :             if (nwritten != (signed)towrite) goto socket_wr_err;
    1652               2 :             pos += nwritten;
    1653                 :         }
    1654                 :     }
    1655                 : 
    1656                 :     /* Read back the reply. */
    1657                 :     {
    1658                 :         char buf1[1024];
    1659                 :         char buf2[1024];
    1660                 : 
    1661                 :         /* Read the two replies */
    1662               2 :         if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
    1663                 :             goto socket_rd_err;
    1664               1 :         if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
    1665                 :             goto socket_rd_err;
    1666               1 :         if (buf1[0] == '-' || buf2[0] == '-') {
    1667               0 :             addReplyErrorFormat(c,"Target instance replied with error: %s",
    1668               0 :                 (buf1[0] == '-') ? buf1+1 : buf2+1);
    1669                 :         } else {
    1670                 :             robj *aux;
    1671                 : 
    1672               1 :             dbDelete(c->db,c->argv[3]);
    1673               1 :             signalModifiedKey(c->db,c->argv[3]);
    1674               1 :             addReply(c,shared.ok);
    1675               1 :             server.dirty++;
    1676                 : 
    1677                 :             /* Translate MIGRATE as DEL for replication/AOF. */
    1678               1 :             aux = createStringObject("DEL",3);
    1679               1 :             rewriteClientCommandVector(c,2,aux,c->argv[3]);
    1680               1 :             decrRefCount(aux);
    1681                 :         }
    1682                 :     }
    1683                 : 
    1684               1 :     sdsfree(cmd.io.buffer.ptr);
    1685               1 :     close(fd);
    1686               1 :     return;
    1687                 : 
    1688                 : socket_wr_err:
    1689               0 :     addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
    1690               0 :     sdsfree(cmd.io.buffer.ptr);
    1691               0 :     close(fd);
    1692               0 :     return;
    1693                 : 
    1694                 : socket_rd_err:
    1695               1 :     addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
    1696               1 :     sdsfree(cmd.io.buffer.ptr);
    1697               1 :     close(fd);
    1698               1 :     return;
    1699                 : }
    1700                 : 
    1701                 : /* The ASKING command is required after a -ASK redirection.
    1702                 :  * The client should issue ASKING before to actualy send the command to
    1703                 :  * the target instance. See the Redis Cluster specification for more
    1704                 :  * information. */
    1705               0 : void askingCommand(redisClient *c) {
    1706               0 :     if (server.cluster_enabled == 0) {
    1707               0 :         addReplyError(c,"This instance has cluster support disabled");
    1708               0 :         return;
    1709                 :     }
    1710               0 :     c->flags |= REDIS_ASKING;
    1711               0 :     addReply(c,shared.ok);
    1712                 : }
    1713                 : 
    1714                 : /* -----------------------------------------------------------------------------
    1715                 :  * Cluster functions related to serving / redirecting clients
    1716                 :  * -------------------------------------------------------------------------- */
    1717                 : 
    1718                 : /* Return the pointer to the cluster node that is able to serve the query
    1719                 :  * as all the keys belong to hash slots for which the node is in charge.
    1720                 :  *
    1721                 :  * If the returned node should be used only for this request, the *ask
    1722                 :  * integer is set to '1', otherwise to '0'. This is used in order to
    1723                 :  * let the caller know if we should reply with -MOVED or with -ASK.
    1724                 :  *
    1725                 :  * If the request contains more than a single key NULL is returned,
    1726                 :  * however a request with more then a key argument where the key is always
    1727                 :  * the same is valid, like in: RPOPLPUSH mylist mylist.*/
    1728               0 : clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
    1729               0 :     clusterNode *n = NULL;
    1730               0 :     robj *firstkey = NULL;
    1731                 :     multiState *ms, _ms;
    1732                 :     multiCmd mc;
    1733               0 :     int i, slot = 0;
    1734                 : 
    1735                 :     /* We handle all the cases as if they were EXEC commands, so we have
    1736                 :      * a common code path for everything */
    1737               0 :     if (cmd->proc == execCommand) {
    1738                 :         /* If REDIS_MULTI flag is not set EXEC is just going to return an
    1739                 :          * error. */
    1740               0 :         if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
    1741               0 :         ms = &c->mstate;
    1742                 :     } else {
    1743                 :         /* In order to have a single codepath create a fake Multi State
    1744                 :          * structure if the client is not in MULTI/EXEC state, this way
    1745                 :          * we have a single codepath below. */
    1746               0 :         ms = &_ms;
    1747               0 :         _ms.commands = &mc;
    1748               0 :         _ms.count = 1;
    1749               0 :         mc.argv = argv;
    1750               0 :         mc.argc = argc;
    1751               0 :         mc.cmd = cmd;
    1752                 :     }
    1753                 : 
    1754                 :     /* Check that all the keys are the same key, and get the slot and
    1755                 :      * node for this key. */
    1756               0 :     for (i = 0; i < ms->count; i++) {
    1757                 :         struct redisCommand *mcmd;
    1758                 :         robj **margv;
    1759                 :         int margc, *keyindex, numkeys, j;
    1760                 : 
    1761               0 :         mcmd = ms->commands[i].cmd;
    1762               0 :         margc = ms->commands[i].argc;
    1763               0 :         margv = ms->commands[i].argv;
    1764                 : 
    1765               0 :         keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
    1766                 :                                       REDIS_GETKEYS_ALL);
    1767               0 :         for (j = 0; j < numkeys; j++) {
    1768               0 :             if (firstkey == NULL) {
    1769                 :                 /* This is the first key we see. Check what is the slot
    1770                 :                  * and node. */
    1771               0 :                 firstkey = margv[keyindex[j]];
    1772                 : 
    1773               0 :                 slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
    1774               0 :                 n = server.cluster.slots[slot];
    1775               0 :                 redisAssertWithInfo(c,firstkey,n != NULL);
    1776                 :             } else {
    1777                 :                 /* If it is not the first key, make sure it is exactly
    1778                 :                  * the same key as the first we saw. */
    1779               0 :                 if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
    1780               0 :                     decrRefCount(firstkey);
    1781               0 :                     getKeysFreeResult(keyindex);
    1782               0 :                     return NULL;
    1783                 :                 }
    1784                 :             }
    1785                 :         }
    1786               0 :         getKeysFreeResult(keyindex);
    1787                 :     }
    1788               0 :     if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
    1789                 :     /* No key at all in command? then we can serve the request
    1790                 :      * without redirections. */
    1791               0 :     if (n == NULL) return server.cluster.myself;
    1792               0 :     if (hashslot) *hashslot = slot;
    1793                 :     /* This request is about a slot we are migrating into another instance?
    1794                 :      * Then we need to check if we have the key. If we have it we can reply.
    1795                 :      * If instead is a new key, we pass the request to the node that is
    1796                 :      * receiving the slot. */
    1797               0 :     if (n == server.cluster.myself &&
    1798               0 :         server.cluster.migrating_slots_to[slot] != NULL)
    1799                 :     {
    1800               0 :         if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
    1801               0 :             if (ask) *ask = 1;
    1802               0 :             return server.cluster.migrating_slots_to[slot];
    1803                 :         }
    1804                 :     }
    1805                 :     /* Handle the case in which we are receiving this hash slot from
    1806                 :      * another instance, so we'll accept the query even if in the table
    1807                 :      * it is assigned to a different node, but only if the client
    1808                 :      * issued an ASKING command before. */
    1809               0 :     if (server.cluster.importing_slots_from[slot] != NULL &&
    1810               0 :         c->flags & REDIS_ASKING) {
    1811               0 :         return server.cluster.myself;
    1812                 :     }
    1813                 :     /* It's not a -ASK case. Base case: just return the right node. */
    1814               0 :     return n;
    1815                 : }

Generated by: LCOV version 1.7