Redis Data Integration configuration file

Redis Data Integration configuration file reference

Configuration file for Redis Data Integration (RDI) source collectors and target connections

Properties

Name Type Description Required
sources
(Source collectors)
object Defines source collectors and their configurations. Each key represents a unique source identifier, and its value contains specific configuration for that collector
processors
(Data processing configuration)
object, null Configuration settings that control how data is processed, including batch sizes, error handling, and performance tuning
targets
(Target connections)
object Configuration for target Redis databases where processed data will be written
secret-providers
(Secret providers)
object Configuration for secret management providers

Additional Properties: not allowed

sources: Source collectors

Defines source collectors and their configurations. Each key represents a unique source identifier, and its value contains specific configuration for that collector

Properties (key: .*)

Name Type Description Required
connection yes
type
(Collector type)
string Type of the source collector.
Default: "cdc"
Enum: "cdc", "flink"
yes
active
(Collector enabled)
boolean Flag to enable or disable the source collector
Default: true
no
logging
(Logging configuration)
object Logging configuration for the source collector
no
tables
(Tables to capture)
object Defines which tables to capture and how to handle their data
no
schemas
(Schema names)
string[] Schema names to capture from the source database (schema.include.list)
no
databases
(Database names)
string[] Database names to capture from the source database (database.include.list)
no
advanced
(Advanced configuration)
object Advanced configuration options for fine-tuning the collector
no

sources.logging: Logging configuration

Logging configuration for the source collector

Properties

Name Type Description Required
level
(Logging level)
string Logging level for the source collector
Default: "info"
Enum: "trace", "debug", "info", "warn", "error"

Additional Properties: not allowed
Example

level: info

sources.tables: Tables to capture

Defines which tables to capture and how to handle their data

Additional Properties

Name Type Description Required
Additional Properties object, null

Minimal Properties: 1

sources.tables.additionalProperties: object,null

Properties

Name Type Description Required
snapshot_sql string Custom SQL statement to use for the initial data snapshot, allowing fine-grained control over what data is captured
columns
(Columns to capture)
string[] List of specific columns to capture for changes. If not specified, all columns will be captured. Note: This property cannot be used for MongoDB connections
exclude_columns
(Columns to exclude)
string[] List of specific columns to exclude from capture. If not specified, no columns will be excluded. Note: This property can only be used for MongoDB connections
keys
(Message keys)
string[] Optional list of columns to use as a composite unique identifier. Only required when the table lacks a primary key or unique constraint. Must form a unique combination of fields

Additional Properties: not allowed

sources.tables.additionalProperties.columns[]: Columns to capture

List of specific columns to capture for changes. If not specified, all columns will be captured. Note: This property cannot be used for MongoDB connections

sources.tables.additionalProperties.exclude_columns[]: Columns to exclude

List of specific columns to exclude from capture. If not specified, no columns will be excluded. Note: This property can only be used for MongoDB connections

sources.tables.additionalProperties.keys[]: Message keys

Optional list of columns to use as a composite unique identifier. Only required when the table lacks a primary key or unique constraint. Must form a unique combination of fields

sources.schemas[]: Schema names

Schema names to capture from the source database (schema.include.list)

sources.databases[]: Database names

Database names to capture from the source database (database.include.list)

sources.advanced: Advanced configuration

Advanced configuration options for fine-tuning the collector

Properties

Name Type Description Required
sink
(RDI Collector stream writer configuration)
object Advanced configuration properties for RDI Collector stream writer connection and behaviour. When using collector type 'cdc', see the full list of properties at - https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream . When using a property from that list, remove the debezium.sink. prefix. When using collector type 'flink', see the full list of properties at
source
(Advanced source settings)
object Advanced configuration properties for the source database connection and CDC behavior
quarkus
(Quarkus runtime settings)
object Advanced configuration properties for the Quarkus runtime environment
flink
(Advanced Flink settings)
object Advanced configuration properties for Flink
java_options
(Advanced Java options)
string These Java options will be passed to the command line command when launching the source collector

Additional Properties: not allowed
Minimal Properties: 1
Example

sink: {}
source: {}
quarkus: {}
flink: {}

