Available since: 5.0.0
Time complexity: For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.
Moreover, if you are new to streams, we recommend to read our introduction to Redis Streams. Make sure to understand the concept of consumer group in the introduction so that following how this command works will be simpler.
The difference between this command and the vanilla
XREAD is that this
one supports consumer groups.
Without consumer groups, just using
XREAD, all the clients are served with all the entries arriving in a stream. Instead using consumer groups with
XREADGROUP, it is possible to create groups of clients that consume different parts of the messages arriving in a given stream. If, for instance, the stream gets the new entries A, B, and C and there are two consumers reading via a consumer group, one client will get, for instance, the messages A and C, and the other the message B, and so forth.
Within a consumer group, a given consumer (that is, just a client consuming messages from the stream), has to identify with an unique consumer name. Which is just a string.
One of the guarantees of consumer groups is that a given consumer can only see the history of messages that were delivered to it, so a message has just a single owner. However there is a special feature called message claiming that allows other consumers to claim messages in case there is a non recoverable failure of some consumer. In order to implement such semantics, consumer groups require explicit acknowledgment of the messages successfully processed by the consumer, via the
XACK command. This is needed because the stream will track, for each consumer group, who is processing what message.
This is how to understand if you want to use a consumer group or not:
From the point of view of the syntax, the commands are almost the same,
XREADGROUP requires a special and mandatory option:
GROUP <group-name> <consumer-name>
The group name is just the name of a consumer group associated to the stream.
The group is created using the
XGROUP command. The consumer name is the
string that is used by the client to identify itself inside the group.
The consumer is auto created inside the consumer group the first time it
is saw. Different clients should select a different consumer name.
When you read with
XREADGROUP, the server will remember that a given
message was delivered to you: the message will be stored inside the
consumer group in what is called a Pending Entries List (PEL), that is
a list of message IDs delivered but not yet acknowledged.
NOACK subcommand can be used to avoid adding the message to the PEL in
cases where reliability is not a requirement and the occasional message loss
is acceptable. This is equivalent to acknowledging the message when it is read.
The ID to specify in the STREAMS option when using
be one of the following two:
>ID, which means that the consumer want to receive only messages that were never delivered to any other consumer. It just means, give me new messages.
>, then the command will just let the client access its pending entries: messages delivered to it, but not yet acknowledged. Note that in this case, both
XREADGROUP command can be used in a blocking way. There
are no differences in this regard.
Normally you use the command like that in order to get new messages and process them. In pseudo-code:
WHILE true entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream > if entries == nil puts "Timeout... try again" CONTINUE end FOREACH entries AS stream_entries FOREACH stream_entries as message process_message(message.id,message.fields) # ACK the message as processed XACK mystream $GroupName message.id END END END
In this way the example consumer code will fetch only new messages, process
them, and acknowledge them via
XACK. However the example code above is
not complete, because it does not handle recovering after a crash. What
will happen if we crash in the middle of processing messages, is that our
messages will remain in the pending entries list, so we can access our
history by giving
XREADGROUP initially an ID of 0, and performing the same
loop. Once providing an ID of 0 the reply is an empty set of messages, we
know that we processed and acknowledged all the pending messages: we
can start to use
> as ID, in order to get the new messages and rejoin the
consumers that are processing new things.
To see how the command actually replies, please check the
XREAD command page.
Array reply, specifically:
The command returns an array of results: each element of the returned
array is an array composed of a two element containing the key name and
the entries reported for that key. The entries reported are full stream
entries, having IDs and the list of all the fields and values. Field and
values are guaranteed to be reported in the same order they were added
When BLOCK is used, on timeout a null reply is returned.
Reading the Redis Streams introduction is highly suggested in order to understand more about the streams overall behavior and semantics.