Redis Streams
Introduction to Redis streams
A Redis stream is a data structure that acts like an append-only log but also implements several operations to overcome some of the limits of a typical append-only log. These include random access in O(1) time and complex consumption strategies, such as consumer groups. You can use streams to record and simultaneously syndicate events in real time. Examples of Redis stream use cases include:
- Event sourcing (e.g., tracking user actions, clicks, etc.)
- Sensor monitoring (e.g., readings from devices in the field)
- Notifications (e.g., storing a record of each user's notifications in a separate stream)
Redis generates a unique ID for each stream entry. You can use these IDs to retrieve their associated entries later or to read and process all subsequent entries in the stream. Note that because these IDs are related to time, the ones shown here may vary and will be different from the IDs you see in your own Redis instance.
Redis streams support several trimming strategies (to prevent streams from growing unbounded) and more than one consumption strategy (see XREAD
, XREADGROUP
, and XRANGE
).
Basic commands
XADD
adds a new entry to a stream.XREAD
reads one or more entries, starting at a given position and moving forward in time.XRANGE
returns a range of entries between two supplied entry IDs.XLEN
returns the length of a stream.
See the complete list of stream commands.
Examples
-
When our racers pass a checkpoint, we add a stream entry for each racer that includes the racer's name, speed, position, and location ID:
> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1 "1692632086370-0" > XADD race:france * rider Norem speed 28.8 position 3 location_id 1 "1692632094485-0" > XADD race:france * rider Prickett speed 29.7 position 2 location_id 1 "1692632102976-0"
Are you tired of using redis-cli? Try Redis Insight - the developer GUI for Redis.""" Code samples for Stream doc pages: https://redis.io/docs/latest/develop/data-types/streams/ """ import redis r = redis.Redis(decode_responses=True) res1 = r.xadd( "race:france", {"rider": "Castilla", "speed": 30.2, "position": 1, "location_id": 1}, ) print(res1) # >>> 1692629576966-0 res2 = r.xadd( "race:france", {"rider": "Norem", "speed": 28.8, "position": 3, "location_id": 1}, ) print(res2) # >>> 1692629594113-0 res3 = r.xadd( "race:france", {"rider": "Prickett", "speed": 29.7, "position": 2, "location_id": 1}, ) print(res3) # >>> 1692629613374-0 res4 = r.xrange("race:france", "1691765278160-0", "+", 2) print( res4 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] res5 = r.xread(streams={"race:france": 0}, count=100, block=300) print( res5 ) # >>> [ # ['race:france', # [('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ), # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # )] # ] # ] res6 = r.xadd( "race:france", {"rider": "Castilla", "speed": 29.9, "position": 1, "location_id": 2}, ) print(res6) # >>> 1692629676124-0 res7 = r.xlen("race:france") print(res7) # >>> 4 res8 = r.xadd("race:usa", {"racer": "Castilla"}, id="0-1") print(res8) # >>> 0-1 res9 = r.xadd("race:usa", {"racer": "Norem"}, id="0-2") print(res9) # >>> 0-2 try: res10 = r.xadd("race:usa", {"racer": "Prickett"}, id="0-1") print(res10) # >>> 0-1 except redis.exceptions.ResponseError as e: print(e) # >>> WRONGID # Not yet implemented res11 = r.xrange("race:france", "-", "+") print( res11 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ), # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # ), # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res12 = r.xrange("race:france", 1692629576965, 1692629576967) print( res12 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ) # ] res13 = r.xrange("race:france", "-", "+", 2) print( res13 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] res14 = r.xrange("race:france", "(1692629594113-0", "+", 2) print( res14 ) # >>> [ # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # ), # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res15 = r.xrange("race:france", "(1692629676124-0", "+", 2) print(res15) # >>> [] res16 = r.xrevrange("race:france", "+", "-", 1) print( res16 ) # >>> [ # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res17 = r.xread(streams={"race:france": 0}, count=2) print( res17 ) # >>> [ # ['race:france', [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] # ] # ] res18 = r.xgroup_create("race:france", "france_riders", "$") print(res18) # >>> True res19 = r.xgroup_create("race:italy", "italy_riders", "$", mkstream=True) print(res19) # >>> True r.xadd("race:italy", {"rider": "Castilla"}) r.xadd("race:italy", {"rider": "Royce"}) r.xadd("race:italy", {"rider": "Sam-Bodden"}) r.xadd("race:italy", {"rider": "Prickett"}) r.xadd("race:italy", {"rider": "Norem"}) res20 = r.xreadgroup( streams={"race:italy": ">"}, consumername="Alice", groupname="italy_riders", count=1, ) print(res20) # >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] res21 = r.xreadgroup( streams={"race:italy": 0}, consumername="Alice", groupname="italy_riders", count=1, ) print(res21) # >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] res22 = r.xack("race:italy", "italy_riders", "1692629925771-0") print(res22) # >>> 1 res23 = r.xreadgroup( streams={"race:italy": 0}, consumername="Alice", groupname="italy_riders", count=1, ) print(res23) # >>> [['race:italy', []]] res24 = r.xreadgroup( streams={"race:italy": ">"}, consumername="Bob", groupname="italy_riders", count=2, ) print( res24 ) # >>> [ # ['race:italy', [ # ('1692629925789-0', # {'rider': 'Royce'} # ), # ('1692629925790-0', # {'rider': 'Sam-Bodden'} # ) # ] # ] # ] res25 = r.xpending("race:italy", "italy_riders") print( res25 ) # >>> { # 'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0', # 'consumers': [{'name': 'Bob', 'pending': 2}] # } res26 = r.xpending_range("race:italy", "italy_riders", "-", "+", 10) print( res26 ) # >>> [ # { # 'message_id': '1692629925789-0', 'consumer': 'Bob', # 'time_since_delivered': 31084, 'times_delivered': 1 # }, # { # 'message_id': '1692629925790-0', 'consumer': 'Bob', # 'time_since_delivered': 31084, 'times_delivered': 1 # } # ] res27 = r.xrange("race:italy", "1692629925789-0", "1692629925789-0") print(res27) # >>> [('1692629925789-0', {'rider': 'Royce'})] res28 = r.xclaim("race:italy", "italy_riders", "Alice", 60000, ["1692629925789-0"]) print(res28) # >>> [('1692629925789-0', {'rider': 'Royce'})] res29 = r.xautoclaim("race:italy", "italy_riders", "Alice", 1, "0-0", 1) print(res29) # >>> ['1692629925790-0', [('1692629925789-0', {'rider': 'Royce'})]] res30 = r.xautoclaim("race:italy", "italy_riders", "Alice", 1, "(1692629925789-0", 1) print(res30) # >>> ['0-0', [('1692629925790-0', {'rider': 'Sam-Bodden'})]] res31 = r.xinfo_stream("race:italy") print( res31 ) # >>> { # 'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2, # 'last-generated-id': '1692629926436-0', 'groups': 1, # 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}), # 'last-entry': ('1692629926436-0', {'rider': 'Norem'}) # } res32 = r.xinfo_groups("race:italy") print( res32 ) # >>> [ # { # 'name': 'italy_riders', 'consumers': 2, 'pending': 2, # 'last-delivered-id': '1692629925790-0' # } # ] res33 = r.xinfo_consumers("race:italy", "italy_riders") print( res33 ) # >>> [ # {'name': 'Alice', 'pending': 2, 'idle': 199332}, # {'name': 'Bob', 'pending': 0, 'idle': 489170} # ] r.xadd("race:italy", {"rider": "Jones"}, maxlen=2) r.xadd("race:italy", {"rider": "Wood"}, maxlen=2) r.xadd("race:italy", {"rider": "Henshaw"}, maxlen=2) res34 = r.xlen("race:italy") print(res34) # >>> 8 res35 = r.xrange("race:italy", "-", "+") print( res35 ) # >>> [ # ('1692629925771-0', {'rider': 'Castilla'}), # ('1692629925789-0', {'rider': 'Royce'}), # ('1692629925790-0', {'rider': 'Sam-Bodden'}), # ('1692629925791-0', {'rider': 'Prickett'}), # ('1692629926436-0', {'rider': 'Norem'}), # ('1692630612602-0', {'rider': 'Jones'}), # ('1692630641947-0', {'rider': 'Wood'}), # ('1692630648281-0', {'rider': 'Henshaw'}) # ] r.xadd("race:italy", {"rider": "Smith"}, maxlen=2, approximate=False) res36 = r.xrange("race:italy", "-", "+") print( res36 ) # >>> [ # ('1692630648281-0', {'rider': 'Henshaw'}), # ('1692631018238-0', {'rider': 'Smith'}) # ] res37 = r.xtrim("race:italy", maxlen=10, approximate=False) print(res37) # >>> 0 res38 = r.xtrim("race:italy", maxlen=10) print(res38) # >>> 0 res39 = r.xrange("race:italy", "-", "+") print( res39 ) # >>> [ # ('1692630648281-0', {'rider': 'Henshaw'}), # ('1692631018238-0', {'rider': 'Smith'}) # ] res40 = r.xdel("race:italy", "1692631018238-0") print(res40) # >>> 1 res41 = r.xrange("race:italy", "-", "+") print(res41) # >>> [('1692630648281-0', {'rider': 'Henshaw'})]
import assert from 'assert'; import { createClient } from 'redis'; const client = await createClient(); await client.connect(); const res1 = await client.xAdd( 'race:france', '*', { 'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1' } ); console.log(res1); // >>> 1700073067968-0 N.B. actual values will differ from these examples const res2 = await client.xAdd( 'race:france', '*', { 'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1' }, ); console.log(res2); // >>> 1692629594113-0 const res3 = await client.xAdd( 'race:france', '*', { 'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1' }, ); console.log(res3); // >>> 1692629613374-0 const res4 = await client.xRange('race:france', '1691765278160-0', '+', {COUNT: 2}); console.log(res4); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})] const res5 = await client.xRead({ key: 'race:france', id: '0-0' }, { count: 100, block: 300 }); console.log(res5); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'})]]] const res6 = await client.xAdd( 'race:france', '*', { 'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2' } ); console.log(res6); // >>> 1692629676124-0 const res7 = await client.xLen('race:france'); console.log(res7); // >>> 4 const res8 = await client.xAdd('race:usa', '0-1', { 'racer': 'Castilla' }); console.log(res8); // >>> 0-1 const res9 = await client.xAdd('race:usa', '0-2', { 'racer': 'Norem' }); console.log(res9); // >>> 0-2 try { const res10 = await client.xAdd('race:usa', '0-1', { 'racer': 'Prickett' }); console.log(res10); // >>> 0-1 } catch (error) { console.error(error); // >>> WRONGID } const res11a = await client.xAdd('race:usa', '0-*', { racer: 'Norem' }); console.log(res11a); // >>> 0-3 const res11 = await client.xRange('race:france', '-', '+'); console.log(res11); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}), ('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res12 = await client.xRange('race:france', '1692629576965', '1692629576967'); console.log(res12); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'})] const res13 = await client.xRange('race:france', '-', '+', {COUNT: 2}); console.log(res13); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})] const res14 = await client.xRange('race:france', '(1692629594113-0', '+', {COUNT: 2}); console.log(res14); // >>> [('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}), ('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res15 = await client.xRange('race:france', '(1692629676124-0', '+', {COUNT: 2}); console.log(res15); // >>> [] const res16 = await client.xRevRange('race:france', '+', '-', {COUNT: 1}); console.log( res16 ); // >>> [('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res17 = await client.xRead({ key: 'race:france', id: '0-0' }, { count: 2 }); console.log(res17); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})]]] const res18 = await client.xGroupCreate('race:france', 'france_riders', '$'); console.log(res18); // >>> True const res19 = await client.xGroupCreate('race:italy', 'italy_riders', '$', { 'MKSTREAM': true }); console.log(res19); // >>> True await client.xAdd('race:italy', '*', { 'rider': 'Castilla' }); await client.xAdd('race:italy', '*', { 'rider': 'Royce' }); await client.xAdd('race:italy', '*', { 'rider': 'Sam-Bodden' }); await client.xAdd('race:italy', '*', { 'rider': 'Prickett' }); await client.xAdd('race:italy', '*', { 'rider': 'Norem' }); const res20 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '>' }, { 'COUNT': 1 } ); console.log(res20); // >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] const res21 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '0' }, { 'COUNT': 1 } ); console.log(res21); // >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] const res22 = await client.xAck('race:italy', 'italy_riders', '1692629925771-0') console.log(res22); // >>> 1 const res23 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '0' }, { 'COUNT': 1 } ); console.log(res23); // >>> [['race:italy', []]] const res24 = await client.xReadGroup( 'italy_riders', 'Bob', { key: 'race:italy', id: '>' }, { 'COUNT': 2 } ); console.log(res24); // >>> [['race:italy', [('1692629925789-0', {'rider': 'Royce'}), ('1692629925790-0', {'rider': 'Sam-Bodden'})]]] const res25 = await client.xPending('race:italy', 'italy_riders'); console.log(res25); // >>> {'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0', 'consumers': [{'name': 'Bob', 'pending': 2}]} const res26 = await client.xPendingRange('race:italy', 'italy_riders', '-', '+', 10); console.log(res26); // >>> [{'message_id': '1692629925789-0', 'consumer': 'Bob', 'time_since_delivered': 31084, 'times_delivered': 1}, {'message_id': '1692629925790-0', 'consumer': 'Bob', 'time_since_delivered': 31084, 'times_delivered': 1}] const res27 = await client.xRange('race:italy', '1692629925789-0', '1692629925789-0'); console.log(res27); // >>> [('1692629925789-0', {'rider': 'Royce'})] const res28 = await client.xClaim( 'race:italy', 'italy_riders', 'Alice', 60000, ['1692629925789-0'] ); console.log(res28); // >>> [('1692629925789-0', {'rider': 'Royce'})] const res29 = await client.xAutoClaim('race:italy', 'italy_riders', 'Alice', 1, '0-0', 1); console.log(res29); // >>> ['1692629925790-0', [('1692629925789-0', {'rider': 'Royce'})]] const res30 = await client.xAutoClaim( 'race:italy', 'italy_riders', 'Alice', 1, '(1692629925789-0', 1 ); console.log(res30); // >>> ['0-0', [('1692629925790-0', {'rider': 'Sam-Bodden'})]] const res31 = await client.xInfoStream('race:italy'); console.log(res31); // >>> {'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2, 'last-generated-id': '1692629926436-0', 'groups': 1, 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}), 'last-entry': ('1692629926436-0', {'rider': 'Norem'})} const res32 = await client.xInfoGroups('race:italy'); console.log(res32); // >>> [{'name': 'italy_riders', 'consumers': 2, 'pending': 2, 'last-delivered-id': '1692629925790-0'}] const res33 = await client.xInfoConsumers('race:italy', 'italy_riders'); console.log(res33); // >>> [{'name': 'Alice', 'pending': 2, 'idle': 199332}, {'name': 'Bob', 'pending': 0, 'idle': 489170}] await client.xAdd('race:italy', '*', { 'rider': 'Jones' }, { 'MAXLEN': 2 }); await client.xAdd('race:italy', '*', { 'rider': 'Wood' }, { 'MAXLEN': 2 }); await client.xAdd('race:italy', '*', { 'rider': 'Henshaw' }, { 'MAXLEN': 2 }); const res34 = await client.xLen('race:italy'); console.log(res34); // >>> 8 const res35 = await client.xRange('race:italy', '-', '+'); console.log(res35); // >>> [('1692629925771-0', {'rider': 'Castilla'}), ('1692629925789-0', {'rider': 'Royce'}), ('1692629925790-0', {'rider': 'Sam-Bodden'}), ('1692629925791-0', {'rider': 'Prickett'}), ('1692629926436-0', {'rider': 'Norem'}), ('1692630612602-0', {'rider': 'Jones'}), ('1692630641947-0', {'rider': 'Wood'}), ('1692630648281-0', {'rider': 'Henshaw'})] await client.xAdd('race:italy', '*', { 'rider': 'Smith' }, { 'MAXLEN': 2, 'APPROXIMATE': false }); const res36 = await client.xRange('race:italy', '-', '+'); console.log(res36); // >>> [('1692630648281-0', {'rider': 'Henshaw'}), ('1692631018238-0', {'rider': 'Smith'})] const res37 = await client.xTrim('race:italy', 'MAXLEN', 10, { 'APPROXIMATE': false }); console.log(res37); // >>> 0 const res38 = await client.xTrim('race:italy', "MAXLEN", 10); console.log(res38); // >>> 0 const res39 = await client.xRange('race:italy', '-', '+'); console.log(res39); // >>> [('1692630648281-0', {'rider': 'Henshaw'}), ('1692631018238-0', {'rider': 'Smith'})] const res40 = await client.xDel('race:italy', '1692631018238-0'); console.log(res40); // >>> 1 const res41 = await client.xRange('race:italy', '-', '+'); console.log(res41); // >>> [('1692630648281-0', {'rider': 'Henshaw'})]
package io.redis.examples; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.UnifiedJedis; public class StreamsExample { public void run(){ UnifiedJedis jedis = new UnifiedJedis("redis://localhost:6379"); StreamEntryID res1 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","30.2");put("position","1");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res1); // >>> 1701760582225-0 StreamEntryID res2 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Norem");put("speed","28.8");put("position","3");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res2); // >>> 1701760582225-1 StreamEntryID res3 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Prickett");put("speed","29.7");put("position","2");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res3); // >>> 1701760582226-0 List<StreamEntry> res4 = jedis.xrange("race:france","1701760582225-0","+",2); System.out.println(res4); // >>> [1701760841292-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701760841292-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<Map.Entry<String, List<StreamEntry>>> res5= jedis.xread(XReadParams.xReadParams().block(300).count(100),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}}); System.out.println( res5 ); // >>> [race:france=[1701761996660-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701761996661-0 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701761996661-1 {rider=Prickett, speed=29.7, location_id=1, position=2}]] StreamEntryID res6 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","29.9");put("position","2");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res6); // >>> 1701762285679-0 long res7 = jedis.xlen("race:france"); System.out.println(res7); // >>> 4 StreamEntryID res8 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Castilla");}},XAddParams.xAddParams().id("0-1")); System.out.println(res8); // >>> 0-1 StreamEntryID res9 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-2")); System.out.println(res9); // >>> 0-2 try { StreamEntryID res10 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Prickett");}},XAddParams.xAddParams().id("0-1")); System.out.println(res10); // >>> 0-1 } catch (JedisDataException e){ System.out.println(e); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } StreamEntryID res11 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-*")); System.out.println(res11); List<StreamEntry> res12 = jedis.xrange("race:france","-","+"); System.out.println( res12 ); // >>> [1701764734160-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764734160-1 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701764734161-0 {rider=Prickett, speed=29.7, location_id=1, position=2}, 1701764734162-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<StreamEntry> res13 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000),String.valueOf(System.currentTimeMillis()+1000)); System.out.println( res13 ); // >>> [1701764734160-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764734160-1 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701764734161-0 {rider=Prickett, speed=29.7, location_id=1, position=2}, 1701764734162-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<StreamEntry> res14 = jedis.xrange("race:france","-","+",2); System.out.println(res14); // >>> [1701764887638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764887638-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<StreamEntry> res15 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000)+"-0","+",2); System.out.println(res15); // >>> [1701764887638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764887638-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<StreamEntry> res16 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()+1000)+"-0","+",2); System.out.println(res16); // >>> [] List<StreamEntry> res17 = jedis.xrevrange("race:france","+","-",1); System.out.println(res17); // >>> [1701765218592-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<Map.Entry<String, List<StreamEntry>>> res18= jedis.xread(XReadParams.xReadParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}}); System.out.println( res18 ); // >>> [race:france=[1701765384638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701765384638-1 {rider=Norem, speed=28.8, location_id=1, position=3}]] String res19 = jedis.xgroupCreate("race:france","france_riders",StreamEntryID.LAST_ENTRY,false); System.out.println(res19); // >>> OK String res20 = jedis.xgroupCreate("race:italy","italy_riders",StreamEntryID.LAST_ENTRY,true); System.out.println(res20); // >>> OK StreamEntryID id1 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Castilaa");}},XAddParams.xAddParams()); StreamEntryID id2 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Royce");}},XAddParams.xAddParams()); StreamEntryID id3 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Sam-Bodden");}},XAddParams.xAddParams()); StreamEntryID id4 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Prickett");}},XAddParams.xAddParams()); StreamEntryID id5 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Norem");}},XAddParams.xAddParams()); List<Map.Entry<String, List<StreamEntry>>> res21 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",StreamEntryID.UNRECEIVED_ENTRY);}}); System.out.println(res21); // >>> [race:italy=[1701766299006-0 {rider=Castilaa}]] List<Map.Entry<String, List<StreamEntry>>> res22 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",new StreamEntryID());}}); System.out.println(res22); // >>> [race:italy=[1701766299006-0 {rider=Castilaa}]] long res23 = jedis.xack("race:italy","italy_riders",id1); System.out.println(res23); // >>> 1 List<Map.Entry<String, List<StreamEntry>>> res24 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",new StreamEntryID());}}); System.out.println(res24); // >>> [race:italy=[]] List<Map.Entry<String, List<StreamEntry>>> res25 = jedis.xreadGroup("italy_riders","Bob", XReadGroupParams.xReadGroupParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:italy",StreamEntryID.UNRECEIVED_ENTRY);}}); System.out.println(res25); // >>> [race:italy=[1701767632261-1 {rider=Royce}, 1701767632262-0 {rider=Sam-Bodden}]] StreamPendingSummary res26 = jedis.xpending("race:italy","italy_riders"); System.out.println(res26.getConsumerMessageCount()); // >>> {Bob=2} List<StreamPendingEntry> res27 = jedis.xpending("race:italy","italy_riders",XPendingParams.xPendingParams().start(StreamEntryID.MINIMUM_ID).end(StreamEntryID.MAXIMUM_ID).count(10)); System.out.println(res27); // >>> [1701768567412-1 Bob idle:0 times:1, 1701768567412-2 Bob idle:0 times:1] List<StreamEntry> res28 = jedis.xrange("race:italy",id2.toString(),id2.toString()); System.out.println(res28); // >>> [1701768744819-1 {rider=Royce}] List<StreamEntry> res29 = jedis.xclaim("race:italy","italy_riders","Alice", 0L, XClaimParams.xClaimParams().time(60000),id2); System.out.println(res29); // >>> [1701769004195-1 {rider=Royce}] Map.Entry<StreamEntryID, List<StreamEntry>> res30 = jedis.xautoclaim("race:italy","italy_riders","Alice",1L,new StreamEntryID("0-0"),XAutoClaimParams.xAutoClaimParams().count(1)); System.out.println(res30); // >>> [1701769266831-2=[1701769266831-1 {rider=Royce}] Map.Entry<StreamEntryID, List<StreamEntry>> res31 = jedis.xautoclaim("race:italy","italy_riders","Alice",1L,new StreamEntryID(id2.toString()),XAutoClaimParams.xAutoClaimParams().count(1)); System.out.println(res31); // >>> [0-0=[1701769605847-2 {rider=Sam-Bodden}] StreamInfo res32 = jedis.xinfoStream("race:italy"); System.out.println( res32.getStreamInfo() ); // >>> {radix-tree-keys=1, radix-tree-nodes=2, entries-added=5, length=5, groups=1, max-deleted-entry-id=0-0, first-entry=1701769637612-0 {rider=Castilaa}, last-generated-id=1701769637612-4, last-entry=1701769637612-4 {rider=Norem}, recorded-first-entry-id=1701769637612-0} List<StreamGroupInfo> res33 = jedis.xinfoGroups("race:italy"); for (StreamGroupInfo a : res33){ System.out.println( a.getGroupInfo() ); // >>> {last-delivered-id=1701770253659-0, lag=2, pending=2, name=italy_riders, consumers=2, entries-read=3} } List<StreamConsumersInfo> res34 = jedis.xinfoConsumers("race:italy","italy_riders"); for (StreamConsumerInfo a : res34){ System.out.println( a.getConsumerInfo() ); // {inactive=1, idle=1, pending=1, name=Alice} , {inactive=3, idle=3, pending=1, name=Bob} } jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Jones");}},XAddParams.xAddParams().maxLen(10)); jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Wood");}},XAddParams.xAddParams().maxLen(10)); jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Henshaw");}},XAddParams.xAddParams().maxLen(10)); long res35 = jedis.xlen("race:italy"); System.out.println(res35); // >>> 8 List<StreamEntry> res36 = jedis.xrange("race:italy","-","+"); System.out.println(res36); // >>> [1701771219852-0 {rider=Castilaa}, 1701771219852-1 {rider=Royce}, 1701771219853-0 {rider=Sam-Bodden}, 1701771219853-1 {rider=Prickett}, 1701771219853-2 {rider=Norem}, 1701771219858-0 {rider=Jones}, 1701771219858-1 {rider=Wood}, 1701771219859-0 {rider=Henshaw}] StreamEntryID id6 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Smith");}},XAddParams.xAddParams().maxLen(2)); List<StreamEntry> res37 = jedis.xrange("race:italy","-","+"); System.out.println(res37); // >>> [1701771067332-1 {rider=Henshaw}, 1701771067332-2 {rider=Smith}] long res38 = jedis.xtrim("race:italy",XTrimParams.xTrimParams().maxLen(10).exactTrimming()); System.out.println(res38); /// >>> 0 long res39 = jedis.xtrim("race:italy",XTrimParams.xTrimParams().maxLen(10)); System.out.println(res39); /// >>> 0 List<StreamEntry> res40 = jedis.xrange("race:italy","-","+"); System.out.println(res40); // >>> [1701771356428-2 {rider=Henshaw}, 1701771356429-0 {rider=Smith}] long res41 = jedis.xdel("race:italy",id6); System.out.println(res41); // >>> 1 List<StreamEntry> res42 = jedis.xrange("race:italy","-","+"); System.out.println(res42); // >>> [1701771517639-1 {rider=Henshaw}] } }
using System.Runtime.CompilerServices; using NRedisStack.Tests; using StackExchange.Redis; public class StreamTutorial { [SkipIfRedis(Is.OSSCluster)] public void run() { var muxer = ConnectionMultiplexer.Connect("localhost:6379"); var db = muxer.GetDatabase(); RedisValue res1 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 30.2), new NameValueEntry("position", 1), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res1); // >>> 1712668482289-0 RedisValue res2 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Norem"), new NameValueEntry("speed", 28.8), new NameValueEntry("position", 3), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res2); // >>> 1712668766534-1 RedisValue res3 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Prickett"), new NameValueEntry("speed", 29.7), new NameValueEntry("position", 2), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res3); // >>> 1712669055705-0 // Tests for 'xadd' step. StreamEntry[] res4 = db.StreamRange("race:france", "1712668482289-0", "+", 2); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xrange' step. StreamEntry[] res5 = db.StreamRead("race:france", 0, 100); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // Tests for 'xread_block' step. RedisValue res6 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 29.9), new NameValueEntry("position", 1), new NameValueEntry("location_id", 2) } ); Console.WriteLine(res6); // >>> 1712675674750-0 // Tests for 'xadd_2' step. long res7 = db.StreamLength("race:france"); Console.WriteLine(res7); // >>> 4 // Tests for 'xlen' step. RedisValue res8 = db.StreamAdd( "race:usa", new NameValueEntry[] { new NameValueEntry("racer", "Castilla") }, "0-1" ); Console.WriteLine(res8); // >>> 0-1 RedisValue res9 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Norem") }, "0-2" ); Console.WriteLine(res9); // >>> 0-2 // Tests for 'xadd_id' step. try { RedisValue res10 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Prickett") }, "0-1" ); } catch (RedisServerException ex) { Console.WriteLine(ex); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } // Tests for 'xadd_bad_id' step. RedisValue res11 = ""; Version version = muxer.GetServer("localhost:6379").Version; if (version.Major >= 7) { res11 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("rider", "Norem") }, "0-*" ); Console.WriteLine(res11); // >>> "0-3" } // Tests for 'xadd_7' step. StreamEntry[] res12 = db.StreamRange("race:france", "-", "+"); foreach (StreamEntry entry in res12) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrange_all' step. StreamEntry[] res13 = db.StreamRange("race:france", 1712668482289, 1712668482291); foreach (StreamEntry entry in res13) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // Tests for 'xrange_time' step. StreamEntry[] res14 = db.StreamRange("race:france", "-", "+", 2); foreach (StreamEntry entry in res14) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xrange_step_1' step. StreamEntry[] res15 = db.StreamRange("race:france", "(1712668766534-1", "+", 2); foreach (StreamEntry entry in res15) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrange_step_2' step. StreamEntry[] res16 = db.StreamRange("race:france", "(1712675674750-0", "+", 2); foreach (StreamEntry entry in res16) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> <empty array> // Tests for 'xrange_empty' step. StreamEntry[] res17 = db.StreamRange("race:france", "+", "-", 1, Order.Descending); foreach (StreamEntry entry in res17) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrevrange' step. StreamEntry[] res18 = db.StreamRead("race:france", 0, 2); foreach (StreamEntry entry in res18) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xread' step. bool res19 = db.StreamCreateConsumerGroup("race:france", "france_riders", "$"); Console.WriteLine(res19); // >>> true // Tests for 'xgroup_create' step. bool res20 = db.StreamCreateConsumerGroup("race:italy", "italy_riders", "$", true); Console.WriteLine(res20); // >>> true // Tests for 'xgroup_create_mkstream' step. RedisValue groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Castilla") } ); // 1712744323758-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Royce") } ); // 1712744358384-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Sam-Bodden") } ); // 1712744379676-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Prickett") } ); // 1712744399401-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Norem") } ); // 1712744413117-0 StreamEntry[] res21 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", ">", 1); foreach (StreamEntry entry in res21) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744323758-0: [rider: Castilla] // Tests for 'xgroup_read' step. StreamEntry[] res22 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", "0"); foreach (StreamEntry entry in res22) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); // >>> 1712744323758-0: [rider: Castilla] } // Tests for 'xgroup_read_id' step. long res23 = db.StreamAcknowledge("race:italy", "italy_riders", "1712744323758-0"); Console.WriteLine(res23); // >>> 1 StreamEntry[] res24 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", "0"); foreach (StreamEntry entry in res24) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> <empty array> // Tests for 'xack' step. StreamEntry[] res25 = db.StreamReadGroup("race:italy", "italy_riders", "Bob", ">", 2); foreach (StreamEntry entry in res25) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // >>> 1712744379676-0: [rider: Sam-Bodden] // Tests for 'xgroup_read_bob' step. StreamPendingInfo res26 = db.StreamPending("race:italy", "italy_riders"); Console.WriteLine($"pending: {res26.PendingMessageCount}, min: {res26.LowestPendingMessageId}, max: {res26.HighestPendingMessageId}, consumers:[{string.Join(", ", res26.Consumers.Select(c => $"{c.Name}: {c.PendingMessageCount}"))}]"); // >>> pending: 2, min: 1712747506906-0, max: 1712747506907-0, consumers:[name: Bob, pending:2] // Tests for 'xpending' step. StreamPendingMessageInfo[] res27 = db.StreamPendingMessages( "race:italy", "italy_riders", 10, "", "-", "+" ); foreach (StreamPendingMessageInfo info in res27) { Console.WriteLine($"message_id: {info.MessageId}, consumer: {info.ConsumerName}, time_since_delivered: {info.IdleTimeInMilliseconds}, times_delivered: {info.DeliveryCount}"); } // >>> message_id: min: 1712747506906-0, consumer: Bob, time_since_delivered: 31084, times_delivered: 1 // >>> message_id: min: 1712747506907-0, consumer: Bob, time_since_delivered: 31084, times_delivered: 1 // Tests for 'xpending_plus_minus' step. StreamEntry[] res28 = db.StreamRange("race:italy", "1712744358384-0", "1712744358384-0"); foreach (StreamEntry entry in res28) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // Tests for 'xrange_pending' step. StreamEntry[] res29 = db.StreamClaim( "race:italy", "italy_riders", "Alice", 60000, new RedisValue[] { 1712744358384 - 0 } ); foreach (StreamEntry entry in res29) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // Tests for 'xclaim' step. StreamAutoClaimResult res30 = db.StreamAutoClaim( "race:italy", "italy_riders", "Alice", 1, "0-0", 1 ); Console.WriteLine($"{res30.NextStartId}, ({string.Join(", ", res30.ClaimedEntries.Select(entry => $"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"))})"); // >>> 1712744379676-0, (1712744358384-0: [rider: Royce]) // Tests for 'xautoclaim' step. StreamAutoClaimResult res31 = db.StreamAutoClaim( "race:italy", "italy_riders", "Alice", 1, "(1712744358384-0", 1 ); Console.WriteLine($"{res31.NextStartId}, ({string.Join(", ", res31.ClaimedEntries.Select(entry => $"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"))})"); // >>> 0-0, (1712744379676-0: [rider: Sam-Bodden]) // Tests for 'xautoclaim_cursor' step. StreamInfo res32 = db.StreamInfo("race:italy"); Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}"); // >>> length: 5, radix-tree-keys: 1, radix-tree-nodes: 2, last-generated-id: 1712756762686-1, first-entry: 1712756762685-0: [rider: Castilla], last-entry: 1712756762686-1: [rider: Norem] // Tests for 'xinfo' step. StreamGroupInfo[] res33 = db.StreamGroupInfo("race:italy"); foreach (StreamGroupInfo info in res33) { Console.WriteLine($"name: {info.Name}, consumers: {info.ConsumerCount}, pending: {info.PendingMessageCount}, last-delivered-id: {info.LastDeliveredId}"); } // >>> name: italy_riders, consumers: 2, pending: 2, last-delivered-id: 1712757192730-2 // Tests for 'xinfo_groups' step. StreamConsumerInfo[] res34 = db.StreamConsumerInfo("race:italy", "italy_riders"); foreach (StreamConsumerInfo info in res34) { Console.WriteLine($"name: {info.Name}, pending: {info.PendingMessageCount}, idle: {info.IdleTimeInMilliseconds}"); } // >>> name: Alice, pending: 1, idle: 7717 // >>> name: Bob, pending: 0, idle: 7722 // Tests for 'xinfo_consumers' step. db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Jones") }, null, 2, true ); db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Wood") }, null, 2, true ); db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Henshaw") }, null, 2, true ); long res35 = db.StreamLength("race:italy"); Console.WriteLine(res35); // >>> 8 StreamEntry[] res36 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res36) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712758336128-0: [rider: Castilla] // >>> 1712758336128-1: [rider: Royce] // >>> 1712758336128-2: [rider: Sam-Bodden] // >>> 1712758336129-0: [rider: Prickett] // >>> 1712758336139-0: [rider: Norem] // >>> 1712758340854-0: [rider: Jones] // >>> 1712758341645-0: [rider: Wood] // >>> 1712758342134-0: [rider: Henshaw] db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Smith") }, null, 2, false ); StreamEntry[] res37 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res37) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // 1712758746476-1: [rider: Henshaw] // 1712758746477-0: [rider: Smith] // Tests for 'maxlen' step. long res38 = db.StreamTrim("race:italy", 10, false); Console.WriteLine(res38); // >>> 0 // Tests for 'xtrim' step. long res39 = db.StreamTrim("race:italy", 10, true); Console.WriteLine(res39); // >>> 0 // Tests for 'xtrim2' step. StreamEntry[] res40 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res40) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712759694003-0: [rider: Henshaw] // >>> 1712759694003-1: [rider: Smith] long res41 = db.StreamDelete("race:italy", new RedisValue[] { "1712759694003-1" }); Console.WriteLine(res41); // >>> 1 StreamEntry[] res42 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res42) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712759694003-0: [rider: Henshaw] // Tests for 'xdel' step. } }
-
Read two stream entries starting at ID
1692632086370-0
:> XRANGE race:france 1692632086370-0 + COUNT 2 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" 2) 1) "1692632094485-0" 2) 1) "rider" 2) "Norem" 3) "speed" 4) "28.8" 5) "position" 6) "3" 7) "location_id" 8) "1"
Are you tired of using redis-cli? Try Redis Insight - the developer GUI for Redis.""" Code samples for Stream doc pages: https://redis.io/docs/latest/develop/data-types/streams/ """ import redis r = redis.Redis(decode_responses=True) res1 = r.xadd( "race:france", {"rider": "Castilla", "speed": 30.2, "position": 1, "location_id": 1}, ) print(res1) # >>> 1692629576966-0 res2 = r.xadd( "race:france", {"rider": "Norem", "speed": 28.8, "position": 3, "location_id": 1}, ) print(res2) # >>> 1692629594113-0 res3 = r.xadd( "race:france", {"rider": "Prickett", "speed": 29.7, "position": 2, "location_id": 1}, ) print(res3) # >>> 1692629613374-0 res4 = r.xrange("race:france", "1691765278160-0", "+", 2) print( res4 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] res5 = r.xread(streams={"race:france": 0}, count=100, block=300) print( res5 ) # >>> [ # ['race:france', # [('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ), # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # )] # ] # ] res6 = r.xadd( "race:france", {"rider": "Castilla", "speed": 29.9, "position": 1, "location_id": 2}, ) print(res6) # >>> 1692629676124-0 res7 = r.xlen("race:france") print(res7) # >>> 4 res8 = r.xadd("race:usa", {"racer": "Castilla"}, id="0-1") print(res8) # >>> 0-1 res9 = r.xadd("race:usa", {"racer": "Norem"}, id="0-2") print(res9) # >>> 0-2 try: res10 = r.xadd("race:usa", {"racer": "Prickett"}, id="0-1") print(res10) # >>> 0-1 except redis.exceptions.ResponseError as e: print(e) # >>> WRONGID # Not yet implemented res11 = r.xrange("race:france", "-", "+") print( res11 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ), # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # ), # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res12 = r.xrange("race:france", 1692629576965, 1692629576967) print( res12 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ) # ] res13 = r.xrange("race:france", "-", "+", 2) print( res13 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] res14 = r.xrange("race:france", "(1692629594113-0", "+", 2) print( res14 ) # >>> [ # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # ), # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res15 = r.xrange("race:france", "(1692629676124-0", "+", 2) print(res15) # >>> [] res16 = r.xrevrange("race:france", "+", "-", 1) print( res16 ) # >>> [ # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res17 = r.xread(streams={"race:france": 0}, count=2) print( res17 ) # >>> [ # ['race:france', [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] # ] # ] res18 = r.xgroup_create("race:france", "france_riders", "$") print(res18) # >>> True res19 = r.xgroup_create("race:italy", "italy_riders", "$", mkstream=True) print(res19) # >>> True r.xadd("race:italy", {"rider": "Castilla"}) r.xadd("race:italy", {"rider": "Royce"}) r.xadd("race:italy", {"rider": "Sam-Bodden"}) r.xadd("race:italy", {"rider": "Prickett"}) r.xadd("race:italy", {"rider": "Norem"}) res20 = r.xreadgroup( streams={"race:italy": ">"}, consumername="Alice", groupname="italy_riders", count=1, ) print(res20) # >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] res21 = r.xreadgroup( streams={"race:italy": 0}, consumername="Alice", groupname="italy_riders", count=1, ) print(res21) # >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] res22 = r.xack("race:italy", "italy_riders", "1692629925771-0") print(res22) # >>> 1 res23 = r.xreadgroup( streams={"race:italy": 0}, consumername="Alice", groupname="italy_riders", count=1, ) print(res23) # >>> [['race:italy', []]] res24 = r.xreadgroup( streams={"race:italy": ">"}, consumername="Bob", groupname="italy_riders", count=2, ) print( res24 ) # >>> [ # ['race:italy', [ # ('1692629925789-0', # {'rider': 'Royce'} # ), # ('1692629925790-0', # {'rider': 'Sam-Bodden'} # ) # ] # ] # ] res25 = r.xpending("race:italy", "italy_riders") print( res25 ) # >>> { # 'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0', # 'consumers': [{'name': 'Bob', 'pending': 2}] # } res26 = r.xpending_range("race:italy", "italy_riders", "-", "+", 10) print( res26 ) # >>> [ # { # 'message_id': '1692629925789-0', 'consumer': 'Bob', # 'time_since_delivered': 31084, 'times_delivered': 1 # }, # { # 'message_id': '1692629925790-0', 'consumer': 'Bob', # 'time_since_delivered': 31084, 'times_delivered': 1 # } # ] res27 = r.xrange("race:italy", "1692629925789-0", "1692629925789-0") print(res27) # >>> [('1692629925789-0', {'rider': 'Royce'})] res28 = r.xclaim("race:italy", "italy_riders", "Alice", 60000, ["1692629925789-0"]) print(res28) # >>> [('1692629925789-0', {'rider': 'Royce'})] res29 = r.xautoclaim("race:italy", "italy_riders", "Alice", 1, "0-0", 1) print(res29) # >>> ['1692629925790-0', [('1692629925789-0', {'rider': 'Royce'})]] res30 = r.xautoclaim("race:italy", "italy_riders", "Alice", 1, "(1692629925789-0", 1) print(res30) # >>> ['0-0', [('1692629925790-0', {'rider': 'Sam-Bodden'})]] res31 = r.xinfo_stream("race:italy") print( res31 ) # >>> { # 'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2, # 'last-generated-id': '1692629926436-0', 'groups': 1, # 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}), # 'last-entry': ('1692629926436-0', {'rider': 'Norem'}) # } res32 = r.xinfo_groups("race:italy") print( res32 ) # >>> [ # { # 'name': 'italy_riders', 'consumers': 2, 'pending': 2, # 'last-delivered-id': '1692629925790-0' # } # ] res33 = r.xinfo_consumers("race:italy", "italy_riders") print( res33 ) # >>> [ # {'name': 'Alice', 'pending': 2, 'idle': 199332}, # {'name': 'Bob', 'pending': 0, 'idle': 489170} # ] r.xadd("race:italy", {"rider": "Jones"}, maxlen=2) r.xadd("race:italy", {"rider": "Wood"}, maxlen=2) r.xadd("race:italy", {"rider": "Henshaw"}, maxlen=2) res34 = r.xlen("race:italy") print(res34) # >>> 8 res35 = r.xrange("race:italy", "-", "+") print( res35 ) # >>> [ # ('1692629925771-0', {'rider': 'Castilla'}), # ('1692629925789-0', {'rider': 'Royce'}), # ('1692629925790-0', {'rider': 'Sam-Bodden'}), # ('1692629925791-0', {'rider': 'Prickett'}), # ('1692629926436-0', {'rider': 'Norem'}), # ('1692630612602-0', {'rider': 'Jones'}), # ('1692630641947-0', {'rider': 'Wood'}), # ('1692630648281-0', {'rider': 'Henshaw'}) # ] r.xadd("race:italy", {"rider": "Smith"}, maxlen=2, approximate=False) res36 = r.xrange("race:italy", "-", "+") print( res36 ) # >>> [ # ('1692630648281-0', {'rider': 'Henshaw'}), # ('1692631018238-0', {'rider': 'Smith'}) # ] res37 = r.xtrim("race:italy", maxlen=10, approximate=False) print(res37) # >>> 0 res38 = r.xtrim("race:italy", maxlen=10) print(res38) # >>> 0 res39 = r.xrange("race:italy", "-", "+") print( res39 ) # >>> [ # ('1692630648281-0', {'rider': 'Henshaw'}), # ('1692631018238-0', {'rider': 'Smith'}) # ] res40 = r.xdel("race:italy", "1692631018238-0") print(res40) # >>> 1 res41 = r.xrange("race:italy", "-", "+") print(res41) # >>> [('1692630648281-0', {'rider': 'Henshaw'})]
import assert from 'assert'; import { createClient } from 'redis'; const client = await createClient(); await client.connect(); const res1 = await client.xAdd( 'race:france', '*', { 'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1' } ); console.log(res1); // >>> 1700073067968-0 N.B. actual values will differ from these examples const res2 = await client.xAdd( 'race:france', '*', { 'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1' }, ); console.log(res2); // >>> 1692629594113-0 const res3 = await client.xAdd( 'race:france', '*', { 'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1' }, ); console.log(res3); // >>> 1692629613374-0 const res4 = await client.xRange('race:france', '1691765278160-0', '+', {COUNT: 2}); console.log(res4); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})] const res5 = await client.xRead({ key: 'race:france', id: '0-0' }, { count: 100, block: 300 }); console.log(res5); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'})]]] const res6 = await client.xAdd( 'race:france', '*', { 'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2' } ); console.log(res6); // >>> 1692629676124-0 const res7 = await client.xLen('race:france'); console.log(res7); // >>> 4 const res8 = await client.xAdd('race:usa', '0-1', { 'racer': 'Castilla' }); console.log(res8); // >>> 0-1 const res9 = await client.xAdd('race:usa', '0-2', { 'racer': 'Norem' }); console.log(res9); // >>> 0-2 try { const res10 = await client.xAdd('race:usa', '0-1', { 'racer': 'Prickett' }); console.log(res10); // >>> 0-1 } catch (error) { console.error(error); // >>> WRONGID } const res11a = await client.xAdd('race:usa', '0-*', { racer: 'Norem' }); console.log(res11a); // >>> 0-3 const res11 = await client.xRange('race:france', '-', '+'); console.log(res11); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}), ('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res12 = await client.xRange('race:france', '1692629576965', '1692629576967'); console.log(res12); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'})] const res13 = await client.xRange('race:france', '-', '+', {COUNT: 2}); console.log(res13); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})] const res14 = await client.xRange('race:france', '(1692629594113-0', '+', {COUNT: 2}); console.log(res14); // >>> [('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}), ('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res15 = await client.xRange('race:france', '(1692629676124-0', '+', {COUNT: 2}); console.log(res15); // >>> [] const res16 = await client.xRevRange('race:france', '+', '-', {COUNT: 1}); console.log( res16 ); // >>> [('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res17 = await client.xRead({ key: 'race:france', id: '0-0' }, { count: 2 }); console.log(res17); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})]]] const res18 = await client.xGroupCreate('race:france', 'france_riders', '$'); console.log(res18); // >>> True const res19 = await client.xGroupCreate('race:italy', 'italy_riders', '$', { 'MKSTREAM': true }); console.log(res19); // >>> True await client.xAdd('race:italy', '*', { 'rider': 'Castilla' }); await client.xAdd('race:italy', '*', { 'rider': 'Royce' }); await client.xAdd('race:italy', '*', { 'rider': 'Sam-Bodden' }); await client.xAdd('race:italy', '*', { 'rider': 'Prickett' }); await client.xAdd('race:italy', '*', { 'rider': 'Norem' }); const res20 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '>' }, { 'COUNT': 1 } ); console.log(res20); // >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] const res21 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '0' }, { 'COUNT': 1 } ); console.log(res21); // >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] const res22 = await client.xAck('race:italy', 'italy_riders', '1692629925771-0') console.log(res22); // >>> 1 const res23 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '0' }, { 'COUNT': 1 } ); console.log(res23); // >>> [['race:italy', []]] const res24 = await client.xReadGroup( 'italy_riders', 'Bob', { key: 'race:italy', id: '>' }, { 'COUNT': 2 } ); console.log(res24); // >>> [['race:italy', [('1692629925789-0', {'rider': 'Royce'}), ('1692629925790-0', {'rider': 'Sam-Bodden'})]]] const res25 = await client.xPending('race:italy', 'italy_riders'); console.log(res25); // >>> {'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0', 'consumers': [{'name': 'Bob', 'pending': 2}]} const res26 = await client.xPendingRange('race:italy', 'italy_riders', '-', '+', 10); console.log(res26); // >>> [{'message_id': '1692629925789-0', 'consumer': 'Bob', 'time_since_delivered': 31084, 'times_delivered': 1}, {'message_id': '1692629925790-0', 'consumer': 'Bob', 'time_since_delivered': 31084, 'times_delivered': 1}] const res27 = await client.xRange('race:italy', '1692629925789-0', '1692629925789-0'); console.log(res27); // >>> [('1692629925789-0', {'rider': 'Royce'})] const res28 = await client.xClaim( 'race:italy', 'italy_riders', 'Alice', 60000, ['1692629925789-0'] ); console.log(res28); // >>> [('1692629925789-0', {'rider': 'Royce'})] const res29 = await client.xAutoClaim('race:italy', 'italy_riders', 'Alice', 1, '0-0', 1); console.log(res29); // >>> ['1692629925790-0', [('1692629925789-0', {'rider': 'Royce'})]] const res30 = await client.xAutoClaim( 'race:italy', 'italy_riders', 'Alice', 1, '(1692629925789-0', 1 ); console.log(res30); // >>> ['0-0', [('1692629925790-0', {'rider': 'Sam-Bodden'})]] const res31 = await client.xInfoStream('race:italy'); console.log(res31); // >>> {'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2, 'last-generated-id': '1692629926436-0', 'groups': 1, 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}), 'last-entry': ('1692629926436-0', {'rider': 'Norem'})} const res32 = await client.xInfoGroups('race:italy'); console.log(res32); // >>> [{'name': 'italy_riders', 'consumers': 2, 'pending': 2, 'last-delivered-id': '1692629925790-0'}] const res33 = await client.xInfoConsumers('race:italy', 'italy_riders'); console.log(res33); // >>> [{'name': 'Alice', 'pending': 2, 'idle': 199332}, {'name': 'Bob', 'pending': 0, 'idle': 489170}] await client.xAdd('race:italy', '*', { 'rider': 'Jones' }, { 'MAXLEN': 2 }); await client.xAdd('race:italy', '*', { 'rider': 'Wood' }, { 'MAXLEN': 2 }); await client.xAdd('race:italy', '*', { 'rider': 'Henshaw' }, { 'MAXLEN': 2 }); const res34 = await client.xLen('race:italy'); console.log(res34); // >>> 8 const res35 = await client.xRange('race:italy', '-', '+'); console.log(res35); // >>> [('1692629925771-0', {'rider': 'Castilla'}), ('1692629925789-0', {'rider': 'Royce'}), ('1692629925790-0', {'rider': 'Sam-Bodden'}), ('1692629925791-0', {'rider': 'Prickett'}), ('1692629926436-0', {'rider': 'Norem'}), ('1692630612602-0', {'rider': 'Jones'}), ('1692630641947-0', {'rider': 'Wood'}), ('1692630648281-0', {'rider': 'Henshaw'})] await client.xAdd('race:italy', '*', { 'rider': 'Smith' }, { 'MAXLEN': 2, 'APPROXIMATE': false }); const res36 = await client.xRange('race:italy', '-', '+'); console.log(res36); // >>> [('1692630648281-0', {'rider': 'Henshaw'}), ('1692631018238-0', {'rider': 'Smith'})] const res37 = await client.xTrim('race:italy', 'MAXLEN', 10, { 'APPROXIMATE': false }); console.log(res37); // >>> 0 const res38 = await client.xTrim('race:italy', "MAXLEN", 10); console.log(res38); // >>> 0 const res39 = await client.xRange('race:italy', '-', '+'); console.log(res39); // >>> [('1692630648281-0', {'rider': 'Henshaw'}), ('1692631018238-0', {'rider': 'Smith'})] const res40 = await client.xDel('race:italy', '1692631018238-0'); console.log(res40); // >>> 1 const res41 = await client.xRange('race:italy', '-', '+'); console.log(res41); // >>> [('1692630648281-0', {'rider': 'Henshaw'})]
package io.redis.examples; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.UnifiedJedis; public class StreamsExample { public void run(){ UnifiedJedis jedis = new UnifiedJedis("redis://localhost:6379"); StreamEntryID res1 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","30.2");put("position","1");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res1); // >>> 1701760582225-0 StreamEntryID res2 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Norem");put("speed","28.8");put("position","3");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res2); // >>> 1701760582225-1 StreamEntryID res3 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Prickett");put("speed","29.7");put("position","2");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res3); // >>> 1701760582226-0 List<StreamEntry> res4 = jedis.xrange("race:france","1701760582225-0","+",2); System.out.println(res4); // >>> [1701760841292-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701760841292-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<Map.Entry<String, List<StreamEntry>>> res5= jedis.xread(XReadParams.xReadParams().block(300).count(100),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}}); System.out.println( res5 ); // >>> [race:france=[1701761996660-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701761996661-0 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701761996661-1 {rider=Prickett, speed=29.7, location_id=1, position=2}]] StreamEntryID res6 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","29.9");put("position","2");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res6); // >>> 1701762285679-0 long res7 = jedis.xlen("race:france"); System.out.println(res7); // >>> 4 StreamEntryID res8 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Castilla");}},XAddParams.xAddParams().id("0-1")); System.out.println(res8); // >>> 0-1 StreamEntryID res9 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-2")); System.out.println(res9); // >>> 0-2 try { StreamEntryID res10 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Prickett");}},XAddParams.xAddParams().id("0-1")); System.out.println(res10); // >>> 0-1 } catch (JedisDataException e){ System.out.println(e); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } StreamEntryID res11 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-*")); System.out.println(res11); List<StreamEntry> res12 = jedis.xrange("race:france","-","+"); System.out.println( res12 ); // >>> [1701764734160-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764734160-1 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701764734161-0 {rider=Prickett, speed=29.7, location_id=1, position=2}, 1701764734162-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<StreamEntry> res13 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000),String.valueOf(System.currentTimeMillis()+1000)); System.out.println( res13 ); // >>> [1701764734160-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764734160-1 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701764734161-0 {rider=Prickett, speed=29.7, location_id=1, position=2}, 1701764734162-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<StreamEntry> res14 = jedis.xrange("race:france","-","+",2); System.out.println(res14); // >>> [1701764887638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764887638-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<StreamEntry> res15 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000)+"-0","+",2); System.out.println(res15); // >>> [1701764887638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764887638-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<StreamEntry> res16 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()+1000)+"-0","+",2); System.out.println(res16); // >>> [] List<StreamEntry> res17 = jedis.xrevrange("race:france","+","-",1); System.out.println(res17); // >>> [1701765218592-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<Map.Entry<String, List<StreamEntry>>> res18= jedis.xread(XReadParams.xReadParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}}); System.out.println( res18 ); // >>> [race:france=[1701765384638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701765384638-1 {rider=Norem, speed=28.8, location_id=1, position=3}]] String res19 = jedis.xgroupCreate("race:france","france_riders",StreamEntryID.LAST_ENTRY,false); System.out.println(res19); // >>> OK String res20 = jedis.xgroupCreate("race:italy","italy_riders",StreamEntryID.LAST_ENTRY,true); System.out.println(res20); // >>> OK StreamEntryID id1 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Castilaa");}},XAddParams.xAddParams()); StreamEntryID id2 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Royce");}},XAddParams.xAddParams()); StreamEntryID id3 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Sam-Bodden");}},XAddParams.xAddParams()); StreamEntryID id4 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Prickett");}},XAddParams.xAddParams()); StreamEntryID id5 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Norem");}},XAddParams.xAddParams()); List<Map.Entry<String, List<StreamEntry>>> res21 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",StreamEntryID.UNRECEIVED_ENTRY);}}); System.out.println(res21); // >>> [race:italy=[1701766299006-0 {rider=Castilaa}]] List<Map.Entry<String, List<StreamEntry>>> res22 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",new StreamEntryID());}}); System.out.println(res22); // >>> [race:italy=[1701766299006-0 {rider=Castilaa}]] long res23 = jedis.xack("race:italy","italy_riders",id1); System.out.println(res23); // >>> 1 List<Map.Entry<String, List<StreamEntry>>> res24 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",new StreamEntryID());}}); System.out.println(res24); // >>> [race:italy=[]] List<Map.Entry<String, List<StreamEntry>>> res25 = jedis.xreadGroup("italy_riders","Bob", XReadGroupParams.xReadGroupParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:italy",StreamEntryID.UNRECEIVED_ENTRY);}}); System.out.println(res25); // >>> [race:italy=[1701767632261-1 {rider=Royce}, 1701767632262-0 {rider=Sam-Bodden}]] StreamPendingSummary res26 = jedis.xpending("race:italy","italy_riders"); System.out.println(res26.getConsumerMessageCount()); // >>> {Bob=2} List<StreamPendingEntry> res27 = jedis.xpending("race:italy","italy_riders",XPendingParams.xPendingParams().start(StreamEntryID.MINIMUM_ID).end(StreamEntryID.MAXIMUM_ID).count(10)); System.out.println(res27); // >>> [1701768567412-1 Bob idle:0 times:1, 1701768567412-2 Bob idle:0 times:1] List<StreamEntry> res28 = jedis.xrange("race:italy",id2.toString(),id2.toString()); System.out.println(res28); // >>> [1701768744819-1 {rider=Royce}] List<StreamEntry> res29 = jedis.xclaim("race:italy","italy_riders","Alice", 0L, XClaimParams.xClaimParams().time(60000),id2); System.out.println(res29); // >>> [1701769004195-1 {rider=Royce}] Map.Entry<StreamEntryID, List<StreamEntry>> res30 = jedis.xautoclaim("race:italy","italy_riders","Alice",1L,new StreamEntryID("0-0"),XAutoClaimParams.xAutoClaimParams().count(1)); System.out.println(res30); // >>> [1701769266831-2=[1701769266831-1 {rider=Royce}] Map.Entry<StreamEntryID, List<StreamEntry>> res31 = jedis.xautoclaim("race:italy","italy_riders","Alice",1L,new StreamEntryID(id2.toString()),XAutoClaimParams.xAutoClaimParams().count(1)); System.out.println(res31); // >>> [0-0=[1701769605847-2 {rider=Sam-Bodden}] StreamInfo res32 = jedis.xinfoStream("race:italy"); System.out.println( res32.getStreamInfo() ); // >>> {radix-tree-keys=1, radix-tree-nodes=2, entries-added=5, length=5, groups=1, max-deleted-entry-id=0-0, first-entry=1701769637612-0 {rider=Castilaa}, last-generated-id=1701769637612-4, last-entry=1701769637612-4 {rider=Norem}, recorded-first-entry-id=1701769637612-0} List<StreamGroupInfo> res33 = jedis.xinfoGroups("race:italy"); for (StreamGroupInfo a : res33){ System.out.println( a.getGroupInfo() ); // >>> {last-delivered-id=1701770253659-0, lag=2, pending=2, name=italy_riders, consumers=2, entries-read=3} } List<StreamConsumersInfo> res34 = jedis.xinfoConsumers("race:italy","italy_riders"); for (StreamConsumerInfo a : res34){ System.out.println( a.getConsumerInfo() ); // {inactive=1, idle=1, pending=1, name=Alice} , {inactive=3, idle=3, pending=1, name=Bob} } jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Jones");}},XAddParams.xAddParams().maxLen(10)); jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Wood");}},XAddParams.xAddParams().maxLen(10)); jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Henshaw");}},XAddParams.xAddParams().maxLen(10)); long res35 = jedis.xlen("race:italy"); System.out.println(res35); // >>> 8 List<StreamEntry> res36 = jedis.xrange("race:italy","-","+"); System.out.println(res36); // >>> [1701771219852-0 {rider=Castilaa}, 1701771219852-1 {rider=Royce}, 1701771219853-0 {rider=Sam-Bodden}, 1701771219853-1 {rider=Prickett}, 1701771219853-2 {rider=Norem}, 1701771219858-0 {rider=Jones}, 1701771219858-1 {rider=Wood}, 1701771219859-0 {rider=Henshaw}] StreamEntryID id6 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Smith");}},XAddParams.xAddParams().maxLen(2)); List<StreamEntry> res37 = jedis.xrange("race:italy","-","+"); System.out.println(res37); // >>> [1701771067332-1 {rider=Henshaw}, 1701771067332-2 {rider=Smith}] long res38 = jedis.xtrim("race:italy",XTrimParams.xTrimParams().maxLen(10).exactTrimming()); System.out.println(res38); /// >>> 0 long res39 = jedis.xtrim("race:italy",XTrimParams.xTrimParams().maxLen(10)); System.out.println(res39); /// >>> 0 List<StreamEntry> res40 = jedis.xrange("race:italy","-","+"); System.out.println(res40); // >>> [1701771356428-2 {rider=Henshaw}, 1701771356429-0 {rider=Smith}] long res41 = jedis.xdel("race:italy",id6); System.out.println(res41); // >>> 1 List<StreamEntry> res42 = jedis.xrange("race:italy","-","+"); System.out.println(res42); // >>> [1701771517639-1 {rider=Henshaw}] } }
using System.Runtime.CompilerServices; using NRedisStack.Tests; using StackExchange.Redis; public class StreamTutorial { [SkipIfRedis(Is.OSSCluster)] public void run() { var muxer = ConnectionMultiplexer.Connect("localhost:6379"); var db = muxer.GetDatabase(); RedisValue res1 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 30.2), new NameValueEntry("position", 1), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res1); // >>> 1712668482289-0 RedisValue res2 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Norem"), new NameValueEntry("speed", 28.8), new NameValueEntry("position", 3), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res2); // >>> 1712668766534-1 RedisValue res3 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Prickett"), new NameValueEntry("speed", 29.7), new NameValueEntry("position", 2), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res3); // >>> 1712669055705-0 // Tests for 'xadd' step. StreamEntry[] res4 = db.StreamRange("race:france", "1712668482289-0", "+", 2); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xrange' step. StreamEntry[] res5 = db.StreamRead("race:france", 0, 100); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // Tests for 'xread_block' step. RedisValue res6 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 29.9), new NameValueEntry("position", 1), new NameValueEntry("location_id", 2) } ); Console.WriteLine(res6); // >>> 1712675674750-0 // Tests for 'xadd_2' step. long res7 = db.StreamLength("race:france"); Console.WriteLine(res7); // >>> 4 // Tests for 'xlen' step. RedisValue res8 = db.StreamAdd( "race:usa", new NameValueEntry[] { new NameValueEntry("racer", "Castilla") }, "0-1" ); Console.WriteLine(res8); // >>> 0-1 RedisValue res9 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Norem") }, "0-2" ); Console.WriteLine(res9); // >>> 0-2 // Tests for 'xadd_id' step. try { RedisValue res10 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Prickett") }, "0-1" ); } catch (RedisServerException ex) { Console.WriteLine(ex); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } // Tests for 'xadd_bad_id' step. RedisValue res11 = ""; Version version = muxer.GetServer("localhost:6379").Version; if (version.Major >= 7) { res11 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("rider", "Norem") }, "0-*" ); Console.WriteLine(res11); // >>> "0-3" } // Tests for 'xadd_7' step. StreamEntry[] res12 = db.StreamRange("race:france", "-", "+"); foreach (StreamEntry entry in res12) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrange_all' step. StreamEntry[] res13 = db.StreamRange("race:france", 1712668482289, 1712668482291); foreach (StreamEntry entry in res13) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // Tests for 'xrange_time' step. StreamEntry[] res14 = db.StreamRange("race:france", "-", "+", 2); foreach (StreamEntry entry in res14) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xrange_step_1' step. StreamEntry[] res15 = db.StreamRange("race:france", "(1712668766534-1", "+", 2); foreach (StreamEntry entry in res15) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrange_step_2' step. StreamEntry[] res16 = db.StreamRange("race:france", "(1712675674750-0", "+", 2); foreach (StreamEntry entry in res16) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> <empty array> // Tests for 'xrange_empty' step. StreamEntry[] res17 = db.StreamRange("race:france", "+", "-", 1, Order.Descending); foreach (StreamEntry entry in res17) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrevrange' step. StreamEntry[] res18 = db.StreamRead("race:france", 0, 2); foreach (StreamEntry entry in res18) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xread' step. bool res19 = db.StreamCreateConsumerGroup("race:france", "france_riders", "$"); Console.WriteLine(res19); // >>> true // Tests for 'xgroup_create' step. bool res20 = db.StreamCreateConsumerGroup("race:italy", "italy_riders", "$", true); Console.WriteLine(res20); // >>> true // Tests for 'xgroup_create_mkstream' step. RedisValue groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Castilla") } ); // 1712744323758-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Royce") } ); // 1712744358384-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Sam-Bodden") } ); // 1712744379676-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Prickett") } ); // 1712744399401-0 groupRes = db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Norem") } ); // 1712744413117-0 StreamEntry[] res21 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", ">", 1); foreach (StreamEntry entry in res21) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744323758-0: [rider: Castilla] // Tests for 'xgroup_read' step. StreamEntry[] res22 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", "0"); foreach (StreamEntry entry in res22) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); // >>> 1712744323758-0: [rider: Castilla] } // Tests for 'xgroup_read_id' step. long res23 = db.StreamAcknowledge("race:italy", "italy_riders", "1712744323758-0"); Console.WriteLine(res23); // >>> 1 StreamEntry[] res24 = db.StreamReadGroup("race:italy", "italy_riders", "Alice", "0"); foreach (StreamEntry entry in res24) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> <empty array> // Tests for 'xack' step. StreamEntry[] res25 = db.StreamReadGroup("race:italy", "italy_riders", "Bob", ">", 2); foreach (StreamEntry entry in res25) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // >>> 1712744379676-0: [rider: Sam-Bodden] // Tests for 'xgroup_read_bob' step. StreamPendingInfo res26 = db.StreamPending("race:italy", "italy_riders"); Console.WriteLine($"pending: {res26.PendingMessageCount}, min: {res26.LowestPendingMessageId}, max: {res26.HighestPendingMessageId}, consumers:[{string.Join(", ", res26.Consumers.Select(c => $"{c.Name}: {c.PendingMessageCount}"))}]"); // >>> pending: 2, min: 1712747506906-0, max: 1712747506907-0, consumers:[name: Bob, pending:2] // Tests for 'xpending' step. StreamPendingMessageInfo[] res27 = db.StreamPendingMessages( "race:italy", "italy_riders", 10, "", "-", "+" ); foreach (StreamPendingMessageInfo info in res27) { Console.WriteLine($"message_id: {info.MessageId}, consumer: {info.ConsumerName}, time_since_delivered: {info.IdleTimeInMilliseconds}, times_delivered: {info.DeliveryCount}"); } // >>> message_id: min: 1712747506906-0, consumer: Bob, time_since_delivered: 31084, times_delivered: 1 // >>> message_id: min: 1712747506907-0, consumer: Bob, time_since_delivered: 31084, times_delivered: 1 // Tests for 'xpending_plus_minus' step. StreamEntry[] res28 = db.StreamRange("race:italy", "1712744358384-0", "1712744358384-0"); foreach (StreamEntry entry in res28) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // Tests for 'xrange_pending' step. StreamEntry[] res29 = db.StreamClaim( "race:italy", "italy_riders", "Alice", 60000, new RedisValue[] { 1712744358384 - 0 } ); foreach (StreamEntry entry in res29) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712744358384-0: [rider: Royce] // Tests for 'xclaim' step. StreamAutoClaimResult res30 = db.StreamAutoClaim( "race:italy", "italy_riders", "Alice", 1, "0-0", 1 ); Console.WriteLine($"{res30.NextStartId}, ({string.Join(", ", res30.ClaimedEntries.Select(entry => $"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"))})"); // >>> 1712744379676-0, (1712744358384-0: [rider: Royce]) // Tests for 'xautoclaim' step. StreamAutoClaimResult res31 = db.StreamAutoClaim( "race:italy", "italy_riders", "Alice", 1, "(1712744358384-0", 1 ); Console.WriteLine($"{res31.NextStartId}, ({string.Join(", ", res31.ClaimedEntries.Select(entry => $"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"))})"); // >>> 0-0, (1712744379676-0: [rider: Sam-Bodden]) // Tests for 'xautoclaim_cursor' step. StreamInfo res32 = db.StreamInfo("race:italy"); Console.WriteLine($"length: {res32.Length}, radix-tree-keys: {res32.RadixTreeKeys}, radix-tree-nodes: {res32.RadixTreeNodes}, last-generated-id: {res32.LastGeneratedId}, first-entry: {$"{res32.FirstEntry.Id}: [{string.Join(", ", res32.FirstEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}, last-entry: {$"{res32.LastEntry.Id}: [{string.Join(", ", res32.LastEntry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"}"); // >>> length: 5, radix-tree-keys: 1, radix-tree-nodes: 2, last-generated-id: 1712756762686-1, first-entry: 1712756762685-0: [rider: Castilla], last-entry: 1712756762686-1: [rider: Norem] // Tests for 'xinfo' step. StreamGroupInfo[] res33 = db.StreamGroupInfo("race:italy"); foreach (StreamGroupInfo info in res33) { Console.WriteLine($"name: {info.Name}, consumers: {info.ConsumerCount}, pending: {info.PendingMessageCount}, last-delivered-id: {info.LastDeliveredId}"); } // >>> name: italy_riders, consumers: 2, pending: 2, last-delivered-id: 1712757192730-2 // Tests for 'xinfo_groups' step. StreamConsumerInfo[] res34 = db.StreamConsumerInfo("race:italy", "italy_riders"); foreach (StreamConsumerInfo info in res34) { Console.WriteLine($"name: {info.Name}, pending: {info.PendingMessageCount}, idle: {info.IdleTimeInMilliseconds}"); } // >>> name: Alice, pending: 1, idle: 7717 // >>> name: Bob, pending: 0, idle: 7722 // Tests for 'xinfo_consumers' step. db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Jones") }, null, 2, true ); db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Wood") }, null, 2, true ); db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Henshaw") }, null, 2, true ); long res35 = db.StreamLength("race:italy"); Console.WriteLine(res35); // >>> 8 StreamEntry[] res36 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res36) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712758336128-0: [rider: Castilla] // >>> 1712758336128-1: [rider: Royce] // >>> 1712758336128-2: [rider: Sam-Bodden] // >>> 1712758336129-0: [rider: Prickett] // >>> 1712758336139-0: [rider: Norem] // >>> 1712758340854-0: [rider: Jones] // >>> 1712758341645-0: [rider: Wood] // >>> 1712758342134-0: [rider: Henshaw] db.StreamAdd( "race:italy", new NameValueEntry[] { new NameValueEntry("rider", "Smith") }, null, 2, false ); StreamEntry[] res37 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res37) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // 1712758746476-1: [rider: Henshaw] // 1712758746477-0: [rider: Smith] // Tests for 'maxlen' step. long res38 = db.StreamTrim("race:italy", 10, false); Console.WriteLine(res38); // >>> 0 // Tests for 'xtrim' step. long res39 = db.StreamTrim("race:italy", 10, true); Console.WriteLine(res39); // >>> 0 // Tests for 'xtrim2' step. StreamEntry[] res40 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res40) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712759694003-0: [rider: Henshaw] // >>> 1712759694003-1: [rider: Smith] long res41 = db.StreamDelete("race:italy", new RedisValue[] { "1712759694003-1" }); Console.WriteLine(res41); // >>> 1 StreamEntry[] res42 = db.StreamRange("race:italy", "-", "+"); foreach (StreamEntry entry in res42) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712759694003-0: [rider: Henshaw] // Tests for 'xdel' step. } }
-
Read up to 100 new stream entries, starting at the end of the stream, and block for up to 300 ms if no entries are being written:
> XREAD COUNT 100 BLOCK 300 STREAMS race:france $ (nil)
Are you tired of using redis-cli? Try Redis Insight - the developer GUI for Redis.""" Code samples for Stream doc pages: https://redis.io/docs/latest/develop/data-types/streams/ """ import redis r = redis.Redis(decode_responses=True) res1 = r.xadd( "race:france", {"rider": "Castilla", "speed": 30.2, "position": 1, "location_id": 1}, ) print(res1) # >>> 1692629576966-0 res2 = r.xadd( "race:france", {"rider": "Norem", "speed": 28.8, "position": 3, "location_id": 1}, ) print(res2) # >>> 1692629594113-0 res3 = r.xadd( "race:france", {"rider": "Prickett", "speed": 29.7, "position": 2, "location_id": 1}, ) print(res3) # >>> 1692629613374-0 res4 = r.xrange("race:france", "1691765278160-0", "+", 2) print( res4 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] res5 = r.xread(streams={"race:france": 0}, count=100, block=300) print( res5 ) # >>> [ # ['race:france', # [('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ), # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # )] # ] # ] res6 = r.xadd( "race:france", {"rider": "Castilla", "speed": 29.9, "position": 1, "location_id": 2}, ) print(res6) # >>> 1692629676124-0 res7 = r.xlen("race:france") print(res7) # >>> 4 res8 = r.xadd("race:usa", {"racer": "Castilla"}, id="0-1") print(res8) # >>> 0-1 res9 = r.xadd("race:usa", {"racer": "Norem"}, id="0-2") print(res9) # >>> 0-2 try: res10 = r.xadd("race:usa", {"racer": "Prickett"}, id="0-1") print(res10) # >>> 0-1 except redis.exceptions.ResponseError as e: print(e) # >>> WRONGID # Not yet implemented res11 = r.xrange("race:france", "-", "+") print( res11 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ), # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # ), # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res12 = r.xrange("race:france", 1692629576965, 1692629576967) print( res12 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ) # ] res13 = r.xrange("race:france", "-", "+", 2) print( res13 ) # >>> [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] res14 = r.xrange("race:france", "(1692629594113-0", "+", 2) print( res14 ) # >>> [ # ('1692629613374-0', # {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'} # ), # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res15 = r.xrange("race:france", "(1692629676124-0", "+", 2) print(res15) # >>> [] res16 = r.xrevrange("race:france", "+", "-", 1) print( res16 ) # >>> [ # ('1692629676124-0', # {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'} # ) # ] res17 = r.xread(streams={"race:france": 0}, count=2) print( res17 ) # >>> [ # ['race:france', [ # ('1692629576966-0', # {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'} # ), # ('1692629594113-0', # {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'} # ) # ] # ] # ] res18 = r.xgroup_create("race:france", "france_riders", "$") print(res18) # >>> True res19 = r.xgroup_create("race:italy", "italy_riders", "$", mkstream=True) print(res19) # >>> True r.xadd("race:italy", {"rider": "Castilla"}) r.xadd("race:italy", {"rider": "Royce"}) r.xadd("race:italy", {"rider": "Sam-Bodden"}) r.xadd("race:italy", {"rider": "Prickett"}) r.xadd("race:italy", {"rider": "Norem"}) res20 = r.xreadgroup( streams={"race:italy": ">"}, consumername="Alice", groupname="italy_riders", count=1, ) print(res20) # >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] res21 = r.xreadgroup( streams={"race:italy": 0}, consumername="Alice", groupname="italy_riders", count=1, ) print(res21) # >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] res22 = r.xack("race:italy", "italy_riders", "1692629925771-0") print(res22) # >>> 1 res23 = r.xreadgroup( streams={"race:italy": 0}, consumername="Alice", groupname="italy_riders", count=1, ) print(res23) # >>> [['race:italy', []]] res24 = r.xreadgroup( streams={"race:italy": ">"}, consumername="Bob", groupname="italy_riders", count=2, ) print( res24 ) # >>> [ # ['race:italy', [ # ('1692629925789-0', # {'rider': 'Royce'} # ), # ('1692629925790-0', # {'rider': 'Sam-Bodden'} # ) # ] # ] # ] res25 = r.xpending("race:italy", "italy_riders") print( res25 ) # >>> { # 'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0', # 'consumers': [{'name': 'Bob', 'pending': 2}] # } res26 = r.xpending_range("race:italy", "italy_riders", "-", "+", 10) print( res26 ) # >>> [ # { # 'message_id': '1692629925789-0', 'consumer': 'Bob', # 'time_since_delivered': 31084, 'times_delivered': 1 # }, # { # 'message_id': '1692629925790-0', 'consumer': 'Bob', # 'time_since_delivered': 31084, 'times_delivered': 1 # } # ] res27 = r.xrange("race:italy", "1692629925789-0", "1692629925789-0") print(res27) # >>> [('1692629925789-0', {'rider': 'Royce'})] res28 = r.xclaim("race:italy", "italy_riders", "Alice", 60000, ["1692629925789-0"]) print(res28) # >>> [('1692629925789-0', {'rider': 'Royce'})] res29 = r.xautoclaim("race:italy", "italy_riders", "Alice", 1, "0-0", 1) print(res29) # >>> ['1692629925790-0', [('1692629925789-0', {'rider': 'Royce'})]] res30 = r.xautoclaim("race:italy", "italy_riders", "Alice", 1, "(1692629925789-0", 1) print(res30) # >>> ['0-0', [('1692629925790-0', {'rider': 'Sam-Bodden'})]] res31 = r.xinfo_stream("race:italy") print( res31 ) # >>> { # 'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2, # 'last-generated-id': '1692629926436-0', 'groups': 1, # 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}), # 'last-entry': ('1692629926436-0', {'rider': 'Norem'}) # } res32 = r.xinfo_groups("race:italy") print( res32 ) # >>> [ # { # 'name': 'italy_riders', 'consumers': 2, 'pending': 2, # 'last-delivered-id': '1692629925790-0' # } # ] res33 = r.xinfo_consumers("race:italy", "italy_riders") print( res33 ) # >>> [ # {'name': 'Alice', 'pending': 2, 'idle': 199332}, # {'name': 'Bob', 'pending': 0, 'idle': 489170} # ] r.xadd("race:italy", {"rider": "Jones"}, maxlen=2) r.xadd("race:italy", {"rider": "Wood"}, maxlen=2) r.xadd("race:italy", {"rider": "Henshaw"}, maxlen=2) res34 = r.xlen("race:italy") print(res34) # >>> 8 res35 = r.xrange("race:italy", "-", "+") print( res35 ) # >>> [ # ('1692629925771-0', {'rider': 'Castilla'}), # ('1692629925789-0', {'rider': 'Royce'}), # ('1692629925790-0', {'rider': 'Sam-Bodden'}), # ('1692629925791-0', {'rider': 'Prickett'}), # ('1692629926436-0', {'rider': 'Norem'}), # ('1692630612602-0', {'rider': 'Jones'}), # ('1692630641947-0', {'rider': 'Wood'}), # ('1692630648281-0', {'rider': 'Henshaw'}) # ] r.xadd("race:italy", {"rider": "Smith"}, maxlen=2, approximate=False) res36 = r.xrange("race:italy", "-", "+") print( res36 ) # >>> [ # ('1692630648281-0', {'rider': 'Henshaw'}), # ('1692631018238-0', {'rider': 'Smith'}) # ] res37 = r.xtrim("race:italy", maxlen=10, approximate=False) print(res37) # >>> 0 res38 = r.xtrim("race:italy", maxlen=10) print(res38) # >>> 0 res39 = r.xrange("race:italy", "-", "+") print( res39 ) # >>> [ # ('1692630648281-0', {'rider': 'Henshaw'}), # ('1692631018238-0', {'rider': 'Smith'}) # ] res40 = r.xdel("race:italy", "1692631018238-0") print(res40) # >>> 1 res41 = r.xrange("race:italy", "-", "+") print(res41) # >>> [('1692630648281-0', {'rider': 'Henshaw'})]
import assert from 'assert'; import { createClient } from 'redis'; const client = await createClient(); await client.connect(); const res1 = await client.xAdd( 'race:france', '*', { 'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1' } ); console.log(res1); // >>> 1700073067968-0 N.B. actual values will differ from these examples const res2 = await client.xAdd( 'race:france', '*', { 'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1' }, ); console.log(res2); // >>> 1692629594113-0 const res3 = await client.xAdd( 'race:france', '*', { 'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1' }, ); console.log(res3); // >>> 1692629613374-0 const res4 = await client.xRange('race:france', '1691765278160-0', '+', {COUNT: 2}); console.log(res4); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})] const res5 = await client.xRead({ key: 'race:france', id: '0-0' }, { count: 100, block: 300 }); console.log(res5); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'})]]] const res6 = await client.xAdd( 'race:france', '*', { 'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2' } ); console.log(res6); // >>> 1692629676124-0 const res7 = await client.xLen('race:france'); console.log(res7); // >>> 4 const res8 = await client.xAdd('race:usa', '0-1', { 'racer': 'Castilla' }); console.log(res8); // >>> 0-1 const res9 = await client.xAdd('race:usa', '0-2', { 'racer': 'Norem' }); console.log(res9); // >>> 0-2 try { const res10 = await client.xAdd('race:usa', '0-1', { 'racer': 'Prickett' }); console.log(res10); // >>> 0-1 } catch (error) { console.error(error); // >>> WRONGID } const res11a = await client.xAdd('race:usa', '0-*', { racer: 'Norem' }); console.log(res11a); // >>> 0-3 const res11 = await client.xRange('race:france', '-', '+'); console.log(res11); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'}), ('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}), ('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res12 = await client.xRange('race:france', '1692629576965', '1692629576967'); console.log(res12); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'})] const res13 = await client.xRange('race:france', '-', '+', {COUNT: 2}); console.log(res13); // >>> [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})] const res14 = await client.xRange('race:france', '(1692629594113-0', '+', {COUNT: 2}); console.log(res14); // >>> [('1692629613374-0', {'rider': 'Prickett', 'speed': '29.7', 'position': '2', 'location_id': '1'}), ('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res15 = await client.xRange('race:france', '(1692629676124-0', '+', {COUNT: 2}); console.log(res15); // >>> [] const res16 = await client.xRevRange('race:france', '+', '-', {COUNT: 1}); console.log( res16 ); // >>> [('1692629676124-0', {'rider': 'Castilla', 'speed': '29.9', 'position': '1', 'location_id': '2'})] const res17 = await client.xRead({ key: 'race:france', id: '0-0' }, { count: 2 }); console.log(res17); // >>> [['race:france', [('1692629576966-0', {'rider': 'Castilla', 'speed': '30.2', 'position': '1', 'location_id': '1'}), ('1692629594113-0', {'rider': 'Norem', 'speed': '28.8', 'position': '3', 'location_id': '1'})]]] const res18 = await client.xGroupCreate('race:france', 'france_riders', '$'); console.log(res18); // >>> True const res19 = await client.xGroupCreate('race:italy', 'italy_riders', '$', { 'MKSTREAM': true }); console.log(res19); // >>> True await client.xAdd('race:italy', '*', { 'rider': 'Castilla' }); await client.xAdd('race:italy', '*', { 'rider': 'Royce' }); await client.xAdd('race:italy', '*', { 'rider': 'Sam-Bodden' }); await client.xAdd('race:italy', '*', { 'rider': 'Prickett' }); await client.xAdd('race:italy', '*', { 'rider': 'Norem' }); const res20 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '>' }, { 'COUNT': 1 } ); console.log(res20); // >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] const res21 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '0' }, { 'COUNT': 1 } ); console.log(res21); // >>> [['race:italy', [('1692629925771-0', {'rider': 'Castilla'})]]] const res22 = await client.xAck('race:italy', 'italy_riders', '1692629925771-0') console.log(res22); // >>> 1 const res23 = await client.xReadGroup( 'italy_riders', 'Alice', { key: 'race:italy', id: '0' }, { 'COUNT': 1 } ); console.log(res23); // >>> [['race:italy', []]] const res24 = await client.xReadGroup( 'italy_riders', 'Bob', { key: 'race:italy', id: '>' }, { 'COUNT': 2 } ); console.log(res24); // >>> [['race:italy', [('1692629925789-0', {'rider': 'Royce'}), ('1692629925790-0', {'rider': 'Sam-Bodden'})]]] const res25 = await client.xPending('race:italy', 'italy_riders'); console.log(res25); // >>> {'pending': 2, 'min': '1692629925789-0', 'max': '1692629925790-0', 'consumers': [{'name': 'Bob', 'pending': 2}]} const res26 = await client.xPendingRange('race:italy', 'italy_riders', '-', '+', 10); console.log(res26); // >>> [{'message_id': '1692629925789-0', 'consumer': 'Bob', 'time_since_delivered': 31084, 'times_delivered': 1}, {'message_id': '1692629925790-0', 'consumer': 'Bob', 'time_since_delivered': 31084, 'times_delivered': 1}] const res27 = await client.xRange('race:italy', '1692629925789-0', '1692629925789-0'); console.log(res27); // >>> [('1692629925789-0', {'rider': 'Royce'})] const res28 = await client.xClaim( 'race:italy', 'italy_riders', 'Alice', 60000, ['1692629925789-0'] ); console.log(res28); // >>> [('1692629925789-0', {'rider': 'Royce'})] const res29 = await client.xAutoClaim('race:italy', 'italy_riders', 'Alice', 1, '0-0', 1); console.log(res29); // >>> ['1692629925790-0', [('1692629925789-0', {'rider': 'Royce'})]] const res30 = await client.xAutoClaim( 'race:italy', 'italy_riders', 'Alice', 1, '(1692629925789-0', 1 ); console.log(res30); // >>> ['0-0', [('1692629925790-0', {'rider': 'Sam-Bodden'})]] const res31 = await client.xInfoStream('race:italy'); console.log(res31); // >>> {'length': 5, 'radix-tree-keys': 1, 'radix-tree-nodes': 2, 'last-generated-id': '1692629926436-0', 'groups': 1, 'first-entry': ('1692629925771-0', {'rider': 'Castilla'}), 'last-entry': ('1692629926436-0', {'rider': 'Norem'})} const res32 = await client.xInfoGroups('race:italy'); console.log(res32); // >>> [{'name': 'italy_riders', 'consumers': 2, 'pending': 2, 'last-delivered-id': '1692629925790-0'}] const res33 = await client.xInfoConsumers('race:italy', 'italy_riders'); console.log(res33); // >>> [{'name': 'Alice', 'pending': 2, 'idle': 199332}, {'name': 'Bob', 'pending': 0, 'idle': 489170}] await client.xAdd('race:italy', '*', { 'rider': 'Jones' }, { 'MAXLEN': 2 }); await client.xAdd('race:italy', '*', { 'rider': 'Wood' }, { 'MAXLEN': 2 }); await client.xAdd('race:italy', '*', { 'rider': 'Henshaw' }, { 'MAXLEN': 2 }); const res34 = await client.xLen('race:italy'); console.log(res34); // >>> 8 const res35 = await client.xRange('race:italy', '-', '+'); console.log(res35); // >>> [('1692629925771-0', {'rider': 'Castilla'}), ('1692629925789-0', {'rider': 'Royce'}), ('1692629925790-0', {'rider': 'Sam-Bodden'}), ('1692629925791-0', {'rider': 'Prickett'}), ('1692629926436-0', {'rider': 'Norem'}), ('1692630612602-0', {'rider': 'Jones'}), ('1692630641947-0', {'rider': 'Wood'}), ('1692630648281-0', {'rider': 'Henshaw'})] await client.xAdd('race:italy', '*', { 'rider': 'Smith' }, { 'MAXLEN': 2, 'APPROXIMATE': false }); const res36 = await client.xRange('race:italy', '-', '+'); console.log(res36); // >>> [('1692630648281-0', {'rider': 'Henshaw'}), ('1692631018238-0', {'rider': 'Smith'})] const res37 = await client.xTrim('race:italy', 'MAXLEN', 10, { 'APPROXIMATE': false }); console.log(res37); // >>> 0 const res38 = await client.xTrim('race:italy', "MAXLEN", 10); console.log(res38); // >>> 0 const res39 = await client.xRange('race:italy', '-', '+'); console.log(res39); // >>> [('1692630648281-0', {'rider': 'Henshaw'}), ('1692631018238-0', {'rider': 'Smith'})] const res40 = await client.xDel('race:italy', '1692631018238-0'); console.log(res40); // >>> 1 const res41 = await client.xRange('race:italy', '-', '+'); console.log(res41); // >>> [('1692630648281-0', {'rider': 'Henshaw'})]
package io.redis.examples; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.UnifiedJedis; public class StreamsExample { public void run(){ UnifiedJedis jedis = new UnifiedJedis("redis://localhost:6379"); StreamEntryID res1 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","30.2");put("position","1");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res1); // >>> 1701760582225-0 StreamEntryID res2 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Norem");put("speed","28.8");put("position","3");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res2); // >>> 1701760582225-1 StreamEntryID res3 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Prickett");put("speed","29.7");put("position","2");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res3); // >>> 1701760582226-0 List<StreamEntry> res4 = jedis.xrange("race:france","1701760582225-0","+",2); System.out.println(res4); // >>> [1701760841292-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701760841292-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<Map.Entry<String, List<StreamEntry>>> res5= jedis.xread(XReadParams.xReadParams().block(300).count(100),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}}); System.out.println( res5 ); // >>> [race:france=[1701761996660-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701761996661-0 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701761996661-1 {rider=Prickett, speed=29.7, location_id=1, position=2}]] StreamEntryID res6 = jedis.xadd("race:france",new HashMap<String,String>(){{put("rider","Castilla");put("speed","29.9");put("position","2");put("location_id","1");}} , XAddParams.xAddParams()); System.out.println(res6); // >>> 1701762285679-0 long res7 = jedis.xlen("race:france"); System.out.println(res7); // >>> 4 StreamEntryID res8 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Castilla");}},XAddParams.xAddParams().id("0-1")); System.out.println(res8); // >>> 0-1 StreamEntryID res9 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-2")); System.out.println(res9); // >>> 0-2 try { StreamEntryID res10 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Prickett");}},XAddParams.xAddParams().id("0-1")); System.out.println(res10); // >>> 0-1 } catch (JedisDataException e){ System.out.println(e); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } StreamEntryID res11 = jedis.xadd("race:usa", new HashMap<String,String>(){{put("racer","Norem");}},XAddParams.xAddParams().id("0-*")); System.out.println(res11); List<StreamEntry> res12 = jedis.xrange("race:france","-","+"); System.out.println( res12 ); // >>> [1701764734160-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764734160-1 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701764734161-0 {rider=Prickett, speed=29.7, location_id=1, position=2}, 1701764734162-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<StreamEntry> res13 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000),String.valueOf(System.currentTimeMillis()+1000)); System.out.println( res13 ); // >>> [1701764734160-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764734160-1 {rider=Norem, speed=28.8, location_id=1, position=3}, 1701764734161-0 {rider=Prickett, speed=29.7, location_id=1, position=2}, 1701764734162-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<StreamEntry> res14 = jedis.xrange("race:france","-","+",2); System.out.println(res14); // >>> [1701764887638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764887638-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<StreamEntry> res15 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()-1000)+"-0","+",2); System.out.println(res15); // >>> [1701764887638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701764887638-1 {rider=Norem, speed=28.8, location_id=1, position=3}] List<StreamEntry> res16 = jedis.xrange("race:france",String.valueOf(System.currentTimeMillis()+1000)+"-0","+",2); System.out.println(res16); // >>> [] List<StreamEntry> res17 = jedis.xrevrange("race:france","+","-",1); System.out.println(res17); // >>> [1701765218592-0 {rider=Castilla, speed=29.9, location_id=1, position=2}] List<Map.Entry<String, List<StreamEntry>>> res18= jedis.xread(XReadParams.xReadParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:france",new StreamEntryID());}}); System.out.println( res18 ); // >>> [race:france=[1701765384638-0 {rider=Castilla, speed=30.2, location_id=1, position=1}, 1701765384638-1 {rider=Norem, speed=28.8, location_id=1, position=3}]] String res19 = jedis.xgroupCreate("race:france","france_riders",StreamEntryID.LAST_ENTRY,false); System.out.println(res19); // >>> OK String res20 = jedis.xgroupCreate("race:italy","italy_riders",StreamEntryID.LAST_ENTRY,true); System.out.println(res20); // >>> OK StreamEntryID id1 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Castilaa");}},XAddParams.xAddParams()); StreamEntryID id2 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Royce");}},XAddParams.xAddParams()); StreamEntryID id3 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Sam-Bodden");}},XAddParams.xAddParams()); StreamEntryID id4 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Prickett");}},XAddParams.xAddParams()); StreamEntryID id5 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Norem");}},XAddParams.xAddParams()); List<Map.Entry<String, List<StreamEntry>>> res21 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",StreamEntryID.UNRECEIVED_ENTRY);}}); System.out.println(res21); // >>> [race:italy=[1701766299006-0 {rider=Castilaa}]] List<Map.Entry<String, List<StreamEntry>>> res22 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",new StreamEntryID());}}); System.out.println(res22); // >>> [race:italy=[1701766299006-0 {rider=Castilaa}]] long res23 = jedis.xack("race:italy","italy_riders",id1); System.out.println(res23); // >>> 1 List<Map.Entry<String, List<StreamEntry>>> res24 = jedis.xreadGroup("italy_riders","Alice", XReadGroupParams.xReadGroupParams().count(1),new HashMap<String,StreamEntryID>(){{put("race:italy",new StreamEntryID());}}); System.out.println(res24); // >>> [race:italy=[]] List<Map.Entry<String, List<StreamEntry>>> res25 = jedis.xreadGroup("italy_riders","Bob", XReadGroupParams.xReadGroupParams().count(2),new HashMap<String,StreamEntryID>(){{put("race:italy",StreamEntryID.UNRECEIVED_ENTRY);}}); System.out.println(res25); // >>> [race:italy=[1701767632261-1 {rider=Royce}, 1701767632262-0 {rider=Sam-Bodden}]] StreamPendingSummary res26 = jedis.xpending("race:italy","italy_riders"); System.out.println(res26.getConsumerMessageCount()); // >>> {Bob=2} List<StreamPendingEntry> res27 = jedis.xpending("race:italy","italy_riders",XPendingParams.xPendingParams().start(StreamEntryID.MINIMUM_ID).end(StreamEntryID.MAXIMUM_ID).count(10)); System.out.println(res27); // >>> [1701768567412-1 Bob idle:0 times:1, 1701768567412-2 Bob idle:0 times:1] List<StreamEntry> res28 = jedis.xrange("race:italy",id2.toString(),id2.toString()); System.out.println(res28); // >>> [1701768744819-1 {rider=Royce}] List<StreamEntry> res29 = jedis.xclaim("race:italy","italy_riders","Alice", 0L, XClaimParams.xClaimParams().time(60000),id2); System.out.println(res29); // >>> [1701769004195-1 {rider=Royce}] Map.Entry<StreamEntryID, List<StreamEntry>> res30 = jedis.xautoclaim("race:italy","italy_riders","Alice",1L,new StreamEntryID("0-0"),XAutoClaimParams.xAutoClaimParams().count(1)); System.out.println(res30); // >>> [1701769266831-2=[1701769266831-1 {rider=Royce}] Map.Entry<StreamEntryID, List<StreamEntry>> res31 = jedis.xautoclaim("race:italy","italy_riders","Alice",1L,new StreamEntryID(id2.toString()),XAutoClaimParams.xAutoClaimParams().count(1)); System.out.println(res31); // >>> [0-0=[1701769605847-2 {rider=Sam-Bodden}] StreamInfo res32 = jedis.xinfoStream("race:italy"); System.out.println( res32.getStreamInfo() ); // >>> {radix-tree-keys=1, radix-tree-nodes=2, entries-added=5, length=5, groups=1, max-deleted-entry-id=0-0, first-entry=1701769637612-0 {rider=Castilaa}, last-generated-id=1701769637612-4, last-entry=1701769637612-4 {rider=Norem}, recorded-first-entry-id=1701769637612-0} List<StreamGroupInfo> res33 = jedis.xinfoGroups("race:italy"); for (StreamGroupInfo a : res33){ System.out.println( a.getGroupInfo() ); // >>> {last-delivered-id=1701770253659-0, lag=2, pending=2, name=italy_riders, consumers=2, entries-read=3} } List<StreamConsumersInfo> res34 = jedis.xinfoConsumers("race:italy","italy_riders"); for (StreamConsumerInfo a : res34){ System.out.println( a.getConsumerInfo() ); // {inactive=1, idle=1, pending=1, name=Alice} , {inactive=3, idle=3, pending=1, name=Bob} } jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Jones");}},XAddParams.xAddParams().maxLen(10)); jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Wood");}},XAddParams.xAddParams().maxLen(10)); jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Henshaw");}},XAddParams.xAddParams().maxLen(10)); long res35 = jedis.xlen("race:italy"); System.out.println(res35); // >>> 8 List<StreamEntry> res36 = jedis.xrange("race:italy","-","+"); System.out.println(res36); // >>> [1701771219852-0 {rider=Castilaa}, 1701771219852-1 {rider=Royce}, 1701771219853-0 {rider=Sam-Bodden}, 1701771219853-1 {rider=Prickett}, 1701771219853-2 {rider=Norem}, 1701771219858-0 {rider=Jones}, 1701771219858-1 {rider=Wood}, 1701771219859-0 {rider=Henshaw}] StreamEntryID id6 = jedis.xadd("race:italy", new HashMap<String,String>(){{put("rider","Smith");}},XAddParams.xAddParams().maxLen(2)); List<StreamEntry> res37 = jedis.xrange("race:italy","-","+"); System.out.println(res37); // >>> [1701771067332-1 {rider=Henshaw}, 1701771067332-2 {rider=Smith}] long res38 = jedis.xtrim("race:italy",XTrimParams.xTrimParams().maxLen(10).exactTrimming()); System.out.println(res38); /// >>> 0 long res39 = jedis.xtrim("race:italy",XTrimParams.xTrimParams().maxLen(10)); System.out.println(res39); /// >>> 0 List<StreamEntry> res40 = jedis.xrange("race:italy","-","+"); System.out.println(res40); // >>> [1701771356428-2 {rider=Henshaw}, 1701771356429-0 {rider=Smith}] long res41 = jedis.xdel("race:italy",id6); System.out.println(res41); // >>> 1 List<StreamEntry> res42 = jedis.xrange("race:italy","-","+"); System.out.println(res42); // >>> [1701771517639-1 {rider=Henshaw}] } }
using System.Runtime.CompilerServices; using NRedisStack.Tests; using StackExchange.Redis; public class StreamTutorial { [SkipIfRedis(Is.OSSCluster)] public void run() { var muxer = ConnectionMultiplexer.Connect("localhost:6379"); var db = muxer.GetDatabase(); RedisValue res1 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 30.2), new NameValueEntry("position", 1), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res1); // >>> 1712668482289-0 RedisValue res2 = db.StreamAdd( "race:france", new NameValueEntry[] { new NameValueEntry("rider", "Norem"), new NameValueEntry("speed", 28.8), new NameValueEntry("position", 3), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res2); // >>> 1712668766534-1 RedisValue res3 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Prickett"), new NameValueEntry("speed", 29.7), new NameValueEntry("position", 2), new NameValueEntry("location_id", 1) } ); Console.WriteLine(res3); // >>> 1712669055705-0 // Tests for 'xadd' step. StreamEntry[] res4 = db.StreamRange("race:france", "1712668482289-0", "+", 2); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // Tests for 'xrange' step. StreamEntry[] res5 = db.StreamRead("race:france", 0, 100); foreach (StreamEntry entry in res4) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // Tests for 'xread_block' step. RedisValue res6 = db.StreamAdd( "race:france", new NameValueEntry[]{ new NameValueEntry("rider", "Castilla"), new NameValueEntry("speed", 29.9), new NameValueEntry("position", 1), new NameValueEntry("location_id", 2) } ); Console.WriteLine(res6); // >>> 1712675674750-0 // Tests for 'xadd_2' step. long res7 = db.StreamLength("race:france"); Console.WriteLine(res7); // >>> 4 // Tests for 'xlen' step. RedisValue res8 = db.StreamAdd( "race:usa", new NameValueEntry[] { new NameValueEntry("racer", "Castilla") }, "0-1" ); Console.WriteLine(res8); // >>> 0-1 RedisValue res9 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Norem") }, "0-2" ); Console.WriteLine(res9); // >>> 0-2 // Tests for 'xadd_id' step. try { RedisValue res10 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("racer", "Prickett") }, "0-1" ); } catch (RedisServerException ex) { Console.WriteLine(ex); // >>> ERR The ID specified in XADD is equal or smaller than the target stream top item } // Tests for 'xadd_bad_id' step. RedisValue res11 = ""; Version version = muxer.GetServer("localhost:6379").Version; if (version.Major >= 7) { res11 = db.StreamAdd( "race:usa", new NameValueEntry[]{ new NameValueEntry("rider", "Norem") }, "0-*" ); Console.WriteLine(res11); // >>> "0-3" } // Tests for 'xadd_7' step. StreamEntry[] res12 = db.StreamRange("race:france", "-", "+"); foreach (StreamEntry entry in res12) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // >>> 1712668766534-1: [rider: Norem, speed: 28.800000000000001, position: 3, location_id: 1] // >>> 1712669055705-0: [rider: Prickett, speed: 29.699999999999999, position: 2, location_id: 1] // >>> 1712675674750-0: [rider: Castilla, speed: 29.899999999999999, position: 1, location_id: 2] // Tests for 'xrange_all' step. StreamEntry[] res13 = db.StreamRange("race:france", 1712668482289, 1712668482291); foreach (StreamEntry entry in res13) { Console.WriteLine($"{entry.Id}: [{string.Join(", ", entry.Values.Select(b => $"{b.Name}: {b.Value}"))}]"); } // >>> 1712668482289-0: [rider: Castilla, speed: 30.199999999999999, position: 1, location_id: 1] // Tests for 'xrange_time' step. StreamEntry[] res14 = db.StreamRange("race:france", "-", "+", 2);