antirez 442 days ago. 98522 views.
EDIT: In case you missed it, Disque source code is now available at http://github.com/antirez/disque

It is a few months that I spend ~ 15-20% of my time, mostly hours stolen to nights and weekends, working to a new system. It’s a message broker and it’s called Disque. I’ve an implementation of 80% of what was in the original specification, but still I don’t feel like it’s ready to be released. Since I can’t ship, I’ll at least blog… so that’s the story of how it started and a few details about what it is.

~ First steps ~

Many developers use Redis as a message queue, often wrappered via some library abstracting away Redis low level primitives, other times directly building a simple, ad-hoc queue, using the Redis raw API. This use case is covered mainly using blocking list operations, and list push operations. Redis apparently is at the same time the best and the worst system to use like that. It’s good because it is fast, easy to inspect, deploy and use, and in many environments it was already one piece of the infrastructure. However it has disadvantages because Redis mutable data structures are very different than immutable messages. Redis HA / Cluster tradeoffs are totally biased towards large mutable values, but the same tradeoffs are not the best ones to deal with messages.

One thing that is important to guarantee for a message broker is that a message is delivered either at least one time, or at most one time. In short given that to guarantee an exact single delivery of a message (where for delivery we intent a message that was received *and* processed by a worker) is practically impossible, the choices are that the message broker is able to guarantee either 0 or 1 deliveries, or 1 to infinite deliveries. This is often referred as at-most-once semantics, and at-least-once semantics. There are use cases for the first, but the most interesting and practical semantics is the latter, that is, to guarantee that a message is delivered at least one time, and deliver multiple times if there are failures.

