LCOV - code coverage report
Current view: directory - redis/src - replication.c (source / functions) Found Hit Coverage
Test: redis.info Lines: 321 161 50.2 %
Date: 2012-04-04 Functions: 12 9 75.0 %
Colors: not hit hit

       1                 : #include "redis.h"
       2                 : 
       3                 : #include <sys/time.h>
       4                 : #include <unistd.h>
       5                 : #include <fcntl.h>
       6                 : #include <sys/stat.h>
       7                 : 
       8                 : /* ---------------------------------- MASTER -------------------------------- */
       9                 : 
      10          141387 : void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
      11                 :     listNode *ln;
      12                 :     listIter li;
      13                 :     int j;
      14                 : 
      15          141387 :     listRewind(slaves,&li);
      16          554123 :     while((ln = listNext(&li))) {
      17          271349 :         redisClient *slave = ln->value;
      18                 : 
      19                 :         /* Don't feed slaves that are still waiting for BGSAVE to start */
      20          271349 :         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
      21                 : 
      22                 :         /* Feed slaves that are waiting for the initial SYNC (so these commands
      23                 :          * are queued in the output buffer until the intial SYNC completes),
      24                 :          * or are already in sync with the master. */
      25          271349 :         if (slave->slaveseldb != dictid) {
      26                 :             robj *selectcmd;
      27                 : 
      28           28681 :             if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
      29            9548 :                 selectcmd = shared.select[dictid];
      30            9548 :                 incrRefCount(selectcmd);
      31                 :             } else {
      32           19133 :                 selectcmd = createObject(REDIS_STRING,
      33           19133 :                     sdscatprintf(sdsempty(),"select %d\r\n",dictid));
      34                 :             }
      35           28681 :             addReply(slave,selectcmd);
      36           28681 :             decrRefCount(selectcmd);
      37           28681 :             slave->slaveseldb = dictid;
      38                 :         }
      39          271349 :         addReplyMultiBulkLen(slave,argc);
      40          271349 :         for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
      41                 :     }
      42          141387 : }
      43                 : 
      44               0 : void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
      45                 :     listNode *ln;
      46                 :     listIter li;
      47                 :     int j, port;
      48               0 :     sds cmdrepr = sdsnew("+");
      49                 :     robj *cmdobj;
      50                 :     char ip[32];
      51                 :     struct timeval tv;
      52                 : 
      53               0 :     gettimeofday(&tv,NULL);
      54               0 :     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
      55               0 :     if (c->flags & REDIS_LUA_CLIENT) {
      56               0 :         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ", dictid);
      57                 :     } else {
      58               0 :         anetPeerToString(c->fd,ip,&port);
      59               0 :         cmdrepr = sdscatprintf(cmdrepr,"[%d %s:%d] ", dictid,ip,port);
      60                 :     }
      61                 : 
      62               0 :     for (j = 0; j < argc; j++) {
      63               0 :         if (argv[j]->encoding == REDIS_ENCODING_INT) {
      64               0 :             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
      65                 :         } else {
      66               0 :             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
      67               0 :                         sdslen(argv[j]->ptr));
      68                 :         }
      69               0 :         if (j != argc-1)
      70               0 :             cmdrepr = sdscatlen(cmdrepr," ",1);
      71                 :     }
      72               0 :     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
      73               0 :     cmdobj = createObject(REDIS_STRING,cmdrepr);
      74                 : 
      75               0 :     listRewind(monitors,&li);
      76               0 :     while((ln = listNext(&li))) {
      77               0 :         redisClient *monitor = ln->value;
      78               0 :         addReply(monitor,cmdobj);
      79                 :     }
      80               0 :     decrRefCount(cmdobj);
      81               0 : }
      82                 : 
      83               9 : void syncCommand(redisClient *c) {
      84                 :     /* ignore SYNC if aleady slave or in monitor mode */
      85               9 :     if (c->flags & REDIS_SLAVE) return;
      86                 : 
      87                 :     /* Refuse SYNC requests if we are a slave but the link with our master
      88                 :      * is not ok... */
      89               9 :     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
      90               0 :         addReplyError(c,"Can't SYNC while not connected with my master");
      91               0 :         return;
      92                 :     }
      93                 : 
      94                 :     /* SYNC can't be issued when the server has pending data to send to
      95                 :      * the client about already issued commands. We need a fresh reply
      96                 :      * buffer registering the differences between the BGSAVE and the current
      97                 :      * dataset, so that we can copy to other slaves if needed. */
      98               9 :     if (listLength(c->reply) != 0) {
      99               0 :         addReplyError(c,"SYNC is invalid with pending input");
     100               0 :         return;
     101                 :     }
     102                 : 
     103               9 :     redisLog(REDIS_NOTICE,"Slave ask for synchronization");
     104                 :     /* Here we need to check if there is a background saving operation
     105                 :      * in progress, or if it is required to start one */
     106               9 :     if (server.rdb_child_pid != -1) {
     107                 :         /* Ok a background save is in progress. Let's check if it is a good
     108                 :          * one for replication, i.e. if there is another slave that is
     109                 :          * registering differences since the server forked to save */
     110                 :         redisClient *slave;
     111                 :         listNode *ln;
     112                 :         listIter li;
     113                 : 
     114               2 :         listRewind(server.slaves,&li);
     115               4 :         while((ln = listNext(&li))) {
     116               2 :             slave = ln->value;
     117               2 :             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
     118                 :         }
     119               2 :         if (ln) {
     120                 :             /* Perfect, the server is already registering differences for
     121                 :              * another slave. Set the right state, and copy the buffer. */
     122               2 :             copyClientOutputBuffer(c,slave);
     123               2 :             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
     124               2 :             redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
     125                 :         } else {
     126                 :             /* No way, we need to wait for the next BGSAVE in order to
     127                 :              * register differences */
     128               0 :             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
     129               0 :             redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
     130                 :         }
     131                 :     } else {
     132                 :         /* Ok we don't have a BGSAVE in progress, let's start one */
     133               7 :         redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
     134               7 :         if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
     135               0 :             redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
     136               0 :             addReplyError(c,"Unable to perform background save");
     137               0 :             return;
     138                 :         }
     139               7 :         c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
     140                 :     }
     141               9 :     c->repldbfd = -1;
     142               9 :     c->flags |= REDIS_SLAVE;
     143               9 :     c->slaveseldb = 0;
     144               9 :     listAddNodeTail(server.slaves,c);
     145               9 :     return;
     146                 : }
     147                 : 
     148            2451 : void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
     149            2451 :     redisClient *slave = privdata;
     150                 :     REDIS_NOTUSED(el);
     151                 :     REDIS_NOTUSED(mask);
     152                 :     char buf[REDIS_IOBUF_LEN];
     153                 :     ssize_t nwritten, buflen;
     154                 : 
     155            2451 :     if (slave->repldboff == 0) {
     156                 :         /* Write the bulk write count before to transfer the DB. In theory here
     157                 :          * we don't know how much room there is in the output buffer of the
     158                 :          * socket, but in pratice SO_SNDLOWAT (the minimum count for output
     159                 :          * operations) will never be smaller than the few bytes we need. */
     160                 :         sds bulkcount;
     161                 : 
     162               9 :         bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
     163                 :             slave->repldbsize);
     164              18 :         if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
     165                 :         {
     166               0 :             sdsfree(bulkcount);
     167               0 :             freeClient(slave);
     168               0 :             return;
     169                 :         }
     170               9 :         sdsfree(bulkcount);
     171                 :     }
     172            2451 :     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
     173            4902 :     buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
     174            2451 :     if (buflen <= 0) {
     175               0 :         redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
     176               0 :             (buflen == 0) ? "premature EOF" : strerror(errno));
     177               0 :         freeClient(slave);
     178               0 :         return;
     179                 :     }
     180            2451 :     if ((nwritten = write(fd,buf,buflen)) == -1) {
     181               0 :         redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
     182                 :             strerror(errno));
     183               0 :         freeClient(slave);
     184               0 :         return;
     185                 :     }
     186            2451 :     slave->repldboff += nwritten;
     187            2451 :     if (slave->repldboff == slave->repldbsize) {
     188               9 :         close(slave->repldbfd);
     189               9 :         slave->repldbfd = -1;
     190               9 :         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
     191               9 :         slave->replstate = REDIS_REPL_ONLINE;
     192               9 :         if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
     193                 :             sendReplyToClient, slave) == AE_ERR) {
     194               0 :             freeClient(slave);
     195               0 :             return;
     196                 :         }
     197               9 :         addReplySds(slave,sdsempty());
     198               9 :         redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
     199                 :     }
     200                 : }
     201                 : 
     202                 : /* This function is called at the end of every backgrond saving.
     203                 :  * The argument bgsaveerr is REDIS_OK if the background saving succeeded
     204                 :  * otherwise REDIS_ERR is passed to the function.
     205                 :  *
     206                 :  * The goal of this function is to handle slaves waiting for a successful
     207                 :  * background saving in order to perform non-blocking synchronization. */
     208               8 : void updateSlavesWaitingBgsave(int bgsaveerr) {
     209                 :     listNode *ln;
     210               8 :     int startbgsave = 0;
     211                 :     listIter li;
     212                 : 
     213               8 :     listRewind(server.slaves,&li);
     214              25 :     while((ln = listNext(&li))) {
     215               9 :         redisClient *slave = ln->value;
     216                 : 
     217               9 :         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
     218               0 :             startbgsave = 1;
     219               0 :             slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
     220               9 :         } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
     221                 :             struct redis_stat buf;
     222                 : 
     223               9 :             if (bgsaveerr != REDIS_OK) {
     224               0 :                 freeClient(slave);
     225               0 :                 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
     226               0 :                 continue;
     227                 :             }
     228              27 :             if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
     229               9 :                 redis_fstat(slave->repldbfd,&buf) == -1) {
     230               0 :                 freeClient(slave);
     231               0 :                 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
     232               0 :                 continue;
     233                 :             }
     234               9 :             slave->repldboff = 0;
     235               9 :             slave->repldbsize = buf.st_size;
     236               9 :             slave->replstate = REDIS_REPL_SEND_BULK;
     237               9 :             aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
     238               9 :             if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
     239               0 :                 freeClient(slave);
     240               0 :                 continue;
     241                 :             }
     242                 :         }
     243                 :     }
     244               8 :     if (startbgsave) {
     245               0 :         if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
     246                 :             listIter li;
     247                 : 
     248               0 :             listRewind(server.slaves,&li);
     249               0 :             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
     250               0 :             while((ln = listNext(&li))) {
     251               0 :                 redisClient *slave = ln->value;
     252                 : 
     253               0 :                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
     254               0 :                     freeClient(slave);
     255                 :             }
     256                 :         }
     257                 :     }
     258               8 : }
     259                 : 
     260                 : /* ----------------------------------- SLAVE -------------------------------- */
     261                 : 
     262                 : /* Abort the async download of the bulk dataset while SYNC-ing with master */
     263               0 : void replicationAbortSyncTransfer(void) {
     264               0 :     redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
     265                 : 
     266               0 :     aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
     267               0 :     close(server.repl_transfer_s);
     268               0 :     close(server.repl_transfer_fd);
     269               0 :     unlink(server.repl_transfer_tmpfile);
     270               0 :     zfree(server.repl_transfer_tmpfile);
     271               0 :     server.repl_state = REDIS_REPL_CONNECT;
     272               0 : }
     273                 : 
     274                 : /* Asynchronously read the SYNC payload we receive from a master */
     275            9796 : void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     276                 :     char buf[4096];
     277                 :     ssize_t nread, readlen;
     278                 :     REDIS_NOTUSED(el);
     279                 :     REDIS_NOTUSED(privdata);
     280                 :     REDIS_NOTUSED(mask);
     281                 : 
     282                 :     /* If repl_transfer_left == -1 we still have to read the bulk length
     283                 :      * from the master reply. */
     284            9796 :     if (server.repl_transfer_left == -1) {
     285               9 :         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
     286               0 :             redisLog(REDIS_WARNING,
     287                 :                 "I/O error reading bulk count from MASTER: %s",
     288                 :                 strerror(errno));
     289               0 :             goto error;
     290                 :         }
     291                 : 
     292               9 :         if (buf[0] == '-') {
     293               0 :             redisLog(REDIS_WARNING,
     294                 :                 "MASTER aborted replication with an error: %s",
     295                 :                 buf+1);
     296               0 :             goto error;
     297               9 :         } else if (buf[0] == '\0') {
     298                 :             /* At this stage just a newline works as a PING in order to take
     299                 :              * the connection live. So we refresh our last interaction
     300                 :              * timestamp. */
     301               0 :             server.repl_transfer_lastio = server.unixtime;
     302               0 :             return;
     303               9 :         } else if (buf[0] != '$') {
     304               0 :             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
     305               0 :             goto error;
     306                 :         }
     307               9 :         server.repl_transfer_left = strtol(buf+1,NULL,10);
     308               9 :         redisLog(REDIS_NOTICE,
     309                 :             "MASTER <-> SLAVE sync: receiving %ld bytes from master",
     310                 :             server.repl_transfer_left);
     311               9 :         return;
     312                 :     }
     313                 : 
     314                 :     /* Read bulk data */
     315            9787 :     readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
     316                 :         server.repl_transfer_left : (signed)sizeof(buf);
     317           19574 :     nread = read(fd,buf,readlen);
     318            9787 :     if (nread <= 0) {
     319               0 :         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
     320               0 :             (nread == -1) ? strerror(errno) : "connection lost");
     321               0 :         replicationAbortSyncTransfer();
     322               0 :         return;
     323                 :     }
     324            9787 :     server.repl_transfer_lastio = server.unixtime;
     325            9787 :     if (write(server.repl_transfer_fd,buf,nread) != nread) {
     326               0 :         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
     327               0 :         goto error;
     328                 :     }
     329            9787 :     server.repl_transfer_left -= nread;
     330                 :     /* Check if the transfer is now complete */
     331            9787 :     if (server.repl_transfer_left == 0) {
     332               9 :         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
     333               0 :             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
     334               0 :             replicationAbortSyncTransfer();
     335               0 :             return;
     336                 :         }
     337               9 :         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
     338               9 :         emptyDb();
     339                 :         /* Before loading the DB into memory we need to delete the readable
     340                 :          * handler, otherwise it will get called recursively since
     341                 :          * rdbLoad() will call the event loop to process events from time to
     342                 :          * time for non blocking loading. */
     343               9 :         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
     344               9 :         if (rdbLoad(server.rdb_filename) != REDIS_OK) {
     345               0 :             redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
     346               0 :             replicationAbortSyncTransfer();
     347               0 :             return;
     348                 :         }
     349                 :         /* Final setup of the connected slave <- master link */
     350               9 :         zfree(server.repl_transfer_tmpfile);
     351               9 :         close(server.repl_transfer_fd);
     352               9 :         server.master = createClient(server.repl_transfer_s);
     353               9 :         server.master->flags |= REDIS_MASTER;
     354               9 :         server.master->authenticated = 1;
     355               9 :         server.repl_state = REDIS_REPL_CONNECTED;
     356               9 :         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
     357                 :         /* Restart the AOF subsystem now that we finished the sync. This
     358                 :          * will trigger an AOF rewrite, and when done will start appending
     359                 :          * to the new file. */
     360               9 :         if (server.aof_state != REDIS_AOF_OFF) {
     361               0 :             int retry = 10;
     362                 : 
     363               0 :             stopAppendOnly();
     364               0 :             while (retry-- && startAppendOnly() == REDIS_ERR) {
     365               0 :                 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchrnization! Trying it again in one second.");
     366               0 :                 sleep(1);
     367                 :             }
     368               0 :             if (!retry) {
     369               0 :                 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
     370               0 :                 exit(1);
     371                 :             }
     372                 :         }
     373                 :     }
     374                 : 
     375                 :     return;
     376                 : 
     377                 : error:
     378               0 :     replicationAbortSyncTransfer();
     379               0 :     return;
     380                 : }
     381                 : 
     382               9 : void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     383                 :     char buf[1024], tmpfile[256];
     384               9 :     int dfd, maxtries = 5;
     385                 :     REDIS_NOTUSED(el);
     386                 :     REDIS_NOTUSED(privdata);
     387                 :     REDIS_NOTUSED(mask);
     388                 : 
     389                 :     /* If this event fired after the user turned the instance into a master
     390                 :      * with SLAVEOF NO ONE we must just return ASAP. */
     391               9 :     if (server.repl_state == REDIS_REPL_NONE) {
     392               0 :         close(fd);
     393               0 :         return;
     394                 :     }
     395                 : 
     396               9 :     redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
     397                 :     /* This event should only be triggered once since it is used to have a
     398                 :      * non-blocking connect(2) to the master. It has been triggered when this
     399                 :      * function is called, so we can delete it. */
     400               9 :     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
     401                 : 
     402                 :     /* AUTH with the master if required. */
     403               9 :     if(server.masterauth) {
     404                 :         char authcmd[1024];
     405                 :         size_t authlen;
     406                 : 
     407               0 :         authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
     408               0 :         if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout*1000) == -1) {
     409               0 :             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
     410                 :                 strerror(errno));
     411               0 :             goto error;
     412                 :         }
     413                 :         /* Read the AUTH result.  */
     414               0 :         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
     415               0 :             redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
     416                 :                 strerror(errno));
     417               0 :             goto error;
     418                 :         }
     419               0 :         if (buf[0] != '+') {
     420               0 :             redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
     421               0 :             goto error;
     422                 :         }
     423                 :     }
     424                 : 
     425                 :     /* Issue the SYNC command */
     426               9 :     if (syncWrite(fd,"SYNC \r\n",7,server.repl_syncio_timeout*1000) == -1) {
     427               0 :         redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
     428                 :             strerror(errno));
     429               0 :         goto error;
     430                 :     }
     431                 : 
     432                 :     /* Prepare a suitable temp file for bulk transfer */
     433               9 :     while(maxtries--) {
     434               9 :         snprintf(tmpfile,256,
     435                 :             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
     436               9 :         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
     437               9 :         if (dfd != -1) break;
     438               0 :         sleep(1);
     439                 :     }
     440               9 :     if (dfd == -1) {
     441               0 :         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
     442               0 :         goto error;
     443                 :     }
     444                 : 
     445                 :     /* Setup the non blocking download of the bulk file. */
     446               9 :     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
     447                 :             == AE_ERR)
     448                 :     {
     449               0 :         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
     450               0 :         goto error;
     451                 :     }
     452                 : 
     453               9 :     server.repl_state = REDIS_REPL_TRANSFER;
     454               9 :     server.repl_transfer_left = -1;
     455               9 :     server.repl_transfer_fd = dfd;
     456               9 :     server.repl_transfer_lastio = server.unixtime;
     457               9 :     server.repl_transfer_tmpfile = zstrdup(tmpfile);
     458               9 :     return;
     459                 : 
     460                 : error:
     461               0 :     server.repl_state = REDIS_REPL_CONNECT;
     462               0 :     close(fd);
     463               0 :     return;
     464                 : }
     465                 : 
     466               9 : int connectWithMaster(void) {
     467                 :     int fd;
     468                 : 
     469               9 :     fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
     470               9 :     if (fd == -1) {
     471               0 :         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
     472                 :             strerror(errno));
     473               0 :         return REDIS_ERR;
     474                 :     }
     475                 : 
     476               9 :     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
     477                 :             AE_ERR)
     478                 :     {
     479               0 :         close(fd);
     480               0 :         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
     481               0 :         return REDIS_ERR;
     482                 :     }
     483                 : 
     484               9 :     server.repl_transfer_lastio = server.unixtime;
     485               9 :     server.repl_transfer_s = fd;
     486               9 :     server.repl_state = REDIS_REPL_CONNECTING;
     487               9 :     return REDIS_OK;
     488                 : }
     489                 : 
     490                 : /* This function can be called when a non blocking connection is currently
     491                 :  * in progress to undo it. */
     492               0 : void undoConnectWithMaster(void) {
     493               0 :     int fd = server.repl_transfer_s;
     494                 : 
     495               0 :     redisAssert(server.repl_state == REDIS_REPL_CONNECTING);
     496               0 :     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
     497               0 :     close(fd);
     498               0 :     server.repl_transfer_s = -1;
     499               0 :     server.repl_state = REDIS_REPL_CONNECT;
     500               0 : }
     501                 : 
     502               9 : void slaveofCommand(redisClient *c) {
     503               9 :     if (!strcasecmp(c->argv[1]->ptr,"no") &&
     504               0 :         !strcasecmp(c->argv[2]->ptr,"one")) {
     505               0 :         if (server.masterhost) {
     506               0 :             sdsfree(server.masterhost);
     507               0 :             server.masterhost = NULL;
     508               0 :             if (server.master) freeClient(server.master);
     509               0 :             if (server.repl_state == REDIS_REPL_TRANSFER)
     510               0 :                 replicationAbortSyncTransfer();
     511               0 :             else if (server.repl_state == REDIS_REPL_CONNECTING)
     512               0 :                 undoConnectWithMaster();
     513               0 :             server.repl_state = REDIS_REPL_NONE;
     514               0 :             redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
     515                 :         }
     516                 :     } else {
     517                 :         long port;
     518                 : 
     519               9 :         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
     520                 :             return;
     521                 : 
     522                 :         /* Check if we are already attached to the specified slave */
     523               9 :         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
     524               0 :             && server.masterport == port) {
     525               0 :             redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
     526               0 :             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
     527               0 :             return;
     528                 :         }
     529                 :         /* There was no previous master or the user specified a different one,
     530                 :          * we can continue. */
     531               9 :         sdsfree(server.masterhost);
     532               9 :         server.masterhost = sdsdup(c->argv[1]->ptr);
     533               9 :         server.masterport = port;
     534               9 :         if (server.master) freeClient(server.master);
     535               9 :         disconnectSlaves(); /* Force our slaves to resync with us as well. */
     536               9 :         if (server.repl_state == REDIS_REPL_TRANSFER)
     537               0 :             replicationAbortSyncTransfer();
     538               9 :         server.repl_state = REDIS_REPL_CONNECT;
     539               9 :         redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
     540                 :             server.masterhost, server.masterport);
     541                 :     }
     542               9 :     addReply(c,shared.ok);
     543                 : }
     544                 : 
     545                 : /* --------------------------- REPLICATION CRON  ---------------------------- */
     546                 : 
     547             271 : void replicationCron(void) {
     548                 :     /* Non blocking connection timeout? */
     549             271 :     if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTING &&
     550               0 :         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
     551                 :     {
     552               0 :         redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
     553               0 :         undoConnectWithMaster();
     554                 :     }
     555                 : 
     556                 :     /* Bulk transfer I/O timeout? */
     557             271 :     if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
     558               0 :         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
     559                 :     {
     560               0 :         redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
     561               0 :         replicationAbortSyncTransfer();
     562                 :     }
     563                 : 
     564                 :     /* Timed out master when we are an already connected slave? */
     565             342 :     if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
     566              71 :         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
     567                 :     {
     568               0 :         redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
     569               0 :         freeClient(server.master);
     570                 :     }
     571                 : 
     572                 :     /* Check if we should connect to a MASTER */
     573             271 :     if (server.repl_state == REDIS_REPL_CONNECT) {
     574               9 :         redisLog(REDIS_NOTICE,"Connecting to MASTER...");
     575               9 :         if (connectWithMaster() == REDIS_OK) {
     576               9 :             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
     577                 :         }
     578                 :     }
     579                 :     
     580                 :     /* If we have attached slaves, PING them from time to time.
     581                 :      * So slaves can implement an explicit timeout to masters, and will
     582                 :      * be able to detect a link disconnection even if the TCP connection
     583                 :      * will not actually go down. */
     584             271 :     if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
     585                 :         listIter li;
     586                 :         listNode *ln;
     587                 : 
     588              59 :         listRewind(server.slaves,&li);
     589             121 :         while((ln = listNext(&li))) {
     590               3 :             redisClient *slave = ln->value;
     591                 : 
     592                 :             /* Don't ping slaves that are in the middle of a bulk transfer
     593                 :              * with the master for first synchronization. */
     594               3 :             if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
     595               3 :             if (slave->replstate == REDIS_REPL_ONLINE) {
     596                 :                 /* If the slave is online send a normal ping */
     597               3 :                 addReplySds(slave,sdsnew("*1\r\n$4\r\nPING\r\n"));
     598                 :             } else {
     599                 :                 /* Otherwise we are in the pre-synchronization stage.
     600                 :                  * Just a newline will do the work of refreshing the
     601                 :                  * connection last interaction time, and at the same time
     602                 :                  * we'll be sure that being a single char there are no
     603                 :                  * short-write problems. */
     604               0 :                 if (write(slave->fd, "\n", 1) == -1) {
     605                 :                     /* Don't worry, it's just a ping. */
     606                 :                 }
     607                 :             }
     608                 :         }
     609                 :     }
     610             271 : }

Generated by: LCOV version 1.7