Redis Data Integration configuration file
Redis Data Integration configuration file reference
Configuration file for Redis Data Integration (RDI) source collectors and target connections
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| sources (Source collectors) |
object |
Defines source collectors and their configurations. Each key represents a unique source identifier, and its value contains specific configuration for that collector |
|
| processors (Data processing configuration) |
object, null |
Configuration settings that control how data is processed, including batch sizes, error handling, and performance tuning |
|
| targets (Target connections) |
object |
Configuration for target Redis databases where processed data will be written |
|
| secret-providers (Secret providers) |
object |
Configuration for secret management providers |
Additional Properties: not allowed
sources: Source collectors
Defines source collectors and their configurations. Each key represents a unique source identifier, and its value contains specific configuration for that collector
Properties (key: .*)
| Name | Type | Description | Required |
|---|---|---|---|
| connection | yes | ||
| type (Collector type) |
string |
Type of the source collector. Default: "cdc"Enum: "cdc", "flink" |
yes |
| active (Collector enabled) |
boolean |
Flag to enable or disable the source collector Default: true |
no |
| logging (Logging configuration) |
object |
Logging configuration for the source collector |
no |
| tables (Tables to capture) |
object |
Defines which tables to capture and how to handle their data |
no |
| schemas (Schema names) |
string[] |
Schema names to capture from the source database (schema.include.list) |
no |
| databases (Database names) |
string[] |
Database names to capture from the source database (database.include.list) |
no |
| advanced (Advanced configuration) |
object |
Advanced configuration options for fine-tuning the collector |
no |
sources.logging: Logging configuration
Logging configuration for the source collector
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| level (Logging level) |
string |
Logging level for the source collector Default: "info"Enum: "trace", "debug", "info", "warn", "error" |
Additional Properties: not allowed
Example
level: info
sources.tables: Tables to capture
Defines which tables to capture and how to handle their data
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | object, null |
sources.tables.additionalProperties: object,null
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| snapshot_sql | string |
Custom SQL statement to use for the initial data snapshot, allowing fine-grained control over what data is captured |
|
| columns (Columns to capture) |
string[] |
List of specific columns to capture for changes. If not specified, all columns will be captured. Note: This property cannot be used for MongoDB connections |
|
| exclude_columns (Columns to exclude) |
string[] |
List of specific columns to exclude from capture. If not specified, no columns will be excluded. Note: This property can only be used for MongoDB connections |
|
| keys (Message keys) |
string[] |
Optional list of columns to use as a composite unique identifier. Only required when the table lacks a primary key or unique constraint. Must form a unique combination of fields |
Additional Properties: not allowed
sources.tables.additionalProperties.columns[]: Columns to capture
List of specific columns to capture for changes. If not specified, all columns will be captured. Note: This property cannot be used for MongoDB connections
sources.tables.additionalProperties.exclude_columns[]: Columns to exclude
List of specific columns to exclude from capture. If not specified, no columns will be excluded. Note: This property can only be used for MongoDB connections
sources.tables.additionalProperties.keys[]: Message keys
Optional list of columns to use as a composite unique identifier. Only required when the table lacks a primary key or unique constraint. Must form a unique combination of fields
sources.schemas[]: Schema names
Schema names to capture from the source database (schema.include.list)
sources.databases[]: Database names
Database names to capture from the source database (database.include.list)
sources.advanced: Advanced configuration
Advanced configuration options for fine-tuning the collector
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| sink (RDI Collector stream writer configuration) |
object |
Advanced configuration properties for RDI Collector stream writer connection and behaviour. When using collector type 'cdc', see the full list of properties at - https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream . When using a property from that list, remove the debezium.sink. prefix. When using collector type 'flink', see the full list of properties at |
|
| source (Advanced source settings) |
object |
Advanced configuration properties for the source database connection and CDC behavior |
|
| quarkus (Quarkus runtime settings) |
object |
Advanced configuration properties for the Quarkus runtime environment |
|
| flink (Advanced Flink settings) |
object |
Advanced configuration properties for Flink |
|
| java_options (Advanced Java options) |
string |
These Java options will be passed to the command line command when launching the source collector |
Additional Properties: not allowed
Minimal Properties: 1
Example
sink: {}
source: {}
quarkus: {}
flink: {}
sources.advanced.sink: RDI Collector stream writer configuration
Advanced configuration properties for RDI Collector stream writer connection and behaviour. When using collector type 'cdc', see the full list of properties at - https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream . When using a property from that list, remove the debezium.sink. prefix. When using collector type 'flink', see the full list of properties at
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
sources.advanced.source: Advanced source settings
Advanced configuration properties for the source database connection and CDC behavior
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
sources.advanced.quarkus: Quarkus runtime settings
Advanced configuration properties for the Quarkus runtime environment
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
sources.advanced.flink: Advanced Flink settings
Advanced configuration properties for Flink
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
processors: Data processing configuration
Configuration settings that control how data is processed, including batch sizes, error handling, and performance tuning
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| type (Processor type) |
string |
Processor type, either 'classic' or 'flink' Default: "classic"Enum: "classic" |
|
| on_failed_retry_interval (Retry interval on failure) |
integer, string |
Number of seconds to wait before retrying a failed operation Default: 5Pattern: ^\${.*}$Minimum: 1 |
|
| read_batch_size | integer, string |
Maximum number of records to process in a single batch Default: 2000Pattern: ^\${.*}$Minimum: 1 |
|
| read_batch_timeout_ms (Read batch timeout) |
integer |
Maximum time in milliseconds to wait for a batch to fill before processing Default: 100Minimum: 1 |
|
| enable_async_processing | boolean |
Enable async processing to improve throughput Default: true |
|
| batch_queue_size | integer |
Maximum number of batches to queue for processing Default: 3Minimum: 1 |
|
| ack_queue_size | integer |
Maximum number of batches to queue for asynchronous acknowledgement Default: 10Minimum: 1 |
|
| dedup (Enable deduplication) |
boolean |
Enable the deduplication mechanism to handle duplicate records Default: false |
|
| dedup_max_size (Deduplication set size) |
integer |
Maximum number of entries to store in the deduplication set Default: 1024Minimum: 1 |
|
| dedup_strategy (Deduplication strategy) |
string |
(DEPRECATED) Property 'dedup_strategy' is now deprecated. The only supported strategy is 'ignore'. Please remove from the configuration. Default: "ignore"Enum: "reject", "ignore" |
|
| duration (Batch duration limit) |
integer, string |
Maximum time in milliseconds to wait for a batch to fill before processing Default: 100Pattern: ^\${.*}$Minimum: 1 |
|
| write_batch_size | integer, string |
Maximum number of records to write to target Redis database in a single batch Default: 200Pattern: ^\${.*}$Minimum: 1 |
|
| error_handling (Error handling strategy) |
string |
Strategy for handling errors: ignore to skip errors, dlq to store rejected messages in dead letter queue Default: "dlq"Pattern: ^\${.*}$|ignore|dlq |
|
| dlq_max_messages (DLQ message limit) |
integer, string |
Maximum number of messages to store in dead letter queue per stream Default: 1000Pattern: ^\${.*}$Minimum: 1 |
|
| target_data_type (Target Redis data type) |
string |
Data type to use in Redis: hash for Redis Hash, json for RedisJSON (requires RedisJSON module) Default: "hash"Pattern: ^\${.*}$|hash|json |
|
| json_update_strategy | string |
Strategy for updating JSON data in Redis: replace to overwrite the entire JSON object, merge to merge new data with existing JSON object Default: "replace"Pattern: ^\${.*}$|replace|merge |
|
| initial_sync_processes | integer, string |
Number of parallel processes for performing initial data synchronization Default: 4Pattern: ^\${.*}$Minimum: 1Maximum: 32 |
|
| idle_sleep_time_ms (Idle sleep interval) |
integer, string |
Time in milliseconds to sleep between processing batches when idle Default: 200Pattern: ^\${.*}$Minimum: 1Maximum: 999999 |
|
| idle_streams_check_interval_ms (Idle streams check interval) |
integer, string |
Time in milliseconds between checking for new streams when processor is idle Default: 1000Pattern: ^\${.*}$Minimum: 1Maximum: 999999 |
|
| busy_streams_check_interval_ms (Busy streams check interval) |
integer, string |
Time in milliseconds between checking for new streams when processor is busy Default: 5000Pattern: ^\${.*}$Minimum: 1Maximum: 999999 |
|
| wait_enabled (Enable replica wait) |
boolean |
Enable verification that data has been written to replica shards of the target database Default: false |
|
| wait_timeout (Replica wait timeout) |
integer, string |
Maximum time in milliseconds to wait for replica write verification of the target database Default: 1000Pattern: ^\${.*}$Minimum: 1 |
|
| retry_max_attempts (Maximum retry attempts) |
integer, string |
Maximum number of attempts for failed operations Default: 5Pattern: ^\${.*}$Minimum: 1 |
|
| retry_initial_delay_ms (Initial retry delay) |
integer, string |
Initial delay in milliseconds before retrying a failed operation Default: 1000Pattern: ^\${.*}$Minimum: 1Maximum: 999999 |
|
| retry_max_delay_ms (Maximum retry delay) |
integer, string |
Maximum delay in milliseconds between retry attempts Default: 10000Pattern: ^\${.*}$Minimum: 1Maximum: 999999 |
|
| retry_on_replica_failure | boolean |
Continue retrying writes until successful replication to replica shards is confirmed Default: true |
|
| logging (Logging configuration) |
object |
Logging configuration for the processor |
|
| use_native_json_merge (Use native JSON merge from RedisJSON module) |
boolean |
Controls whether to use the native JSON.MERGE command (when true) or Lua scripts (when false) for JSON merge operations. Introduced in RDI 1.15.0. The native command provides 2x performance improvement but handles null values differently:Previous behavior (Lua merge): When merging {"field1": "value1", "field2": "value2"} with {"field2": null, "field3": "value3"}, the result was {"field1": "value1", "field2": null, "field3": "value3"} (null value is preserved)New behavior (JSON.MERGE): The same merge produces {"field1": "value1", "field3": "value3"} (null value removes the field, following RFC 7396)Note: The native JSON.MERGE command requires RedisJSON 2.6.0 or higher. If the target database has an older version of RedisJSON, RDI will automatically fall back to using Lua-based merge operations regardless of this setting.Impact: If your application logic distinguishes between a field with a null value and a missing field, you may need to adjust your data handling. This follows the JSON Merge Patch RFC standard but differs from the previous Lua implementation. Set to false to revert to the previous Lua-based merge behavior if needed.Default: true |
|
| advanced (Advanced configuration) |
object |
Advanced configuration options for fine-tuning the processor |
Additional Properties: not allowed
processors.logging: Logging configuration
Logging configuration for the processor
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| level (Logging level) |
string |
Logging level for the processor Default: "info"Enum: "trace", "debug", "info", "warn", "error" |
Additional Properties: not allowed
Example
level: info
processors.advanced: Advanced configuration
Advanced configuration options for fine-tuning the processor
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| source (Advanced source settings) |
object |
Advanced configuration properties for the source |
|
| sink (Advanced sink settings) |
object |
Advanced configuration properties for the sink |
|
| processor (Advanced processor settings) |
object |
Advanced configuration properties for the processor |
Additional Properties: not allowed
Minimal Properties: 1
Example
source: {}
sink: {}
processor: {}
processors.advanced.source: Advanced source settings
Advanced configuration properties for the source
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
processors.advanced.sink: Advanced sink settings
Advanced configuration properties for the sink
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
processors.advanced.processor: Advanced processor settings
Advanced configuration properties for the processor
Additional Properties
| Name | Type | Description | Required |
|---|---|---|---|
| Additional Properties | string, number, boolean |
targets: Target connections
Configuration for target Redis databases where processed data will be written
Properties (Pattern)
| Name | Type | Description | Required |
|---|---|---|---|
| .* |
secret-providers: Secret providers
Configuration for secret management providers
Properties (key: .*)
| Name | Type | Description | Required |
|---|---|---|---|
| type (Provider type) |
string |
Type of secret provider service Enum: "aws", "vault" |
yes |
| parameters (Provider parameters) |
object |
Configuration parameters for the secret provider |
yes |
secret-providers.parameters: Provider parameters
Configuration parameters for the secret provider
Properties
| Name | Type | Description | Required |
|---|---|---|---|
| objects (Secrets objects array) |
object[] |
List of secret objects to fetch from the provider |
yes |
Example
objects:
- {}
secret-providers.parameters.objects[]: Secrets objects array
List of secret objects to fetch from the provider
Items: Secret object
No properties.
Example
- {}