LCOV - code coverage report
Current view: directory - redis/src - pubsub.c (source / functions) Found Hit Coverage
Test: redis.info Lines: 150 150 100.0 %
Date: 2012-04-04 Functions: 14 14 100.0 %
Colors: not hit hit

       1                 : #include "redis.h"
       2                 : 
       3                 : /*-----------------------------------------------------------------------------
       4                 :  * Pubsub low level API
       5                 :  *----------------------------------------------------------------------------*/
       6                 : 
       7               8 : void freePubsubPattern(void *p) {
       8               8 :     pubsubPattern *pat = p;
       9                 : 
      10               8 :     decrRefCount(pat->pattern);
      11               8 :     zfree(pat);
      12               8 : }
      13                 : 
      14               8 : int listMatchPubsubPattern(void *a, void *b) {
      15               8 :     pubsubPattern *pa = a, *pb = b;
      16                 : 
      17              16 :     return (pa->client == pb->client) &&
      18               8 :            (equalStringObjects(pa->pattern,pb->pattern));
      19                 : }
      20                 : 
      21                 : /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
      22                 :  * 0 if the client was already subscribed to that channel. */
      23              14 : int pubsubSubscribeChannel(redisClient *c, robj *channel) {
      24                 :     struct dictEntry *de;
      25              14 :     list *clients = NULL;
      26              14 :     int retval = 0;
      27                 : 
      28                 :     /* Add the channel to the client -> channels hash table */
      29              14 :     if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
      30              12 :         retval = 1;
      31              12 :         incrRefCount(channel);
      32                 :         /* Add the client to the channel -> list of clients hash table */
      33              12 :         de = dictFind(server.pubsub_channels,channel);
      34              12 :         if (de == NULL) {
      35              11 :             clients = listCreate();
      36              11 :             dictAdd(server.pubsub_channels,channel,clients);
      37              11 :             incrRefCount(channel);
      38                 :         } else {
      39               1 :             clients = dictGetVal(de);
      40                 :         }
      41              12 :         listAddNodeTail(clients,c);
      42                 :     }
      43                 :     /* Notify the client */
      44              14 :     addReply(c,shared.mbulkhdr[3]);
      45              14 :     addReply(c,shared.subscribebulk);
      46              14 :     addReplyBulk(c,channel);
      47              14 :     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
      48              14 :     return retval;
      49                 : }
      50                 : 
      51                 : /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
      52                 :  * 0 if the client was not subscribed to the specified channel. */
      53              15 : int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
      54                 :     struct dictEntry *de;
      55                 :     list *clients;
      56                 :     listNode *ln;
      57              15 :     int retval = 0;
      58                 : 
      59                 :     /* Remove the channel from the client -> channels hash table */
      60              15 :     incrRefCount(channel); /* channel may be just a pointer to the same object
      61                 :                             we have in the hash tables. Protect it... */
      62              15 :     if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
      63              12 :         retval = 1;
      64                 :         /* Remove the client from the channel -> clients list hash table */
      65              12 :         de = dictFind(server.pubsub_channels,channel);
      66              12 :         redisAssertWithInfo(c,NULL,de != NULL);
      67              12 :         clients = dictGetVal(de);
      68              12 :         ln = listSearchKey(clients,c);
      69              12 :         redisAssertWithInfo(c,NULL,ln != NULL);
      70              12 :         listDelNode(clients,ln);
      71              12 :         if (listLength(clients) == 0) {
      72                 :             /* Free the list and associated hash entry at all if this was
      73                 :              * the latest client, so that it will be possible to abuse
      74                 :              * Redis PUBSUB creating millions of channels. */
      75              11 :             dictDelete(server.pubsub_channels,channel);
      76                 :         }
      77                 :     }
      78                 :     /* Notify the client */
      79              15 :     if (notify) {
      80               8 :         addReply(c,shared.mbulkhdr[3]);
      81               8 :         addReply(c,shared.unsubscribebulk);
      82               8 :         addReplyBulk(c,channel);
      83              16 :         addReplyLongLong(c,dictSize(c->pubsub_channels)+
      84               8 :                        listLength(c->pubsub_patterns));
      85                 : 
      86                 :     }
      87              15 :     decrRefCount(channel); /* it is finally safe to release it */
      88              15 :     return retval;
      89                 : }
      90                 : 
      91                 : /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
      92               8 : int pubsubSubscribePattern(redisClient *c, robj *pattern) {
      93               8 :     int retval = 0;
      94                 : 
      95               8 :     if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
      96               8 :         retval = 1;
      97                 :         pubsubPattern *pat;
      98               8 :         listAddNodeTail(c->pubsub_patterns,pattern);
      99               8 :         incrRefCount(pattern);
     100               8 :         pat = zmalloc(sizeof(*pat));
     101               8 :         pat->pattern = getDecodedObject(pattern);
     102               8 :         pat->client = c;
     103               8 :         listAddNodeTail(server.pubsub_patterns,pat);
     104                 :     }
     105                 :     /* Notify the client */
     106               8 :     addReply(c,shared.mbulkhdr[3]);
     107               8 :     addReply(c,shared.psubscribebulk);
     108               8 :     addReplyBulk(c,pattern);
     109               8 :     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
     110               8 :     return retval;
     111                 : }
     112                 : 
     113                 : /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
     114                 :  * 0 if the client was not subscribed to the specified channel. */
     115              11 : int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
     116                 :     listNode *ln;
     117                 :     pubsubPattern pat;
     118              11 :     int retval = 0;
     119                 : 
     120              11 :     incrRefCount(pattern); /* Protect the object. May be the same we remove */
     121              11 :     if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
     122               8 :         retval = 1;
     123               8 :         listDelNode(c->pubsub_patterns,ln);
     124               8 :         pat.client = c;
     125               8 :         pat.pattern = pattern;
     126               8 :         ln = listSearchKey(server.pubsub_patterns,&pat);
     127               8 :         listDelNode(server.pubsub_patterns,ln);
     128                 :     }
     129                 :     /* Notify the client */
     130              11 :     if (notify) {
     131               8 :         addReply(c,shared.mbulkhdr[3]);
     132               8 :         addReply(c,shared.punsubscribebulk);
     133               8 :         addReplyBulk(c,pattern);
     134              16 :         addReplyLongLong(c,dictSize(c->pubsub_channels)+
     135               8 :                        listLength(c->pubsub_patterns));
     136                 :     }
     137              11 :     decrRefCount(pattern);
     138              11 :     return retval;
     139                 : }
     140                 : 
     141                 : /* Unsubscribe from all the channels. Return the number of channels the
     142                 :  * client was subscribed from. */
     143              97 : int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
     144              97 :     dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
     145                 :     dictEntry *de;
     146              97 :     int count = 0;
     147                 : 
     148             204 :     while((de = dictNext(di)) != NULL) {
     149              10 :         robj *channel = dictGetKey(de);
     150                 : 
     151              10 :         count += pubsubUnsubscribeChannel(c,channel,notify);
     152                 :     }
     153              97 :     dictReleaseIterator(di);
     154              97 :     return count;
     155                 : }
     156                 : 
     157                 : /* Unsubscribe from all the patterns. Return the number of patterns the
     158                 :  * client was subscribed from. */
     159              97 : int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
     160                 :     listNode *ln;
     161                 :     listIter li;
     162              97 :     int count = 0;
     163                 : 
     164              97 :     listRewind(c->pubsub_patterns,&li);
     165             200 :     while ((ln = listNext(&li)) != NULL) {
     166               6 :         robj *pattern = ln->value;
     167                 : 
     168               6 :         count += pubsubUnsubscribePattern(c,pattern,notify);
     169                 :     }
     170              97 :     return count;
     171                 : }
     172                 : 
     173                 : /* Publish a message */
     174           75381 : int pubsubPublishMessage(robj *channel, robj *message) {
     175           75381 :     int receivers = 0;
     176                 :     struct dictEntry *de;
     177                 :     listNode *ln;
     178                 :     listIter li;
     179                 : 
     180                 :     /* Send to clients listening for that channel */
     181           75381 :     de = dictFind(server.pubsub_channels,channel);
     182           75381 :     if (de) {
     183           75361 :         list *list = dictGetVal(de);
     184                 :         listNode *ln;
     185                 :         listIter li;
     186                 : 
     187           75361 :         listRewind(list,&li);
     188          226084 :         while ((ln = listNext(&li)) != NULL) {
     189           75362 :             redisClient *c = ln->value;
     190                 : 
     191           75362 :             addReply(c,shared.mbulkhdr[3]);
     192           75362 :             addReply(c,shared.messagebulk);
     193           75362 :             addReplyBulk(c,channel);
     194           75362 :             addReplyBulk(c,message);
     195           75362 :             receivers++;
     196                 :         }
     197                 :     }
     198                 :     /* Send to clients listening to matching channels */
     199           75381 :     if (listLength(server.pubsub_patterns)) {
     200               9 :         listRewind(server.pubsub_patterns,&li);
     201               9 :         channel = getDecodedObject(channel);
     202              33 :         while ((ln = listNext(&li)) != NULL) {
     203              15 :             pubsubPattern *pat = ln->value;
     204                 : 
     205              30 :             if (stringmatchlen((char*)pat->pattern->ptr,
     206              15 :                                 sdslen(pat->pattern->ptr),
     207                 :                                 (char*)channel->ptr,
     208              15 :                                 sdslen(channel->ptr),0)) {
     209               6 :                 addReply(pat->client,shared.mbulkhdr[4]);
     210               6 :                 addReply(pat->client,shared.pmessagebulk);
     211               6 :                 addReplyBulk(pat->client,pat->pattern);
     212               6 :                 addReplyBulk(pat->client,channel);
     213               6 :                 addReplyBulk(pat->client,message);
     214               6 :                 receivers++;
     215                 :             }
     216                 :         }
     217               9 :         decrRefCount(channel);
     218                 :     }
     219           75381 :     return receivers;
     220                 : }
     221                 : 
     222                 : /*-----------------------------------------------------------------------------
     223                 :  * Pubsub commands implementation
     224                 :  *----------------------------------------------------------------------------*/
     225                 : 
     226               9 : void subscribeCommand(redisClient *c) {
     227                 :     int j;
     228                 : 
     229              23 :     for (j = 1; j < c->argc; j++)
     230              14 :         pubsubSubscribeChannel(c,c->argv[j]);
     231               9 : }
     232                 : 
     233               4 : void unsubscribeCommand(redisClient *c) {
     234               4 :     if (c->argc == 1) {
     235               1 :         pubsubUnsubscribeAllChannels(c,1);
     236               1 :         return;
     237                 :     } else {
     238                 :         int j;
     239                 : 
     240               8 :         for (j = 1; j < c->argc; j++)
     241               5 :             pubsubUnsubscribeChannel(c,c->argv[j],1);
     242                 :     }
     243                 : }
     244                 : 
     245               5 : void psubscribeCommand(redisClient *c) {
     246                 :     int j;
     247                 : 
     248              13 :     for (j = 1; j < c->argc; j++)
     249               8 :         pubsubSubscribePattern(c,c->argv[j]);
     250               5 : }
     251                 : 
     252               4 : void punsubscribeCommand(redisClient *c) {
     253               4 :     if (c->argc == 1) {
     254               1 :         pubsubUnsubscribeAllPatterns(c,1);
     255               1 :         return;
     256                 :     } else {
     257                 :         int j;
     258                 : 
     259               8 :         for (j = 1; j < c->argc; j++)
     260               5 :             pubsubUnsubscribePattern(c,c->argv[j],1);
     261                 :     }
     262                 : }
     263                 : 
     264           75381 : void publishCommand(redisClient *c) {
     265           75381 :     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
     266           75381 :     if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
     267           75381 :     addReplyLongLong(c,receivers);
     268           75381 : }

Generated by: LCOV version 1.7