dot GenAI Workshop AWS & Redis, le 24 Septembre à Paris

Inscrivez-vous gratuitement ici !

Messagerie

Qu’est-ce qu’un message broker ?

Les applications modernes ne sont plus un bloc monolithique unique et se sont transformées pour constituer un ensemble de services sans lien fixe. Alors que cette nouvelle architecture apporte de nombreux bénéfices, ces services ont toujours besoin d’interagir entre eux et nécessitent des solutions de messaging solides et efficaces.

Redis Streams fait à la fois office de canal de communication pour établir des architectures de stream et de structure de données similaire à un protocole pour les données persistantes : la solution parfaite pour l’approvisionnement d’événement.

Redis Pub/Sub est un protocole de messaging ultra-léger conçu pour publier des notifications en direct au sein d’un système. Il est idéal pour propager des messages éphémères lorsque seuls la latence et un haut débit sont critiques.

Redis Lists et Redis Sorted Sets sont la base de la mise en œuvre des files d’attente de messages. Ils peuvent être utilisés directement pour créer des solutions sur mesure ou via un cadre rendant le traitement des messages plus adapté à votre langue de programmation choisie.

Défis liés à l’établissement de solutions de message broker

1. La communication entre les services doit être fiable

When one service wants to communicate with another service, it can’t always do so immediately. Failures happen and independent deployments might make a service unavailable for periods of time. For applications at scale, it’s not a matter of “if” or “when” a service becomes unavailable, but how often. To mitigate this problem, a best practice is to limit the amount of synchronous communication between services (i.e., directly calling the service’s APIs, for example by sending a HTTP(S) request) and instead prefer persisted channels whenever practical, so that services can then consume messages at their convenience. The two main paradigms for this type of asynchronous communication are event streams and message queues.

Files d’attente de message

  1. Les files d’attente de message reposent sur les listes modifiables et sont quelquefois consommées par des outils aidant la mise en œuvre de modèles communs. Il existe deux différences principales entre les files d’attente de message et les streams d’événements: les files d’attente de message utilisent un type de communication « push », un service pousse un nouveau message dans la boîte d’entrée d’un autre service dès que quelque chose doit être traité. Les streams fonctionnent de manière opposée.
  2. Les messages contiennent des états modifiables (par exemple, le nombre d’essais) et sont supprimés du système si leur traitement est réussi. Les Streams d’événements ne peuvent être modifiés et l’historique, lorsqu’il est ajusté, est souvent « congelé ».

Les listes et les sets triés de Redis sont les deux types de données mettant en œuvre ce type de comportement et les deux outils peuvent être utilisés pour construire des solutions sur mesure tout comme les éléments finaux de gros ordinateurs spécifiques à un écosystème comme Celery (Python), Bull (JavaScript), Sidekiq (Ruby), Machinery (Go), et bien d’autres encore.

Streams/Flux d’événements

Les Streams d’événements reposent sur le type de données de protocole, un type extrêmement efficace lors de la recherche de son historique et de l’ajout de nouveaux éléments en bout de processus. Ces deux propriétés font du protocole inchangeable un excellent élément de base de communication et une façon efficace de stocker les données.

La communication via un stream est différente de l’utilisation d’une file d’attente de messages. Comme décrit auparavant, les files d’attente de messages sont « push » alors que les Streams sont « pull ». En pratique, ceci signifie que chaque service écrit dans son propre stream et que les autres services peuvent observer, en option (c’est-à-dire « tirer (pull)) » de celui-ci. Ceci rend la communication entre un et plusieurs services bien plus efficaces qu’avec les files d’attente de message.

Les files d’attente de message fonctionnent au mieux lorsqu’un service veut qu’un autre exécute une opération. Dans cette situation, la file d’attente de messages du deuxième service fait office de « boîte d’entrée de requêtes ». Si un service a plutôt besoin de publier un événement (c’est-à-dire un message pouvant interessér plusieurs service) le service le publiant devra pousser un message dans la file d’attente de chacun des services intéressés par l’événement. En pratique, la plupart des outils (par exemple, Enterprise Service Buses), peuvent le faire de manière transparente, mais la génération et le stockage d’une copie séparée du message pour chacun des destinataires restent inefficaces.

Les flux d’événement sont plus efficaces que les files d’attente de messages dans les communication entre un et plusieurs interlocuteurs, car ils inversent le protocole : une seule copie de l’événement original existe et le service voulant y accéder peut faire des recherches dans le flux d’événement (c’est-à-dire le flux du service qui publie) à son propre rythme. Les flux d’événement ont un autre avantage pratique sur les files d’attente de message: Vous n’avez pas besoin de spécifier les souscripteurs aux événements à l’avance. Dans les files d’attente de message, le système doit savoir à quelles files d’attente il doit fournir une copie de l’événement. Si vous ajouter un nouveau service plus tard, il ne recevra donc que les nouveaux événements. Avec les flux d’événement, ce problème n’existe pas, un nouveau service peut même être conçu pour faire des recherches dans l’ensemble de l’historique d’événements, ce qui est parfait pour ajouter de nouvelles statistiques tout en continuant en pouvoir les prendre en compte de manière rétroactive. Ceci signifie que vous ne devez pas immédiatement fournir toutes les métriques dont vous pourriez avoir besoin à l’avenir. Vous pouvez juste suivre celles dont vous avez besoin maintenant et en ajouter d’autres au fur et à mesure, car vous savez que vous pourrez encore voir l’historique complet même pour ceux ajoutés a posteriori.

2. Le stockage doit être économe en espace

L’efficacité de l’espace est une propriété appréciable pour tous les canaux de communication qui pérennisent les messages. Pour les flux d’événements, ceci est fondamental, car ils sont souvent utilisés pour un stockage d’informations à long terme. (Nous avons mentionné ci-dessus que les protocoles inchangeables sont rapides lorsqu’il s’agit d’ajouter de nouvelles entrées et de faire une recherche dans l’historique.)

Redis Streams est une mise en œuvre du protocole inchangeable utilisant les arborescences radix comme la structure de données sous-jacente. Chacun des débits est identifié par une estampille temporelle et peut contenir un jeu arbitraire de couples de valeur de terrain. Des entrées du même flux peuvent avoir des champs différents, mais Redis est capable de compresser plusieurs événements dans une ligne partageant le même schéma. Ceci signifie que si vos événements ont un jeu de champs stable, vous ne paierez pas de frais de stockage pour chaque nom de champ, ce qui vous permet d’utiliser des noms de clés plus longs et plus descriptifs sans inconvénient.

Comme nous l’avons vu, les flux ne peuvent pas être ajustés pour supprimer les entrées plus vieilles, et les historiques supprimés sont souvent préservés au format d’archive. Une autre caractéristique de Redis Streams est sa capacité à marquer une entrée comme « supprimée » pour se conformer aux règlements comme le RGPD.

SMise à l’échelle du débit de traitement

Les flux d’événement et les files d’attente de messages aide à gérer les pics de communication. Mais un autre problème de l’appel direct de l’API est que les services peuvent être dépassés par les pics de trafic. Les canaux de communication asynchrones peuvent faire office de tampon, ce qui permet de lisser les pics, mais le débit de traitement doit être assez solide pour prendre en charge le trafic normal. Dans le cas contraire, le système s’effondrera et le tampon grandira sans limite.

Dans Redis Streams, il est possible d’augmenter le débit de traitement en lisant un Stream via un groupe de consommateurs. Les lecteurs faisant partie du même groupe de consommateurs voient les messages d’une façon mutuellement exclusive. Bien évidemment, un seul Stream peut avoir plusieurs groupes de consommateurs. Dans les faits, nous vous recommandons de créer un groupe de consommateurs distincts pour chaque service afin que ceux-ci puissent établir plusieurs instances de lecteur afin d’augmenter le parallélisme au besoin.

3. Les sémantiques des messages doivent être claires

Lors d’une communication asynchrone, il est fondamentale de prendre en compte les scénarii d’échec possibles. Une instance de service peut par exemple subir un plantage ou perdre sa connectivité lors du traitement d’un message. Les échecs de communication étant inévitable, les systèmes de messaging se divisent en deux catégories : délivrer au plus une fois et délivrer au moins une fois. (Certains systèmes de messaging annoncent qu’ils proposent une remise exactement une seule fois, mais ceci ne représente pas toute la vérité. Dans tout système de messaging fiable, les messages doivent occasionnellement être livrés plus d’une fois afin de surmonter les échecs. Ceci est une caractéristique inévitable de la communication sur plusieurs réseaux non fiables.)

Pour traiter correctement les échecs, tous les services participant au système doivent être capables d’exécuter un traitement de message ayant la même puissance. Cette « égalité de puissance » signifie que l’état du message ne change pas dans le cas d’une livraison de messages multiples. L’égalité de puissance est habituellement atteinte en appliquant tout changement d’état nécessaire et en sauvegardant le dernier message traité de manière atomique (par exemple, dans une transaction). De cette façon, l’échec ne sera jamais maintenu dans un état incohérent et le lecteur pourra dire si un certain message a déjà été traité ou pas en vérifiant si l’identificateur du nouveau message est antérieur au nouveau message traité.

Comme Redis Streams est un canal de communication par streaming fiable, il constitue un système forcément disponible une fois . Lors de la lecture d’un Stream via un groupe de consommateurs, Redis se souvient de quel événement a été envoyé à quel consommateur. C’est au consommateur de valider correctement le fait que le message a été traité correctement. Si un consommateur meurt, un événement peut rester bloqué. Pour résoudre ce problème, les groupes de consommateurs proposent une façon d’inspecter l’état des messages en attente et, si nécessaire, de réaffecter un événement à un autre consommateur.

