Grokking NATS Consumers: Push-based queue groups

In the previous article I provided a quick intro to JetStream and unpacked some of the mystery of push-based consumer configuration and behavior. This is the second part which will cover push-based queue groups.

As an FYI, the terms "consumer group", "deliver group", or just "group" may be used which all refer to a queue group.

Why do we need queue groups?

There are two problems queue groups solve. The first, which core NATS and JetStream supports, is the need for load balancing and scaling message processing.

Credit: NATS.io website

The default behavior of NATS is to broadcast published messages to interested subscribers as quickly as possible. In a queue group setup, it treats the group as one logical subscriber and distributes the messages across each member of the group (rather than giving each one a copy).

Another framing of a queue group is that it is a form of high availability (HA) for subscription. Members can cleanly disconnect and reconnect or new members can join without interrupting the processing (as long as there is at least one connected). One could even emulate an auto-scaling kind of behavior.

One great use case for core NATS queue groups are services where clients send a request and expect a reply. A queue group can be setup with as many members as needed for the scale.

Why wouldn't we always define subscriptions as queue groups? The main reason is that concurrent message processing gives up message order. For eventing/streaming use cases, messages may need to be processed in the order they were published.

JetStream consumer benefits

If core NATS supports queue groups, what advantages does JetStream provide? There are three:

  • Message persistence
  • At-least-once delivery
  • Ordered processing supported

The first two points were discussed in the previous article, but I will frame them to contrast with core NATS queue groups.

Message persistence

This once is fairly obvious since we know that all messages published to the subject space bound to a stream are persisted. If consumers are not setup or unavailable, these messages will remain (adhering to the stream retention policy). This provides temporal decoupling between streams and consumers.

At-least-once delivery

The happy path for a (core NATS) queue group is that there are no abrupt disconnections with members, all messages are being processed sufficiently quick to keep up with published messages, and there are no unrecoverable errors during message processing.

If you can guarantee all these things in your application, you won't drop any messages then core NATS queue groups would be sufficient.

But, of course I am being facetious. Except maybe on a local network with trivial processing logic, it is nearly impossible to make these guarantees. 😅 The question is what is the probability of one of these failure cases happening and whether your use case can tolerate this risk.

The first guarantee I stated is a fallacy. The network is not reliable, disconnections will happen. NATS clients do automatically reconnect, but any messages that were in-flight/being processed by a particular subscriber may be dropped.

The second guarantee can be achieved from a processing logic standpoint, however "processing sufficiently quick" needs to take into account end-to-end latency, include network delays. NATS identifies slow consumers and will automatically disconnect them to protect itself. Although this is generally a clean disconnect, it is unclear whether the messages in-flight to that subscription have been dropped or not.

The third guarantee could also be achieved (or at least with high confidence) assuming the message schema/structure is known and won't change unexpectedly and processing doesn't do any I/O (disk or network). 😏 The processing logic can do its own recovery/retries, but it needs to do something since the message won't be redelivered.

So how does JetStream support these failure cases? It embraces the fact that message redelivery will need to happen at some point. Message acknowledgements are used to tell the server that a message has been successfully processed.

