As a new Enterprise Technical Account Manager at Redis, one of my first tasks was to learn more about Redis. So I started digging in, and quickly discovered Redis Streams. As a big fan of streaming-based applications, I am thrilled to share what I’ve learned about how to use Redis Streams and Java.
Redis Streams is a Redis data type that represents a log, so you can add new information and message in an append-only mode (Note: This is not 100% accurate, since you can remove messages from the log, but it’s close enough.) Redis Streams lets you build “Kafka-like” applications, which can:
In addition, Redis Streams has the concept of consumer groups. Redis Streams consumer groups, like the similar concept in Apache Kafka, allows client applications to consume messages in a distributed fashion (multiple clients), making it easy to scale and create highly available systems.
So, while it may be tempting to compare Redis Streams and Redis Pub/Sub and decide that one is better than the other, these two features aim to accomplish different things. If you’re evaluating Pub/Sub and Redis Streams and it’s not immediately clear, you might want to think either more about your problem to be solved or re-read the documentation on both.
(Enroll in the Redis University: Redis Streams course to learn more.)
The best way to learn how to use Redis Streams and Java, is to build a sample application. The redis-streams-101-java GitHub repository contains sample code that shows how to post messages to a Stream and consume messages using a consumer group. To get started, you’ll need Redis 5.x, Java 8 or later, Apache Maven 3.5.x, and Git.
Redis has many Java clients developed by the community, as you can see on Redis.io. My current favorite for working with Redis Streams is Lettuce, so what I use in this sample app. Let’s walk through the steps involved in creating the sample project:
Add the dependency below to your project file:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
Import the following classes:
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
Then connect with:
RedisClient redisClient = RedisClient.create("redis://password@host:port"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
When your application is done with the connection, disconnect using the following code:
connection.close();
redisClient.shutdown();
Once you have a connection, you can send a message. In this example, I let Redis generate the message ID, which is time-based, and build the body using a map representing Internet of Things weather data capturing wind speed and direction in real-time:
public static void main(String[] args) {
RedisClient redisClient = RedisClient.create("redis://localhost:6379"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
Map<String, String> messageBody = new HashMap<>();
messageBody.put( "speed", "15" );
messageBody.put( "direction", "270" );
messageBody.put( "sensor_ts", String.valueOf(System.currentTimeMillis()) );
String messageId = syncCommands.xadd(
"weather_sensor:wind",
messageBody);
System.out.println( String.format("Message %s : %s posted", messageId, messageBody) );
connection.close();
redisClient.shutdown();
}
Here’s what’s happening in the code:
(The complete producer code is available here.)
Redis Streams offers several ways to consume and read messages using the commands: XRANGE, XREVRANGE, XREAD, XREADGROUP. To focus on how to build an application with Apache Kafka, let’s use the XREADGROUP command from Lettuce.
The consumer group allows developers to create a group of clients that will cooperate to consume messages from the streams (for scale and high availability). It is also a way to associate the client to specific applications roles; for example:
Each of these consumer groups will act independently, and each of this group could have multiple “consumers” (clients).
Here’s how it works in Java:
...
try {
// WARNING: Streams must exist before creating the group
// This will not be necessary in Lettuce 5.2, see https://github.com/lettuce-io/lettuce-core/issues/898
syncCommands.xgroupCreate( XReadArgs.StreamOffset.from("weather_sensor:wind", "0-0"), "application_1" );
}
catch (RedisBusyException redisBusyException) {
System.out.println( String.format("\t Group '%s' already exists","application_1"));
}
System.out.println("Waiting for new messages");
while(true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from("application_1", "consumer_1"),
XReadArgs.StreamOffset.lastConsumed("weather_sensor:wind")
);
if (!messages.isEmpty()) {
for (StreamMessage<String, String> message : messages) {
System.out.println(message);
// Confirm that the message has been processed using XACK
syncCommands.xack(STREAMS_KEY, "application_1", message.getId());
}
}
}
...
This code is a subset of the main() method. I removed the connection management part to make it more readable. Let’s take a look at the code.
The complete consumer code is available here.
Now that you have a better understanding of the code, let’s run the producer and consumer. You can run this from your IDE, or using Maven, but here’s how it works in the Maven CLI. Start by opening two terminals, one to produce messages and one to consume them, then follow these steps:
Step 1: Clone and build the project:
> git clone https://github.com/tgrall/redis-streams-101-java.git
> cd redis-streams-101-java
> mvn clean verify
Step 2: Post a new message:
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer"
Step 3: Consume messages
Open a new terminal and run this command:
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"
The consumer will start and consume the message you just posted, and wait for any new messages.
Step 4: In the first terminal, post 100 new messages:
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="100"
The consumer will receive and print all the messages.
Step 5: Kill the consumer and post more messages
Let’s do another test: Stop the consumer using a simple Ctrl+C and then post five new messages:
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="5"
The messages are not yet consumed by any application, but are still stored in Redis Streams. So when you start the consumer, it consumes these new messages:
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"
This is one of the differences between Redis Streams and Redis Pub/Sub. The producer application has published many messages while the consumer application was not running. Since the consumer is run with StreamOffset.lastConsumed(), when the consumer is starting, it looks to the last consumed ID, and starts to read the streams from there. This method generates a XGROUPREAD command with the group.
This small project was designed to show you how to use Lettuce, a Java client for Redis, to publish messages to a stream, create a consumer group, and consume messages using the consumer group.
This is a very basic example, and in upcoming posts I plan to dive into how to work with multiple consumers and how to configure the consumer group and consumers to control which messages you want to read.