LCOV - code coverage report
Current view: directory - redis/src - multi.c (source / functions) Found Hit Coverage
Test: redis.info Lines: 140 134 95.7 %
Date: 2012-04-04 Functions: 14 14 100.0 %
Colors: not hit hit

       1                 : #include "redis.h"
       2                 : 
       3                 : /* ================================ MULTI/EXEC ============================== */
       4                 : 
       5                 : /* Client state initialization for MULTI/EXEC */
       6             279 : void initClientMultiState(redisClient *c) {
       7             306 :     c->mstate.commands = NULL;
       8             306 :     c->mstate.count = 0;
       9             279 : }
      10                 : 
      11                 : /* Release all the resources associated with MULTI/EXEC state */
      12             145 : void freeClientMultiState(redisClient *c) {
      13                 :     int j;
      14                 : 
      15             175 :     for (j = 0; j < c->mstate.count; j++) {
      16                 :         int i;
      17              30 :         multiCmd *mc = c->mstate.commands+j;
      18                 : 
      19              83 :         for (i = 0; i < mc->argc; i++)
      20              53 :             decrRefCount(mc->argv[i]);
      21              30 :         zfree(mc->argv);
      22                 :     }
      23             145 :     zfree(c->mstate.commands);
      24             145 : }
      25                 : 
      26                 : /* Add a new command into the MULTI commands queue */
      27              30 : void queueMultiCommand(redisClient *c) {
      28                 :     multiCmd *mc;
      29                 :     int j;
      30                 : 
      31              30 :     c->mstate.commands = zrealloc(c->mstate.commands,
      32              30 :             sizeof(multiCmd)*(c->mstate.count+1));
      33              30 :     mc = c->mstate.commands+c->mstate.count;
      34              30 :     mc->cmd = c->cmd;
      35              30 :     mc->argc = c->argc;
      36              30 :     mc->argv = zmalloc(sizeof(robj*)*c->argc);
      37              30 :     memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
      38              88 :     for (j = 0; j < c->argc; j++)
      39              58 :         incrRefCount(mc->argv[j]);
      40              30 :     c->mstate.count++;
      41              30 : }
      42                 : 
      43               3 : void discardTransaction(redisClient *c) {
      44               3 :     freeClientMultiState(c);
      45                 :     initClientMultiState(c);
      46               3 :     c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);;
      47               3 :     unwatchAllKeys(c);
      48               3 : }
      49                 : 
      50              29 : void multiCommand(redisClient *c) {
      51              29 :     if (c->flags & REDIS_MULTI) {
      52               1 :         addReplyError(c,"MULTI calls can not be nested");
      53               1 :         return;
      54                 :     }
      55              28 :     c->flags |= REDIS_MULTI;
      56              28 :     addReply(c,shared.ok);
      57                 : }
      58                 : 
      59               3 : void discardCommand(redisClient *c) {
      60               3 :     if (!(c->flags & REDIS_MULTI)) {
      61               0 :         addReplyError(c,"DISCARD without MULTI");
      62               0 :         return;
      63                 :     }
      64               3 :     discardTransaction(c);
      65               3 :     addReply(c,shared.ok);
      66                 : }
      67                 : 
      68                 : /* Send a MULTI command to all the slaves and AOF file. Check the execCommand
      69                 :  * implememntation for more information. */
      70              17 : void execCommandReplicateMulti(redisClient *c) {
      71              17 :     robj *multistring = createStringObject("MULTI",5);
      72                 : 
      73              17 :     if (server.aof_state != REDIS_AOF_OFF)
      74               0 :         feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1);
      75              17 :     if (listLength(server.slaves))
      76               0 :         replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
      77              17 :     decrRefCount(multistring);
      78              17 : }
      79                 : 
      80              24 : void execCommand(redisClient *c) {
      81                 :     int j;
      82                 :     robj **orig_argv;
      83                 :     int orig_argc;
      84                 :     struct redisCommand *orig_cmd;
      85                 : 
      86              24 :     if (!(c->flags & REDIS_MULTI)) {
      87               0 :         addReplyError(c,"EXEC without MULTI");
      88               0 :         return;
      89                 :     }
      90                 : 
      91                 :     /* Check if we need to abort the EXEC if some WATCHed key was touched.
      92                 :      * A failed EXEC will return a multi bulk nil object. */
      93              24 :     if (c->flags & REDIS_DIRTY_CAS) {
      94               7 :         freeClientMultiState(c);
      95                 :         initClientMultiState(c);
      96               7 :         c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
      97               7 :         unwatchAllKeys(c);
      98               7 :         addReply(c,shared.nullmultibulk);
      99               7 :         return;
     100                 :     }
     101                 : 
     102                 :     /* Replicate a MULTI request now that we are sure the block is executed.
     103                 :      * This way we'll deliver the MULTI/..../EXEC block as a whole and
     104                 :      * both the AOF and the replication link will have the same consistency
     105                 :      * and atomicity guarantees. */
     106              17 :     execCommandReplicateMulti(c);
     107                 : 
     108                 :     /* Exec all the queued commands */
     109              17 :     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
     110              17 :     orig_argv = c->argv;
     111              17 :     orig_argc = c->argc;
     112              17 :     orig_cmd = c->cmd;
     113              17 :     addReplyMultiBulkLen(c,c->mstate.count);
     114              39 :     for (j = 0; j < c->mstate.count; j++) {
     115              22 :         c->argc = c->mstate.commands[j].argc;
     116              22 :         c->argv = c->mstate.commands[j].argv;
     117              22 :         c->cmd = c->mstate.commands[j].cmd;
     118              22 :         call(c,REDIS_CALL_FULL);
     119                 : 
     120                 :         /* Commands may alter argc/argv, restore mstate. */
     121              22 :         c->mstate.commands[j].argc = c->argc;
     122              22 :         c->mstate.commands[j].argv = c->argv;
     123              22 :         c->mstate.commands[j].cmd = c->cmd;
     124                 :     }
     125              17 :     c->argv = orig_argv;
     126              17 :     c->argc = orig_argc;
     127              17 :     c->cmd = orig_cmd;
     128              17 :     freeClientMultiState(c);
     129                 :     initClientMultiState(c);
     130              17 :     c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
     131                 :     /* Make sure the EXEC command is always replicated / AOF, since we
     132                 :      * always send the MULTI command (we can't know beforehand if the
     133                 :      * next operations will contain at least a modification to the DB). */
     134              17 :     server.dirty++;
     135                 : }
     136                 : 
     137                 : /* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
     138                 :  *
     139                 :  * The implementation uses a per-DB hash table mapping keys to list of clients
     140                 :  * WATCHing those keys, so that given a key that is going to be modified
     141                 :  * we can mark all the associated clients as dirty.
     142                 :  *
     143                 :  * Also every client contains a list of WATCHed keys so that's possible to
     144                 :  * un-watch such keys when the client is freed or when UNWATCH is called. */
     145                 : 
     146                 : /* In the client->watched_keys list we need to use watchedKey structures
     147                 :  * as in order to identify a key in Redis we need both the key name and the
     148                 :  * DB */
     149                 : typedef struct watchedKey {
     150                 :     robj *key;
     151                 :     redisDb *db;
     152                 : } watchedKey;
     153                 : 
     154                 : /* Watch for the specified key */
     155              23 : void watchForKey(redisClient *c, robj *key) {
     156              23 :     list *clients = NULL;
     157                 :     listIter li;
     158                 :     listNode *ln;
     159                 :     watchedKey *wk;
     160                 : 
     161                 :     /* Check if we are already watching for this key */
     162              23 :     listRewind(c->watched_keys,&li);
     163              62 :     while((ln = listNext(&li))) {
     164              16 :         wk = listNodeValue(ln);
     165              16 :         if (wk->db == c->db && equalStringObjects(key,wk->key))
     166                 :             return; /* Key already watched */
     167                 :     }
     168                 :     /* This key is not already watched in this DB. Let's add it */
     169              23 :     clients = dictFetchValue(c->db->watched_keys,key);
     170              23 :     if (!clients) { 
     171              23 :         clients = listCreate();
     172              23 :         dictAdd(c->db->watched_keys,key,clients);
     173              23 :         incrRefCount(key);
     174                 :     }
     175              23 :     listAddNodeTail(clients,c);
     176                 :     /* Add the new key to the lits of keys watched by this client */
     177              23 :     wk = zmalloc(sizeof(*wk));
     178              23 :     wk->key = key;
     179              23 :     wk->db = c->db;
     180              23 :     incrRefCount(key);
     181              23 :     listAddNodeTail(c->watched_keys,wk);
     182                 : }
     183                 : 
     184                 : /* Unwatch all the keys watched by this client. To clean the EXEC dirty
     185                 :  * flag is up to the caller. */
     186             125 : void unwatchAllKeys(redisClient *c) {
     187                 :     listIter li;
     188                 :     listNode *ln;
     189                 : 
     190             125 :     if (listLength(c->watched_keys) == 0) return;
     191              16 :     listRewind(c->watched_keys,&li);
     192              55 :     while((ln = listNext(&li))) {
     193                 :         list *clients;
     194                 :         watchedKey *wk;
     195                 : 
     196                 :         /* Lookup the watched key -> clients list and remove the client
     197                 :          * from the list */
     198              23 :         wk = listNodeValue(ln);
     199              23 :         clients = dictFetchValue(wk->db->watched_keys, wk->key);
     200              23 :         redisAssertWithInfo(c,NULL,clients != NULL);
     201              23 :         listDelNode(clients,listSearchKey(clients,c));
     202                 :         /* Kill the entry at all if this was the only client */
     203              23 :         if (listLength(clients) == 0)
     204              23 :             dictDelete(wk->db->watched_keys, wk->key);
     205                 :         /* Remove this watched key from the client->watched list */
     206              23 :         listDelNode(c->watched_keys,ln);
     207              23 :         decrRefCount(wk->key);
     208              23 :         zfree(wk);
     209                 :     }
     210                 : }
     211                 : 
     212                 : /* "Touch" a key, so that if this key is being WATCHed by some client the
     213                 :  * next EXEC will fail. */
     214         1154479 : void touchWatchedKey(redisDb *db, robj *key) {
     215                 :     list *clients;
     216                 :     listIter li;
     217                 :     listNode *ln;
     218                 : 
     219         1154479 :     if (dictSize(db->watched_keys) == 0) return;
     220               8 :     clients = dictFetchValue(db->watched_keys, key);
     221               8 :     if (!clients) return;
     222                 : 
     223                 :     /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
     224                 :     /* Check if we are already watching for this key */
     225               8 :     listRewind(clients,&li);
     226              24 :     while((ln = listNext(&li))) {
     227               8 :         redisClient *c = listNodeValue(ln);
     228                 : 
     229               8 :         c->flags |= REDIS_DIRTY_CAS;
     230                 :     }
     231                 : }
     232                 : 
     233                 : /* On FLUSHDB or FLUSHALL all the watched keys that are present before the
     234                 :  * flush but will be deleted as effect of the flushing operation should
     235                 :  * be touched. "dbid" is the DB that's getting the flush. -1 if it is
     236                 :  * a FLUSHALL operation (all the DBs flushed). */
     237              51 : void touchWatchedKeysOnFlush(int dbid) {
     238                 :     listIter li1, li2;
     239                 :     listNode *ln;
     240                 : 
     241                 :     /* For every client, check all the waited keys */
     242              51 :     listRewind(server.clients,&li1);
     243             155 :     while((ln = listNext(&li1))) {
     244              53 :         redisClient *c = listNodeValue(ln);
     245              53 :         listRewind(c->watched_keys,&li2);
     246             110 :         while((ln = listNext(&li2))) {
     247               4 :             watchedKey *wk = listNodeValue(ln);
     248                 : 
     249                 :             /* For every watched key matching the specified DB, if the
     250                 :              * key exists, mark the client as dirty, as the key will be
     251                 :              * removed. */
     252               4 :             if (dbid == -1 || wk->db->id == dbid) {
     253               4 :                 if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
     254               2 :                     c->flags |= REDIS_DIRTY_CAS;
     255                 :             }
     256                 :         }
     257                 :     }
     258              51 : }
     259                 : 
     260              18 : void watchCommand(redisClient *c) {
     261                 :     int j;
     262                 : 
     263              18 :     if (c->flags & REDIS_MULTI) {
     264               1 :         addReplyError(c,"WATCH inside MULTI is not allowed");
     265               1 :         return;
     266                 :     }
     267              40 :     for (j = 1; j < c->argc; j++)
     268              23 :         watchForKey(c,c->argv[j]);
     269              17 :     addReply(c,shared.ok);
     270                 : }
     271                 : 
     272               2 : void unwatchCommand(redisClient *c) {
     273               2 :     unwatchAllKeys(c);
     274               2 :     c->flags &= (~REDIS_DIRTY_CAS);
     275               2 :     addReply(c,shared.ok);
     276               2 : }

Generated by: LCOV version 1.7