Redis Streams

Introduction to Redis streams

A Redis stream is a data structure that acts like an append-only log but also implements several operations to overcome some of the limits of a typical append-only log. These include random access in O(1) time and complex consumption strategies, such as consumer groups. You can use streams to record and simultaneously syndicate events in real time. Examples of Redis stream use cases include:

  • Event sourcing (e.g., tracking user actions, clicks, etc.)
  • Sensor monitoring (e.g., readings from devices in the field)
  • Notifications (e.g., storing a record of each user's notifications in a separate stream)

Redis generates a unique ID for each stream entry. You can use these IDs to retrieve their associated entries later or to read and process all subsequent entries in the stream. Note that because these IDs are related to time, the ones shown here may vary and will be different from the IDs you see in your own Redis instance.

Redis streams support several trimming strategies (to prevent streams from growing unbounded) and more than one consumption strategy (see XREAD, XREADGROUP, and XRANGE).

Basic commands

  • XADD adds a new entry to a stream.
  • XREAD reads one or more entries, starting at a given position and moving forward in time.
  • XRANGE returns a range of entries between two supplied entry IDs.
  • XLEN returns the length of a stream.

See the complete list of stream commands.

Examples

  • When our racers pass a checkpoint, we add a stream entry for each racer that includes the racer's name, speed, position, and location ID:

  • Read two stream entries starting at ID 1692632086370-0:

  • Read up to 100 new stream entries, starting at the end of the stream, and block for up to 300 ms if no entries are being written:

Performance

Adding an entry to a stream is O(1). Accessing any single entry is O(n), where n is the length of the ID. Since stream IDs are typically short and of a fixed length, this effectively reduces to a constant time lookup. For details on why, note that streams are implemented as radix trees.

Simply put, Redis streams provide highly efficient inserts and reads. See each command's time complexity for the details.

Streams basics

Streams are an append-only data structure. The fundamental write command, called XADD, appends a new entry to the specified stream.

Each stream entry consists of one or more field-value pairs, somewhat like a dictionary or a Redis hash:

The above call to the XADD command adds an entry rider: Castilla, speed: 29.9, position: 1, location_id: 2 to the stream at key race:france, using an auto-generated entry ID, which is the one returned by the command, specifically 1692632147973-0. It gets as its first argument the key name race:france, the second argument is the entry ID that identifies every entry inside a stream. However, in this case, we passed * because we want the server to generate a new ID for us. Every new ID will be monotonically increasing, so in more simple terms, every new entry added will have a higher ID compared to all the past entries. Auto-generation of IDs by the server is almost always what you want, and the reasons for specifying an ID explicitly are very rare. We'll talk more about this later. The fact that each Stream entry has an ID is another similarity with log files, where line numbers, or the byte offset inside the file, can be used in order to identify a given entry. Returning back at our XADD example, after the key name and ID, the next arguments are the field-value pairs composing our stream entry.

It is possible to get the number of items inside a Stream just using the XLEN command:

Entry IDs

The entry ID returned by the XADD command, and identifying univocally each entry inside a given stream, is composed of two parts:

<millisecondsTime>-<sequenceNumber>

The milliseconds time part is actually the local time in the local Redis node generating the stream ID, however if the current milliseconds time happens to be smaller than the previous entry time, then the previous entry time is used instead, so if a clock jumps backward the monotonically incrementing ID property still holds. The sequence number is used for entries created in the same millisecond. Since the sequence number is 64 bit wide, in practical terms there is no limit to the number of entries that can be generated within the same millisecond.

The format of such IDs may look strange at first, and the gentle reader may wonder why the time is part of the ID. The reason is that Redis streams support range queries by ID. Because the ID is related to the time the entry is generated, this gives the ability to query for time ranges basically for free. We will see this soon while covering the XRANGE command.

If for some reason the user needs incremental IDs that are not related to time but are actually associated to another external system ID, as previously mentioned, the XADD command can take an explicit ID instead of the * wildcard ID that triggers auto-generation, like in the following examples:

Note that in this case, the minimum ID is 0-1 and that the command will not accept an ID equal or smaller than a previous one:

If you're running Redis 7 or later, you can also provide an explicit ID consisting of the milliseconds part only. In this case, the sequence portion of the ID will be automatically generated. To do this, use the syntax below:

Getting data from Streams

Now we are finally able to append entries in our stream via XADD. However, while appending data to a stream is quite obvious, the way streams can be queried in order to extract data is not so obvious. If we continue with the analogy of the log file, one obvious way is to mimic what we normally do with the Unix command tail -f, that is, we may start to listen in order to get the new messages that are appended to the stream. Note that unlike the blocking list operations of Redis, where a given element will reach a single client which is blocking in a pop style operation like BLPOP, with streams we want multiple consumers to see the new messages appended to the stream (the same way many tail -f processes can see what is added to a log). Using the traditional terminology we want the streams to be able to fan out messages to multiple clients.

However, this is just one potential access mode. We could also see a stream in quite a different way: not as a messaging system, but as a time series store. In this case, maybe it's also useful to get the new messages appended, but another natural query mode is to get messages by ranges of time, or alternatively to iterate the messages using a cursor to incrementally check all the history. This is definitely another useful access mode.

Finally, if we see a stream from the point of view of consumers, we may want to access the stream in yet another way, that is, as a stream of messages that can be partitioned to multiple consumers that are processing such messages, so that groups of consumers can only see a subset of the messages arriving in a single stream. In this way, it is possible to scale the message processing across different consumers, without single consumers having to process all the messages: each consumer will just get different messages to process. This is basically what Kafka (TM) does with consumer groups. Reading messages via consumer groups is yet another interesting mode of reading from a Redis Stream.

Redis Streams support all three of the query modes described above via different commands. The next sections will show them all, starting from the simplest and most direct to use: range queries.

Querying by range: XRANGE and XREVRANGE

To query the stream by range we are only required to specify two IDs, start and end. The range returned will include the elements having start or end as ID, so the range is inclusive. The two special IDs - and + respectively mean the smallest and the greatest ID possible.

Each entry returned is an array of two items: the ID and the list of field-value pairs. We already said that the entry IDs have a relation with the time, because the part at the left of the - character is the Unix time in milliseconds of the local node that created the stream entry, at the moment the entry was created (however note that streams are replicated with fully specified XADD commands, so the replicas will have identical IDs to the master). This means that I could query a range of time using XRANGE. In order to do so, however, I may want to omit the sequence part of the ID: if omitted, in the start of the range it will be assumed to be 0, while in the end part it will be assumed to be the maximum sequence number available. This way, querying using just two milliseconds Unix times, we get all the entries that were generated in that range of time, in an inclusive way. For instance, if I want to query a two milliseconds period I could use:

I have only a single entry in this range. However in real data sets, I could query for ranges of hours, or there could be many items in just two milliseconds, and the result returned could be huge. For this reason, XRANGE supports an optional COUNT option at the end. By specifying a count, I can just get the first N items. If I want more, I can get the last ID returned, increment the sequence part by one, and query again. Let's see this in the following example. Let's assume that the stream race:france was populated with 4 items. To start my iteration, getting 2 items per command, I start with the full range, but with a count of 2.

To continue the iteration with the next two items, I have to pick the last ID returned, that is 1692632094485-0, and add the prefix ( to it. The resulting exclusive range interval, that is (1692632094485-0 in this case, can now be used as the new start argument for the next XRANGE call: