La collecte, le stockage et le traitement de grand volumes de données diverses à grande vitesse s’accompagne de défis de conception complexes, spécialement dans des domaines comme l’Internet des objets (IoT), l’e-commerce, la sécurité, les communications, les loisirs, la finance et la distribution de détail. La prise de décision réactive, à temps et précise sur la base des données est un élément essentiel de ces secteurs d’activité, ce qui fait de la collecte de données et de leur analyse en temps réel un aspect critique.
Une première étape dans la fourniture d’analyse de données en temps réel consiste à s’assurer que des ressources adéquates sont disponibles pour capturer de manière efficace des flux de données rapides. Alors que l’infrastructure physique (y compris un réseau à haute vitesse, le calcul, le stockage et la mémoire) joue un rôle important à ce niveau, la pile logicielle doit être en accord avec la performance de sa couche physique. Sinon, les organisations peuvent se trouver confrontées à une quantité importante de tâches en retard, des données manquantes, incomplètes ou fausses.
L’ingestion de données à haute vitesse implique souvent différents types de complexités:
Une performance élevée avec le plus petit nombre de serveurs possible:
Quand il s’agit de performance, Redis Enterprise a été soumis à un banc d’essai de traitement de plus de 200 millions d’opérations de lecture/écriture par seconde avec des latences inférieures au millième de seconde, avec seulement une grappe de 40 nœuds sur AWS. Ceci fait de Redis Enterprise la base de données NoSQL la plus efficace en matière de ressources du marché.
Structures et modules de données polyvalents pour des analyses en temps réel: Redis Streams, Pub/Sub, listes, Sorted Sets (ensembles triés), RedisTimeSeries
Redis propose une variété de structures tels que Streams, Lists, Sets, Sorted Sets et Hashes qui assurent un traitement de données simple et flexible afin de combiner efficacement l’ingestion de données à haute vitesse et les analyses en temps réel.
Les capacités Pub/Sub de Redis lui permettent d’agir en tant que « message broker » efficace entre les nœuds d’ingestion de données géographiquement distribués. Les applications produisant des données publient des données en flux vers les canaux au(x) format(s) requis alors que les applications consommatrices souscrivent aux canaux dont elles ont besoin et reçoivent des messages de manière asynchrones dès qu’elles sont publiées.
Les listes et les ensembles triés peuvent être utilisés comme canaux de données connectant les producteurs et les consommateurs. Vous pouvez aussi utiliser ces structures de données pour transmettre les données de mode asynchrone. Contrairement à Pub/Sub, les listes et les ensembles triés sont persistants.
Les streams peuvent faire encore plus et offrent un canal d’ingestion de données continu entre les producteurs et les consommateurs. Avec les streams, vous pouvez diminuer le nombre de consommateurs à l’aide des groupes de consommateurs. Les groupes de consommateurs mettent également en œuvre la sécurité de données similaire à celle des transactions lorsque les consommateurs ne répondent pas pendant la consommation et le traitement des données.
Pour terminer, RedisTimeSeries fournit un jeu de fonctions rapide et avancé d’ingestion de données incluant le sous-échantillonnage, les contre-opérations spéciales sur la dernière valeur ingérée et la double compression delta combinée aux capacités d’analyse en temps réel similaire à celle de l’étiquetage des données avec recherche intégrée, agrégation, requête sur plage de données ainsi qu’un connecteur intégré pour les outils leaders de surveillance et d’analyse comme Grafana et Prometheus.
Déploiement avec géodistribution active-active
La technologie active-active basée sur les CRDT de Redis Enterprise permet l’ingestion complexe de données et les opérations de messagerie sur des sites géographiquement distants et rend possible le déploiement des applications d’une façon totalement distribuée pour améliorer de manière significative la disponibilité et le temps de réponse de l’application.
Extension de la DRAM de Redis avec du SSD et une mémoire permanente
La technologie de Redis Enterprise Redis on Flash permet l’extension de la DRAM avec du SSD et une mémoire permanente, permet de stocker de très grands ensembles de données de plusieurs téraoctets avec les mêmes coûts d’infrastructure que les bases de données à bases de disques, avec des latences de base de données inférieures au millième de seconde même lors de l’ingestion de plus de 1M d’éléments/sec sur chacun des nœuds de la grappe de Redis Enterprise.
Voici quelques bouts de codes écrits en Java. Ils utilisent tous la bibliothèque Jedis. Tout d’abord, suivez les instructions sur la page Commencer de Jedis pour télécharger la dernière version de Jedis.
import java.util.HashMap;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;public class StreamPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {
try {
Map<String, String> kv = new HashMap<String, String>();
kv.put(“a”, “100”); // key -> a; value -> 100
jedis.xadd(“MyStream”, StreamEntryID.NEW_ENTRY, kv);
}finally {
jedis.close();
}
}
}
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;public class StreamConsumeAsync{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{// Start from 0. For subsequent queries, read from last id + 1
String lastStreamDataId = “0-0”;
int count = 1000;
long waitTimeInMillis = 5000;try {
// Read new data asynchronously in a loop
while(true) {
List next = getNext(“MyStream”, lastStreamDataId,
count, waitTimeInMillis);
if(next != null) {
List<StreamEntry> stList = getStreamEntries(next);
if(stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); // Read the fields (key-value pairs) of data stream
Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
lastStreamDataId = streamData.getID().getTime()
+”-”
+(streamData.getID().getSequence()+1); System.out.println(stList.get(j));
System.out.println(lastStreamDataId);
}
}else{
System.out.println(“No new data in the stream”);
}
}
} }finally {
jedis.close();
}
} // Read the next set of data from the stream
private static List getNext(String streamId, String lastId, int count, long waitTimeInMillis) throws Exception{
HashMap<String, StreamEntryID> map = new HashMap();
String readFrom = lastId;
map.put(streamId, new StreamEntryID(readFrom));
List list = jedis.xread(count, waitTimeInMillis,
(Entry<String, StreamEntryID>)
map.entrySet().toArray()[0]);
return list;
} // Read stream entries
// Assumes streamList has only one stream
private static List<StreamEntry> getStreamEntries(List streamList) throws Exception{
if(streamList.size()>0) {
SimpleEntry stEntry = (SimpleEntry)streamList.get(0);
return (List<StreamEntry>) stEntry.getValue();
} return null;
}
}
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;public class StreamQuery{static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception{String streamID = “MyStream”;
StreamEntryID start = new StreamEntryID(0,0);
StreamEntryID end = null; // null -> until the last item in the stream
int count = 2;try {
List<StreamEntry> stList = jedis.xrange(streamID, start, end, count);
if(stList != null) {
// Process data here
for(int j=0; j<stList.size(); j++) {
StreamEntry streamData = (StreamEntry)stList.get(j); System.out.println(streamData); // Read the fields (key-value pairs) of data stream
Map<String, String> fields = streamData.getFields(); // Read subsequent data from last id + 1
StreamEntryID nextStart =
new StreamEntryID(streamData.getID().getTime(),
(streamData.getID().getSequence()+1));
}
}else {
System.out.println(“No new data in the stream”);
}
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
public class PubSubPublish {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String channel = “MyChannel”;
String message = “Hello there!”;
jedis.publish(channel, message);
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubSubscribe extends JedisPubSub{
static Jedis jedis = new Jedis(“localhost”, 6379);public static void main(String[] args) throws Exception {try {
PubSubSubscribe mySubscriber = new PubSubSubscribe();
String channel = “MyChannel”;
jedis.subscribe(mySubscriber, channel);
}finally {
jedis.close();
}
}// Receive messages
@Override
public void onMessage(String channel, String message) {
System.out.println(message);
}
}
import redis.clients.jedis.Jedis;
public class ListPush {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String list = “MyList”;
String message = “Hello there!”;
jedis.lpush(list, message);
}finally {
jedis.close();
}
}
}
import redis.clients.jedis.Jedis;
public class ListPop {
static Jedis jedis = new Jedis(“localhost”, 6379);
public static void main(String[] args) throws Exception {try {
String list = “MyList”;
String message = jedis.rpop(list);
System.out.println(message);
}finally {
jedis.close();
}
}
}