Grokking NATS Consumers: Pull-based

In part one and two of this series, we studied the behavior of push-based consumers and how their configuration may impact this behavior. In this post we are going to contrast them with pull-based consumers, specifically highlighting the subtle behavioral differences as well as the set of use cases each consumer type is appropriate for.

Control your flow

Recall that messages are pushed (buffered) to a subscriber of a push-based consumer even before NextMsg is called. The number of messages are determined by the MaxAckPending value set on the consumer. As messages are ack'ed more messages are pushed automatically to the subscriber. Likewise, if AckWait is exceeded for a message that has been sent to the subscriber, the server will redeliver that message. This can lead to some confusing outcomes including duplicate and out-of-order messages if these behaviors are not well understood.

In contrast, a pull-based consumer allows subscribers to fetch N messages at their own pace. This could be one message at a time, 100 or seven. The N could change on every fetch. The subscriber is in control of how many messages it wants to handle rather than the server proactively pushing them.

We can see this non-push behavior by inspecting our consumer info like we had done in the previous two parts. We can create a pull consumer implicitly using the JetStream convenience method.

sub, _ := js.PullSubscribe("events.*.1", "pull-consumer")

In part one we already had defined a stream of events with four kinds of subjects. All of the consumer examples have matched this specific subject pattern. The second argument here is a durable name. ๐Ÿค” We will come back to this..

For completeness, here is the explicit way using js.AddConsumer.

js.AddConsumer("events", &nats.ConsumerConfig{
  Durable:       "pull-consumer",
  FilterSubject: "events.*.1",
  AckPolicy:     nats.AckExplicitPolicy,
})

Here is the resulting consumer info:

{
 "stream_name": "events",
 "name": "pull-consumer",
 "created": "2022-01-27T12:28:54.149443Z",
 "config": {
  "durable_name": "pull-consumer",
  "deliver_policy": "all",
  "ack_policy": "explicit",
  "ack_wait": 30000000000,
  "max_deliver": -1,
  "filter_subject": "events.*.1",
  "replay_policy": "instant",
  "max_waiting": 512,
  "max_ack_pending": 65536
 },
 "delivered": {
  "consumer_seq": 0,
  "stream_seq": 0
 },
 "ack_floor": {
  "consumer_seq": 0,
  "stream_seq": 0
 },
 "num_ack_pending": 0,
 "num_redelivered": 0,
 "num_waiting": 0,
 "num_pending": 734,
 "cluster": {
  "leader": "NCD6JGBOEQVGIBOBXOHOS2BEHBSN57NC33HXZPXTBFASSNQIC4RRGCYT"
 }
}

Much of this will look similar to the push-based consumer info, so I will highlight the main differences.

First and importantly, nothing has been delivered! delivered.consumer_seq and delivered.stream_seq are zero because we have not fetched any messages with our subscription yet.

There is no config.deliver_subject. How can this be? Each call to sub.Fetch sends an explicit request to the server and then the server replies back over an internal reply subject (like the standard request-reply pattern) with metadata and the batch of messages. There is no need to explicitly set a deliver subject like with a push consumer.

There is a new field present, we had not seen in push consumers, config.max_waiting. This one is a bit subtle, but it defines the maximum number of in-flight fetch requests for the consumer (not the number of in-flight messages being fetched). ๐Ÿคจ You may be thinking "why/how would I have more than one call to Fetch at a time?"

Implicit queue groups

Since it is the subscribers who are fetching messages rather than the server pushing them, the control of message distribution is on the subscribers. Specifically, multiple subscriptions can be bound to the same pull-consumer to fetch messages concurrently.

Since we have our first subscription created above sub and assuming the stream is populated, we can fetch some messages (error handling ignored for brevity).

msgs, _ := sub.Fetch(5)

for _, msg := range msgs {
  msg.Ack()
  meta, _ := msg.Metadata()
  log.Printf(
    "got msg %d/%d on subject %s",
    meta.Sequence.Stream,
    meta.Sequence.Consumer,
    msg.Subject,
  )
}