Nous avons noté ci-dessus que les transactions (et opérations atomiques) constituent la façon principale d’atteindre l’égalité de puissance. Comme moyen d’aide, le scriptage Redis Transactions et Lua permet d’allier plusieurs commandes avec une sémantique tout-ou-rien.

Redis Pub/Sub est un système de messaging disponible au maximum une fois qui permet aux publications d’émettre des messages vers un canal ou vers plusieurs canaux. Plus précisément, Redis Pub/Sub est conçu pour la communication en temps réel entre les instances où la faible latence est extrêmement importante et, en temps que telle, n’intègre aucune forme de persistence ou de validation. Le système de messaging en temps réel qui en résulte est le plus allégé possible, parfait pour les applications financières et de jeux où chaque millième de seconde compte.

Pourquoi Redis Enterprise pour le messaging ?

Redis Enterprise a pour base une architecture symétrique sans partage permettant la croissance linéaire et sans heurt des jeux de données sans avoir besoin de modifier le code de l’application.

Redis Enterprise offre plusieurs modèles de haute disponibilité et de distribution géographique, permettant les latences locales pour vos utilisateurs lorsque vous en avez besoin.

Plusieurs options persistantes (AOF -Append Only Files, Fichiers seulement ajoutés, par écriture ou par seconde et captures d’écrans) n’entravant pas les performances vous assurent que vous n’avez pas besoin de reconstruire vos serveurs de base de données après les pannes.

Prise en charge de jeux de données extrêmement volumineux avec l’utilisation d’une mémoire intelligente avec accès à niveau (RAM, mémoire persistante ou flash) pour vous assurer d’échelonner vos ensembles de données pour répondre aux exigences de vos utilisateurs sans trop affecter la performance.

Comment utiliser un Pub/Sub avec Redis Enterprise

Redis Streams et Pub/Sub ont des API stables englobant plusieurs langages de programmation différents. Les exemples de Python ci-après peuvent donc être facilement traduits dans le langage de votre choix.

Connexion à Redis :

import redis
# Se connecter à une instance locale de redis instance
r = redis.Redis(host='localhost', port=6379, db=0)

Écriture sur un stream :

event = {"eventType": "achat", "montant": 5, "id-article": "XXX"}
r.xadd("stream_key", '*', event)
# l’ « * » signifie que Redis génère automatiquement une id d’événement

Lecture directe d’un événement :

last_id = « $ » # « $ » signifie seulement les nouveaux messages
while True:
events = r.xread({"stream_key": last_id}, block=0, count=10)
for _, e in events:
print(f"«Nouvel événement, montant : {e['amount']}")
last_id = e['id']

Lecture d’un Stream par un groupe de consommateurs :

# Commencer par lire les événements pouvant être en attente
# qui n’ont précédemment pas été pris en compte (par exemple,
# à cause d’un plantage informatique). « 0 » indique les événements en attente.
En attente = r.xreadgroup("service-1", "consumer-A", {"stream_key": "0"})
pending_ids = []
for _, e in pending:
print(f"« Ancien événement trouvé, montant : {e['amount']}")
pending_ids.append(e['id'])
# mark pending events as processed
r.xack("stream_key", "service-1", *pending_ids)

# Now that we handled all previous events,
# Maintenant que nous avons traité tous les événements précédents,
# commencer à demander les nouveaux. « &t; » Indique « uniquement les nouveaux événements ».
Pendant que True:
events = r.xreadgroup(“service-1”, “consumer-A”, {“stream_key”: “>”}, count=10)
event_ids = []
for _, e in events:
print(f« Nouvel événement, montant : {e[‘amount’]}”)
event_ids.append(e[‘id’])
r.xack(“stream_key”, “service-1”, *event_ids)
# If we crash before `r.xack`, on reload,
# nous réessaierons ce lot de messages.

Traitement de certains événements, prise en compte et application automatique des changements :

Tant que True:
events = r.xreadgroup("service-1", "consumer-A", {"stream_key": ">"}, count=10)
event_ids = []

# initiate a redis transaction
# lancer une transaction redis
transaction = r.multi()
for _, e in events:
transaction.incrby(f”item:{e[‘item_id’}:total”, e[‘amount’])
event_ids.append(e[‘id’])
transaction.xack(“stream_key”, “service-1”, *event_ids)
transaction.exec()
# Si un plantage a lieu avant d’affecter la transaction, aucune
# autre opération n’aura lieu, afin de garantir la cohérence.

Publication sur Pub/Sub :

# Publie un message sur le canal « redis »
r.Publish( "redis", "hello world")

Souscription à un canal sur Pub/Sub :

sub = r.pubsub()
sub.subscribe("redis")
while True:
msg = sub.get_message()
print(f"new message: {msg['data']}")

Souscription à une structure sur Pub/Sub :

sub = r.pubsub()
# cette souscription renverra des messages
# de tous les canaux commençant par `red`.
sub.psubscribe("red*")
while True:
msg = sub.get_message()
print(f« Nouveau message dans le canal {msg['channel']}: {msg['data']}")


Explorer plus avant


Les prochaines étapes