sources.advanced.sink: RDI Collector stream writer configuration

Advanced configuration properties for RDI Collector stream writer connection and behaviour. When using collector type 'cdc', see the full list of properties at - https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream . When using a property from that list, remove the debezium.sink. prefix. When using collector type 'flink', see the full list of properties at

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

sources.advanced.source: Advanced source settings

Advanced configuration properties for the source database connection and CDC behavior

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

sources.advanced.quarkus: Quarkus runtime settings

Advanced configuration properties for the Quarkus runtime environment

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

Advanced configuration properties for Flink

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

processors: Data processing configuration

Configuration settings that control how data is processed, including batch sizes, error handling, and performance tuning

Properties

Name Type Description Required
type
(Processor type)
string Processor type, either 'classic' or 'flink'
Default: "classic"
Enum: "classic"
on_failed_retry_interval
(Retry interval on failure)
integer, string Number of seconds to wait before retrying a failed operation
Default: 5
Pattern: ^\${.*}$
Minimum: 1
read_batch_size integer, string Maximum number of records to process in a single batch
Default: 2000
Pattern: ^\${.*}$
Minimum: 1
read_batch_timeout_ms
(Read batch timeout)
integer Maximum time in milliseconds to wait for a batch to fill before processing
Default: 100
Minimum: 1
enable_async_processing boolean Enable async processing to improve throughput
Default: true
batch_queue_size integer Maximum number of batches to queue for processing
Default: 3
Minimum: 1
ack_queue_size integer Maximum number of batches to queue for asynchronous acknowledgement
Default: 10
Minimum: 1
dedup
(Enable deduplication)
boolean Enable the deduplication mechanism to handle duplicate records
Default: false
dedup_max_size
(Deduplication set size)
integer Maximum number of entries to store in the deduplication set
Default: 1024
Minimum: 1
dedup_strategy
(Deduplication strategy)
string (DEPRECATED)
Property 'dedup_strategy' is now deprecated. The only supported strategy is 'ignore'. Please remove from the configuration.
Default: "ignore"
Enum: "reject", "ignore"
duration
(Batch duration limit)
integer, string Maximum time in milliseconds to wait for a batch to fill before processing
Default: 100
Pattern: ^\${.*}$
Minimum: 1
write_batch_size integer, string Maximum number of records to write to target Redis database in a single batch
Default: 200
Pattern: ^\${.*}$
Minimum: 1
error_handling
(Error handling strategy)
string Strategy for handling errors: ignore to skip errors, dlq to store rejected messages in dead letter queue
Default: "dlq"
Pattern: ^\${.*}$|ignore|dlq
dlq_max_messages
(DLQ message limit)
integer, string Maximum number of messages to store in dead letter queue per stream
Default: 1000
Pattern: ^\${.*}$
Minimum: 1
target_data_type
(Target Redis data type)
string Data type to use in Redis: hash for Redis Hash, json for RedisJSON (requires RedisJSON module)
Default: "hash"
Pattern: ^\${.*}$|hash|json
json_update_strategy string Strategy for updating JSON data in Redis: replace to overwrite the entire JSON object, merge to merge new data with existing JSON object
Default: "replace"
Pattern: ^\${.*}$|replace|merge
initial_sync_processes integer, string Number of parallel processes for performing initial data synchronization
Default: 4
Pattern: ^\${.*}$
Minimum: 1
Maximum: 32
idle_sleep_time_ms
(Idle sleep interval)
integer, string Time in milliseconds to sleep between processing batches when idle
Default: 200
Pattern: ^\${.*}$
Minimum: 1
Maximum: 999999
idle_streams_check_interval_ms
(Idle streams check interval)
integer, string Time in milliseconds between checking for new streams when processor is idle
Default: 1000
Pattern: ^\${.*}$
Minimum: 1
Maximum: 999999
busy_streams_check_interval_ms
(Busy streams check interval)
integer, string Time in milliseconds between checking for new streams when processor is busy
Default: 5000
Pattern: ^\${.*}$
Minimum: 1
Maximum: 999999
wait_enabled
(Enable replica wait)
boolean Enable verification that data has been written to replica shards of the target database
Default: false
wait_timeout
(Replica wait timeout)
integer, string Maximum time in milliseconds to wait for replica write verification of the target database
Default: 1000
Pattern: ^\${.*}$
Minimum: 1
retry_max_attempts
(Maximum retry attempts)
integer, string Maximum number of attempts for failed operations
Default: 5
Pattern: ^\${.*}$
Minimum: 1
retry_initial_delay_ms
(Initial retry delay)
integer, string Initial delay in milliseconds before retrying a failed operation
Default: 1000
Pattern: ^\${.*}$
Minimum: 1
Maximum: 999999
retry_max_delay_ms
(Maximum retry delay)
integer, string Maximum delay in milliseconds between retry attempts
Default: 10000
Pattern: ^\${.*}$
Minimum: 1
Maximum: 999999
retry_on_replica_failure boolean Continue retrying writes until successful replication to replica shards is confirmed
Default: true
logging
(Logging configuration)
object Logging configuration for the processor
use_native_json_merge
(Use native JSON merge from RedisJSON module)
boolean Controls whether to use the native JSON.MERGE command (when true) or Lua scripts (when false) for JSON merge operations. Introduced in RDI 1.15.0. The native command provides 2x performance improvement but handles null values differently:

