Configure data pipelines

Learn how to configure ingest pipelines for data transformation

RDI implements change data capture (CDC) with pipelines. (See the architecture overview for an introduction to pipelines.)

Overview

An RDI pipeline captures change data records from the source database, and transforms them into Redis data structures. It writes each of these new structures to a Redis target database under its own key.

By default, RDI transforms the source data into hashes or JSON objects for the target with a standard data mapping and a standard format for the key. However, you can also provide your own custom transformation jobs for each source table, using your own data mapping and key pattern. You specify these jobs declaratively with YAML configuration files that require no coding.

The data tranformation involves two separate stages. First, the data ingested during CDC is automatically transformed to a JSON format. Then, this JSON data gets passed on to your custom transformation for further processing.

You can provide a job file for each source table you want to transform, but you can also add a default job for any tables that don't have their own. You must specify the full name of the source table in the job file (or the special name "*" in the default job) and you can also include filtering logic to skip data that matches a particular condition. As part of the transformation, you can specify whether you want to store the data in Redis as JSON objects, hashes, sets, streams, sorted sets, or strings.

The diagram below shows the flow of data through the pipeline:

Pipeline configuration

RDI uses a set of YAML files to configure each pipeline. The following diagram shows the folder structure of the configuration:

The main configuration for the pipeline is in the config.yaml file. This specifies the connection details for the source database (such as host, username, and password) and also the queries that RDI will use to extract the required data. You should place job configurations in the Jobs folder if you want to specify your own data transformations.

The sections below describe the two types of configuration file in more detail.

The config.yaml file

Here is an example of a config.yaml file. Note that the values of the form "${name}" refer to environment variables that you should set with the redis-di set-secret command. In particular, you should normally use environment variables as shown to set the source and target username and password rather than storing them in plain text in this file (see Set secrets for more information).

sources:
  mysql:
    type: cdc
    logging:
      level: info
    connection:
      type: mysql
      host: <DB_HOST> # e.g. localhost
      port: 3306
      # User and password are injected from the secrets.
      user: ${SOURCE_DB_USERNAME}
      password: ${SOURCE_DB_PASSWORD}
    # Additional properties for the source collector:
    # List of databases to include (optional).
    # databases:
    #   - database1
    #   - database2

    # List of tables to be synced (optional).
    # tables:
    #   If only one database is specified in the databases property above,
    #   then tables can be defined without the database prefix.
    #   <DATABASE_NAME>.<TABLE_NAME>:
    #     List of columns to be synced (optional).
    #     columns:
    #       - <COLUMN_NAME>
    #       - <COLUMN_NAME>
    #     List of columns to be used as keys (optional).
    #     keys:
    #       - <COLUMN_NAME>

    # Example: Sync specific tables.
    # tables:
    #   Sync a specific table with all its columns:
    #   redislabscdc.account: {}
    #   Sync a specific table with selected columns:
    #   redislabscdc.emp:
    #     columns:
    #       - empno
    #       - fname
    #       - lname

    # Advanced collector properties (optional):
    # advanced:
    #   Sink collector properties - see the full list at
    #     https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream
    #   sink:
    #     Optional hard limits on memory usage of RDI streams.
    #     redis.memory.limit.mb: 300
    #     redis.memory.threshold.percentage: 85

    #     Uncomment for production so RDI Collector will wait on replica
    #     when writing entries.
    #     redis.wait.enabled: true
    #     redis.wait.timeout.ms: 1000
    #     redis.wait.retry.enabled: true
    #     redis.wait.retry.delay.ms: 1000

    #   Source specific properties - see the full list at
    #     https://debezium.io/documentation/reference/stable/connectors/
    #   source:
    #     snapshot.mode: initial
    #     Uncomment if you want a snapshot to include only a subset of the rows
    #     in a table. This property affects snapshots only.
    #     snapshot.select.statement.overrides: <DATABASE_NAME>.<TABLE_NAME>
    #     The specified SELECT statement determines the subset of table rows to
    #     include in the snapshot.
    #     snapshot.select.statement.overrides.<DATABASE_NAME>.<TABLE_NAME>: <SELECT_STATEMENT>

    #     Example: Snapshot filtering by order status.
    #     To include only orders with non-pending status from customers.orders
    #     table:
    #     snapshot.select.statement.overrides: customer.orders
    #     snapshot.select.statement.overrides.customer.orders: SELECT * FROM customers.orders WHERE status != 'pending' ORDER BY order_id DESC

    #   Quarkus framework properties - see the full list at
    #     https://quarkus.io/guides/all-config
    #   quarkus:
    #     banner.enabled: "false"

