This article addresses the common problem of schema proliferation in event-driven architectures utilizing Apache Kafka and Flink, where a one-to-one event-to-schema mapping leads to significant maintenance overhead and query complexity. It proposes a solution using consolidated schemas with discriminator fields and nullable attribute blocks, enhancing schema evolution and simplifying consumer querying. The approach emphasizes consumer-centric schema design over producer-centric, making event processing pipelines more manageable at scale.
Read original on InfoQ ArchitectureIn event-driven systems, particularly those built with Apache Kafka and Flink, a common anti-pattern emerges as the system scales: schema proliferation. This occurs when each specific event variant (e.g., `RideAcceptedStandardEvent`, `RideAcceptedSharedEvent`) is assigned its own unique schema. While seemingly straightforward initially, this one-to-one mapping leads to a combinatorial explosion of schemas as event types and subtypes grow.
The primary symptoms of schema proliferation include:
The proposed solution is a consolidated schema design pattern. Instead of one schema per variant, a single, unified schema is used for a logical domain (e.g., `DriverRideActivityRecord` for all ride activity events). This consolidated schema incorporates several key elements:
Benefits of Enum Discriminators
Using enums for discriminator fields (instead of free-form strings) provides compile-time safety, enables efficient predicate filtering in downstream query engines, and explicitly documents valid values. This prevents silent data quality issues caused by inconsistent string values from producers.
In a Kafka-Flink pipeline, schema consolidation is implemented in the Flink processing layer. This typically involves a layered adapter design:
public class SharedRideAcceptedAdapter implements RecordAdapter<DriverRideAcceptedSharedEvent, DriverRideActivityRecord> {
@Override
public DriverRideActivityRecord adapt(String orgId, DriverRideAcceptedSharedEvent event) {
DriverRideActivityRecord record = new DriverRideActivityRecord();
record.setEventType(EventType.ACCEPTED);
record.setRideType(RideType.SHARED);
// ... populate shared and specific attributes ...
return record;
}
}