For all the scenarios above, messages wouldn't be dropped. Instead, the semantics are that each message that is in-flight has a timer set with the ack wait time. If a subscriber acks a message, the server clears the message timer and it won't be redelivered. However, if the timer run outs, the server will redeliver the message to one of the members of the queue group (possibly the same one that didn't ack the first time).

What this means is that regardless of how or why the ack wait time has been exceeded (network, processing time, errors, etc), the server will always redeliver until it is ack'ed (or the max deliver attempts has been met).

It is important to note that if a message has been processed, but the ack was not received by the server (again.. network), the server would redeliver. NATS supports windowed deduplication on the publisher side to provide some guarantees around publisher retries. However, it is on the consumer to handle deduplication or, better yet, implement idempotent processing, if an unexpected redelivery occurs.

Ordered processing supported

As noted above, a trade-off with queue groups is that the publish order of messages is lost since messages are processed concurrently. This is true by default for JetStream consumers, however the max ack pending configuration can be set to one. This means that only one message will ever be in-flight at a given time across the queue group.

js.QueueSubscribeSync(
    "events.>",
    "db-writer",
    nats.MaxAckPending(1),
)
This method will be introduced in more detail below..

Doing this with a queue group makes the processing highly available rather than relying on a single subscription. If any of the members disconnect or fail, other members will continue to receive messages in order.

Setting up a queue group

Like in part one we will setup a test stream and populate it with 1000 events:

js.AddStream(&nats.StreamConfig{
  Name:     "events",
  Subjects: []string{"events.>"},
})
Assume this is a generic stream accreting domain events from an application.

To create a queue group, we will use the analogous method to SubscribeSync:

js.QueueSubscribeSync(
  "events.*.1",
  "db-writer",
)

The second argument db-writer is the name of the group (let's assume this consumer is taking events and writing them to a database to support ad-hoc querying). Like with single subscription consumers, there can many queue group consumers created having distinct names.

Now that these are setup, let's checkout the consumer info:

{
 "stream_name": "events",
 "name": "db-writer",
 "created": "2022-01-21T11:52:23.267086Z",
 "config": {
  "durable_name": "db-writer",
  "deliver_subject": "_INBOX.5KNAwtzQRkLE3QRZpqcdZI",
  "deliver_group": "db-writer",
  "deliver_policy": "all",
  "ack_policy": "explicit",
  "ack_wait": 30000000000,
  "max_deliver": -1,
  "filter_subject": "events.*.1",
  "replay_policy": "instant",
  "max_ack_pending": 65536
 },
 "delivered": {
  "consumer_seq": 361,
  "stream_seq": 491,
  "last_active": "2022-01-21T11:52:23.268503Z"
 },
 "ack_floor": {
  "consumer_seq": 0,
  "stream_seq": 0
 },
 "num_ack_pending": 361,
 "num_redelivered": 0,
 "num_waiting": 0,
 "num_pending": 373,
 "cluster": {
  "leader": "NBOQU6CZ4DQM7AIX5E5QBQ3NQ5BIDVW6QY6BVYYCUE3T7AZWFEEYALJM"
 },
 "push_bound": true
}

In part one, we enumerated nearly all of these fields, so if you need a refresher please refer to this section of part one. We do however, have a new field to look at!

config.deliver_group is the name of the group we defined above. The default behavior of the method QueueSubscribeSync is to use the group name as durable name (and thus the consumer name) as well.

Wait a minute. 🧐 Why is the durable name set?

Recall that when we used SubscribeSync in part one, this created an ephemeral consumer. The consumer was given a random name and no durable name was set. When the subscription unsubscribed, the corresponding consumer was deleted.

Why isn't the behavior the same for QueueSubscribeSync?

Since the expected usage is for a group of subscribers, we need to know what the name of the consumer is (rather than a randomly generated one). This allows for multiple instances of js.QueueSubscribeSync("events.*.1", "db-writer") to be setup. The consumer will be created transparently on the first call, while the rest simply bind to the existing consumer.

Another reason is that if two members subscribed and one unsubscribed, we wouldn't want one of the member unsubscribing and deleting the consumer while others are still connected.

For these reasons, the behavior is different so consumers aren't deleted unexpectedly while still in use.

Explicit configuration

Setting up an explicit queue group consumer is just as simple as the non-group version. The only addition is the DeliverGroup field.

// Create the consumer with the same config options as above.
js.AddConsumer("events", &nats.ConsumerConfig{
  Durable: "db-writer",
  DeliverSubject: "_INBOX.5KNAwtzQRkLE3QRZpqcdZI",
  DeliverGroup: "db-writer",
  DeliverPolicy: nats.DeliverAllPolicy,
  AckPolicy: nats.AckExplicitPolicy,
  AckWait: 30*time.Second,
  MaxDeliver: -1,
  FilterSubject: "events.*.1",
  ReplayPolicy: nats.ReplayInstantPolicy,
  MaxAckPending: 65536,
})

How do we setup subscriptions for it now? As mentioned above, core NATS also supports queue groups and has the corresponding subscription methods.

// Create a subscription with the deliver subject and queue name.
nc.QueueSubscribeSync(
  "_INBOX.5KNAwtzQRkLE3QRZpqcdZI",
  "db-writer",
)

Multiple threads/processes/containers can now be deployed with this subscription and NATS will transparently distribute messages to all members of the group. Likewise, any can be removed and the remaining members will get the messages. And since this queue group is backed by a persistent stream, we can guarantee messages won't be dropped!

Takeaways

  • Core NATS queue groups are ideal for service deployments (request-reply interactions) since they are the low-latency and highly available. Requests are often retried by clients if a time out occurs, so dropped messages are not really an issue for this use case.
  • JetStream queue groups are ideal for queuing and streaming workloads where we've accepted a message/unit of work from a client. We need to persist it and ensure the message has been processed with respect to the stream's retention policy.
  • Use MaxAckPending(1) in a JetStream queue group to enforce ordered delivery/processing of messages.
  • It is possible for a message to be redelivered if ack to the server failed for some reason (this varies based on your network stability). Where possible implement message processing to be idempotent.

Thanks for reading! In the next post, we will move on to pull-based consumers.

Please reach out on Slack or Twitter if you have questions or find an issue with the content so I can correct it.