1 : #include "redis.h"
2 :
3 : /*-----------------------------------------------------------------------------
4 : * Pubsub low level API
5 : *----------------------------------------------------------------------------*/
6 :
7 8 : void freePubsubPattern(void *p) {
8 8 : pubsubPattern *pat = p;
9 :
10 8 : decrRefCount(pat->pattern);
11 8 : zfree(pat);
12 8 : }
13 :
14 8 : int listMatchPubsubPattern(void *a, void *b) {
15 8 : pubsubPattern *pa = a, *pb = b;
16 :
17 16 : return (pa->client == pb->client) &&
18 8 : (equalStringObjects(pa->pattern,pb->pattern));
19 : }
20 :
21 : /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
22 : * 0 if the client was already subscribed to that channel. */
23 14 : int pubsubSubscribeChannel(redisClient *c, robj *channel) {
24 : struct dictEntry *de;
25 14 : list *clients = NULL;
26 14 : int retval = 0;
27 :
28 : /* Add the channel to the client -> channels hash table */
29 14 : if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
30 12 : retval = 1;
31 12 : incrRefCount(channel);
32 : /* Add the client to the channel -> list of clients hash table */
33 12 : de = dictFind(server.pubsub_channels,channel);
34 12 : if (de == NULL) {
35 11 : clients = listCreate();
36 11 : dictAdd(server.pubsub_channels,channel,clients);
37 11 : incrRefCount(channel);
38 : } else {
39 1 : clients = dictGetVal(de);
40 : }
41 12 : listAddNodeTail(clients,c);
42 : }
43 : /* Notify the client */
44 14 : addReply(c,shared.mbulkhdr[3]);
45 14 : addReply(c,shared.subscribebulk);
46 14 : addReplyBulk(c,channel);
47 14 : addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
48 14 : return retval;
49 : }
50 :
51 : /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
52 : * 0 if the client was not subscribed to the specified channel. */
53 15 : int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
54 : struct dictEntry *de;
55 : list *clients;
56 : listNode *ln;
57 15 : int retval = 0;
58 :
59 : /* Remove the channel from the client -> channels hash table */
60 15 : incrRefCount(channel); /* channel may be just a pointer to the same object
61 : we have in the hash tables. Protect it... */
62 15 : if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
63 12 : retval = 1;
64 : /* Remove the client from the channel -> clients list hash table */
65 12 : de = dictFind(server.pubsub_channels,channel);
66 12 : redisAssertWithInfo(c,NULL,de != NULL);
67 12 : clients = dictGetVal(de);
68 12 : ln = listSearchKey(clients,c);
69 12 : redisAssertWithInfo(c,NULL,ln != NULL);
70 12 : listDelNode(clients,ln);
71 12 : if (listLength(clients) == 0) {
72 : /* Free the list and associated hash entry at all if this was
73 : * the latest client, so that it will be possible to abuse
74 : * Redis PUBSUB creating millions of channels. */
75 11 : dictDelete(server.pubsub_channels,channel);
76 : }
77 : }
78 : /* Notify the client */
79 15 : if (notify) {
80 8 : addReply(c,shared.mbulkhdr[3]);
81 8 : addReply(c,shared.unsubscribebulk);
82 8 : addReplyBulk(c,channel);
83 16 : addReplyLongLong(c,dictSize(c->pubsub_channels)+
84 8 : listLength(c->pubsub_patterns));
85 :
86 : }
87 15 : decrRefCount(channel); /* it is finally safe to release it */
88 15 : return retval;
89 : }
90 :
91 : /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
92 8 : int pubsubSubscribePattern(redisClient *c, robj *pattern) {
93 8 : int retval = 0;
94 :
95 8 : if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
96 8 : retval = 1;
97 : pubsubPattern *pat;
98 8 : listAddNodeTail(c->pubsub_patterns,pattern);
99 8 : incrRefCount(pattern);
100 8 : pat = zmalloc(sizeof(*pat));
101 8 : pat->pattern = getDecodedObject(pattern);
102 8 : pat->client = c;
103 8 : listAddNodeTail(server.pubsub_patterns,pat);
104 : }
105 : /* Notify the client */
106 8 : addReply(c,shared.mbulkhdr[3]);
107 8 : addReply(c,shared.psubscribebulk);
108 8 : addReplyBulk(c,pattern);
109 8 : addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
110 8 : return retval;
111 : }
112 :
113 : /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
114 : * 0 if the client was not subscribed to the specified channel. */
115 11 : int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
116 : listNode *ln;
117 : pubsubPattern pat;
118 11 : int retval = 0;
119 :
120 11 : incrRefCount(pattern); /* Protect the object. May be the same we remove */
121 11 : if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
122 8 : retval = 1;
123 8 : listDelNode(c->pubsub_patterns,ln);
124 8 : pat.client = c;
125 8 : pat.pattern = pattern;
126 8 : ln = listSearchKey(server.pubsub_patterns,&pat);
127 8 : listDelNode(server.pubsub_patterns,ln);
128 : }
129 : /* Notify the client */
130 11 : if (notify) {
131 8 : addReply(c,shared.mbulkhdr[3]);
132 8 : addReply(c,shared.punsubscribebulk);
133 8 : addReplyBulk(c,pattern);
134 16 : addReplyLongLong(c,dictSize(c->pubsub_channels)+
135 8 : listLength(c->pubsub_patterns));
136 : }
137 11 : decrRefCount(pattern);
138 11 : return retval;
139 : }
140 :
141 : /* Unsubscribe from all the channels. Return the number of channels the
142 : * client was subscribed from. */
143 97 : int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
144 97 : dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
145 : dictEntry *de;
146 97 : int count = 0;
147 :
148 204 : while((de = dictNext(di)) != NULL) {
149 10 : robj *channel = dictGetKey(de);
150 :
151 10 : count += pubsubUnsubscribeChannel(c,channel,notify);
152 : }
153 97 : dictReleaseIterator(di);
154 97 : return count;
155 : }
156 :
157 : /* Unsubscribe from all the patterns. Return the number of patterns the
158 : * client was subscribed from. */
159 97 : int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
160 : listNode *ln;
161 : listIter li;
162 97 : int count = 0;
163 :
164 97 : listRewind(c->pubsub_patterns,&li);
165 200 : while ((ln = listNext(&li)) != NULL) {
166 6 : robj *pattern = ln->value;
167 :
168 6 : count += pubsubUnsubscribePattern(c,pattern,notify);
169 : }
170 97 : return count;
171 : }
172 :
173 : /* Publish a message */
174 75381 : int pubsubPublishMessage(robj *channel, robj *message) {
175 75381 : int receivers = 0;
176 : struct dictEntry *de;
177 : listNode *ln;
178 : listIter li;
179 :
180 : /* Send to clients listening for that channel */
181 75381 : de = dictFind(server.pubsub_channels,channel);
182 75381 : if (de) {
183 75361 : list *list = dictGetVal(de);
184 : listNode *ln;
185 : listIter li;
186 :
187 75361 : listRewind(list,&li);
188 226084 : while ((ln = listNext(&li)) != NULL) {
189 75362 : redisClient *c = ln->value;
190 :
191 75362 : addReply(c,shared.mbulkhdr[3]);
192 75362 : addReply(c,shared.messagebulk);
193 75362 : addReplyBulk(c,channel);
194 75362 : addReplyBulk(c,message);
195 75362 : receivers++;
196 : }
197 : }
198 : /* Send to clients listening to matching channels */
199 75381 : if (listLength(server.pubsub_patterns)) {
200 9 : listRewind(server.pubsub_patterns,&li);
201 9 : channel = getDecodedObject(channel);
202 33 : while ((ln = listNext(&li)) != NULL) {
203 15 : pubsubPattern *pat = ln->value;
204 :
205 30 : if (stringmatchlen((char*)pat->pattern->ptr,
206 15 : sdslen(pat->pattern->ptr),
207 : (char*)channel->ptr,
208 15 : sdslen(channel->ptr),0)) {
209 6 : addReply(pat->client,shared.mbulkhdr[4]);
210 6 : addReply(pat->client,shared.pmessagebulk);
211 6 : addReplyBulk(pat->client,pat->pattern);
212 6 : addReplyBulk(pat->client,channel);
213 6 : addReplyBulk(pat->client,message);
214 6 : receivers++;
215 : }
216 : }
217 9 : decrRefCount(channel);
218 : }
219 75381 : return receivers;
220 : }
221 :
222 : /*-----------------------------------------------------------------------------
223 : * Pubsub commands implementation
224 : *----------------------------------------------------------------------------*/
225 :
226 9 : void subscribeCommand(redisClient *c) {
227 : int j;
228 :
229 23 : for (j = 1; j < c->argc; j++)
230 14 : pubsubSubscribeChannel(c,c->argv[j]);
231 9 : }
232 :
233 4 : void unsubscribeCommand(redisClient *c) {
234 4 : if (c->argc == 1) {
235 1 : pubsubUnsubscribeAllChannels(c,1);
236 1 : return;
237 : } else {
238 : int j;
239 :
240 8 : for (j = 1; j < c->argc; j++)
241 5 : pubsubUnsubscribeChannel(c,c->argv[j],1);
242 : }
243 : }
244 :
245 5 : void psubscribeCommand(redisClient *c) {
246 : int j;
247 :
248 13 : for (j = 1; j < c->argc; j++)
249 8 : pubsubSubscribePattern(c,c->argv[j]);
250 5 : }
251 :
252 4 : void punsubscribeCommand(redisClient *c) {
253 4 : if (c->argc == 1) {
254 1 : pubsubUnsubscribeAllPatterns(c,1);
255 1 : return;
256 : } else {
257 : int j;
258 :
259 8 : for (j = 1; j < c->argc; j++)
260 5 : pubsubUnsubscribePattern(c,c->argv[j],1);
261 : }
262 : }
263 :
264 75381 : void publishCommand(redisClient *c) {
265 75381 : int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
266 75381 : if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
267 75381 : addReplyLongLong(c,receivers);
268 75381 : }
|