La recopilación, el almacenamiento y el procesamiento de grandes volúmenes de datos de gran variedad y velocidad presentan varios retos de diseño complejos, especialmente en campos como el Internet de las cosas (IoT), el comercio electrónico, la seguridad, las comunicaciones, el entretenimiento, las finanzas y el comercio minorista. Por eso, dado que la toma de decisiones basada en datos sensibles, oportunos y precisos es fundamental para estas empresas, la recopilación y el análisis de datos en tiempo real son fundamentales.
Un primer paso muy importante para realizar análisis de datos en tiempo real es garantizar la disponibilidad de recursos adecuados para capturar eficazmente flujos de datos rápidos. La infraestructura física (que incluye una red de alta velocidad, computación, almacenamiento y memoria) desempeña un papel importante en este caso, pero la pila de software debe estar a la altura del rendimiento de su capa física o las organizaciones pueden acabar con una acumulación masiva de datos, con datos que faltan o con datos incompletos y engañosos.
La ingestión de datos a alta velocidad suele implicar diferentes tipos de complejidades:
Alto rendimiento con el menor número de servidores
Toma nota, en lo que respecta a rendimiento, Redis Enterprise ha sido evaluada para manejar más de 200 millones de operaciones de lectura/escritura por segundo, con latencias de menos de un milisegundo con solo un clúster de 40 nodos en AWS. Razón por la cual, Redis Enterprise es la base de datos NoSQL más eficiente en cuanto a recursos del mercado.
Estructuras de datos y módulos flexibles para el análisis en tiempo real: Redis Streams, Pub/Sub, Lists, Sorted Sets, RedisTimeSeries
Redis te lo ofrece todo: una variedad de estructuras de datos como Streams, Listas, Conjuntos, Conjuntos Ordenados y Hashes que proporcionan un procesamiento de datos simple y versátil con el fin de combinar eficientemente la ingestión de datos de alta velocidad y el análisis en tiempo real.
Las capacidades Pub/Sub de Redis le permiten actuar como un eficiente intermediario de mensajes entre nodos de anidación de datos distribuidos geográficamente. Las aplicaciones productoras de datos publican los datos en streaming en los canales en el formato o formatos requeridos, y las aplicaciones consumidoras se suscriben a los canales que les interesan, recibiendo los mensajes de forma asíncrona a medida que se publican y así en círculo.
Las listas y los conjuntos ordenados pueden utilizarse como canales de datos que conectan a productores y consumidores. También puedes utilizar estas estructuras de datos para transmitir datos de forma asíncrona. A diferencia de Pub/Sub, pero las listas y los conjuntos ordenados ofrecen persistencia.
Los flujos pueden hacer todavía más, ofreciendo un canal de ingestión de datos persistente entre productores y consumidores. Con Streams, puedes ampliar el número de consumidores utilizando grupos de consumidores. Los grupos de consumidores también implementan la seguridad de los datos de tipo transaccional si los consumidores fallan en medio del consumo y el procesamiento de los datos.
Y, por último, pero no menos importante RedisTimeSeries proporciona un conjunto mejorado de funciones de ingestión rápida de datos que incluye el muestreo descendente, operaciones de contador especiales en el último valor ingerido y la compresión delta doble, combinadas con capacidades de análisis en tiempo real como para el etiquetado de datos con búsqueda incorporada, agregación, consultas de rango y un conector incorporado a las principales herramientas de supervisión y análisis como Grafana y Prometheus.
Despliegue de geodistribución de activo a activo
Latecnología Activo-Activobasada en CRDTs de Redis Enterprise permite realizar operaciones complejas de almacenamiento de datos y mensajería a través de ubicaciones geográficas y permite que la aplicación se despliegue de forma completamente distribuida así mejora significativamente la disponibilidad y el tiempo de respuesta de la aplicación.
Amplía la DRAM de Redis con SSD y memoria persistente
La tecnología Redison Flashde Redis Enterprise te permite ampliar la DRAM con SSD y memoria persistente, y así almacenar conjuntos de datos muy grandes de varios terabytes utilizando los mismos costes de infraestructura de las bases de datos basadas en disco y manteniendo las latencias de las bases de datos a niveles inferiores al milisegundo, incluso cuando se ingieren más de 1 millón de elementos/segundo en cada nodo del clúster de Redis Enterprise.
A continuación podrás ver algunos fragmentos de código escritos en Java, no te lo pierdas. Todos utilizan la biblioteca de Jedis. Antes de todo, sigue las instrucciones de la página de inicio de Jedis para descargar la última versión de Jedis.
import java.util.HashMap;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;public class StreamPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
try {
Map<String, String> kv = new HashMap<String, String>();
kv.put(“a”, “100”); // key -> a; value -> 100
jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
}finally {
jedis.close();
}
}
}
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. Para las consultas posteriores, lee desde el último id + 1
String lastStreamDataId = “0-0”;
int count = 1000;
long waitTimeInMillis = 5000;try {
// Read new data asynchronously in a loop
while(true) {
List next = getNext(“MyStream”, lastStreamDataId,
count, waitTimeInMillis);
if(next != null) {
List<StreamEntry> stList = getStreamEntries(next);
if(stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); // Lee los campos (pareja key-value) del flujo de datos
Map<String, String> fields = streamData.getFields(); // Lee los datos posteriores de la última id + 1
lastStreamDataId = streamData.getID().getTime()
+”-”
+(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
System.out.println(lastStreamDataId);
}
}else{
System.out.println(“No new data in the stream”);
}
}
} }finally {
jedis.close();
}
} // Read the next set of data from the stream
private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
HashMap<String, StreamEntryID> map = new HashMap();
String readFrom = lastId;
map.put(streamId, new StreamEntryID(readFrom));
List list = jedis.xread(count, waitTimeInMillis,
(Entry<String, StreamEntryID>)
map.entrySet().toArray()[0]);
return list;
} // Read stream entries
// Assumes streamList has only one stream
private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
if(streamList.size()>0) {
SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
return (List<StreamEntry>) stEntry.getValue();
} return null;
}
}
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
StreamEntryID start = new StreamEntryID(0,0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;try {
List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
if(stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Lee los campos (pareja key-value) del flujo de datos
Map<String, String> fields = streamData.getFields(); // Lee los datos posteriores de la última id + 1
StreamEntryID nextStart =
new StreamEntryID(streamData.getID().getTime(),
(streamData.getID().getSequence()+1));
}
}else {
System.out.println(“No new data in the stream”);
}}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
public class PubSubPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String channel = “MyChannel”;
String message = “Hello there!”;
jedis.publish(channel, message);
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubSubscribe extends JedisPubSub{
static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
PubSubSubscribe mySubscriber = new PubSubSubscribe();
String channel = “MyChannel”;
jedis.subscribe(mySubscriber, channel);
}finally {
jedis.close();
}
}// Receive messages
@Override
public void onMessage(String channel, String message) {
System.out.println(message);
}
}
import redis.clients.jedis.Jedis;
public class ListPush {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String list = “MyList”;
String message = “Hello there!”;
jedis.lpush(list, message);
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
public class ListPop {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String list = “MyList”;
String message = jedis.rpop(list);
System.out.println(message);
}finally {
jedis.close();
}
}
}