We are simply ack'ing the message, extracting the metadata from the header and printing out sequence and subject.

got msg 1/1 on subject events.a.1
got msg 3/2 on subject events.a.1
got msg 4/3 on subject events.b.1
got msg 5/4 on subject events.a.1
got msg 6/5 on subject events.c.1

We can see all the subjects match the expected filter. Message with stream sequence 2 appears to have been filtered out.

Let's create another subscription and do the same thing:

sub2, _ := js.PullSubscribe("events.*.1", "pull-consumer")

If we fetch another 5 messages like above, what do we expect?

got msg 7/6 on subject events.a.1
got msg 9/7 on subject events.a.1
got msg 10/8 on subject events.b.1
got msg 11/9 on subject events.a.1
got msg 13/10 on subject events.a.1

There was no need to declare that these subscriptions formed a queue group up front, like we needed to do with the push consumer. Since subscribers bound to a pull consumer dictate the control flow, consumption can be scaled up (down) simply by changing the number of subscribers. There is no special requirement of a "queue group" for pull consumers.

Batch size and timeout

In the above example, I glossed over the Fetch method which takes a batch size. This is the maximum set of messages the subscriber will receive if available on the server. This means that even if only one message is available, the fetch will return. If zero messages are available, the Fetch call will block waiting up to the MaxWait time (default is 3 seconds). To customize the MaxWait time, you can pass it as a FetchOpt:

msgs, err := js.Fetch(5, nats.MaxWait(time.Second))

So then what is the best batch size? Since the call will not block even if only one message is available, you need to decide the maximum number of messages you can process in one fetch, ensuring that you can ack all of them within the AckWait window (default is 30 seconds).

On the other extreme, if the messages coming in exceed what a single subscriber can reasonably process (while maintaining sufficient latency), then it straightforward to simply deploy more subscribers bound to the pull consumer (as noted above). Each subscriber will then fetch separate batches of messages and process them concurrently which would increase the throughput by some factor.

As a reminder, although the messages in a given batch will remain ordered, processing of those messages across batches has no ordering guarantees since subscribers will be operating independently.

Which consumer type is better?

Well, as with most things in software design.. it depends ๐Ÿ˜† Having worked with pull consumers more recently, the API is easier to use, the control over when messages are requested is more predictable (and comforting), and the implicit ability to scale is wonderful to have.

It is important to note that a push consumer as a queue group with explicit ack is functionally the same as a pull consumer, however the control flow is different. It is different enough that the latter is more optimized and will use less resources (both server and client).

For most use cases where control flow matters or subscriptions are expected to be long-lived, pull consumers should be used.

So when are push consumers useful? For short-lived consumers, reading short segments of a stream, or a loose ack requirement on subscribers, push consumers are a good fit.

As an anecdote, I am currently implementing an event store on top of NATS leveraging streams for storage and the KV layer for snapshots. The standard replay operation is to get the latest snapshot of some state given an ID, and fetch the events that occurred after the snapshot was taken (which would occur when there are multiple concurrent actors). Typically this could be a handful of events depending on how active the entity is and the snapshot frequency. But in this case an ephemeral, ordered push consumer is perfect for reading these short sequence of events.

In all other cases, I would reach for a pull consumer by default.

Takeaways

  • Pull consumers invert the control of flow by allowing a subscriber to fetch messages rather than having them implicitly pushed.
  • Scaling pull consumers is implicit, rather than requiring a distinct queue group like with push consumers. Simply create/deploy additional pull subscribers with the same durable name.
  • The Fetch batch size is the maximum number of messages you want to handle in a single call. The call only blocks if zero messages are available.
  • Default to using pull consumers as they are easier to use and understand. Choose push consumers for very specific, short-lived or less-control-flow-tolerant use cases.

Special thanks to Jean-Noel Moyne of the NATS core team for providing feedback and improvements to a draft of this post.


Please reach out on Slack or Twitter if you have questions or find an issue with the content so I can correct it.