Streaming features in Featureform
Build stream-backed features with Kafka, streaming transformations, and Redis serving.
Featureform supports stream-backed feature workflows for use cases that need continuously updated online values while preserving historical correctness for training.
This page focuses on the documented SDK surface: register Kafka, define a streaming transformation, define features with from_stream(...), optionally backfill from a batch source, materialize a feature view, and serve it from Redis.
Prerequisites
Before you build streaming features, you need:
- a running Featureform deployment
- a registered Redis online store
- a stream-capable compute provider such as Spark or Databricks
- a Kafka topic that contains the events you want to transform into features
Register Kafka
from featureform.config.file_stores import KafkaConfig
kafka = ff.register_kafka(
name="transactions-kafka",
kafka_config=KafkaConfig(
bootstrap_servers=["kafka-1:9092", "kafka-2:9092"],
use_msk_iam_auth=False,
options={
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
},
),
)
Register a Kafka topic
transactions_stream = kafka.register_kafka_topic(
name="transactions-stream",
topic="transactions",
description="Raw transaction events from Kafka",
)
Define a streaming transformation
@spark.streaming_sql_transformation(
name="parsed_transactions",
inputs=[transactions_stream],
)
def parsed_transactions(source):
return """
SELECT
get_json_object(CAST(value AS STRING), '$.user_id') AS user_id,
CAST(get_json_object(CAST(value AS STRING), '$.amount') AS DOUBLE) AS amount,
CAST(get_json_object(CAST(value AS STRING), '$.event_time') AS TIMESTAMP) AS event_time
FROM {{ source }}
"""
Define features from a stream
from datetime import timedelta
window = timedelta(days=7)
@ff.entity
class User:
rolling_amount = (
ff.Feature()
.from_stream(
parsed_transactions,
entity="user_id",
values="amount",
timestamp="event_time",
)
.aggregate(
function=ff.AggregateFunction.SUM,
windows=[window],
)
)
Backfill from batch history
If the stream is the online source of truth but you already have historical data in batch storage, backfill the stream-backed feature:
historical_transactions = spark.register_delta_table(
name="historical_transactions",
database="ml_catalog.featureform",
table="historical_transactions",
)
@ff.entity
class User:
rolling_amount = (
ff.Feature()
.from_stream(
parsed_transactions,
entity="user_id",
values="amount",
timestamp="event_time",
)
.backfill_from(
source=historical_transactions,
entity="user_id",
values="amount",
timestamp="event_time",
)
.aggregate(
function=ff.AggregateFunction.SUM,
windows=[window],
)
)
Register metadata and materialize
client.apply()
client.materialize_feature_view(
view_name="user_streaming_features",
inference_store=redis,
features=[User.rolling_amount[window]],
)
Serve streaming feature values
result = client.serve_feature_view(
view="user_streaming_features",
entity_ids=["user_1", "user_2"],
)
Guidance
- Parse and cast stream payloads in the transformation layer.
- Reuse one transformed stream for multiple features.
- Keep backfill semantics aligned with the stream definition so training and serving stay consistent.
- Avoid undocumented parameters or status names in production documentation and examples.