Error Management
KafkaStreams
execution ?During the execution of KafkaStreams
instance different types of errors may happens.
7.1 The DeadLetterTopicExceptionHandler
class
Azkarra provides the DeadLetterTopicExceptionHandler
, a DeserializationExceptionHandler
allowing to send corrupted records into a dedicated topic.
By default, the DeadLetterTopicExceptionHandler
will send corrupted records to an sink topic named based on the source topic - i.e: rejected-<source_topic_name>
.
In addition, the DeadLetterTopicExceptionHandler
will enrich corrupted records with Kafka headers to help investigate the cause of the exception.
Header | Type | Description |
---|---|---|
errors.exception.stacktrace |
string | The exception stacktrace. |
errors.exception.message |
string | The exception message. |
errors.timestamp |
string | The current epoch time in millisecond when exception ocurred. |
errors.exception.class.name |
string | The exception class name. |
errors.record.topic |
string | The source topic of the corrupted message. |
errors.record.partition |
string | The source partition of the corrupted message. |
errors.record.offset |
string | The source offset of the corrupted message. |
7.1.1 Configuring DeadLetterTopicExceptionHandler
Property | Type | Description |
---|---|---|
exception.handler.dead.letter.topic |
String | The name of the dead letter topic to be used to write rejected records. |
exception.handler.dead.letter.fatal.errors |
List | List of exception classes on which the handler must fail. |
7.1.2 Configuring KafkaProducer
By default, the DeadLetterTopicExceptionHandler
uses the KafkaProducer
attached to the internal StreamThread
.
A dedicated KafkaProducer
can be created by configuring handler producer using the property prefix exception.handler.dead.letter.
.
7.1.3 Adding custom headers
You can configure additional header to be added to corrupted message using the prefix exception.handler.dead.letter.headers.
.
7.2 The SafeDeserializer
class
Azkarra provides a SafeDeserializer
that can be used to wrap an existing Deserializer
and catch any exception thrown during deserialization for returning a record called a sentinel-object that you filter later in the Topology
(e.g null, “N/A”, -1, etc).
7.2.1 Creating a SafeDeserializer
SafeDeserializer deserializer = new SafeDeserializer<>(
new GenericAvroSerde().deserializer(), // the delegating deserializer
(GenericRecord)null // the sentinel-object to return when an exception is catch
);
7.2.2 Configuring a SafeDeserializer
The sentinel-object to return can also be configured.
SafeDeserializer<Double> deserializer = new SafeDeserializer<>(
Serdes.Double().deserializer(), // the delegating deserializer
Double.class // the value type
);
Map<String, Object> configs = new HashMap<>();
configs.put(SafeDeserializerConfig.SAFE_DESERIALIZER_DEFAULT_VALUE_CONFIG, 0.0);
deserializer.configure(configs, false);
7.2.3 The SafeSerde
class
The SafeSerde
is an utility class allowing you to wrap existing Serde
or Deserializer
.
Behing the scene, SafeSerde
uses the SafeDeserializer
for wrapping existing Deserializer
.
Serde<String> stringSerde = SafeSerdes.Double();
or
SafeSerdes.serdeFrom(Serdes.String(), 0.0);