1 : #include "redis.h"
2 : #include "endianconv.h"
3 :
4 : #include <arpa/inet.h>
5 : #include <fcntl.h>
6 : #include <unistd.h>
7 :
8 : void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
9 : void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
10 : void clusterSendPing(clusterLink *link, int type);
11 : void clusterSendFail(char *nodename);
12 : void clusterUpdateState(void);
13 : int clusterNodeGetSlotBit(clusterNode *n, int slot);
14 : sds clusterGenNodesDescription(void);
15 : clusterNode *clusterLookupNode(char *name);
16 : int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
17 : int clusterAddSlot(clusterNode *n, int slot);
18 :
19 : /* -----------------------------------------------------------------------------
20 : * Initialization
21 : * -------------------------------------------------------------------------- */
22 :
23 0 : int clusterLoadConfig(char *filename) {
24 0 : FILE *fp = fopen(filename,"r");
25 : char *line;
26 : int maxline, j;
27 :
28 0 : if (fp == NULL) return REDIS_ERR;
29 :
30 : /* Parse the file. Note that single liens of the cluster config file can
31 : * be really long as they include all the hash slots of the node.
32 : * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
33 : * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
34 0 : maxline = 1024+REDIS_CLUSTER_SLOTS*16;
35 0 : line = zmalloc(maxline);
36 0 : while(fgets(line,maxline,fp) != NULL) {
37 : int argc;
38 0 : sds *argv = sdssplitargs(line,&argc);
39 : clusterNode *n, *master;
40 : char *p, *s;
41 :
42 : /* Create this node if it does not exist */
43 0 : n = clusterLookupNode(argv[0]);
44 0 : if (!n) {
45 0 : n = createClusterNode(argv[0],0);
46 0 : clusterAddNode(n);
47 : }
48 : /* Address and port */
49 0 : if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
50 0 : *p = '\0';
51 0 : memcpy(n->ip,argv[1],strlen(argv[1])+1);
52 0 : n->port = atoi(p+1);
53 :
54 : /* Parse flags */
55 0 : p = s = argv[2];
56 0 : while(p) {
57 0 : p = strchr(s,',');
58 0 : if (p) *p = '\0';
59 0 : if (!strcasecmp(s,"myself")) {
60 0 : redisAssert(server.cluster.myself == NULL);
61 0 : server.cluster.myself = n;
62 0 : n->flags |= REDIS_NODE_MYSELF;
63 0 : } else if (!strcasecmp(s,"master")) {
64 0 : n->flags |= REDIS_NODE_MASTER;
65 0 : } else if (!strcasecmp(s,"slave")) {
66 0 : n->flags |= REDIS_NODE_SLAVE;
67 0 : } else if (!strcasecmp(s,"fail?")) {
68 0 : n->flags |= REDIS_NODE_PFAIL;
69 0 : } else if (!strcasecmp(s,"fail")) {
70 0 : n->flags |= REDIS_NODE_FAIL;
71 0 : } else if (!strcasecmp(s,"handshake")) {
72 0 : n->flags |= REDIS_NODE_HANDSHAKE;
73 0 : } else if (!strcasecmp(s,"noaddr")) {
74 0 : n->flags |= REDIS_NODE_NOADDR;
75 0 : } else if (!strcasecmp(s,"noflags")) {
76 : /* nothing to do */
77 : } else {
78 0 : redisPanic("Unknown flag in redis cluster config file");
79 : }
80 0 : if (p) s = p+1;
81 : }
82 :
83 : /* Get master if any. Set the master and populate master's
84 : * slave list. */
85 0 : if (argv[3][0] != '-') {
86 0 : master = clusterLookupNode(argv[3]);
87 0 : if (!master) {
88 0 : master = createClusterNode(argv[3],0);
89 0 : clusterAddNode(master);
90 : }
91 0 : n->slaveof = master;
92 0 : clusterNodeAddSlave(master,n);
93 : }
94 :
95 : /* Set ping sent / pong received timestamps */
96 0 : if (atoi(argv[4])) n->ping_sent = time(NULL);
97 0 : if (atoi(argv[5])) n->pong_received = time(NULL);
98 :
99 : /* Populate hash slots served by this instance. */
100 0 : for (j = 7; j < argc; j++) {
101 : int start, stop;
102 :
103 0 : if (argv[j][0] == '[') {
104 : /* Here we handle migrating / importing slots */
105 : int slot;
106 : char direction;
107 : clusterNode *cn;
108 :
109 0 : p = strchr(argv[j],'-');
110 0 : redisAssert(p != NULL);
111 0 : *p = '\0';
112 0 : direction = p[1]; /* Either '>' or '<' */
113 0 : slot = atoi(argv[j]+1);
114 0 : p += 3;
115 0 : cn = clusterLookupNode(p);
116 0 : if (!cn) {
117 0 : cn = createClusterNode(p,0);
118 0 : clusterAddNode(cn);
119 : }
120 0 : if (direction == '>') {
121 0 : server.cluster.migrating_slots_to[slot] = cn;
122 : } else {
123 0 : server.cluster.importing_slots_from[slot] = cn;
124 : }
125 0 : continue;
126 0 : } else if ((p = strchr(argv[j],'-')) != NULL) {
127 0 : *p = '\0';
128 0 : start = atoi(argv[j]);
129 0 : stop = atoi(p+1);
130 : } else {
131 0 : start = stop = atoi(argv[j]);
132 : }
133 0 : while(start <= stop) clusterAddSlot(n, start++);
134 : }
135 :
136 0 : sdssplitargs_free(argv,argc);
137 : }
138 0 : zfree(line);
139 0 : fclose(fp);
140 :
141 : /* Config sanity check */
142 0 : redisAssert(server.cluster.myself != NULL);
143 0 : redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
144 0 : server.cluster.myself->name);
145 0 : clusterUpdateState();
146 0 : return REDIS_OK;
147 :
148 : fmterr:
149 0 : redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
150 0 : fclose(fp);
151 0 : exit(1);
152 : }
153 :
154 : /* Cluster node configuration is exactly the same as CLUSTER NODES output.
155 : *
156 : * This function writes the node config and returns 0, on error -1
157 : * is returned. */
158 0 : int clusterSaveConfig(void) {
159 0 : sds ci = clusterGenNodesDescription();
160 : int fd;
161 :
162 0 : if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
163 : == -1) goto err;
164 0 : if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
165 0 : close(fd);
166 0 : sdsfree(ci);
167 0 : return 0;
168 :
169 : err:
170 0 : sdsfree(ci);
171 0 : return -1;
172 : }
173 :
174 0 : void clusterSaveConfigOrDie(void) {
175 0 : if (clusterSaveConfig() == -1) {
176 0 : redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
177 0 : exit(1);
178 : }
179 0 : }
180 :
181 0 : void clusterInit(void) {
182 0 : int saveconf = 0;
183 :
184 0 : server.cluster.myself = NULL;
185 0 : server.cluster.state = REDIS_CLUSTER_FAIL;
186 0 : server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
187 0 : server.cluster.node_timeout = 15;
188 : memset(server.cluster.migrating_slots_to,0,
189 : sizeof(server.cluster.migrating_slots_to));
190 : memset(server.cluster.importing_slots_from,0,
191 : sizeof(server.cluster.importing_slots_from));
192 : memset(server.cluster.slots,0,
193 : sizeof(server.cluster.slots));
194 0 : if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
195 : /* No configuration found. We will just use the random name provided
196 : * by the createClusterNode() function. */
197 0 : server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
198 0 : redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
199 0 : server.cluster.myself->name);
200 0 : clusterAddNode(server.cluster.myself);
201 0 : saveconf = 1;
202 : }
203 0 : if (saveconf) clusterSaveConfigOrDie();
204 : /* We need a listening TCP port for our cluster messaging needs */
205 0 : server.cfd = anetTcpServer(server.neterr,
206 : server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
207 0 : if (server.cfd == -1) {
208 0 : redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr);
209 0 : exit(1);
210 : }
211 0 : if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
212 0 : clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
213 0 : server.cluster.slots_to_keys = zslCreate();
214 0 : }
215 :
216 : /* -----------------------------------------------------------------------------
217 : * CLUSTER communication link
218 : * -------------------------------------------------------------------------- */
219 :
220 0 : clusterLink *createClusterLink(clusterNode *node) {
221 0 : clusterLink *link = zmalloc(sizeof(*link));
222 0 : link->sndbuf = sdsempty();
223 0 : link->rcvbuf = sdsempty();
224 0 : link->node = node;
225 0 : link->fd = -1;
226 0 : return link;
227 : }
228 :
229 : /* Free a cluster link, but does not free the associated node of course.
230 : * Just this function will make sure that the original node associated
231 : * with this link will have the 'link' field set to NULL. */
232 0 : void freeClusterLink(clusterLink *link) {
233 0 : if (link->fd != -1) {
234 0 : aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
235 0 : aeDeleteFileEvent(server.el, link->fd, AE_READABLE);
236 : }
237 0 : sdsfree(link->sndbuf);
238 0 : sdsfree(link->rcvbuf);
239 0 : if (link->node)
240 0 : link->node->link = NULL;
241 0 : close(link->fd);
242 0 : zfree(link);
243 0 : }
244 :
245 0 : void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
246 : int cport, cfd;
247 : char cip[128];
248 : clusterLink *link;
249 : REDIS_NOTUSED(el);
250 : REDIS_NOTUSED(mask);
251 : REDIS_NOTUSED(privdata);
252 :
253 0 : cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
254 0 : if (cfd == AE_ERR) {
255 0 : redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
256 0 : return;
257 : }
258 0 : redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
259 : /* We need to create a temporary node in order to read the incoming
260 : * packet in a valid contest. This node will be released once we
261 : * read the packet and reply. */
262 0 : link = createClusterLink(NULL);
263 0 : link->fd = cfd;
264 0 : aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
265 : }
266 :
267 : /* -----------------------------------------------------------------------------
268 : * Key space handling
269 : * -------------------------------------------------------------------------- */
270 :
271 : /* We have 4096 hash slots. The hash slot of a given key is obtained
272 : * as the least significant 12 bits of the crc16 of the key. */
273 0 : unsigned int keyHashSlot(char *key, int keylen) {
274 0 : return crc16(key,keylen) & 0x0FFF;
275 : }
276 :
277 : /* -----------------------------------------------------------------------------
278 : * CLUSTER node API
279 : * -------------------------------------------------------------------------- */
280 :
281 : /* Create a new cluster node, with the specified flags.
282 : * If "nodename" is NULL this is considered a first handshake and a random
283 : * node name is assigned to this node (it will be fixed later when we'll
284 : * receive the first pong).
285 : *
286 : * The node is created and returned to the user, but it is not automatically
287 : * added to the nodes hash table. */
288 0 : clusterNode *createClusterNode(char *nodename, int flags) {
289 0 : clusterNode *node = zmalloc(sizeof(*node));
290 :
291 0 : if (nodename)
292 0 : memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
293 : else
294 0 : getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
295 0 : node->flags = flags;
296 0 : memset(node->slots,0,sizeof(node->slots));
297 0 : node->numslaves = 0;
298 0 : node->slaves = NULL;
299 0 : node->slaveof = NULL;
300 0 : node->ping_sent = node->pong_received = 0;
301 0 : node->configdigest = NULL;
302 0 : node->configdigest_ts = 0;
303 0 : node->link = NULL;
304 0 : return node;
305 : }
306 :
307 0 : int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
308 : int j;
309 :
310 0 : for (j = 0; j < master->numslaves; j++) {
311 0 : if (master->slaves[j] == slave) {
312 0 : memmove(master->slaves+j,master->slaves+(j+1),
313 0 : (master->numslaves-1)-j);
314 0 : master->numslaves--;
315 0 : return REDIS_OK;
316 : }
317 : }
318 0 : return REDIS_ERR;
319 : }
320 :
321 0 : int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
322 : int j;
323 :
324 : /* If it's already a slave, don't add it again. */
325 0 : for (j = 0; j < master->numslaves; j++)
326 0 : if (master->slaves[j] == slave) return REDIS_ERR;
327 0 : master->slaves = zrealloc(master->slaves,
328 0 : sizeof(clusterNode*)*(master->numslaves+1));
329 0 : master->slaves[master->numslaves] = slave;
330 0 : master->numslaves++;
331 0 : return REDIS_OK;
332 : }
333 :
334 0 : void clusterNodeResetSlaves(clusterNode *n) {
335 0 : zfree(n->slaves);
336 0 : n->numslaves = 0;
337 0 : }
338 :
339 0 : void freeClusterNode(clusterNode *n) {
340 : sds nodename;
341 :
342 0 : nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
343 0 : redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK);
344 0 : sdsfree(nodename);
345 0 : if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
346 0 : if (n->link) freeClusterLink(n->link);
347 0 : zfree(n);
348 0 : }
349 :
350 : /* Add a node to the nodes hash table */
351 0 : int clusterAddNode(clusterNode *node) {
352 : int retval;
353 :
354 0 : retval = dictAdd(server.cluster.nodes,
355 0 : sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
356 0 : return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
357 : }
358 :
359 : /* Node lookup by name */
360 0 : clusterNode *clusterLookupNode(char *name) {
361 0 : sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN);
362 : struct dictEntry *de;
363 :
364 0 : de = dictFind(server.cluster.nodes,s);
365 0 : sdsfree(s);
366 0 : if (de == NULL) return NULL;
367 0 : return dictGetVal(de);
368 : }
369 :
370 : /* This is only used after the handshake. When we connect a given IP/PORT
371 : * as a result of CLUSTER MEET we don't have the node name yet, so we
372 : * pick a random one, and will fix it when we receive the PONG request using
373 : * this function. */
374 0 : void clusterRenameNode(clusterNode *node, char *newname) {
375 : int retval;
376 0 : sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN);
377 :
378 0 : redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s",
379 : node->name, newname);
380 0 : retval = dictDelete(server.cluster.nodes, s);
381 0 : sdsfree(s);
382 0 : redisAssert(retval == DICT_OK);
383 0 : memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN);
384 0 : clusterAddNode(node);
385 0 : }
386 :
387 : /* -----------------------------------------------------------------------------
388 : * CLUSTER messages exchange - PING/PONG and gossip
389 : * -------------------------------------------------------------------------- */
390 :
391 : /* Process the gossip section of PING or PONG packets.
392 : * Note that this function assumes that the packet is already sanity-checked
393 : * by the caller, not in the content of the gossip section, but in the
394 : * length. */
395 0 : void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
396 0 : uint16_t count = ntohs(hdr->count);
397 0 : clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
398 0 : clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
399 :
400 0 : while(count--) {
401 0 : sds ci = sdsempty();
402 0 : uint16_t flags = ntohs(g->flags);
403 : clusterNode *node;
404 :
405 0 : if (flags == 0) ci = sdscat(ci,"noflags,");
406 0 : if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
407 0 : if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
408 0 : if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
409 0 : if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
410 0 : if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
411 0 : if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
412 0 : if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
413 0 : if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
414 :
415 0 : redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
416 : g->nodename,
417 : g->ip,
418 0 : ntohs(g->port),
419 : ci);
420 0 : sdsfree(ci);
421 :
422 : /* Update our state accordingly to the gossip sections */
423 0 : node = clusterLookupNode(g->nodename);
424 0 : if (node != NULL) {
425 : /* We already know this node. Let's start updating the last
426 : * time PONG figure if it is newer than our figure.
427 : * Note that it's not a problem if we have a PING already
428 : * in progress against this node. */
429 0 : if (node->pong_received < (signed) ntohl(g->pong_received)) {
430 0 : redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
431 0 : node->pong_received = ntohl(g->pong_received);
432 : }
433 : /* Mark this node as FAILED if we think it is possibly failing
434 : * and another node also thinks it's failing. */
435 0 : if (node->flags & REDIS_NODE_PFAIL &&
436 0 : (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)))
437 : {
438 0 : redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name);
439 0 : node->flags &= ~REDIS_NODE_PFAIL;
440 0 : node->flags |= REDIS_NODE_FAIL;
441 : /* Broadcast the failing node name to everybody */
442 0 : clusterSendFail(node->name);
443 0 : clusterUpdateState();
444 0 : clusterSaveConfigOrDie();
445 : }
446 : } else {
447 : /* If it's not in NOADDR state and we don't have it, we
448 : * start an handshake process against this IP/PORT pairs.
449 : *
450 : * Note that we require that the sender of this gossip message
451 : * is a well known node in our cluster, otherwise we risk
452 : * joining another cluster. */
453 0 : if (sender && !(flags & REDIS_NODE_NOADDR)) {
454 : clusterNode *newnode;
455 :
456 0 : redisLog(REDIS_DEBUG,"Adding the new node");
457 0 : newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
458 0 : memcpy(newnode->ip,g->ip,sizeof(g->ip));
459 0 : newnode->port = ntohs(g->port);
460 0 : clusterAddNode(newnode);
461 : }
462 : }
463 :
464 : /* Next node */
465 0 : g++;
466 : }
467 0 : }
468 :
469 : /* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */
470 0 : void nodeIp2String(char *buf, clusterLink *link) {
471 : struct sockaddr_in sa;
472 0 : socklen_t salen = sizeof(sa);
473 :
474 0 : if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1)
475 0 : redisPanic("getpeername() failed.");
476 0 : strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip));
477 0 : }
478 :
479 :
480 : /* Update the node address to the IP address that can be extracted
481 : * from link->fd, and at the specified port. */
482 0 : void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) {
483 : /* TODO */
484 0 : }
485 :
486 : /* When this function is called, there is a packet to process starting
487 : * at node->rcvbuf. Releasing the buffer is up to the caller, so this
488 : * function should just handle the higher level stuff of processing the
489 : * packet, modifying the cluster state if needed.
490 : *
491 : * The function returns 1 if the link is still valid after the packet
492 : * was processed, otherwise 0 if the link was freed since the packet
493 : * processing lead to some inconsistency error (for instance a PONG
494 : * received from the wrong sender ID). */
495 0 : int clusterProcessPacket(clusterLink *link) {
496 0 : clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
497 0 : uint32_t totlen = ntohl(hdr->totlen);
498 0 : uint16_t type = ntohs(hdr->type);
499 : clusterNode *sender;
500 :
501 0 : redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
502 : type, (unsigned long) totlen);
503 :
504 : /* Perform sanity checks */
505 0 : if (totlen < 8) return 1;
506 0 : if (totlen > sdslen(link->rcvbuf)) return 1;
507 0 : if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
508 : type == CLUSTERMSG_TYPE_MEET)
509 : {
510 0 : uint16_t count = ntohs(hdr->count);
511 : uint32_t explen; /* expected length of this packet */
512 :
513 0 : explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
514 0 : explen += (sizeof(clusterMsgDataGossip)*count);
515 0 : if (totlen != explen) return 1;
516 : }
517 0 : if (type == CLUSTERMSG_TYPE_FAIL) {
518 0 : uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
519 :
520 0 : explen += sizeof(clusterMsgDataFail);
521 0 : if (totlen != explen) return 1;
522 : }
523 0 : if (type == CLUSTERMSG_TYPE_PUBLISH) {
524 0 : uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
525 :
526 0 : explen += sizeof(clusterMsgDataPublish) +
527 0 : ntohl(hdr->data.publish.msg.channel_len) +
528 0 : ntohl(hdr->data.publish.msg.message_len);
529 0 : if (totlen != explen) return 1;
530 : }
531 :
532 : /* Ready to process the packet. Dispatch by type. */
533 0 : sender = clusterLookupNode(hdr->sender);
534 0 : if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
535 0 : int update_config = 0;
536 0 : redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
537 :
538 : /* Add this node if it is new for us and the msg type is MEET.
539 : * In this stage we don't try to add the node with the right
540 : * flags, slaveof pointer, and so forth, as this details will be
541 : * resolved when we'll receive PONGs from the server. */
542 0 : if (!sender && type == CLUSTERMSG_TYPE_MEET) {
543 : clusterNode *node;
544 :
545 0 : node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
546 0 : nodeIp2String(node->ip,link);
547 0 : node->port = ntohs(hdr->port);
548 0 : clusterAddNode(node);
549 0 : update_config = 1;
550 : }
551 :
552 : /* Get info from the gossip section */
553 0 : clusterProcessGossipSection(hdr,link);
554 :
555 : /* Anyway reply with a PONG */
556 0 : clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
557 :
558 : /* Update config if needed */
559 0 : if (update_config) clusterSaveConfigOrDie();
560 0 : } else if (type == CLUSTERMSG_TYPE_PONG) {
561 0 : int update_state = 0;
562 0 : int update_config = 0;
563 :
564 0 : redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
565 0 : if (link->node) {
566 0 : if (link->node->flags & REDIS_NODE_HANDSHAKE) {
567 : /* If we already have this node, try to change the
568 : * IP/port of the node with the new one. */
569 0 : if (sender) {
570 0 : redisLog(REDIS_WARNING,
571 : "Handshake error: we already know node %.40s, updating the address if needed.", sender->name);
572 0 : nodeUpdateAddress(sender,link,ntohs(hdr->port));
573 0 : freeClusterNode(link->node); /* will free the link too */
574 0 : return 0;
575 : }
576 :
577 : /* First thing to do is replacing the random name with the
578 : * right node name if this was an handshake stage. */
579 0 : clusterRenameNode(link->node, hdr->sender);
580 0 : redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
581 0 : link->node->name);
582 0 : link->node->flags &= ~REDIS_NODE_HANDSHAKE;
583 0 : update_config = 1;
584 0 : } else if (memcmp(link->node->name,hdr->sender,
585 : REDIS_CLUSTER_NAMELEN) != 0)
586 : {
587 : /* If the reply has a non matching node ID we
588 : * disconnect this node and set it as not having an associated
589 : * address. */
590 0 : redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
591 0 : link->node->flags |= REDIS_NODE_NOADDR;
592 0 : freeClusterLink(link);
593 0 : update_config = 1;
594 : /* FIXME: remove this node if we already have it.
595 : *
596 : * If we already have it but the IP is different, use
597 : * the new one if the old node is in FAIL, PFAIL, or NOADDR
598 : * status... */
599 0 : return 0;
600 : }
601 : }
602 : /* Update our info about the node */
603 0 : if (link->node) link->node->pong_received = time(NULL);
604 :
605 : /* Update master/slave info */
606 0 : if (sender) {
607 0 : if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
608 : sizeof(hdr->slaveof)))
609 : {
610 0 : sender->flags &= ~REDIS_NODE_SLAVE;
611 0 : sender->flags |= REDIS_NODE_MASTER;
612 0 : sender->slaveof = NULL;
613 : } else {
614 0 : clusterNode *master = clusterLookupNode(hdr->slaveof);
615 :
616 0 : sender->flags &= ~REDIS_NODE_MASTER;
617 0 : sender->flags |= REDIS_NODE_SLAVE;
618 0 : if (sender->numslaves) clusterNodeResetSlaves(sender);
619 0 : if (master) clusterNodeAddSlave(master,sender);
620 : }
621 : }
622 :
623 : /* Update our info about served slots if this new node is serving
624 : * slots that are not served from our point of view. */
625 0 : if (sender && sender->flags & REDIS_NODE_MASTER) {
626 : int newslots, j;
627 :
628 0 : newslots =
629 0 : memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0;
630 0 : memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots));
631 0 : if (newslots) {
632 0 : for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
633 0 : if (clusterNodeGetSlotBit(sender,j)) {
634 0 : if (server.cluster.slots[j] == sender) continue;
635 0 : if (server.cluster.slots[j] == NULL ||
636 0 : server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
637 : {
638 0 : server.cluster.slots[j] = sender;
639 0 : update_state = update_config = 1;
640 : }
641 : }
642 : }
643 : }
644 : }
645 :
646 : /* Get info from the gossip section */
647 0 : clusterProcessGossipSection(hdr,link);
648 :
649 : /* Update the cluster state if needed */
650 0 : if (update_state) clusterUpdateState();
651 0 : if (update_config) clusterSaveConfigOrDie();
652 0 : } else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
653 : clusterNode *failing;
654 :
655 0 : failing = clusterLookupNode(hdr->data.fail.about.nodename);
656 0 : if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
657 : {
658 0 : redisLog(REDIS_NOTICE,
659 : "FAIL message received from %.40s about %.40s",
660 : hdr->sender, hdr->data.fail.about.nodename);
661 0 : failing->flags |= REDIS_NODE_FAIL;
662 0 : failing->flags &= ~REDIS_NODE_PFAIL;
663 0 : clusterUpdateState();
664 0 : clusterSaveConfigOrDie();
665 : }
666 0 : } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
667 : robj *channel, *message;
668 : uint32_t channel_len, message_len;
669 :
670 : /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
671 0 : if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
672 0 : channel_len = ntohl(hdr->data.publish.msg.channel_len);
673 0 : message_len = ntohl(hdr->data.publish.msg.message_len);
674 0 : channel = createStringObject(
675 : (char*)hdr->data.publish.msg.bulk_data,channel_len);
676 0 : message = createStringObject(
677 : (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
678 0 : pubsubPublishMessage(channel,message);
679 0 : decrRefCount(channel);
680 0 : decrRefCount(message);
681 : }
682 : } else {
683 0 : redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
684 : }
685 0 : return 1;
686 : }
687 :
688 : /* This function is called when we detect the link with this node is lost.
689 : We set the node as no longer connected. The Cluster Cron will detect
690 : this connection and will try to get it connected again.
691 :
692 : Instead if the node is a temporary node used to accept a query, we
693 : completely free the node on error. */
694 0 : void handleLinkIOError(clusterLink *link) {
695 0 : freeClusterLink(link);
696 0 : }
697 :
698 : /* Send data. This is handled using a trivial send buffer that gets
699 : * consumed by write(). We don't try to optimize this for speed too much
700 : * as this is a very low traffic channel. */
701 0 : void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
702 0 : clusterLink *link = (clusterLink*) privdata;
703 : ssize_t nwritten;
704 : REDIS_NOTUSED(el);
705 : REDIS_NOTUSED(mask);
706 :
707 0 : nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
708 0 : if (nwritten <= 0) {
709 0 : redisLog(REDIS_NOTICE,"I/O error writing to node link: %s",
710 : strerror(errno));
711 : handleLinkIOError(link);
712 : return;
713 : }
714 0 : link->sndbuf = sdsrange(link->sndbuf,nwritten,-1);
715 0 : if (sdslen(link->sndbuf) == 0)
716 0 : aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
717 : }
718 :
719 : /* Read data. Try to read the first field of the header first to check the
720 : * full length of the packet. When a whole packet is in memory this function
721 : * will call the function to process the packet. And so forth. */
722 0 : void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
723 : char buf[1024];
724 : ssize_t nread;
725 : clusterMsg *hdr;
726 0 : clusterLink *link = (clusterLink*) privdata;
727 : int readlen;
728 : REDIS_NOTUSED(el);
729 : REDIS_NOTUSED(mask);
730 :
731 : again:
732 0 : if (sdslen(link->rcvbuf) >= 4) {
733 0 : hdr = (clusterMsg*) link->rcvbuf;
734 0 : readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf);
735 : } else {
736 0 : readlen = 4 - sdslen(link->rcvbuf);
737 : }
738 :
739 0 : nread = read(fd,buf,readlen);
740 0 : if (nread == -1 && errno == EAGAIN) return; /* Just no data */
741 :
742 0 : if (nread <= 0) {
743 : /* I/O error... */
744 0 : redisLog(REDIS_NOTICE,"I/O error reading from node link: %s",
745 0 : (nread == 0) ? "connection closed" : strerror(errno));
746 : handleLinkIOError(link);
747 : return;
748 : } else {
749 : /* Read data and recast the pointer to the new buffer. */
750 0 : link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
751 0 : hdr = (clusterMsg*) link->rcvbuf;
752 : }
753 :
754 : /* Total length obtained? read the payload now instead of burning
755 : * cycles waiting for a new event to fire. */
756 0 : if (sdslen(link->rcvbuf) == 4) goto again;
757 :
758 : /* Whole packet in memory? We can process it. */
759 0 : if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) {
760 0 : if (clusterProcessPacket(link)) {
761 0 : sdsfree(link->rcvbuf);
762 0 : link->rcvbuf = sdsempty();
763 : }
764 : }
765 : }
766 :
767 : /* Put stuff into the send buffer. */
768 0 : void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
769 0 : if (sdslen(link->sndbuf) == 0 && msglen != 0)
770 0 : aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
771 : clusterWriteHandler,link);
772 :
773 0 : link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
774 0 : }
775 :
776 : /* Send a message to all the nodes with a reliable link */
777 0 : void clusterBroadcastMessage(void *buf, size_t len) {
778 : dictIterator *di;
779 : dictEntry *de;
780 :
781 0 : di = dictGetIterator(server.cluster.nodes);
782 0 : while((de = dictNext(di)) != NULL) {
783 0 : clusterNode *node = dictGetVal(de);
784 :
785 0 : if (!node->link) continue;
786 0 : if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
787 0 : clusterSendMessage(node->link,buf,len);
788 : }
789 0 : dictReleaseIterator(di);
790 0 : }
791 :
792 : /* Build the message header */
793 0 : void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
794 0 : int totlen = 0;
795 :
796 : memset(hdr,0,sizeof(*hdr));
797 0 : hdr->type = htons(type);
798 0 : memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN);
799 0 : memcpy(hdr->myslots,server.cluster.myself->slots,
800 : sizeof(hdr->myslots));
801 0 : memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN);
802 0 : if (server.cluster.myself->slaveof != NULL) {
803 0 : memcpy(hdr->slaveof,server.cluster.myself->slaveof->name,
804 : REDIS_CLUSTER_NAMELEN);
805 : }
806 0 : hdr->port = htons(server.port);
807 0 : hdr->state = server.cluster.state;
808 0 : memset(hdr->configdigest,0,32); /* FIXME: set config digest */
809 :
810 0 : if (type == CLUSTERMSG_TYPE_FAIL) {
811 0 : totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
812 0 : totlen += sizeof(clusterMsgDataFail);
813 : }
814 0 : hdr->totlen = htonl(totlen);
815 : /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */
816 0 : }
817 :
818 : /* Send a PING or PONG packet to the specified node, making sure to add enough
819 : * gossip informations. */
820 0 : void clusterSendPing(clusterLink *link, int type) {
821 : unsigned char buf[1024];
822 0 : clusterMsg *hdr = (clusterMsg*) buf;
823 0 : int gossipcount = 0, totlen;
824 : /* freshnodes is the number of nodes we can still use to populate the
825 : * gossip section of the ping packet. Basically we start with the nodes
826 : * we have in memory minus two (ourself and the node we are sending the
827 : * message to). Every time we add a node we decrement the counter, so when
828 : * it will drop to <= zero we know there is no more gossip info we can
829 : * send. */
830 0 : int freshnodes = dictSize(server.cluster.nodes)-2;
831 :
832 0 : if (link->node && type == CLUSTERMSG_TYPE_PING)
833 0 : link->node->ping_sent = time(NULL);
834 0 : clusterBuildMessageHdr(hdr,type);
835 :
836 : /* Populate the gossip fields */
837 0 : while(freshnodes > 0 && gossipcount < 3) {
838 0 : struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
839 0 : clusterNode *this = dictGetVal(de);
840 : clusterMsgDataGossip *gossip;
841 : int j;
842 :
843 : /* Not interesting to gossip about ourself.
844 : * Nor to send gossip info about HANDSHAKE state nodes (zero info). */
845 0 : if (this == server.cluster.myself ||
846 0 : this->flags & REDIS_NODE_HANDSHAKE) {
847 0 : freshnodes--; /* otherwise we may loop forever. */
848 0 : continue;
849 : }
850 :
851 : /* Check if we already added this node */
852 0 : for (j = 0; j < gossipcount; j++) {
853 0 : if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
854 : REDIS_CLUSTER_NAMELEN) == 0) break;
855 : }
856 0 : if (j != gossipcount) continue;
857 :
858 : /* Add it */
859 0 : freshnodes--;
860 0 : gossip = &(hdr->data.ping.gossip[gossipcount]);
861 0 : memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
862 0 : gossip->ping_sent = htonl(this->ping_sent);
863 0 : gossip->pong_received = htonl(this->pong_received);
864 0 : memcpy(gossip->ip,this->ip,sizeof(this->ip));
865 0 : gossip->port = htons(this->port);
866 0 : gossip->flags = htons(this->flags);
867 0 : gossipcount++;
868 : }
869 0 : totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
870 0 : totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
871 0 : hdr->count = htons(gossipcount);
872 0 : hdr->totlen = htonl(totlen);
873 0 : clusterSendMessage(link,buf,totlen);
874 0 : }
875 :
876 : /* Send a PUBLISH message.
877 : *
878 : * If link is NULL, then the message is broadcasted to the whole cluster. */
879 0 : void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
880 : unsigned char buf[4096], *payload;
881 0 : clusterMsg *hdr = (clusterMsg*) buf;
882 : uint32_t totlen;
883 : uint32_t channel_len, message_len;
884 :
885 0 : channel = getDecodedObject(channel);
886 0 : message = getDecodedObject(message);
887 0 : channel_len = sdslen(channel->ptr);
888 0 : message_len = sdslen(message->ptr);
889 :
890 0 : clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
891 0 : totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
892 0 : totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
893 :
894 0 : hdr->data.publish.msg.channel_len = htonl(channel_len);
895 0 : hdr->data.publish.msg.message_len = htonl(message_len);
896 0 : hdr->totlen = htonl(totlen);
897 :
898 : /* Try to use the local buffer if possible */
899 0 : if (totlen < sizeof(buf)) {
900 0 : payload = buf;
901 : } else {
902 0 : payload = zmalloc(totlen);
903 0 : hdr = (clusterMsg*) payload;
904 : memcpy(payload,hdr,sizeof(hdr));
905 : }
906 0 : memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
907 0 : memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
908 0 : message->ptr,sdslen(message->ptr));
909 :
910 0 : if (link)
911 0 : clusterSendMessage(link,payload,totlen);
912 : else
913 0 : clusterBroadcastMessage(payload,totlen);
914 :
915 0 : decrRefCount(channel);
916 0 : decrRefCount(message);
917 0 : if (payload != buf) zfree(payload);
918 0 : }
919 :
920 : /* Send a FAIL message to all the nodes we are able to contact.
921 : * The FAIL message is sent when we detect that a node is failing
922 : * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this:
923 : * we switch the node state to REDIS_NODE_FAIL and ask all the other
924 : * nodes to do the same ASAP. */
925 0 : void clusterSendFail(char *nodename) {
926 : unsigned char buf[1024];
927 0 : clusterMsg *hdr = (clusterMsg*) buf;
928 :
929 0 : clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
930 0 : memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN);
931 0 : clusterBroadcastMessage(buf,ntohl(hdr->totlen));
932 0 : }
933 :
934 : /* -----------------------------------------------------------------------------
935 : * CLUSTER Pub/Sub support
936 : *
937 : * For now we do very little, just propagating PUBLISH messages across the whole
938 : * cluster. In the future we'll try to get smarter and avoiding propagating those
939 : * messages to hosts without receives for a given channel.
940 : * -------------------------------------------------------------------------- */
941 0 : void clusterPropagatePublish(robj *channel, robj *message) {
942 0 : clusterSendPublish(NULL, channel, message);
943 0 : }
944 :
945 : /* -----------------------------------------------------------------------------
946 : * CLUSTER cron job
947 : * -------------------------------------------------------------------------- */
948 :
949 : /* This is executed 1 time every second */
950 0 : void clusterCron(void) {
951 : dictIterator *di;
952 : dictEntry *de;
953 : int j;
954 0 : time_t min_ping_sent = 0;
955 0 : clusterNode *min_ping_node = NULL;
956 :
957 : /* Check if we have disconnected nodes and reestablish the connection. */
958 0 : di = dictGetIterator(server.cluster.nodes);
959 0 : while((de = dictNext(di)) != NULL) {
960 0 : clusterNode *node = dictGetVal(de);
961 :
962 0 : if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
963 0 : if (node->link == NULL) {
964 : int fd;
965 : clusterLink *link;
966 :
967 0 : fd = anetTcpNonBlockConnect(server.neterr, node->ip,
968 : node->port+REDIS_CLUSTER_PORT_INCR);
969 0 : if (fd == -1) continue;
970 0 : link = createClusterLink(node);
971 0 : link->fd = fd;
972 0 : node->link = link;
973 0 : aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
974 : /* If the node is flagged as MEET, we send a MEET message instead
975 : * of a PING one, to force the receiver to add us in its node
976 : * table. */
977 0 : clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
978 : CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
979 : /* We can clear the flag after the first packet is sent.
980 : * If we'll never receive a PONG, we'll never send new packets
981 : * to this node. Instead after the PONG is received and we
982 : * are no longer in meet/handshake status, we want to send
983 : * normal PING packets. */
984 0 : node->flags &= ~REDIS_NODE_MEET;
985 :
986 0 : redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
987 : }
988 : }
989 0 : dictReleaseIterator(di);
990 :
991 : /* Ping some random node. Check a few random nodes and ping the one with
992 : * the oldest ping_sent time */
993 0 : for (j = 0; j < 5; j++) {
994 0 : de = dictGetRandomKey(server.cluster.nodes);
995 0 : clusterNode *this = dictGetVal(de);
996 :
997 0 : if (this->link == NULL) continue;
998 0 : if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
999 0 : if (min_ping_node == NULL || min_ping_sent > this->ping_sent) {
1000 0 : min_ping_node = this;
1001 0 : min_ping_sent = this->ping_sent;
1002 : }
1003 : }
1004 0 : if (min_ping_node) {
1005 0 : redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name);
1006 0 : clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING);
1007 : }
1008 :
1009 : /* Iterate nodes to check if we need to flag something as failing */
1010 0 : di = dictGetIterator(server.cluster.nodes);
1011 0 : while((de = dictNext(di)) != NULL) {
1012 0 : clusterNode *node = dictGetVal(de);
1013 : int delay;
1014 :
1015 0 : if (node->flags &
1016 : (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
1017 0 : continue;
1018 : /* Check only if we already sent a ping and did not received
1019 : * a reply yet. */
1020 0 : if (node->ping_sent == 0 ||
1021 0 : node->ping_sent <= node->pong_received) continue;
1022 :
1023 0 : delay = time(NULL) - node->pong_received;
1024 0 : if (delay < server.cluster.node_timeout) {
1025 : /* The PFAIL condition can be reversed without external
1026 : * help if it is not transitive (that is, if it does not
1027 : * turn into a FAIL state).
1028 : *
1029 : * The FAIL condition is also reversible if there are no slaves
1030 : * for this host, so no slave election should be in progress.
1031 : *
1032 : * TODO: consider all the implications of resurrecting a
1033 : * FAIL node. */
1034 0 : if (node->flags & REDIS_NODE_PFAIL) {
1035 0 : node->flags &= ~REDIS_NODE_PFAIL;
1036 0 : } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) {
1037 0 : node->flags &= ~REDIS_NODE_FAIL;
1038 0 : clusterUpdateState();
1039 : }
1040 : } else {
1041 : /* Timeout reached. Set the noad se possibly failing if it is
1042 : * not already in this state. */
1043 0 : if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
1044 0 : redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
1045 : node->name);
1046 0 : node->flags |= REDIS_NODE_PFAIL;
1047 : }
1048 : }
1049 : }
1050 0 : dictReleaseIterator(di);
1051 0 : }
1052 :
1053 : /* -----------------------------------------------------------------------------
1054 : * Slots management
1055 : * -------------------------------------------------------------------------- */
1056 :
1057 : /* Set the slot bit and return the old value. */
1058 0 : int clusterNodeSetSlotBit(clusterNode *n, int slot) {
1059 0 : off_t byte = slot/8;
1060 0 : int bit = slot&7;
1061 0 : int old = (n->slots[byte] & (1<<bit)) != 0;
1062 0 : n->slots[byte] |= 1<<bit;
1063 0 : return old;
1064 : }
1065 :
1066 : /* Clear the slot bit and return the old value. */
1067 0 : int clusterNodeClearSlotBit(clusterNode *n, int slot) {
1068 0 : off_t byte = slot/8;
1069 0 : int bit = slot&7;
1070 0 : int old = (n->slots[byte] & (1<<bit)) != 0;
1071 0 : n->slots[byte] &= ~(1<<bit);
1072 0 : return old;
1073 : }
1074 :
1075 : /* Return the slot bit from the cluster node structure. */
1076 0 : int clusterNodeGetSlotBit(clusterNode *n, int slot) {
1077 0 : off_t byte = slot/8;
1078 0 : int bit = slot&7;
1079 0 : return (n->slots[byte] & (1<<bit)) != 0;
1080 : }
1081 :
1082 : /* Add the specified slot to the list of slots that node 'n' will
1083 : * serve. Return REDIS_OK if the operation ended with success.
1084 : * If the slot is already assigned to another instance this is considered
1085 : * an error and REDIS_ERR is returned. */
1086 0 : int clusterAddSlot(clusterNode *n, int slot) {
1087 0 : if (clusterNodeSetSlotBit(n,slot) != 0)
1088 0 : return REDIS_ERR;
1089 0 : server.cluster.slots[slot] = n;
1090 0 : return REDIS_OK;
1091 : }
1092 :
1093 : /* Delete the specified slot marking it as unassigned.
1094 : * Returns REDIS_OK if the slot was assigned, otherwise if the slot was
1095 : * already unassigned REDIS_ERR is returned. */
1096 0 : int clusterDelSlot(int slot) {
1097 0 : clusterNode *n = server.cluster.slots[slot];
1098 :
1099 0 : if (!n) return REDIS_ERR;
1100 0 : redisAssert(clusterNodeClearSlotBit(n,slot) == 1);
1101 0 : server.cluster.slots[slot] = NULL;
1102 0 : return REDIS_OK;
1103 : }
1104 :
1105 : /* -----------------------------------------------------------------------------
1106 : * Cluster state evaluation function
1107 : * -------------------------------------------------------------------------- */
1108 0 : void clusterUpdateState(void) {
1109 0 : int ok = 1;
1110 : int j;
1111 :
1112 0 : for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1113 0 : if (server.cluster.slots[j] == NULL ||
1114 0 : server.cluster.slots[j]->flags & (REDIS_NODE_FAIL))
1115 : {
1116 0 : ok = 0;
1117 0 : break;
1118 : }
1119 : }
1120 0 : if (ok) {
1121 0 : if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) {
1122 0 : server.cluster.state = REDIS_CLUSTER_NEEDHELP;
1123 : } else {
1124 0 : server.cluster.state = REDIS_CLUSTER_OK;
1125 : }
1126 : } else {
1127 0 : server.cluster.state = REDIS_CLUSTER_FAIL;
1128 : }
1129 0 : }
1130 :
1131 : /* -----------------------------------------------------------------------------
1132 : * CLUSTER command
1133 : * -------------------------------------------------------------------------- */
1134 :
1135 0 : sds clusterGenNodesDescription(void) {
1136 0 : sds ci = sdsempty();
1137 : dictIterator *di;
1138 : dictEntry *de;
1139 : int j, start;
1140 :
1141 0 : di = dictGetIterator(server.cluster.nodes);
1142 0 : while((de = dictNext(di)) != NULL) {
1143 0 : clusterNode *node = dictGetVal(de);
1144 :
1145 : /* Node coordinates */
1146 0 : ci = sdscatprintf(ci,"%.40s %s:%d ",
1147 : node->name,
1148 : node->ip,
1149 : node->port);
1150 :
1151 : /* Flags */
1152 0 : if (node->flags == 0) ci = sdscat(ci,"noflags,");
1153 0 : if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
1154 0 : if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
1155 0 : if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
1156 0 : if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
1157 0 : if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
1158 0 : if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
1159 0 : if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
1160 0 : if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
1161 :
1162 : /* Slave of... or just "-" */
1163 0 : if (node->slaveof)
1164 0 : ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
1165 : else
1166 0 : ci = sdscatprintf(ci,"- ");
1167 :
1168 : /* Latency from the POV of this node, link status */
1169 0 : ci = sdscatprintf(ci,"%ld %ld %s",
1170 : (long) node->ping_sent,
1171 : (long) node->pong_received,
1172 0 : (node->link || node->flags & REDIS_NODE_MYSELF) ?
1173 : "connected" : "disconnected");
1174 :
1175 : /* Slots served by this instance */
1176 0 : start = -1;
1177 0 : for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1178 : int bit;
1179 :
1180 0 : if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
1181 0 : if (start == -1) start = j;
1182 : }
1183 0 : if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
1184 0 : if (j == REDIS_CLUSTER_SLOTS-1) j++;
1185 :
1186 0 : if (start == j-1) {
1187 0 : ci = sdscatprintf(ci," %d",start);
1188 : } else {
1189 0 : ci = sdscatprintf(ci," %d-%d",start,j-1);
1190 : }
1191 0 : start = -1;
1192 : }
1193 : }
1194 :
1195 : /* Just for MYSELF node we also dump info about slots that
1196 : * we are migrating to other instances or importing from other
1197 : * instances. */
1198 0 : if (node->flags & REDIS_NODE_MYSELF) {
1199 0 : for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1200 0 : if (server.cluster.migrating_slots_to[j]) {
1201 0 : ci = sdscatprintf(ci," [%d->-%.40s]",j,
1202 0 : server.cluster.migrating_slots_to[j]->name);
1203 0 : } else if (server.cluster.importing_slots_from[j]) {
1204 0 : ci = sdscatprintf(ci," [%d-<-%.40s]",j,
1205 0 : server.cluster.importing_slots_from[j]->name);
1206 : }
1207 : }
1208 : }
1209 0 : ci = sdscatlen(ci,"\n",1);
1210 : }
1211 0 : dictReleaseIterator(di);
1212 0 : return ci;
1213 : }
1214 :
1215 0 : int getSlotOrReply(redisClient *c, robj *o) {
1216 : long long slot;
1217 :
1218 0 : if (getLongLongFromObject(o,&slot) != REDIS_OK ||
1219 0 : slot < 0 || slot > REDIS_CLUSTER_SLOTS)
1220 : {
1221 0 : addReplyError(c,"Invalid or out of range slot");
1222 0 : return -1;
1223 : }
1224 0 : return (int) slot;
1225 : }
1226 :
1227 0 : void clusterCommand(redisClient *c) {
1228 0 : if (server.cluster_enabled == 0) {
1229 0 : addReplyError(c,"This instance has cluster support disabled");
1230 0 : return;
1231 : }
1232 :
1233 0 : if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
1234 : clusterNode *n;
1235 : struct sockaddr_in sa;
1236 : long port;
1237 :
1238 : /* Perform sanity checks on IP/port */
1239 0 : if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) {
1240 0 : addReplyError(c,"Invalid IP address in MEET");
1241 0 : return;
1242 : }
1243 0 : if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK ||
1244 0 : port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR))
1245 : {
1246 0 : addReplyError(c,"Invalid TCP port specified");
1247 0 : return;
1248 : }
1249 :
1250 : /* Finally add the node to the cluster with a random name, this
1251 : * will get fixed in the first handshake (ping/pong). */
1252 0 : n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
1253 0 : strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip));
1254 0 : n->port = port;
1255 0 : clusterAddNode(n);
1256 0 : addReply(c,shared.ok);
1257 0 : } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1258 : robj *o;
1259 0 : sds ci = clusterGenNodesDescription();
1260 :
1261 0 : o = createObject(REDIS_STRING,ci);
1262 0 : addReplyBulk(c,o);
1263 0 : decrRefCount(o);
1264 0 : } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
1265 0 : !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
1266 : {
1267 : /* CLUSTER ADDSLOTS <slot> [slot] ... */
1268 : /* CLUSTER DELSLOTS <slot> [slot] ... */
1269 : int j, slot;
1270 0 : unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
1271 0 : int del = !strcasecmp(c->argv[1]->ptr,"delslots");
1272 :
1273 : memset(slots,0,REDIS_CLUSTER_SLOTS);
1274 : /* Check that all the arguments are parsable and that all the
1275 : * slots are not already busy. */
1276 0 : for (j = 2; j < c->argc; j++) {
1277 0 : if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
1278 0 : zfree(slots);
1279 0 : return;
1280 : }
1281 0 : if (del && server.cluster.slots[slot] == NULL) {
1282 0 : addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
1283 0 : zfree(slots);
1284 0 : return;
1285 0 : } else if (!del && server.cluster.slots[slot]) {
1286 0 : addReplyErrorFormat(c,"Slot %d is already busy", slot);
1287 0 : zfree(slots);
1288 0 : return;
1289 : }
1290 0 : if (slots[slot]++ == 1) {
1291 0 : addReplyErrorFormat(c,"Slot %d specified multiple times",
1292 : (int)slot);
1293 0 : zfree(slots);
1294 0 : return;
1295 : }
1296 : }
1297 0 : for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1298 0 : if (slots[j]) {
1299 : int retval;
1300 :
1301 : /* If this slot was set as importing we can clear this
1302 : * state as now we are the real owner of the slot. */
1303 0 : if (server.cluster.importing_slots_from[j])
1304 0 : server.cluster.importing_slots_from[j] = NULL;
1305 :
1306 0 : retval = del ? clusterDelSlot(j) :
1307 0 : clusterAddSlot(server.cluster.myself,j);
1308 0 : redisAssertWithInfo(c,NULL,retval == REDIS_OK);
1309 : }
1310 : }
1311 0 : zfree(slots);
1312 0 : clusterUpdateState();
1313 0 : clusterSaveConfigOrDie();
1314 0 : addReply(c,shared.ok);
1315 0 : } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
1316 : /* SETSLOT 10 MIGRATING <node ID> */
1317 : /* SETSLOT 10 IMPORTING <node ID> */
1318 : /* SETSLOT 10 STABLE */
1319 : /* SETSLOT 10 NODE <node ID> */
1320 : int slot;
1321 : clusterNode *n;
1322 :
1323 0 : if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;
1324 :
1325 0 : if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
1326 0 : if (server.cluster.slots[slot] != server.cluster.myself) {
1327 0 : addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
1328 0 : return;
1329 : }
1330 0 : if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1331 0 : addReplyErrorFormat(c,"I don't know about node %s",
1332 0 : (char*)c->argv[4]->ptr);
1333 0 : return;
1334 : }
1335 0 : server.cluster.migrating_slots_to[slot] = n;
1336 0 : } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
1337 0 : if (server.cluster.slots[slot] == server.cluster.myself) {
1338 0 : addReplyErrorFormat(c,
1339 : "I'm already the owner of hash slot %u",slot);
1340 0 : return;
1341 : }
1342 0 : if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
1343 0 : addReplyErrorFormat(c,"I don't know about node %s",
1344 0 : (char*)c->argv[3]->ptr);
1345 0 : return;
1346 : }
1347 0 : server.cluster.importing_slots_from[slot] = n;
1348 0 : } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
1349 : /* CLUSTER SETSLOT <SLOT> STABLE */
1350 0 : server.cluster.importing_slots_from[slot] = NULL;
1351 0 : server.cluster.migrating_slots_to[slot] = NULL;
1352 0 : } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
1353 : /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
1354 0 : clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
1355 :
1356 0 : if (!n) addReplyErrorFormat(c,"Unknown node %s",
1357 0 : (char*)c->argv[4]->ptr);
1358 : /* If this hash slot was served by 'myself' before to switch
1359 : * make sure there are no longer local keys for this hash slot. */
1360 0 : if (server.cluster.slots[slot] == server.cluster.myself &&
1361 0 : n != server.cluster.myself)
1362 : {
1363 : int numkeys;
1364 : robj **keys;
1365 :
1366 0 : keys = zmalloc(sizeof(robj*)*1);
1367 0 : numkeys = GetKeysInSlot(slot, keys, 1);
1368 0 : zfree(keys);
1369 0 : if (numkeys != 0) {
1370 0 : addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot);
1371 0 : return;
1372 : }
1373 : }
1374 : /* If this node was the slot owner and the slot was marked as
1375 : * migrating, assigning the slot to another node will clear
1376 : * the migratig status. */
1377 0 : if (server.cluster.slots[slot] == server.cluster.myself &&
1378 0 : server.cluster.migrating_slots_to[slot])
1379 0 : server.cluster.migrating_slots_to[slot] = NULL;
1380 :
1381 : /* If this node was importing this slot, assigning the slot to
1382 : * itself also clears the importing status. */
1383 0 : if (n == server.cluster.myself && server.cluster.importing_slots_from[slot])
1384 0 : server.cluster.importing_slots_from[slot] = NULL;
1385 :
1386 0 : clusterDelSlot(slot);
1387 0 : clusterAddSlot(n,slot);
1388 : } else {
1389 0 : addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
1390 0 : return;
1391 : }
1392 0 : clusterSaveConfigOrDie();
1393 0 : addReply(c,shared.ok);
1394 0 : } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1395 0 : char *statestr[] = {"ok","fail","needhelp"};
1396 0 : int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
1397 : int j;
1398 :
1399 0 : for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
1400 0 : clusterNode *n = server.cluster.slots[j];
1401 :
1402 0 : if (n == NULL) continue;
1403 0 : slots_assigned++;
1404 0 : if (n->flags & REDIS_NODE_FAIL) {
1405 0 : slots_fail++;
1406 0 : } else if (n->flags & REDIS_NODE_PFAIL) {
1407 0 : slots_pfail++;
1408 : } else {
1409 0 : slots_ok++;
1410 : }
1411 : }
1412 :
1413 0 : sds info = sdscatprintf(sdsempty(),
1414 : "cluster_state:%s\r\n"
1415 : "cluster_slots_assigned:%d\r\n"
1416 : "cluster_slots_ok:%d\r\n"
1417 : "cluster_slots_pfail:%d\r\n"
1418 : "cluster_slots_fail:%d\r\n"
1419 : "cluster_known_nodes:%lu\r\n"
1420 0 : , statestr[server.cluster.state],
1421 : slots_assigned,
1422 : slots_ok,
1423 : slots_pfail,
1424 : slots_fail,
1425 0 : dictSize(server.cluster.nodes)
1426 0 : );
1427 0 : addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
1428 : (unsigned long)sdslen(info)));
1429 0 : addReplySds(c,info);
1430 0 : addReply(c,shared.crlf);
1431 0 : } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1432 0 : sds key = c->argv[2]->ptr;
1433 :
1434 0 : addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
1435 0 : } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1436 : long long maxkeys, slot;
1437 : unsigned int numkeys, j;
1438 : robj **keys;
1439 :
1440 0 : if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK)
1441 : return;
1442 0 : if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK)
1443 : return;
1444 0 : if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 ||
1445 0 : maxkeys > 1024*1024) {
1446 0 : addReplyError(c,"Invalid slot or number of keys");
1447 0 : return;
1448 : }
1449 :
1450 0 : keys = zmalloc(sizeof(robj*)*maxkeys);
1451 0 : numkeys = GetKeysInSlot(slot, keys, maxkeys);
1452 0 : addReplyMultiBulkLen(c,numkeys);
1453 0 : for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
1454 0 : zfree(keys);
1455 : } else {
1456 0 : addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
1457 : }
1458 : }
1459 :
1460 : /* -----------------------------------------------------------------------------
1461 : * DUMP, RESTORE and MIGRATE commands
1462 : * -------------------------------------------------------------------------- */
1463 :
1464 : /* Generates a DUMP-format representation of the object 'o', adding it to the
1465 : * io stream pointed by 'rio'. This function can't fail. */
1466 4 : void createDumpPayload(rio *payload, robj *o) {
1467 : unsigned char buf[2];
1468 : uint64_t crc;
1469 :
1470 : /* Serialize the object in a RDB-like format. It consist of an object type
1471 : * byte followed by the serialized object. This is understood by RESTORE. */
1472 4 : rioInitWithBuffer(payload,sdsempty());
1473 4 : redisAssert(rdbSaveObjectType(payload,o));
1474 4 : redisAssert(rdbSaveObject(payload,o));
1475 :
1476 : /* Write the footer, this is how it looks like:
1477 : * ----------------+---------------------+---------------+
1478 : * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
1479 : * ----------------+---------------------+---------------+
1480 : * RDB version and CRC are both in little endian.
1481 : */
1482 :
1483 : /* RDB version */
1484 4 : buf[0] = REDIS_RDB_VERSION & 0xff;
1485 4 : buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
1486 4 : payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
1487 :
1488 : /* CRC64 */
1489 8 : crc = crc64((unsigned char*)payload->io.buffer.ptr,
1490 : sdslen(payload->io.buffer.ptr));
1491 : memrev64ifbe(&crc);
1492 4 : payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
1493 4 : }
1494 :
1495 : /* Verify that the RDB version of the dump payload matches the one of this Redis
1496 : * instance and that the checksum is ok.
1497 : * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
1498 : * is returned. */
1499 3 : int verifyDumpPayload(unsigned char *p, size_t len) {
1500 : unsigned char *footer;
1501 : uint16_t rdbver;
1502 : uint64_t crc;
1503 :
1504 : /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
1505 3 : if (len < 10) return REDIS_ERR;
1506 3 : footer = p+(len-10);
1507 :
1508 : /* Verify RDB version */
1509 3 : rdbver = (footer[1] << 8) | footer[0];
1510 3 : if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
1511 :
1512 : /* Verify CRC64 */
1513 3 : crc = crc64(p,len-8);
1514 : memrev64ifbe(&crc);
1515 3 : return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
1516 : }
1517 :
1518 : /* DUMP keyname
1519 : * DUMP is actually not used by Redis Cluster but it is the obvious
1520 : * complement of RESTORE and can be useful for different applications. */
1521 3 : void dumpCommand(redisClient *c) {
1522 : robj *o, *dumpobj;
1523 : rio payload;
1524 :
1525 : /* Check if the key is here. */
1526 3 : if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
1527 1 : addReply(c,shared.nullbulk);
1528 1 : return;
1529 : }
1530 :
1531 : /* Create the DUMP encoded representation. */
1532 2 : createDumpPayload(&payload,o);
1533 :
1534 : /* Transfer to the client */
1535 2 : dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
1536 2 : addReplyBulk(c,dumpobj);
1537 2 : decrRefCount(dumpobj);
1538 2 : return;
1539 : }
1540 :
1541 : /* RESTORE key ttl serialized-value */
1542 4 : void restoreCommand(redisClient *c) {
1543 : long ttl;
1544 : rio payload;
1545 : int type;
1546 : robj *obj;
1547 :
1548 : /* Make sure this key does not already exist here... */
1549 4 : if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
1550 1 : addReplyError(c,"Target key name is busy.");
1551 1 : return;
1552 : }
1553 :
1554 : /* Check if the TTL value makes sense */
1555 3 : if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) {
1556 : return;
1557 3 : } else if (ttl < 0) {
1558 0 : addReplyError(c,"Invalid TTL value, must be >= 0");
1559 0 : return;
1560 : }
1561 :
1562 : /* Verify RDB version and data checksum. */
1563 6 : if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
1564 0 : addReplyError(c,"DUMP payload version or checksum are wrong");
1565 0 : return;
1566 : }
1567 :
1568 3 : rioInitWithBuffer(&payload,c->argv[3]->ptr);
1569 6 : if (((type = rdbLoadObjectType(&payload)) == -1) ||
1570 3 : ((obj = rdbLoadObject(type,&payload)) == NULL))
1571 : {
1572 0 : addReplyError(c,"Bad data format");
1573 0 : return;
1574 : }
1575 :
1576 : /* Create the key and set the TTL if any */
1577 3 : dbAdd(c->db,c->argv[1],obj);
1578 3 : if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
1579 3 : signalModifiedKey(c->db,c->argv[1]);
1580 3 : addReply(c,shared.ok);
1581 3 : server.dirty++;
1582 : }
1583 :
1584 : /* MIGRATE host port key dbid timeout */
1585 2 : void migrateCommand(redisClient *c) {
1586 : int fd;
1587 : long timeout;
1588 : long dbid;
1589 : long long ttl;
1590 : robj *o;
1591 : rio cmd, payload;
1592 :
1593 : /* Sanity check */
1594 2 : if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
1595 : return;
1596 2 : if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
1597 : return;
1598 2 : if (timeout <= 0) timeout = 1;
1599 :
1600 : /* Check if the key is here. If not we reply with success as there is
1601 : * nothing to migrate (for instance the key expired in the meantime), but
1602 : * we include such information in the reply string. */
1603 2 : if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
1604 0 : addReplySds(c,sdsnew("+NOKEY\r\n"));
1605 0 : return;
1606 : }
1607 :
1608 : /* Connect */
1609 4 : fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
1610 2 : atoi(c->argv[2]->ptr));
1611 2 : if (fd == -1) {
1612 0 : addReplyErrorFormat(c,"Can't connect to target node: %s",
1613 : server.neterr);
1614 0 : return;
1615 : }
1616 2 : if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
1617 0 : addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
1618 0 : return;
1619 : }
1620 :
1621 : /* Create RESTORE payload and generate the protocol to call the command. */
1622 2 : rioInitWithBuffer(&cmd,sdsempty());
1623 2 : redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
1624 2 : redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
1625 2 : redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
1626 :
1627 2 : ttl = getExpire(c->db,c->argv[3])-mstime();
1628 2 : if (ttl < 1) ttl = 1;
1629 2 : redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
1630 2 : redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
1631 2 : redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
1632 4 : redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
1633 2 : redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
1634 :
1635 : /* Finally the last argument that is the serailized object payload
1636 : * in the DUMP format. */
1637 2 : createDumpPayload(&payload,o);
1638 4 : redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
1639 : sdslen(payload.io.buffer.ptr)));
1640 2 : sdsfree(payload.io.buffer.ptr);
1641 :
1642 : /* Tranfer the query to the other node in 64K chunks. */
1643 : {
1644 2 : sds buf = cmd.io.buffer.ptr;
1645 2 : size_t pos = 0, towrite;
1646 2 : int nwritten = 0;
1647 :
1648 6 : while ((towrite = sdslen(buf)-pos) > 0) {
1649 2 : towrite = (towrite > (64*1024) ? (64*1024) : towrite);
1650 2 : nwritten = syncWrite(fd,buf+pos,towrite,timeout);
1651 2 : if (nwritten != (signed)towrite) goto socket_wr_err;
1652 2 : pos += nwritten;
1653 : }
1654 : }
1655 :
1656 : /* Read back the reply. */
1657 : {
1658 : char buf1[1024];
1659 : char buf2[1024];
1660 :
1661 : /* Read the two replies */
1662 2 : if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
1663 : goto socket_rd_err;
1664 1 : if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
1665 : goto socket_rd_err;
1666 1 : if (buf1[0] == '-' || buf2[0] == '-') {
1667 0 : addReplyErrorFormat(c,"Target instance replied with error: %s",
1668 0 : (buf1[0] == '-') ? buf1+1 : buf2+1);
1669 : } else {
1670 : robj *aux;
1671 :
1672 1 : dbDelete(c->db,c->argv[3]);
1673 1 : signalModifiedKey(c->db,c->argv[3]);
1674 1 : addReply(c,shared.ok);
1675 1 : server.dirty++;
1676 :
1677 : /* Translate MIGRATE as DEL for replication/AOF. */
1678 1 : aux = createStringObject("DEL",3);
1679 1 : rewriteClientCommandVector(c,2,aux,c->argv[3]);
1680 1 : decrRefCount(aux);
1681 : }
1682 : }
1683 :
1684 1 : sdsfree(cmd.io.buffer.ptr);
1685 1 : close(fd);
1686 1 : return;
1687 :
1688 : socket_wr_err:
1689 0 : addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
1690 0 : sdsfree(cmd.io.buffer.ptr);
1691 0 : close(fd);
1692 0 : return;
1693 :
1694 : socket_rd_err:
1695 1 : addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
1696 1 : sdsfree(cmd.io.buffer.ptr);
1697 1 : close(fd);
1698 1 : return;
1699 : }
1700 :
1701 : /* The ASKING command is required after a -ASK redirection.
1702 : * The client should issue ASKING before to actualy send the command to
1703 : * the target instance. See the Redis Cluster specification for more
1704 : * information. */
1705 0 : void askingCommand(redisClient *c) {
1706 0 : if (server.cluster_enabled == 0) {
1707 0 : addReplyError(c,"This instance has cluster support disabled");
1708 0 : return;
1709 : }
1710 0 : c->flags |= REDIS_ASKING;
1711 0 : addReply(c,shared.ok);
1712 : }
1713 :
1714 : /* -----------------------------------------------------------------------------
1715 : * Cluster functions related to serving / redirecting clients
1716 : * -------------------------------------------------------------------------- */
1717 :
1718 : /* Return the pointer to the cluster node that is able to serve the query
1719 : * as all the keys belong to hash slots for which the node is in charge.
1720 : *
1721 : * If the returned node should be used only for this request, the *ask
1722 : * integer is set to '1', otherwise to '0'. This is used in order to
1723 : * let the caller know if we should reply with -MOVED or with -ASK.
1724 : *
1725 : * If the request contains more than a single key NULL is returned,
1726 : * however a request with more then a key argument where the key is always
1727 : * the same is valid, like in: RPOPLPUSH mylist mylist.*/
1728 0 : clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
1729 0 : clusterNode *n = NULL;
1730 0 : robj *firstkey = NULL;
1731 : multiState *ms, _ms;
1732 : multiCmd mc;
1733 0 : int i, slot = 0;
1734 :
1735 : /* We handle all the cases as if they were EXEC commands, so we have
1736 : * a common code path for everything */
1737 0 : if (cmd->proc == execCommand) {
1738 : /* If REDIS_MULTI flag is not set EXEC is just going to return an
1739 : * error. */
1740 0 : if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
1741 0 : ms = &c->mstate;
1742 : } else {
1743 : /* In order to have a single codepath create a fake Multi State
1744 : * structure if the client is not in MULTI/EXEC state, this way
1745 : * we have a single codepath below. */
1746 0 : ms = &_ms;
1747 0 : _ms.commands = &mc;
1748 0 : _ms.count = 1;
1749 0 : mc.argv = argv;
1750 0 : mc.argc = argc;
1751 0 : mc.cmd = cmd;
1752 : }
1753 :
1754 : /* Check that all the keys are the same key, and get the slot and
1755 : * node for this key. */
1756 0 : for (i = 0; i < ms->count; i++) {
1757 : struct redisCommand *mcmd;
1758 : robj **margv;
1759 : int margc, *keyindex, numkeys, j;
1760 :
1761 0 : mcmd = ms->commands[i].cmd;
1762 0 : margc = ms->commands[i].argc;
1763 0 : margv = ms->commands[i].argv;
1764 :
1765 0 : keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
1766 : REDIS_GETKEYS_ALL);
1767 0 : for (j = 0; j < numkeys; j++) {
1768 0 : if (firstkey == NULL) {
1769 : /* This is the first key we see. Check what is the slot
1770 : * and node. */
1771 0 : firstkey = margv[keyindex[j]];
1772 :
1773 0 : slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
1774 0 : n = server.cluster.slots[slot];
1775 0 : redisAssertWithInfo(c,firstkey,n != NULL);
1776 : } else {
1777 : /* If it is not the first key, make sure it is exactly
1778 : * the same key as the first we saw. */
1779 0 : if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
1780 0 : decrRefCount(firstkey);
1781 0 : getKeysFreeResult(keyindex);
1782 0 : return NULL;
1783 : }
1784 : }
1785 : }
1786 0 : getKeysFreeResult(keyindex);
1787 : }
1788 0 : if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
1789 : /* No key at all in command? then we can serve the request
1790 : * without redirections. */
1791 0 : if (n == NULL) return server.cluster.myself;
1792 0 : if (hashslot) *hashslot = slot;
1793 : /* This request is about a slot we are migrating into another instance?
1794 : * Then we need to check if we have the key. If we have it we can reply.
1795 : * If instead is a new key, we pass the request to the node that is
1796 : * receiving the slot. */
1797 0 : if (n == server.cluster.myself &&
1798 0 : server.cluster.migrating_slots_to[slot] != NULL)
1799 : {
1800 0 : if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
1801 0 : if (ask) *ask = 1;
1802 0 : return server.cluster.migrating_slots_to[slot];
1803 : }
1804 : }
1805 : /* Handle the case in which we are receiving this hash slot from
1806 : * another instance, so we'll accept the query even if in the table
1807 : * it is assigned to a different node, but only if the client
1808 : * issued an ASKING command before. */
1809 0 : if (server.cluster.importing_slots_from[slot] != NULL &&
1810 0 : c->flags & REDIS_ASKING) {
1811 0 : return server.cluster.myself;
1812 : }
1813 : /* It's not a -ASK case. Base case: just return the right node. */
1814 0 : return n;
1815 : }
|