Below is a command to the clone the source code for the application used in this tutorial
git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
Redis Streams is more cost-effective than using Kafka or other similar technologies. With sub-millisecond latency and a lightweight Streams log data structure, Redis is easier to deploy, develop, and operate.
Below is a command to the clone the source code for the application used in this tutorial
git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
There are a few important things to note here:
isolated: true
, in order to use the blocking version of XREADGROUP
in isolated execution mode.It's a best practice to validate all incoming messages to make sure you can work with them.
If you use Redis Cloud, you will find that Redis Streams is available on the same multi-tenant data platform you already use for caching. Redis Cloud also has high availability, message persistence, support for multiple clients, and resiliency with primary/secondary data replication… all built in.
When building a microservices application, people use a variety of options for communication between services. Among them:
In an event-driven microservices architecture, you might have some services that publish an API, and other services that are simply producers and consumers of events with no external API.
Consider the following scenario: You have an e-commerce application with an architecture that is broken down into different microservices including as create an order
, create an invoice
, process a payment
, fulfill and order
, and so on. Microservices allow you to separate these commands into different services to scale independently, enabling more customers to get their orders processed quickly and simultaneously, which results in a better user experience, higher sales volume, and less-cranky customer service staff.
When you use microservices, you need a tool for interservice communication. Initially, you might consider using a product like Kafka for streaming, but setting it up is rather complicated. What many people don't know about Redis is that it supports streams in the same way Kafka does. Given that you are likely already using Redis for caching, it makes sense to also use it for stream processing. To reduce the complexity of application architecture and maintenance, Redis is a great option for interservice communication. Here we break down the process of using Redis with streams for interservice communication.
The e-commerce microservices application discussed in the rest of this tutorial uses the following architecture:
products service
: handles querying products from the database and returning them to the frontendorders service
: handles validating and creating ordersorder history service
: handles querying a customer's order historypayments service
: handles processing orders for paymentdigital identity service
: handles storing digital identity and calculating identity scoreapi gateway
: unifies services under a single endpointmongodb
: serves as the primary database, storing orders, order history, products, etc.redis
: serves as the stream processor and caching databaseThis diagram illustrates how Redis Streams is used as the message broker between the orders service
and the payments service
:
The following event flow diagram illustrates how the orders service and payments service communicate through Redis with streams:
Let's outline the streams and events used below:
orders service
inserts order data into the database.2. The orders service
also appends minimal data (orderId, orderAmount, and userId) to the ORDERS_STREAM
to signal new order creation (i.e., it acts as PRODUCER
of the ORDERS_STREAM
).
3. The payments service
listens to the ORDERS_STREAM
and processes payments for new orders, then inserts payment data into the database (i.e, it acts as the CONSUMER
of the ORDERS_STREAM
).
4. The payments service
appends minimal data (orderId, paymentId, orderStatusCode, and userId) to the PAYMENTS_STREAM
to signal a new payment (i.e., it acts as the PRODUCER
of the PAYMENTS_STREAM
).
5. The orders service
listens to the PAYMENTS_STREAM
and updates the orderStatus and paymentId for orders in the database accordingly as the order payment is fulfilled (i.e., it acts as the CONSUMER
of the PAYMENTS_STREAM
).
The e-commerce microservices application consists of a frontend, built using Next.js with TailwindCSS. The application backend uses Node.js. The data is stored in Redis and MongoDB. Below you will find screenshots of the frontend of the e-commerce app:
Dashboard
: Shows the list of products with search functionalityShopping Cart
: Add products to the cart, then check out using the "Buy Now" button
Order history
: Once an order is placed, the Orders link in the top navigation bar shows the order status and history
We use Redis to broker the events sent between the orders service and the payments service.
Let's look at some of the code in the orders service to understand how it works:
orders service
appends minimal data to the ORDERS_STREAM
to signal new order creation.3. The payments service
listens to the ORDERS_STREAM
4. The payments service appends minimal data to PAYMENTS_STREAM to signal that a payment has been fulfilled.
5. The orders service listens to the PAYMENTS_STREAM and updates the order when payments are fulfilled.
For the purposes of our application, we make a call to update the order status in both Redis and primary database in the same service (For simplicity, we are not using any synchronization technique between databases rather focusing on how the data is stored and accessed in Redis). Another common pattern is to have your services write to one database, and then separately use a CDC mechanism to update the other database. For example, you could write directly to Redis, then use Triggers and Functions to handle synchronizing Redis and primary database in the background.
That's all there is to it! You now know how to use Redis for streaming as both a producer and a consumer. Hopefully, you can draw some inspiration from this tutorial and apply it to your own event streaming application. For more on this topic, check out the additional resources below:
Redis Streams
Microservices with Redis
General
//sample order data
{
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"products": [
{
"productId": 11000,
"qty": 3,
"productPrice": 3995,
"productData": {
"productDisplayName": "Puma Men Slick 3HD Yellow Black Watches",
"variantName": "Slick 3HD Yellow",
"brandName": "Puma",
"ageGroup": "Adults-Men",
"gender": "Men"
//...
}
},
{
"productId": 11001,
"qty": 2,
"productPrice": 5450,
"productData": {
"productDisplayName": "Puma Men Top Fluctuation Red Black Watches",
"variantName": "Top Fluctuation Red",
"brandName": "Puma",
"ageGroup": "Adults-Men",
"gender": "Men"
//...
}
}
],
"userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"orderStatusCode": 1, //order created
"createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"statusCode": 1
}
//sample payment data
{
"paymentId": "6403212956a976300afbaac1",
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"orderAmount": 22885,
"paidAmount": 22885,
"orderStatusCode": 3, //payment successful
"userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"createdOn": {
"$date": {
"$numberLong": "1677926697841"
}
},
"createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"statusCode": 1
}
{
//order collection update
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"paymentId": "6403212956a976300afbaac1",
"orderStatusCode": 3 //payment success
//...
}
const addOrderIdToStream = async (
orderId: string,
orderAmount: number,
userId: string,
) => {
const nodeRedisClient = getNodeRedisClient();
if (orderId && nodeRedisClient) {
const streamKeyName = 'ORDERS_STREAM';
const entry = {
orderId: orderId,
orderAmount: orderAmount.toFixed(2),
userId: userId,
};
const id = '*'; //* = auto generate
//xAdd adds entry to specified stream
await nodeRedisClient.xAdd(streamKeyName, id, entry);
}
};
// Below is some code for how you would use Redis to listen for the stream events:
async function listenToStream(
onMessage: (message: any, messageId: string) => Promise<void>,
) {
// using node-redis
const redis = getNodeRedisClient();
const streamKeyName = 'ORDERS_STREAM'; //stream name
const groupName = 'ORDERS_CON_GROUP'; // listening consumer group name (custom)
const consumerName = 'PAYMENTS_CON'; // listening consumer name (custom)
const readMaxCount = 100;
// Check if the stream group already exists
if (!(await redis.exists(streamKeyName))) {
const idPosition = '0'; //0 = start, $ = end or any specific id
await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
MKSTREAM: true,
});
}
// setup a loop to listen for stream events
while (true) {
// read set of messages from different streams
const dataArr = await nodeRedisClient.xReadGroup(
commandOptions({
isolated: true,
}),
groupName,
consumerName,
[
{
// you can specify multiple streams in array
key: streamKeyName,
id: '>', // Next entry ID that no consumer in this group has read
},
],
{
COUNT: readMaxCount, // Read n entries at a time
BLOCK: 0, // block for 0 (infinite) seconds if there are none.
},
);
for (let data of dataArr) {
for (let messageItem of data.messages) {
// process the message received (in our case, perform payment)
await onMessage(messageItem.message, messageItem.id);
// acknowledge individual messages after processing
nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
}
}
}
}
// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
onMessage: processPaymentForNewOrders,
});
const processPaymentForNewOrders: IMessageHandler = async (
message,
messageId,
) => {
/*
message = {
orderId: "",
orderAmount: "",
userId: "",
}
*/
// process payment for new orderId and insert "payments" data to database
};
const addPaymentIdToStream = async (
orderId: string,
paymentId: string,
orderStatus: number,
userId: string,
) => {
const nodeRedisClient = getNodeRedisClient();
if (orderId && nodeRedisClient) {
const streamKeyName = 'PAYMENTS_STREAM';
const entry = {
orderId: orderId,
paymentId: paymentId,
orderStatusCode: orderStatus.toString(),
userId: userId,
};
const id = '*'; //* = auto generate
//xAdd adds entry to specified stream
await nodeRedisClient.xAdd(streamKeyName, id, entry);
}
};
//Below is some code for how you would use Redis to listen for the stream events:
async function listenToStream(
onMessage: (message: any, messageId: string) => Promise<void>,
) {
// using node-redis
const redis = getNodeRedisClient();
const streamKeyName = 'PAYMENTS_STREAM'; //stream name
const groupName = 'PAYMENTS_CON_GROUP'; //listening consumer group name (custom)
const consumerName = 'ORDERS_CON'; //listening consumer name (custom)
const readMaxCount = 100;
// Check if the stream group already exists
if (!(await redis.exists(streamKeyName))) {
const idPosition = '0'; //0 = start, $ = end or any specific id
await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
MKSTREAM: true,
});
}
// setup a loop to listen for stream events
while (true) {
// read set of messages from different streams
const dataArr = await nodeRedisClient.xReadGroup(
commandOptions({
isolated: true,
}),
groupName,
consumerName,
[
{
// you can specify multiple streams in array
key: streamKeyName,
id: '>', // Next entry ID that no consumer in this group has read
},
],
{
COUNT: readMaxCount, // Read n entries at a time
BLOCK: 0, // block for 0 (infinite) seconds if there are none.
},
);
for (let data of dataArr) {
for (let messageItem of data.messages) {
//process the message received (in our case, updateOrderStatus)
await onMessage(messageItem.message, messageItem.id);
// acknowledge individual messages after processing
nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
}
}
}
}
// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
onMessage: updateOrderStatus,
});
const updateOrderStatus: IMessageHandler = async (message, messageId) => {
/*
message = {
orderId: "",
paymentId: "",
orderStatusCode:"",
userId: "",
}
*/
// updates orderStatus and paymentId in database accordingly for the order which has fulfilled payment
// updateOrderStatusInRedis(orderId,paymentId,orderStatusCode,userId)
// updateOrderStatusInMongoDB(orderId,paymentId,orderStatusCode,userId)
};