Grokking NATS Consumers: Intro and push-based
The NATS team introduced the first preview of JetStream in early 2020. It became generally available roughly one year later in March 2021 as part of the NATS server 2.2.0 release.
A primary attraction to NATS is its simplicity, in its protocol, usage, and operations. The core of NATS is a publish-subscribe messaging system with at-most-once delivery semantics. JetStream adds persistence to these messages providing at-least-once semantics.
New primitives
The two new primitives that were introduced are streams and consumers. The linked docs provide much more comprehensive explanation of each concept, but in a nutshell, each stream is assigned a set of subjects and is responsible for storing the messages in the order they are received. Consumers act like stateful views on top of the streams and can specify the subset of the subjects the consumer is interested in or how subscribers are expected to consume the messages.
This post is not about streams, since I personally think configuring them is straightforward (but happy to do a deep dive on those as well!). However, it is about consumers since I have been confused a handful of times with how to properly set them up. I don't know if this is inherent in the design of the consumer management protocol or how the client API exposes the protocol. In my case, I use Go which is the reference implementation.
I am writing this post as an excuse to explore them deeper and (hopefully) debunk my current assumption that the API is confusing. It is quite possible clarity will ensue by the end. If that is the case, then I hope it is clear to you as well (and worth the read).
Review of subscriptions
NATS already has this notion of a subscription and the act of subscribing to a subject. Obviously this comes from the publish-subscribe language. Literally, the only thing you need to create a subscription is the subject and the message handler.
// Create async subscription.
sub1, _ := nc.Subscribe("events.>", func(msg *nats.Msg) {
// Handle msg...
})
Since core NATS provides an at-most-once guarantee, everything is in flight and subscriptions must be connected in order to receive the messages. When a message is published on a subject, whomever is connected at that time, will receive the message. Any subscriber after the publish will not get the message.
// Publish after sub1 is setup, so it will be received.
nc.Publish("events.something-happened, []byte("..."))
// sub2 won't receive the published message since it subscribed
// after the fact. Time and order of operations matter!
sub2, _ := nc.Subscribe("events.>", func(msg *nats.Msg) {
// Handle msg...
})
This binary option makes understanding subscriptions simple. Either it is connected or not. If it's not, it is not going to get the messages later.
At-most-once guarantees
Quickly revisiting the at-most-once guarantee, there are two perspectives to consider. If a message is published and there exists a subscription showing interest in the message subject, NATS will do its best effort to deliver the message. However, due to the possibility of network interruptions, delivery can't be guaranteed. Core NATS does not implement any kind of acknowledgement (ack) when clients publish messages to the server or when messages are delivered to subscribers. Thus, if a network interruption occurs (beyond some tolerance window), messages will be dropped. Likewise the server does not attempt to retry delivery.
The second part of at-most-once was alluded to above, which is if you are a late subscriber (or the client needs to be upgraded and thus disconnected and reconnected), you won't even have the option to observe the previous messages. There are strategies for having a queue group which can then be gradually upgraded to prevent from dropping any messages, but for use cases where all the messages matter, this can be tricky to get it right.
All this said, I want reinforce that this behavior and guarantees are by design and should not be seen as a limitation. Core NATS provides incredibly reliable and low-latency connectivity between clients. This may be a post for another time, but when you use NATS you avoid needing a handful of common point-to-point technologies add-ons such as service mesh and others, but I digress..
Ok, back to the topic!
What really is a consumer
With JetStream, streams can be created that support persisting messages. Consumers are created as a view on top of a stream and keeps track of messages that have been delivered to subscribed clients. A client no longer needs to be connected at the time the message is published. They can connect later and receive messages that they haven't received yet. In fact, they can replay old messages if desired.
There are a plethora of use cases when considering a durable stream of messages which could be represent commands, queries, or events.
So what is a consumer? The NATS docs describe them as follows.
Consumers can be conceived as 'views' into a stream, with their own 'cursor'. Consumers iterate or consume over all or a subset of the messages stored in the stream, according to their 'subject filter' and 'replay policy', and can be used by one or multiple client applications.
The key thing to understand is that a consumer doesn't actually do any consuming. As the last sentence implies, client applications via subscriptions are still required to be used. So a consumer acts as a contract between the target stream and the subscriptions.
There are currently 18 options you can set on a consumer, however like a stream's 22 options, most are optional or have good defaults. That said, understanding the right combination of options for specific use cases can be tricky.
Learn by example
Since some of the configuration options are mutually exclusive or conditioned on other options being set, etc. this series is going to go through a few examples to build up the understanding. I will also publish runnable code with the examples, so you can explore it yourself. For this post I will highlight specific code sections here (without boilerplate) along with the explanation.
This first post will focus on push-consumers, however first we need a stream to consume from! Let's create one.
The default policy for a stream is limits-based which defines message retention in terms of message age, max messages, or max size (in bytes). By default, there are no limits which provides a nice stable state for us to explore consumers with.
As an aside, I consider limits-based streams to be stateless from the standpoint of consumers. No matter how many consumers there are, how messages are ack'ed, etc. a new consumer can always be created unaffected by what other consumers have done. There are other stream retention policies (work queue and interest-based), but the retention are largely influenced by the behavior of the consumers. This makes the interactions more subtle to understand, but this will be discussed in later posts.
Ok back to our simple stream..
We will populate the stream with 1000 messages having four subjects, events.a.1
, events.b.1
, events.c.1
, and events.c.2
(see the code link for details).
To establish the baseline understanding, we will start with the simplest consumer API made available which is the SubscribeSync
method.
sub, _ := js.SubscribeSync("events.*.1")
But what actually happens when you create this subscription? I thought we need to create a consumer?
To maintain parity with the simple API provided with core NATS (Subscribe
methods on nats.Conn
), there are equivalent methods on js
which is the nats.JetStream
interface. These methods transparently create or bind to consumers depending on the options provided.
For the example above, it creates an ephemeral consumer. To really understand what is going on, let's get and inspect the consumer info and do a deep dive.
Consumer info
Since we know there is only one consumer, we can fetch the name and info dynamically and print it out as JSON.
// Fetch the first (and only) consumer name associated with the stream.
consumerName := <-js.ConsumerNames("events")
// Get the consumer info associated with the name.
consumerInfo, _ := js.ConsumerInfo("events", consumerName)
bytes, _ := json.MarshalIndent(consumerInfo, "", " ")
fmt.Println(string(bytes))
The output follows. If you are do this yourself, note that a handful of the values may be different because they are randomly generated or time-based.
If this looks intimating, don't worry! We are going to walk through it.
{
"stream_name": "events",
"name": "IlCHNTCc",
"created": "2022-01-08T12:37:12.517776Z",
"config": {
"deliver_subject": "_INBOX.ER6XAUhBpDgIOFRJa4UB2P",
"filter_subject": "events.*.1",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 30000000000,
"max_deliver": -1,
"replay_policy": "instant",
"max_ack_pending": 65536
},
"delivered": {
"consumer_seq": 621,
"stream_seq": 846,
"last_active": "2022-01-08T12:37:12.519254Z"
},
"ack_floor": {
"consumer_seq": 0,
"stream_seq": 0
},
"num_ack_pending": 621,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 113,
"cluster": {
"leader": "NCGAGIH76YVEYZZVGANBGBOKEUAFZOLNPWO55LQG7SU2JXMUPEYDAYYJ"
},
"push_bound": true
}
stream_name
indicates the stream the consumer is bound to (all consumers are bound to exactly one stream). But we didn't specific the stream name when we created the subscription?!? There is indeed an option called nats.BindStream
that can be passed as an option to any js.Subscribe
methods. However, if it is not specified, the NATS client actually queries the server to see if the subject pattern (in this case events.*.1
) overlaps with any of the subject patterns bound to a stream. In this case, the events
stream binds the subject pattern events.>
which is a superset of events.*.1
, thus we know all events having this subject pattern will be in stream events
. This works because streams cannot have overlapping subject spaces.
name
is the unique name of the consumer (relative to the stream). As noted above, if no consumer is created ahead of time, a call to js.Subscribe
will (by default) result in the client creating an ephemeral consumer. This is why it looks like a random string.. because it is. The nats.Durable
option can be used to explicitly set the consumer name. Yet another option, is the nats.Bind()
option which can be used to bind to an existing consumer (and won't attempt to create it).
created
is simply the timestamp when the consumer was created.
The config
block contains the consumer's configuration. Some values are not present indicating the default/zero-value options (see nats.ConsumerConfig
for a list of the options).
config.deliver_subject
being set indicates this consumer is push-based. Again, since we did not set this value explicitly, one is randomly chosen for us. This subject is the actual subject the subscription is going to receive messages on, not events.*.1
which we passed to SyncSubscribe
. This subject specifies the config.filter_subject
option which specifics a filter on top of the stream. Given our four event subjects above, we will receive all but events.c.2
. To set an explicit deliver subject, the subscription option nats.DeliverSubject
can be provided.
While config.filter_subject
dictates the filtered subset of all messages in the stream, config.deliver_policy
defines what messages should be delivered with respect to sequence or time. The default is all
which means all messages in the stream matching the filter from the beginning. Other options include last
, new
, start_sequence
, start_time
, and last_per_subject
, and of course there are corresponding subscription options here and here. (Seeing a pattern??)
config.ack_policy
defines the expectation of a subscriber acknowledging messages once process. The value of explicit
means a subscriber must call msg.Ack()
or msg.Nak()
telling the server it received the message. Other options include none
and all
with corresponding subscription options.
If ack'ing is required, config.ack_wait
indicates how long the server will wait to receive the ack until it attempts to re-deliver a message. This can be specified at subscribe time here.
config.max_delivery
indicates how many attempts a message will be re-delivered if not ack'ed. The default is -1
which means it will retry forever. nats.MaxDeliver
can be used to specify this.
config.replay_policy
provides the option of replaying message instantly (as fast as possible) or at the original speed which simulates the gaps between messages as they were originally ingested into the stream. The options are here.
config.max_ack_pending
indicates how many messages can be in-flight at one time without an acknowledgement. If acks are required, having many messages in-flight means that re-delivery could result in out-of-order messages, assuming there is one subscriber. If this value is set to 1, then this is forcing serial processing and ack'ing of messages. However, if you have this specific need (strict ordering), there is a specialized option nats.OrderedConsumer
which configures the consumer optimally for this use case. Otherwise, if want to tweak this config option, you can use nats.MaxAckPending
.
Reflecting on all of this
Before diving into the rest of the consumer info, let's reflect on the configuration a bit more. If we ignore the js.Subscribe
convenience methods for a minute and the available subscription options, how could we achieve the above setup?
We create a consumer on the stream with the same configuration discussed above followed by a standard subscription to the deliver subject.
// Create the consumer with the same config options above.
js.AddConsumer("events", &nats.ConsumerConfig{
Durable: "IlCHNTCc",
DeliverSubject: "_INBOX.ER6XAUhBpDgIOFRJa4UB2P",
DeliverPolicy: nats.DeliverAllPolicy,
AckPolicy: nats.AckExplicitPolicy,
AckWait: 30*time.Second,
MaxDeliver: -1,
FilterSubject: "events.*.1",
ReplayPolicy: nats.ReplayInstantPolicy,
MaxAckPending: 65536,
})
// Create a subscription on the deliver subject.
sub, _ := nc.SubscribeSync("_INBOX.ER6XAUhBpDgIOFRJa4UB2P")
Are there any differences between these two options? They are simply two different APIs with the higher-level js.Subscribe
methods adding a bit of convenience, specifically in cleaning up ephemeral consumers when Unsubscribe
is called on the subscription. However, this could be done manually using js.DeleteConsumer
if a consumer is manually created.
Consumer state
The rest of the information in ConsumerInfo
is a snapshot of the messages in the stream have been delivered, ack'ed, and are pending.
The delivered
block provides two sequence numbers and the timestamp when the most recent delivery occurred. To understand why there are two sequence numbers, recall that every message published to a stream gets a sequence number. Also recall that a consumer is like a view of a stream, in this case with a filter_subject
being applied. As a result the consumer maintains its own sequence number relative to this filter applied to the stream. The stream_seq
indicates the latest message sequence number in the stream that had been assessed for delivery pre-filter and the consumer_seq
is the sequence number of the latest message delivered matching the filter.
On every message delivered from a stream, there are metadata that can be extracted which, among other information, has both the stream and consumer sequence numbers. This could be useful if a specific message needs to be identified in the stream independent of the consumer.
Following this, the ack_floor
block is a tad more easily understood. I defer to the definition given by Derek Collison in a Slack conversation:
AckFloor is the highest acked message that is contiguous, so if consumer starts at say seq 10, and you ack 10, 11 and then 12 the floor is 12. If you then ack 14 but skip 13 (or it has not come in yet) the floor will remain 12 and the system knows about 14 being acked. If 13 gets acked the floor moves to 14.
The num_ack_pending
field indicates how many messages have been delivered, but the server has received an ack for yet.
num_redelivered
indicates the number of messages that have been re-delivered at least once, but have not been ack'ed yet. Note this value can increase and decrease. As an aside, I initially thought this was a growing sum of the number of redeliveries of all messages, but it is not. However, the naming and semantics are actively being discussed.
num_waiting
is only populated for pull-based consumers. We will revisit this in a later since we are focused on push-based right now.
num_pending
are the number of messages on the server identified that match the consumer filter (thus far) and pending delivery.
Finishing up the last two fields, cluster
describes the current leader that handles transacting reads and writes for the stream and the consumers (they are managed by the same consensus group using the Raft algorithm).
push_bound
is true if at least one subscription exists for the deliver_subject
.
Reflecting on consumer state
Given these definitions, we can analyze the content of the consumer info a bit more. Here is just a subset of the output from above so you don't have to keep scrolling 😉
...
"max_ack_pending": 65536
},
"delivered": {
"consumer_seq": 621,
"stream_seq": 846,
"last_active": "2022-01-08T12:37:12.519254Z"
},
"ack_floor": {
"consumer_seq": 0,
"stream_seq": 0
},
"num_ack_pending": 621,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 113,
...
So we know there are 1000 messages in the stream. The delivered
block indicates 621 messages have already been delivered to the subscription. But wait, we haven't called sub.NextMsg
on the subscription yet?! You've just discussed the core behavior of the push-consumer.
Once a subscription is established, the server will deliver the messages without waiting to be asked.. they are pushed to the client.
Since we haven't processed any of the messages yet, we also see that 621 messages are pending acknowledgement. Since our ack timeout has yet passed (at the time of printing the info), none have been re-delivered. Likewise, the ack floor is zero.
There are also 113 that match the consumer filter, but have not yet been delivered.
I also included the max_ack_pending
to remind us that since we only have 1000 messages in the stream, all can be in-flight without acknowledgement. If a stream has more than this max, it will pause delivery until num_ack_pending
drops lower than max_ack_pending
.
Onward
I will break here since it is a lot to consume, but future posts will cover:
- Queue groups for push-based consumers
- Pull-based consumers (single and queue)
- Interactions with work queue and interest-based stream policies
Please reach out on Slack or Twitter if you have questions or find an issue with the content so I can correct it.