So a few months ago I started to think at some client-side protocol to use a set of Redis masters (without replication or clustering whatsoever) in a way that provides these guarantees. Sometimes with small changes in the way Redis is used for an use case, it is possible to end with a better system. For example for distributed locks I tried to document an algorithm which is trivial to implement but more robust than the single-instance + failover implementation (http://redis.io/topics/distlock).

However after a few days of work my design draft suggested that it was a better bet to design an ad-hoc system, since the client-side algorithm ended being too complex, non optimal, and certain things I absolutely wanted were impossible or very hard to do. To add more things to Redis sounded like a bad idea, it does a lot of things already, and to cover messaging well I needed things which are very different than the way Redis operates. But why to design a new system given that the world is full of message brokers? Because an impressive number of users were using Redis instead of systems specifically designed for this goal, and this was strange. A few can be wrong, but so many need to get some reason. Maybe Redis low barrier of entry, easy API, speed, were not what most people were accustomed to when they looked at the message brokers landscape. It seems populated by solutions that are either too simple, asking the application to do too much, or too complex, but super full featured. Maybe there is some space for the “Redis of messaging”?

~ Redis brutally forked ~

For the first time in my life I didn’t started straight away to write code. For weeks I looked at the design from time to time, converted it into a new system and not a Redis client library, and tried to understand, as an user, what would make me very happy in a message broker. The original use case remained the same: delayed jobs. Disque is a general system, but 90% of times in the design the “reference” was an user that has to solve the problem of sending messages that are likely jobs to process. If something was against this use case, it was removed.

When the design was ready, I finally started to code. But where to start? “vi main.c”? Fortunately Redis is, in part, a framework to write distributed systems in C. I had a protocol, network libraries, clients handling, node-to-node message bus. To rewrite all this from scratch sounded like a huge waste. At the same time I wanted Disque to be able to completely diverge from Redis in any details possible if this is needed, and I wanted it to be a side project without impacts on Redis itself. So instead of trying the huge undertake of splitting Redis into an actual separated framework, and the Redis implementation, I took a more pragmatic approach: I forked the code, and removed everything that was Redis specific from the source code, in order to end with a skeleton. At this point I was ready to implement my specification.

~ What is Disque? ~

After a few months of very non intense work and just 200 commits I’ve finally a system that no longer looks like a toy: it looked like a toy for many weeks so I was afraid of even talking about it, since the probability of me just deleting the source tree was big. Now that most of the idea is working code with tests, I’m finally sure this will be released in the future, and to talk about the tradeoffs I took in the design.

Disque is a distributed system, by default. Since it is an AP system, it made no sense to have like in Redis a single-node mode and a distributed mode. A single Disque node is just a particular case of a cluster, having just one node.
So this was of the important points in the design: fault tolerant, resistant to partitions, and available no matter how many nodes are still up, aka AP. I also wanted a system that was inherently able to scale in different scenarios, both when the issue is many producers and consumers with many queues, and when instead all this producers and consumers are all focusing on a single queue, that may be distributed into multiple nodes.

My requirements were telling me aloud one thing… that Disque was going to make a big design sacrifice. Message ordering. Disque only provides best-effort ordering. However because of this sacrifice, there is a lot to gain… tradeoffs are interesting since sometimes they totally open the design space.

I could continue recounting you what Disque is like that, however a few months ago I saw a comment in Hacker News, written by Jacques Chester, see https://news.ycombinator.com/item?id=8709146 [EDIT: SORRY I made an error cut&pasting the wrong name of Adrian (Hi Adrian, sorry for misquoting you!)]. Jacques, that happens to work for Pivotal like me, was commenting how different messaging systems have very different set of features, properties, and without the details it is almost impossible to evaluate the different choices, and to evaluate if one is faster than the other because it has a better implementation, or simple offers a lot less guarantees. So he wrote a set of questions one should ask when evaluating a messaging system. I’ll use his questions, and add a few more, in order to describe what Disque is, in the hope that I don’t end just hand waving, but providing some actual information.

Q: Are messages delivered at least once?

In Disque you can chose at least once delivery (the default), or at most once delivery. This property can be set per message. At most once delivery is just a special case of at least once delivery, setting the “retry” parameter of the message to 0, and replicating the message to a single node.

Q: Are messages acknowledged by consumers?

Yes, the only way for a consumer to tell the system the message got delivered correctly, is to acknowledge it.

Q: Are messages delivered multiple times if not acknowledged?

Yes, Disque will automatically deliver the message again, after a “retry” time, forever (up to the max TTL time for the message).
When messages are acknowledged, the acknowledge is propagated to the nodes having a copy of the message. If the system believes everybody was reached, the message is finally garbage collected and removed. Acknowledged messages are also evicted during memory pressure.

Nodes run a best-effort algorithm to avoid to queue the same message multiple times, in order to approximate single delivery better. However during failures, multiple nodes may re-deliver the same message multiple times at the same time.

Q: Is queueing durable or ephemeral.


Q: Is durability achieved by writing every message to disk first, or by replicating messages across servers?

By default Disque runs in-memory only, and uses synchronous replication to achieve durability (however you can ask, per message, to use asynchronous replication). It is possible to turn AOF (similarly to Redis) if desired, if the setup is likely to see a mass-reboot or alike. When the system is upgraded it is possible to write the AOF on disk just for the upgrade in order to don’t lose the state after a restart even if normally disk persistence is not used.

Q: Is queueing partially/totally consistent across a group of servers or divided up for maximal throughput?

Divided up for throughput, however message ordering is preserved in a best-effort way. Each message has an immutable “ctime” which is a wall-clock milliseconds timestamp plus an incremental ID for messages generated in the same millisecond. Nodes use this ctime in order to sort messages for delivery.

Q: Can messages be dropped entirely under pressure? (aka best effort)

No, however new messages may be refused if there is no space in memory. When 75% of memory is in use, nodes receiving messages try to externally replicate them, just to outer nodes, without taking a copy, but it many not work if also the other nodes are in an out of memory condition.

Q: Can consumers and producers look into the queue, or is it totally opaque?

There are commands to “PEEK” into queues.

Q: Is queueing unordered, FIFO or prioritised?

Best-effort FIFO-ish as explained.

Q: Is there a broker or no broker?

Broker as a set of masters. Clients can talk to whatever node they want.

Q: Does the broker own independent, named queues (topics, routes etc) or do producers and consumers need to coordinate their connections?

Named queues. Producers and consumers does not need to coordinate, since nodes use federation to discover routes inside the cluster and pass messages as they are needed by consumers. However the client is provided with hints in case it is willing to relocate where more consumers are.

Q: Is message posting transactional?

Yes, once the command to add a message returns, the system guarantees that there are the desired number of copies inside the cluster.

Q: Is message receiving transactional?

I guess not, since Disque will try to deliver the same message again if not acknowledged.

Q: Do consumers block on receive or can they check for new messages?

Both behaviors are supported, by default it blocks.

Q: Do producers block on send or can they check for queue fullness? 

The producer may ask to get an error when adding a new message if the message length is already greater than a specified value in the local node it is pushing the message.

Moreover the producer may ask to replicate the message asynchronously if it want to run away ASAP and let the cluster replicate the message in a best-effort way.

There is no way to block the consumer if there are too many messages in the queue, and unblock it as soon as there are less messages.

Q: Are delayed jobs supported?

Yes, with second granularity, up to years. However they’ll use memory.

Q: Can consumers and producers connect to different nodes?


I hope with this post Disque is a bit less vaporware. Sure, without looking at the code it is hard to tell, but if your best feature is out you can already complain at least.
How much of the above is already implemented and working well? Everything but AOF disk persistence, and a few minor things I want to refine in the API, so first release should not be too far, but working at it so rarely it is hard to get super fast.
blog comments powered by Disqus