Pipelines and transactions
Learn how to use Redis pipelines and transactions
Redis lets you send a sequence of commands to the server together in a batch. There are two types of batch that you can use:
- Pipelines avoid network and processing overhead by sending several commands to the server together in a single communication. The server then sends back a single communication with all the responses. See the Pipelining page for more information.
- Transactions guarantee that all the included commands will execute to completion without being interrupted by commands from other clients. See the Transactions page for more information.
Execute a pipeline
To execute commands in a pipeline, you first create a pipeline object
and then add commands to it using methods that resemble the standard
command methods (for example, set() and get()). The commands are
buffered in the pipeline and only execute when you call the exec()
method on the pipeline object. If you need the results from the
commands, use the query() method, which returns
the results from all the commands in order.
Note that the command methods for a pipeline always return the original pipeline object, so you can "chain" several commands together, as the example below shows:
mod pipe_trans_tests {
use redis::Commands;
fn run() {
let mut r = match redis::Client::open("redis://127.0.0.1") {
Ok(client) => {
match client.get_connection() {
Ok(conn) => conn,
Err(e) => {
println!("Failed to connect to Redis: {e}");
return;
}
}
},
Err(e) => {
println!("Failed to create Redis client: {e}");
return;
}
};
for i in 0..8 {
let key = format!("seat:{}", i);
let _: () = r.del(&key).expect("Failed to delete key");
}
for i in 1..4 {
let key = format!("counter:{}", i);
let _: () = r.del(&key).expect("Failed to delete key");
}
// Check the success of the pipeline without checking the results
// individually.
match redis::pipe()
.set("seat:0", "#0")
.set("seat:1", "#1")
.set("seat:2", "#2")
.set("seat:3", "#3")
.set("seat:4", "#4")
.exec(&mut r)
{
Ok(_) => {
println!("Pipe executed successfully");
},
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
// Check the success of the pipeline and the results individually.
let (seat_0, seat_1, seat_2, seat_3, seat_4) :
(String, String, String, String, String) = match redis::pipe()
.get("seat:0")
.get("seat:1")
.get("seat:2")
.get("seat:3")
.get("seat:4")
.query(&mut r) {
Ok(res) => res,
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
println!("{seat_0}, {seat_1}, {seat_2}, {seat_3}, {seat_4}");
// >>> #0, #1, #2, #3, #4
// Use `ignore()` to ignore the result of specific commands.
let (seat_5, seat_6, seat_7) :
(String, String, String) = match redis::pipe()
.set("seat:5", "#5").ignore()
.set("seat:6", "#6").ignore()
.set("seat:7", "#7").ignore()
.get("seat:5")
.get("seat:6")
.get("seat:7")
.query(&mut r) {
Ok(res) => res,
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
println!("{seat_5}, {seat_6}, {seat_7}");
// >>> #5, #6, #7
match redis::pipe()
.atomic()
.incr("counter:1", 1)
.incr("counter:2", 2)
.incr("counter:3", 3)
.exec(&mut r)
{
Ok(_) => {
println!("Transaction executed successfully");
},
Err(e) => {
println!("Error executing transaction: {e}");
return;
}
};
let key = "shellpath";
let _: () = r.set(key, "/usr/syscmds/").unwrap();
let Ok(_,): Result<((),), _> = redis::transaction(&mut r, &[key], |r, pipe| {
let mut path: String = r.get(key).unwrap();
path.push_str(":/usr/mycmds/");
pipe.set(key, path).query(r)
}) else {
println!("Error executing transaction");
return;
};
match r.get("shellpath") {
Ok(res) => {
let res: String = res;
println!("{res}");
// >>> /usr/syscmds/:/usr/mycmds/
},
Err(e) => {
println!("Error getting shellpath: {e}");
return;
}
};
}
}
Execute a transaction
You can execute a simple transaction by adding the atomic() method to a pipeline.
mod pipe_trans_tests {
use redis::Commands;
fn run() {
let mut r = match redis::Client::open("redis://127.0.0.1") {
Ok(client) => {
match client.get_connection() {
Ok(conn) => conn,
Err(e) => {
println!("Failed to connect to Redis: {e}");
return;
}
}
},
Err(e) => {
println!("Failed to create Redis client: {e}");
return;
}
};
for i in 0..8 {
let key = format!("seat:{}", i);
let _: () = r.del(&key).expect("Failed to delete key");
}
for i in 1..4 {
let key = format!("counter:{}", i);
let _: () = r.del(&key).expect("Failed to delete key");
}
// Check the success of the pipeline without checking the results
// individually.
match redis::pipe()
.set("seat:0", "#0")
.set("seat:1", "#1")
.set("seat:2", "#2")
.set("seat:3", "#3")
.set("seat:4", "#4")
.exec(&mut r)
{
Ok(_) => {
println!("Pipe executed successfully");
},
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
// Check the success of the pipeline and the results individually.
let (seat_0, seat_1, seat_2, seat_3, seat_4) :
(String, String, String, String, String) = match redis::pipe()
.get("seat:0")
.get("seat:1")
.get("seat:2")
.get("seat:3")
.get("seat:4")
.query(&mut r) {
Ok(res) => res,
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
println!("{seat_0}, {seat_1}, {seat_2}, {seat_3}, {seat_4}");
// >>> #0, #1, #2, #3, #4
// Use `ignore()` to ignore the result of specific commands.
let (seat_5, seat_6, seat_7) :
(String, String, String) = match redis::pipe()
.set("seat:5", "#5").ignore()
.set("seat:6", "#6").ignore()
.set("seat:7", "#7").ignore()
.get("seat:5")
.get("seat:6")
.get("seat:7")
.query(&mut r) {
Ok(res) => res,
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
println!("{seat_5}, {seat_6}, {seat_7}");
// >>> #5, #6, #7
match redis::pipe()
.atomic()
.incr("counter:1", 1)
.incr("counter:2", 2)
.incr("counter:3", 3)
.exec(&mut r)
{
Ok(_) => {
println!("Transaction executed successfully");
},
Err(e) => {
println!("Error executing transaction: {e}");
return;
}
};
let key = "shellpath";
let _: () = r.set(key, "/usr/syscmds/").unwrap();
let Ok(_,): Result<((),), _> = redis::transaction(&mut r, &[key], |r, pipe| {
let mut path: String = r.get(key).unwrap();
path.push_str(":/usr/mycmds/");
pipe.set(key, path).query(r)
}) else {
println!("Error executing transaction");
return;
};
match r.get("shellpath") {
Ok(res) => {
let res: String = res;
println!("{res}");
// >>> /usr/syscmds/:/usr/mycmds/
},
Err(e) => {
println!("Error getting shellpath: {e}");
return;
}
};
}
}
Watch keys for changes
Redis supports optimistic locking to avoid inconsistent updates to different keys. The basic idea is to watch for changes to any keys that you use in a transaction while you are processing the updates. If the watched keys do change, you must restart the updates with the latest data from the keys. See Transactions for more information about optimistic locking.
The example below shows how to use the transaction() function to
automatically retry a transaction when watched keys are modified.
Pass the list of keys you want to watch and a closure representing the transaction.
The closure receives the original connection and a pipeline as parameters.
Use the connection to read the latest values from the watched keys,
but always use the pipeline to add all the commands that make up the watched transaction.
If the watched keys are modified during the transaction, the transaction() function
automatically retries the transaction until it succeeds.
mod pipe_trans_tests {
use redis::Commands;
fn run() {
let mut r = match redis::Client::open("redis://127.0.0.1") {
Ok(client) => {
match client.get_connection() {
Ok(conn) => conn,
Err(e) => {
println!("Failed to connect to Redis: {e}");
return;
}
}
},
Err(e) => {
println!("Failed to create Redis client: {e}");
return;
}
};
for i in 0..8 {
let key = format!("seat:{}", i);
let _: () = r.del(&key).expect("Failed to delete key");
}
for i in 1..4 {
let key = format!("counter:{}", i);
let _: () = r.del(&key).expect("Failed to delete key");
}
// Check the success of the pipeline without checking the results
// individually.
match redis::pipe()
.set("seat:0", "#0")
.set("seat:1", "#1")
.set("seat:2", "#2")
.set("seat:3", "#3")
.set("seat:4", "#4")
.exec(&mut r)
{
Ok(_) => {
println!("Pipe executed successfully");
},
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
// Check the success of the pipeline and the results individually.
let (seat_0, seat_1, seat_2, seat_3, seat_4) :
(String, String, String, String, String) = match redis::pipe()
.get("seat:0")
.get("seat:1")
.get("seat:2")
.get("seat:3")
.get("seat:4")
.query(&mut r) {
Ok(res) => res,
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
println!("{seat_0}, {seat_1}, {seat_2}, {seat_3}, {seat_4}");
// >>> #0, #1, #2, #3, #4
// Use `ignore()` to ignore the result of specific commands.
let (seat_5, seat_6, seat_7) :
(String, String, String) = match redis::pipe()
.set("seat:5", "#5").ignore()
.set("seat:6", "#6").ignore()
.set("seat:7", "#7").ignore()
.get("seat:5")
.get("seat:6")
.get("seat:7")
.query(&mut r) {
Ok(res) => res,
Err(e) => {
println!("Error executing pipe: {e}");
return;
}
};
println!("{seat_5}, {seat_6}, {seat_7}");
// >>> #5, #6, #7
match redis::pipe()
.atomic()
.incr("counter:1", 1)
.incr("counter:2", 2)
.incr("counter:3", 3)
.exec(&mut r)
{
Ok(_) => {
println!("Transaction executed successfully");
},
Err(e) => {
println!("Error executing transaction: {e}");
return;
}
};
let key = "shellpath";
let _: () = r.set(key, "/usr/syscmds/").unwrap();
let Ok(_,): Result<((),), _> = redis::transaction(&mut r, &[key], |r, pipe| {
let mut path: String = r.get(key).unwrap();
path.push_str(":/usr/mycmds/");
pipe.set(key, path).query(r)
}) else {
println!("Error executing transaction");
return;
};
match r.get("shellpath") {
Ok(res) => {
let res: String = res;
println!("{res}");
// >>> /usr/syscmds/:/usr/mycmds/
},
Err(e) => {
println!("Error getting shellpath: {e}");
return;
}
};
}
}