1 : #include "redis.h"
2 :
3 : /* ================================ MULTI/EXEC ============================== */
4 :
5 : /* Client state initialization for MULTI/EXEC */
6 279 : void initClientMultiState(redisClient *c) {
7 306 : c->mstate.commands = NULL;
8 306 : c->mstate.count = 0;
9 279 : }
10 :
11 : /* Release all the resources associated with MULTI/EXEC state */
12 145 : void freeClientMultiState(redisClient *c) {
13 : int j;
14 :
15 175 : for (j = 0; j < c->mstate.count; j++) {
16 : int i;
17 30 : multiCmd *mc = c->mstate.commands+j;
18 :
19 83 : for (i = 0; i < mc->argc; i++)
20 53 : decrRefCount(mc->argv[i]);
21 30 : zfree(mc->argv);
22 : }
23 145 : zfree(c->mstate.commands);
24 145 : }
25 :
26 : /* Add a new command into the MULTI commands queue */
27 30 : void queueMultiCommand(redisClient *c) {
28 : multiCmd *mc;
29 : int j;
30 :
31 30 : c->mstate.commands = zrealloc(c->mstate.commands,
32 30 : sizeof(multiCmd)*(c->mstate.count+1));
33 30 : mc = c->mstate.commands+c->mstate.count;
34 30 : mc->cmd = c->cmd;
35 30 : mc->argc = c->argc;
36 30 : mc->argv = zmalloc(sizeof(robj*)*c->argc);
37 30 : memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
38 88 : for (j = 0; j < c->argc; j++)
39 58 : incrRefCount(mc->argv[j]);
40 30 : c->mstate.count++;
41 30 : }
42 :
43 3 : void discardTransaction(redisClient *c) {
44 3 : freeClientMultiState(c);
45 : initClientMultiState(c);
46 3 : c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);;
47 3 : unwatchAllKeys(c);
48 3 : }
49 :
50 29 : void multiCommand(redisClient *c) {
51 29 : if (c->flags & REDIS_MULTI) {
52 1 : addReplyError(c,"MULTI calls can not be nested");
53 1 : return;
54 : }
55 28 : c->flags |= REDIS_MULTI;
56 28 : addReply(c,shared.ok);
57 : }
58 :
59 3 : void discardCommand(redisClient *c) {
60 3 : if (!(c->flags & REDIS_MULTI)) {
61 0 : addReplyError(c,"DISCARD without MULTI");
62 0 : return;
63 : }
64 3 : discardTransaction(c);
65 3 : addReply(c,shared.ok);
66 : }
67 :
68 : /* Send a MULTI command to all the slaves and AOF file. Check the execCommand
69 : * implememntation for more information. */
70 17 : void execCommandReplicateMulti(redisClient *c) {
71 17 : robj *multistring = createStringObject("MULTI",5);
72 :
73 17 : if (server.aof_state != REDIS_AOF_OFF)
74 0 : feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1);
75 17 : if (listLength(server.slaves))
76 0 : replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
77 17 : decrRefCount(multistring);
78 17 : }
79 :
80 24 : void execCommand(redisClient *c) {
81 : int j;
82 : robj **orig_argv;
83 : int orig_argc;
84 : struct redisCommand *orig_cmd;
85 :
86 24 : if (!(c->flags & REDIS_MULTI)) {
87 0 : addReplyError(c,"EXEC without MULTI");
88 0 : return;
89 : }
90 :
91 : /* Check if we need to abort the EXEC if some WATCHed key was touched.
92 : * A failed EXEC will return a multi bulk nil object. */
93 24 : if (c->flags & REDIS_DIRTY_CAS) {
94 7 : freeClientMultiState(c);
95 : initClientMultiState(c);
96 7 : c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
97 7 : unwatchAllKeys(c);
98 7 : addReply(c,shared.nullmultibulk);
99 7 : return;
100 : }
101 :
102 : /* Replicate a MULTI request now that we are sure the block is executed.
103 : * This way we'll deliver the MULTI/..../EXEC block as a whole and
104 : * both the AOF and the replication link will have the same consistency
105 : * and atomicity guarantees. */
106 17 : execCommandReplicateMulti(c);
107 :
108 : /* Exec all the queued commands */
109 17 : unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
110 17 : orig_argv = c->argv;
111 17 : orig_argc = c->argc;
112 17 : orig_cmd = c->cmd;
113 17 : addReplyMultiBulkLen(c,c->mstate.count);
114 39 : for (j = 0; j < c->mstate.count; j++) {
115 22 : c->argc = c->mstate.commands[j].argc;
116 22 : c->argv = c->mstate.commands[j].argv;
117 22 : c->cmd = c->mstate.commands[j].cmd;
118 22 : call(c,REDIS_CALL_FULL);
119 :
120 : /* Commands may alter argc/argv, restore mstate. */
121 22 : c->mstate.commands[j].argc = c->argc;
122 22 : c->mstate.commands[j].argv = c->argv;
123 22 : c->mstate.commands[j].cmd = c->cmd;
124 : }
125 17 : c->argv = orig_argv;
126 17 : c->argc = orig_argc;
127 17 : c->cmd = orig_cmd;
128 17 : freeClientMultiState(c);
129 : initClientMultiState(c);
130 17 : c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
131 : /* Make sure the EXEC command is always replicated / AOF, since we
132 : * always send the MULTI command (we can't know beforehand if the
133 : * next operations will contain at least a modification to the DB). */
134 17 : server.dirty++;
135 : }
136 :
137 : /* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
138 : *
139 : * The implementation uses a per-DB hash table mapping keys to list of clients
140 : * WATCHing those keys, so that given a key that is going to be modified
141 : * we can mark all the associated clients as dirty.
142 : *
143 : * Also every client contains a list of WATCHed keys so that's possible to
144 : * un-watch such keys when the client is freed or when UNWATCH is called. */
145 :
146 : /* In the client->watched_keys list we need to use watchedKey structures
147 : * as in order to identify a key in Redis we need both the key name and the
148 : * DB */
149 : typedef struct watchedKey {
150 : robj *key;
151 : redisDb *db;
152 : } watchedKey;
153 :
154 : /* Watch for the specified key */
155 23 : void watchForKey(redisClient *c, robj *key) {
156 23 : list *clients = NULL;
157 : listIter li;
158 : listNode *ln;
159 : watchedKey *wk;
160 :
161 : /* Check if we are already watching for this key */
162 23 : listRewind(c->watched_keys,&li);
163 62 : while((ln = listNext(&li))) {
164 16 : wk = listNodeValue(ln);
165 16 : if (wk->db == c->db && equalStringObjects(key,wk->key))
166 : return; /* Key already watched */
167 : }
168 : /* This key is not already watched in this DB. Let's add it */
169 23 : clients = dictFetchValue(c->db->watched_keys,key);
170 23 : if (!clients) {
171 23 : clients = listCreate();
172 23 : dictAdd(c->db->watched_keys,key,clients);
173 23 : incrRefCount(key);
174 : }
175 23 : listAddNodeTail(clients,c);
176 : /* Add the new key to the lits of keys watched by this client */
177 23 : wk = zmalloc(sizeof(*wk));
178 23 : wk->key = key;
179 23 : wk->db = c->db;
180 23 : incrRefCount(key);
181 23 : listAddNodeTail(c->watched_keys,wk);
182 : }
183 :
184 : /* Unwatch all the keys watched by this client. To clean the EXEC dirty
185 : * flag is up to the caller. */
186 125 : void unwatchAllKeys(redisClient *c) {
187 : listIter li;
188 : listNode *ln;
189 :
190 125 : if (listLength(c->watched_keys) == 0) return;
191 16 : listRewind(c->watched_keys,&li);
192 55 : while((ln = listNext(&li))) {
193 : list *clients;
194 : watchedKey *wk;
195 :
196 : /* Lookup the watched key -> clients list and remove the client
197 : * from the list */
198 23 : wk = listNodeValue(ln);
199 23 : clients = dictFetchValue(wk->db->watched_keys, wk->key);
200 23 : redisAssertWithInfo(c,NULL,clients != NULL);
201 23 : listDelNode(clients,listSearchKey(clients,c));
202 : /* Kill the entry at all if this was the only client */
203 23 : if (listLength(clients) == 0)
204 23 : dictDelete(wk->db->watched_keys, wk->key);
205 : /* Remove this watched key from the client->watched list */
206 23 : listDelNode(c->watched_keys,ln);
207 23 : decrRefCount(wk->key);
208 23 : zfree(wk);
209 : }
210 : }
211 :
212 : /* "Touch" a key, so that if this key is being WATCHed by some client the
213 : * next EXEC will fail. */
214 1154479 : void touchWatchedKey(redisDb *db, robj *key) {
215 : list *clients;
216 : listIter li;
217 : listNode *ln;
218 :
219 1154479 : if (dictSize(db->watched_keys) == 0) return;
220 8 : clients = dictFetchValue(db->watched_keys, key);
221 8 : if (!clients) return;
222 :
223 : /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
224 : /* Check if we are already watching for this key */
225 8 : listRewind(clients,&li);
226 24 : while((ln = listNext(&li))) {
227 8 : redisClient *c = listNodeValue(ln);
228 :
229 8 : c->flags |= REDIS_DIRTY_CAS;
230 : }
231 : }
232 :
233 : /* On FLUSHDB or FLUSHALL all the watched keys that are present before the
234 : * flush but will be deleted as effect of the flushing operation should
235 : * be touched. "dbid" is the DB that's getting the flush. -1 if it is
236 : * a FLUSHALL operation (all the DBs flushed). */
237 51 : void touchWatchedKeysOnFlush(int dbid) {
238 : listIter li1, li2;
239 : listNode *ln;
240 :
241 : /* For every client, check all the waited keys */
242 51 : listRewind(server.clients,&li1);
243 155 : while((ln = listNext(&li1))) {
244 53 : redisClient *c = listNodeValue(ln);
245 53 : listRewind(c->watched_keys,&li2);
246 110 : while((ln = listNext(&li2))) {
247 4 : watchedKey *wk = listNodeValue(ln);
248 :
249 : /* For every watched key matching the specified DB, if the
250 : * key exists, mark the client as dirty, as the key will be
251 : * removed. */
252 4 : if (dbid == -1 || wk->db->id == dbid) {
253 4 : if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
254 2 : c->flags |= REDIS_DIRTY_CAS;
255 : }
256 : }
257 : }
258 51 : }
259 :
260 18 : void watchCommand(redisClient *c) {
261 : int j;
262 :
263 18 : if (c->flags & REDIS_MULTI) {
264 1 : addReplyError(c,"WATCH inside MULTI is not allowed");
265 1 : return;
266 : }
267 40 : for (j = 1; j < c->argc; j++)
268 23 : watchForKey(c,c->argv[j]);
269 17 : addReply(c,shared.ok);
270 : }
271 :
272 2 : void unwatchCommand(redisClient *c) {
273 2 : unwatchAllKeys(c);
274 2 : c->flags &= (~REDIS_DIRTY_CAS);
275 2 : addReply(c,shared.ok);
276 2 : }
|