Previous behavior (Lua merge): When merging {"field1": "value1", "field2": "value2"} with {"field2": null, "field3": "value3"}, the result was {"field1": "value1", "field2": null, "field3": "value3"} (null value is preserved)

New behavior (JSON.MERGE): The same merge produces {"field1": "value1", "field3": "value3"} (null value removes the field, following RFC 7396)

Note: The native JSON.MERGE command requires RedisJSON 2.6.0 or higher. If the target database has an older version of RedisJSON, RDI will automatically fall back to using Lua-based merge operations regardless of this setting.

Impact: If your application logic distinguishes between a field with a null value and a missing field, you may need to adjust your data handling. This follows the JSON Merge Patch RFC standard but differs from the previous Lua implementation. Set to false to revert to the previous Lua-based merge behavior if needed.
Default: true
advanced
(Advanced configuration)
object Advanced configuration options for fine-tuning the processor

Additional Properties: not allowed

processors.logging: Logging configuration

Logging configuration for the processor

Properties

Name Type Description Required
level
(Logging level)
string Logging level for the processor
Default: "info"
Enum: "trace", "debug", "info", "warn", "error"

Additional Properties: not allowed
Example

level: info

processors.advanced: Advanced configuration

Advanced configuration options for fine-tuning the processor

Properties

Name Type Description Required
source
(Advanced source settings)
object Advanced configuration properties for the source
sink
(Advanced sink settings)
object Advanced configuration properties for the sink
processor
(Advanced processor settings)
object Advanced configuration properties for the processor

Additional Properties: not allowed
Minimal Properties: 1
Example

source: {}
sink: {}
processor: {}

processors.advanced.source: Advanced source settings

Advanced configuration properties for the source

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

processors.advanced.sink: Advanced sink settings

Advanced configuration properties for the sink

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

processors.advanced.processor: Advanced processor settings

Advanced configuration properties for the processor

Additional Properties

Name Type Description Required
Additional Properties string, number, boolean

Minimal Properties: 1

targets: Target connections

Configuration for target Redis databases where processed data will be written

Properties (Pattern)

Name Type Description Required
.*

secret-providers: Secret providers

Configuration for secret management providers

Properties (key: .*)

Name Type Description Required
type
(Provider type)
string Type of secret provider service
Enum: "aws", "vault"
yes
parameters
(Provider parameters)
object Configuration parameters for the secret provider
yes

secret-providers.parameters: Provider parameters

Configuration parameters for the secret provider

Properties

Name Type Description Required
objects
(Secrets objects array)
object[] List of secret objects to fetch from the provider
yes

Example

objects:
  - {}

secret-providers.parameters.objects[]: Secrets objects array

List of secret objects to fetch from the provider

Items: Secret object

No properties.

Example

- {}
RATE THIS PAGE
Back to top ↑