This tutorial uses Lettuce, which is an unsupported Redis library. For production applications, we recommend using Jedis
RedisGears is a dynamic server-side data processing engine, where the "server" part is Redis itself. RedisGears is distributed as a Redis module. You can start a Redis instance preconfigured with Gears using the official Docker image:
docker run -p 6379:6379 redislabs/redisgears:latest
Or, as I do most of the time, using the "redismod" image which include Gears and all the other Redis, Inc. supported modules:
docker run -p 6379:6379 redislabs/redismod
RedisGears was built with the purpose of providing a data processing engine inside of Redis, with more formal semantics than the simpler Lua server-side scripting. Think of it as a more flexible map-reduce engine for Redis. It supports supports transaction, batch, and event-driven processing of Redis data. Gears allow you to localize computation and data provides a built-in coordinator to facilitate processing distributed data in a clustered environment.
In RedisGears, the main unit of processing is the RedisGears function, which can be (currently) written in Python (more languages are being worked on). These functions run on their own thread, separate from Redis' main thread and can be executed in response to keyspace events or imperatively as a result of external commands. The functions are "registered" (deployed) to the Gears engine, and have an associated name and a registration Id.
During registration we pick a specific reader for our function which defines how the function gets its initial data:
KeysReader
: Redis keys and values.KeysOnlyReader
: Redis keys.StreamReader
: Redis Stream messages.PythonReader
: Arbitrary Python generator.ShardsIDReader
: Shard ID.CommandReader
: Command arguments from application client.Depending on the reader type, Gear Functions can either be run immediately, on demand, as batch jobs or in an event-driven manner by registering it to trigger automatically on various types of events.
The Python function rate_limit
takes 3 parameters:
key
: The Redis key backing the counter for a given user.max_request
: The request quota for the user.expiry
: The number of seconds in the future to set the counter TTL.def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
Place the script under src/main/resources/scripts
. Now, Let's
break it down:
rate_limit
function
Similarly to what we did in the previous implementation, we:
key
by
execute
-ing the GET
command.
int
and if not found, default to
-1
max_requests
and expiry
to int
INCR
/EXPIRE
commands in a transactions (with atomic():
) and return
False
(no rate limiting - request is allowed)
True
(deny the request)# Function registration
section, we instantiate the
GearsBuilder
(GB
)
using the CommandReader
reader. The
GearsBuilder
"builds" the context of the function, in
parameters, transformations, triggers, etc.
map
method to performs a one-to-one mapping of
records to the params of the rate_limit
function via a mapper
function callback.
register
action to register the function
as an event handler. The event in our case is the trigger
'RateLimiter'
.
Depending on the reader type, Gear Functions can either be run immediately, on demand, as batch jobs or in an event-driven manner by registering it to trigger automatically on various types of events.
The Python function rate_limit
takes 3 parameters:
key
: The Redis key backing the counter for a given user.max_request
: The request quota for the user.expiry
: The number of seconds in the future to set the counter
TTL.
def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
Place the script under src/main/resources/scripts
. Now, Let's
break it down:
Similarly to what we did in the previous implementation, we:
key
by
execute
-ing the GET
command.
int
and if not found, default to
-1
max_requests
and expiry
to int
INCR
/EXPIRE
commands in a transactions (with atomic():
) and return
False
(no rate limiting - request is allowed)
True
(deny the request)# Function registration
section, we instantiate the
GearsBuilder
(GB
) using the CommandReader
reader. The GearsBuilder
"builds" the context of the function, in parameters, transformations, triggers, etc.map
method to performs a one-to-one mapping of records to the params of the rate_limit
function via a mapper function callback.register
action to register the function as an event handler. The event in our case is the
trigger 'RateLimiter'
.In order to use our RedisGear function from our SpringBoot application we need to do a few things:
LettuceMod is a Java client for Redis Modules based on Lettuce created by Julien Ruaux . It supports the following modules in standalone or cluster configurations:
To use LettuceMod we'll add the dependency to the Maven POM as shown:
<dependency>
<groupId>com.redis</groupId>
<artifactId>spring-lettucemod</artifactId>
<version>1.7.0</version>
</dependency>
To access any of the LettuceMod supported modules we will inject a StatefulRedisModulesConnection
in
our FixedWindowRateLimiterApplication
class as follows:
@Autowired
StatefulRedisModulesConnection<String, String> connection;
Add the matching import statement:
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
We'll start by writing a function to determine whether the function with the trigger RateLimiter
has been
registered. It takes a List
of Registration
s and digs deep to extract the value of the trigger
argument
using the Java Streams API:
private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}
In the @PostConstruct
annotated method loadGearsScript
method:
RedisGearsCommands
from the previously injected StatefulRedisModulesConnection
dumpregistrations
methodgetGearsRegistrationIdForTrigger
String
named py
pyexecute
method passing the py
script payload@PostConstruct
public void loadGearsScript() throws IOException {
String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
Charset.defaultCharset());
RedisGearsCommands<String, String> gears = connection.sync();
List<Registration> registrations = gears.dumpregistrations();
Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
if (maybeRegistrationId.isEmpty()) {
try {
ExecutionResults er = gears.pyexecute(py);
if (er.isOk()) {
logger.info("RateLimiter.py has been registered");
} else if (er.isError()) {
logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
}
} catch (RedisCommandExecutionException rcee) {
logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
}
} else {
logger.info("RateLimiter.py has already been registered");
}
}
Next, we'll modify the filter to include the StatefulRedisModulesConnection
as well as the
quota; the value that we need to pass to the function:
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private StatefulRedisModulesConnection<String, String> connection;
private Long maxRequestPerMinute;
public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
Long maxRequestPerMinute) {
this.connection = connection;
this.maxRequestPerMinute = maxRequestPerMinute;
}
Now we can modify the filter
method to use the function. Gears functions are invoked by
triggering the correct event RateLimiter
and passing the parameters required by the function;
the key
, the quota and the TTL seconds in the future.
As we've have done previously, if the function returns false
we let the request through, otherwise
we return an HTTP 429
:
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
RedisGearsCommands<String, String> gears = connection.sync();
List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
return next.handle(request);
} else {
return ServerResponse.status(TOO_MANY_REQUESTS).build();
}
}
Once again, we use curl loop to test the limiter:
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
You should see the 21st request being rejected:
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s
If we run Redis in monitor mode, we should see the Lua calls to RG.TRIGGER
and under that you should see the
calls to GET
, INCR
and EXPIRE
for allowed requests:
1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"
And for rate limited request you should see only the call to GET
:
1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"
The complete code for this implementation is under the branch with_gears
.