Data transformation pipeline
Learn how to transform data to Redis types
Write-behind's data transformation capabilities allow users to transform their data beyond the default translation of source types to Redis types. The transformation involves no coding. Instead, it is described in a set of human readable YAML files, one per source table.
The ingested format and types are different from one source to another. Currently, the only supported source is Debezium. The first transformation from Debezium types to native JSON with Redis types is done automatically without any need for user instructions. Then, this JSON is passed on to the user defined transformation pipeline.
Each job describes the transformation logic to perform on data from a single source. The source is typically a database table or collection and is specified as the full name of this table/collection. The job may include filtering logic to skip data that matches a condition. Other logical steps in the job will transform the data into the desired output that will be stored in Redis as hashes or JSON.
Default job
In situations where there is a need to perform a transformation on all ingested records without creating a specific job for specific tables, the default job is used. The transformation associated with this job will be applied to all tables that lack their own explicitly defined jobs. The default job must have a table name of "*", and only one instance of this type of job is permitted.
For example, the default job can streamline tasks such as adding a prefix or postfix to Redis keys, or adding fields to new hashes and JSONs without customizing each source table.
Currently, the default job is supported for ingest pipelines only.
Example
This example demonstrates the process of adding an app_code
field with a value of foo
using the add_field block to all tables that lack explicitly defined jobs. Additionally, it appends an aws
prefix and a gcp
postfix to every generated hash key.
default.yaml
source:
table: "*"
row_format: full
transform:
- uses: add_field
with:
fields:
- field: after.app_code
expression: "`foo`"
language: jmespath
output:
- uses: redis.write
with:
data_type: hash
key:
expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
language: jmespath
Jobs
Each job is defined in a separate YAML file. All of these files will be uploaded to Write-behind using the deploy
command.
For more information, see deploy configuration). If you are using the
scaffold command,
place the job files in the jobs
folder.
Job YAML structure
Fields
-
source
:This section describes the table that the job operates on:
server_name
: logical server name (optional). Corresponds to thedebezium.source.topic.prefix
property specified in Debezium Server'sapplication.properties
config filedb
: database name (optional)schema
: database schema (optional)table
: database table namerow_format
: format of the data to be transformed:data_only
(default) - only payload, full - complete change record
Note: Any reference to the properties
server_name
,db
,schema
, andtable
will be treated by default as case insensitive. This can be changed by settingcase_insensitive
tofalse
.
Cassandra only: In Cassandra, a
keyspace
is roughly the equivalent to aschema
in other databases. Write-behind uses theschema
property declared in a job file to match thekeyspace
attribute of the incoming change record.
MongoDB only: In MongoDB, a
replica set
is a cluster of shards with data and can be regarded as roughly equivalent to aschema
in a relational database. A MongoDBcollection
is similar to atable
in other databases. Write-behind uses theschema
andtable
properties declared in a job file to match thereplica set
andcollection
attributes of the incoming change record, respectively.
-
transform
:This section includes a series of blocks that define how the data will be transformed. For more information, see supported blocks and JMESPath custom functions.
-
output
:This section defines the output targets for processed data:
- Cassandra:
uses
:cassandra.write
: write into a Cassandra data storewith
:connection
: connection namekeyspace
: keyspacetable
: target tablekeys
: array of key columnsmapping
: array of mapping columnsopcode_field
: the name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the database
- Redis:
uses
:redis.write
: write to a Redis data structure. Multiple blocks of this type are allowed in the same jobwith
:connection
: connection name as defined inconfig.yaml
(by default, the connection named 'target' is used)data_type
: target data structure when writing data to Redis (hash, json, set and stream are supported values)key
: this allows you to override the key of the record by applying custom logic:expression
: expression to executelanguage
: expression language, JMESPath or SQL
expire
: positive integer value indicating a number of seconds for the key to expire. If not set, the key will never expire
- SQL:
uses
:relational.write
: write into a SQL-compatible data storewith
:connection
: connection nameschema
: schematable
: target table namekeys
: array of key columnsmapping
: array of mapping columnsopcode_field
: the name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the database
- Cassandra:
Notes
source
is required.- Either
transform
,key
, or both should be specified.
Using key in transformations
To access the Redis key (for example in a write-behind job) you will need to take the following steps:
- Set
row_format: full
to allow access to the key that is part of the full data entry. - Use the expression
key.key
to get the Redis key as a string.
Before and after values
Update events typically report before
and after
sections, providing access to the data state before and after the update.
To access the "before" values explicitly, you will need to:
- Set
row_format: full
to allow access to the key that is part of the full data entry. - Use the
before.<FIELD_NAME>
pattern.
Example
This example shows how to rename the fname
field to first_name
in the table emp
using the rename_field
block. It also demonstrates how to set the key of this record instead of relying on the default logic.
redislabs.dbo.emp.yaml
source:
server_name: redislabs
schema: dbo
table: emp
transform:
- uses: rename_field
with:
from_field: fname
to_field: first_name
output:
- uses: redis.write
with:
connection: target
key:
expression: concat(['emp:fname:',fname,':lname:',lname])
language: jmespath
Deploy configuration
In order to deploy your jobs to the remote Write-behind database, run:
redis-di deploy
Deploy configuration on Kubernetes
If the Write-behind CLI is deployed as a pod in a Kubernetes cluster, perform these steps to deploy your jobs:
-
Create a ConfigMap from the YAML files in your
jobs
folder:kubectl create configmap redis-di-jobs --from-file=jobs/
-
Deploy your jobs:
kubectl exec -it pod/redis-di-cli -- redis-di deploy
Note: A delay occurs between creating/modifying the ConfigMap and its availability in the
redis-di-cli
pod. Wait around 30 seconds before running theredis-di deploy
command.
You have two options to update the ConfigMap:
-
For smaller changes, you can edit the ConfigMap directly with this command:
kubectl edit configmap redis-di-jobs
-
For bigger changes such as adding another job file, edit the files in your local
jobs
folder and then run this command:kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
Note: You need to run
kubectl exec -it pod/redis-di-cli -- redis-di deploy
after updating the ConfigMap with either option.