targets:
  # Redis target database connections.
  # The default connection must be named 'target' and is used when no
  # connection is specified in jobs or no jobs
  # are deployed. However multiple connections can be defined here and used
  # in the job definition output blocks:
  # (e.g. target1, my-cloud-redis-db2, etc.)
  target:
    connection:
      type: redis
      # Host of the Redis database to which RDI will
      # write the processed data.
      host: <REDIS_TARGET_DB_HOST> # e.g. localhost
      # Port for the Redis database to which RDI will
      # write the processed data.
      port: <REDIS_TARGET_DB_PORT> # e.g. 12000
      # User of the Redis database to which RDI will write the processed data.
      # Uncomment if you are not using the default user.
      # user: ${TARGET_DB_USERNAME}
      # Password for Redis target database.
      password: ${TARGET_DB_PASSWORD}
      # SSL/TLS configuration: Uncomment to enable secure connections.
      # key: ${TARGET_DB_KEY}
      # key_password: ${TARGET_DB_KEY_PASSWORD}
      # cert: ${TARGET_DB_CERT}
      # cacert: ${TARGET_DB_CACERT}
processors:
  # Interval (in seconds) on which to perform retry on failure.
  # on_failed_retry_interval: 5
  # The batch size for reading data from the source database.
  # read_batch_size: 2000
  # Time (in ms) after which data will be read from stream even if
  # read_batch_size was not reached.
  # duration: 100
  # The batch size for writing data to the target Redis database. Should be
  # less than or equal to the read_batch_size.
  # write_batch_size: 200
  # Enable deduplication mechanism (default: false).
  # dedup: <DEDUP_ENABLED>
  # Max size of the deduplication set (default: 1024).
  # dedup_max_size: <DEDUP_MAX_SIZE>
  # Error handling strategy: ignore - skip, dlq - store rejected messages
  # in a dead letter queue
  # error_handling: dlq

The main sections of the file configure sources and targets.

Sources

The sources section has a subsection for the source that you need to configure. The source section starts with a unique name to identify the source (in the example we have a source called mysql but you can choose any name you like). The example configuration contains the following data:

  • type: The type of collector to use for the pipeline. Currently, the only type we support is cdc.
  • connection: The connection details for the source database: hostname, port, schema/ db name, database credentials and TLS/ mTLS secrets.
  • tables: The dataset you want to collect from the source. This subsection specifies:
    • snapshot_sql: A query that selects the tables to include in the dataset (the default is to include all tables if you don't specify a query here).
    • columns: A list of the columns you are interested in (the default is to include all columns if you don't supply a list)
    • keys: A list of columns to create a composite key if your table doesn't already have a PRIMARY KEY or UNIQUE constraint.
  • advanced: These optional properties configure other Debezium-specific features. The available sub-sections are:
    • sink: All advanced properties for writing to RDI (TLS, memory threshold, etc). See the Debezium Redis stream properties page for the full set of available properties.
    • source: All advanced connector properties (for example, RAC nodes). See Database-specific connection properties below and also see the Debezium Connectors pages for more information about the properties available for each database type.
    • quarkus: All advanced properties for Debezium server, such as the log level. See the Quarkus Configuration options docs for the full set of available properties.

Targets

Use this section to provide the connection details for the target Redis database(s). As with the sources, you should start each target section with a unique name that you are free to choose (here, we have used my-redis as an example). In the connection section, you can supply the type of target database, which will generally be redis along with the host and port of the server. You can also supply connection credentials and TLS/mTLS secrets here if you use them.

Note:

If you specify localhost as the address of either the source or target server during installation then the connection will fail if the actual IP address changes for the local VM. For this reason, we recommend that you don't use localhost for the address. However, if you do encounter this problem, you can fix it using the following commands on the VM that is running RDI itself:

sudo k3s kubectl delete nodes --all
sudo service k3s restart

Job files

You can optionally supply one or more job files that specify how you want to transform the captured data before writing it to the target. Each job file contains a YAML configuration that controls the transformation for a particular table from the source database. You can also add a default-job.yaml file to provide a default transformation for tables that don't have a specific job file of their own.

The job files have a structure like the following example. This configures a default job that:

  • Writes the data to a Redis hash
  • Adds a field app_code to the hash with a value of foo
  • Adds a prefix of aws and a suffix of gcp to the key
source:
  table: "*"
  row_format: full
transform:
  - uses: add_field
    with:
      fields:
        - field: after.app_code
          expression: "`foo`"
          language: jmespath
output:
  - uses: redis.write
    with:
      data_type: hash
      key:
        expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
        language: jmespath

The main sections of these files are:

  • source: This is a mandatory section that specifies the data items that you want to use. You can add the following properties here:

    • server_name: Logical server name (optional). This corresponds to the debezium.source.topic.prefix property specified in the Debezium Server's application.properties config file.
    • db: Database name (optional)
    • schema: Database schema (optional)
    • table: Database table name. This refers to a table name you supplied in config.yaml. The default job doesn't apply to a specific table, so use "*" in place of the table name for this job only.
    • row_format: Format of the data to be transformed. This can take the values data_only (default) to use only the payload data, or full to use the complete change record. See the transform section below for details of the extra data you can access when you use the full option.
    • case_insensitive: This applies to the server_name, db, schema, and table properties and is set to true by default. Set it to false if you need to use case-sensitive values for these properties.
  • transform: This is an optional section describing the transformation that the pipeline applies to the data before writing it to the target. The uses property specifies a transformation block that will use the parameters supplied in the with section. See the data transformation reference for more details about the supported transformation blocks, and also the JMESPath custom functions reference.

    Note:

    If you set row_format to full under the source settings, you can access extra data from the change record in the transformation:

    • Use the expression key.key to get the generated Redis key as a string.
    • Use before.<FIELD_NAME> to get the value of a field before it was updated in the source database (the field name by itself gives you the value after the update).
  • output: This is a mandatory section to specify the data structure(s) that RDI will write to the target along with the text pattern for the key(s) that will access it. Note that you can map one record to more than one key in Redis or nest a record as a field of a JSON structure (see Data denormalization for more information about nesting). You can add the following properties in the output section:

    • uses: This must have the value redis.write to specify writing to a Redis data structure. You can add more than one block of this type in the same job.
    • with:
      • connection: Connection name as defined in config.yaml (by default, the connection named target is used).
      • data_type: Target data structure when writing data to Redis. The supported types are hash, json, set, sorted_set, stream and string.
      • key: This lets you override the default key for the data structure with custom logic:
        • expression: Expression to generate the key.
        • language: Expression language, which must be jmespath or sql.
      • expire: Positive integer value indicating a number of seconds for the key to expire. If you don't specify this property, the key will never expire.
Note:
In a job file, the transform section is optional, but if you don't specify a transform, you must specify custom key logic in output.with.key. You can include both of these sections if you want both a custom transform and a custom key.

Another example below shows how you can rename the fname field to first_name in the table emp using the rename_field block. It also demonstrates how you can set the key of this record instead of relying on the default logic. (See the Transformation examples section for more examples of job files.)

source:
  server_name: redislabs
  schema: dbo
  table: emp
transform:
  - uses: rename_field
    with:
      from_field: fname
      to_field: first_name
output:
  - uses: redis.write
    with:
      connection: target
      key:
        expression: concat(['emp:fname:',fname,':lname:',lname])
        language: jmespath

See the RDI configuration file reference for full details about the available source, transform, and target configuration options and see also the data transformation reference for details of all the available transformation blocks.

Source preparation

Before using the pipeline you must first prepare your source database to use the Debezium connector for change data capture (CDC). See the architecture overview for more information about CDC. Each database type has a different set of preparation steps. You can find the preparation guides for the databases that RDI supports in the Prepare source databases section.

Deploy a pipeline

When your configuration is ready, you must deploy it to start using the pipeline. See Deploy a pipeline to learn how to do this.

Ingest pipeline lifecycle

Once you have created the configuration for a pipeline, it goes through the following phases:

  1. Deploy - when you deploy the pipeline, RDI first validates it before use. Then, the operator creates and configures the collector and stream processor that will run the pipeline.
  2. Snapshot - The collector starts the pipeline by creating a snapshot of the full dataset. This involves reading all the relevant source data, transforming it and then writing it into the Redis target. You should expect this phase to take minutes or hours to complete if you have a lot of data.
  3. CDC - Once the snapshot is complete, the collector starts listening for updates to the source data. Whenever a change is committed to the source, the collector captures it and adds it to the target through the pipeline. This phase continues indefinitely unless you change the pipeline configuration.
  4. Update - If you update the pipeline configuration, the operator starts applying it to the processor and the collector. Note that the changes only affect newly-captured data unless you reset the pipeline completely. Once RDI has accepted the updates, the pipeline returns to the CDC phase with the new configuration.
  5. Reset - There are circumstances where you might want to rebuild the dataset completely. For example, you might want to apply a new transformation to all the source data or refresh the dataset if RDI is disconnected from the source for a long time. In situations like these, you can reset the pipeline back to the snapshot phase. When this is complete, the pipeline continues with CDC as usual.
RATE THIS PAGE
Back to top ↑