使用kafka的streams api处理错误消息

j2datikz  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(368)

我有一个基本的流处理流程

master topic -> my processing in a mapper/filter -> output topics

我想知道处理“坏消息”的最佳方法。这可能是像我无法正确反序列化的消息之类的事情,或者处理/过滤逻辑以某种意外的方式失败(我没有外部依赖关系,因此不应该有这种类型的暂时错误)。
我正在考虑将我所有的处理/过滤代码 Package 在一个try-catch中,如果引发了异常,则路由到一个“错误主题”。然后我可以研究消息并修改它,或者根据需要修改代码,然后将其重播给master。如果我让任何异常传播,流似乎被阻塞,没有更多的消息被拾取。
这种方法被认为是最佳做法吗?
有没有一个方便的Kafka方法来处理这个问题?我不认为有dlq的概念。。。
有什么方法可以阻止Kafka干扰“坏消息”?
有哪些其他错误处理方法?
为了完整起见,这里是我的代码(伪ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() {
         @Override
         public AnalysedDocument apply(String rawValue) {
             Document document;
             try {
                 // Deserialise
                 document = ...
             } catch (Exception e) {
                 return new AnalysedDocument(rawValue, exception);
             }
             try {
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
             } catch (Exception e) {
                 return new AnalysedDocument(document, exception);
             }
         }
    });

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

非常感谢您的帮助。

relj7zay

relj7zay1#

对于处理逻辑,您可以采用以下方法:

someKStream 

    .mapValues(inputValue -> {
        // for each execution the below "return" could provide a different class than the previous run!
        // e.g. "return isFailedProcessing ? failValue : successValue;" 
        // where failValue and successValue have no related classes
        return someObject; // someObject class vary at runtime depending on your business
    }) // here you'll have KStream<whateverKeyClass, Object> -> yes, Object for the value!

    // you could have a different logic for choosing  
    // the target topic, below is just an example
    .to((k, v, recordContext) -> v instanceof failValueClass ?
            "dead-letter-topic" : "success-topic",
            // you could completelly ignore the "Produced" part 
            // and rely on spring-boot properties only, e.g. 
            // spring.kafka.streams.properties.default.key.serde=yourKeySerde
            // spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
            Produced.with(yourKeySerde, 
                            // JsonSerde could be an instance configured as you need 
                            // (with type mappings or headers setting disabled, etc)
                            new JsonSerde<>()));

你的类,虽然不同,并进入不同的主题,将按预期序列化。
不使用时 to() ,但如果希望继续进行其他处理,则可以使用 branch() 基于Kafka价值类的逻辑分裂;你的诡计 branch() 就是回来 KStream<keyClass, ?>[] 为了进一步允许将单个数组项强制转换到适当的类中。

rxztt3cl

rxztt3cl2#

更新日期:2018年3月23日:Kafka1.0通过kip-161提供了比我下面描述的更好、更容易处理的错误消息(“毒丸”)。请参见kafka 1.0文档中的default.deserialization.exception.handler。
这可能是一些我无法正确反序列化的消息[…]
好的,我的答案集中在(反)序列化问题上,因为这可能是大多数用户要处理的最棘手的场景。
[…]或者处理/过滤逻辑以某种意外的方式失败(我没有外部依赖关系,因此不应该有这种类型的暂时错误)。
同样的思想(对于反序列化)也可以应用于处理逻辑中的失败。在这里,大多数人倾向于下面的选项2(减去反序列化部分),但是ymmv。
我正在考虑将我所有的处理/过滤代码 Package 在一个try-catch中,如果引发了异常,则路由到一个“错误主题”。然后我可以研究消息并修改它,或者根据需要修改代码,然后将其重播给master。如果我让任何异常传播,流似乎被阻塞,没有更多的消息被拾取。
这种方法被认为是最佳做法吗?
是的,目前是这样。基本上,两种最常见的模式是(1)跳过损坏的消息或(2)将损坏的记录发送到隔离主题(也称为死信队列)。
有没有一个方便的Kafka方法来处理这个问题?我不认为有dlq的概念。。。
是的,有一种方法可以处理这个问题,包括使用死信队列。然而,它(至少是imho)还没有那么方便。如果您对api应如何允许您处理此问题有任何反馈(例如,通过新的或更新的方法、配置设置(“如果序列化/反序列化失败,请将有问题的记录发送到此隔离主题”)——请告知我们。:-)
有什么方法可以阻止Kafka干扰“坏消息”?
有哪些其他错误处理方法?
请看下面的例子。
fwiw,kafka社区也在讨论添加一个新的cli工具,允许您跳过损坏的消息。但是,作为kafka streams api的用户,我认为理想情况下,您希望直接在代码中处理此类场景,并且仅作为最后的手段而回退到cli实用程序。
以下是kafka streams dsl处理损坏的记录/消息(也称为“毒丸”)的一些模式。这是从http://docs.confluent.io/current/streams/faq.html#handling-损坏的记录和反序列化错误
选项1:跳过损坏的记录 flatMap 这可以说是大多数用户想要做的。
我们使用 flatMap 因为它允许每个输入记录输出零个、一个或多个输出记录。对于损坏的记录,我们什么也不输出(零记录),因此忽略/跳过损坏的记录。
与这里列出的其他方法相比,这种方法的好处是:我们只需要手动反序列化一次记录!
这种方法的缺点: flatMap “标记”输入流以进行潜在的数据重新分区,即如果执行基于键的操作(如分组)( groupBy / groupByKey )或之后加入,您的数据将在幕后重新分区。由于这可能是一个昂贵的步骤,我们不希望发生不必要的。如果您知道记录键总是有效的,或者您不需要对这些键进行操作(因此将它们作为“原始”键保留在 byte[] 格式),您可以从 flatMapflatMapValues ,即使稍后加入/分组/聚合流,也不会导致数据重新分区。
代码示例:

Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();

// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);

// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
    (k, v) -> {
      try {
        // Attempt deserialization
        String key = stringSerde.deserializer().deserialize(inputTopic, k);
        long value = longSerde.deserializer().deserialize(inputTopic, v);

        // Ok, the record is valid (not corrupted).  Let's take the
        // opportunity to also process the record in some way so that
        // we haven't paid the deserialization cost just for "poison pill"
        // checking.
        return Collections.singletonList(KeyValue.pair(key, 2 * value));
      }
      catch (SerializationException e) {
        // log + ignore/skip the corrupted message
        System.err.println("Could not deserialize record: " + e.getMessage());
      }
      return Collections.emptyList();
    }
);

选项2:死信队列 branch 与选项1(忽略损坏的记录)相比,选项2通过将损坏的消息从“主”输入流中过滤出来并将其写入隔离主题(想想:死信队列)来保留损坏的消息。缺点是,对于有效记录,我们必须支付两次手动反序列化成本。

KStream<byte[], byte[]> input = ...;

KStream<byte[], byte[]>[] partitioned = input.branch(
    (k, v) -> {
      boolean isValidRecord = false;
      try {
        stringSerde.deserializer().deserialize(inputTopic, k);
        longSerde.deserializer().deserialize(inputTopic, v);
        isValidRecord = true;
      }
      catch (SerializationException ignored) {}
      return isValidRecord;
    },
    (k, v) -> true
);

// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records.  partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
    (key, value) -> KeyValue.pair(
        // Must deserialize a second time unfortunately.
        stringSerde.deserializer().deserialize(inputTopic, key),
        2 * longSerde.deserializer().deserialize(inputTopic, value)));

// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");

选项3:跳过损坏的记录 filter 我提这个只是为了完整。此选项看起来像选项1和2的组合,但比其中任何一个都差。与选项1相比,您必须为有效记录支付两次手动反序列化成本(糟糕!)。与选项2相比,您将无法在死信队列中保留损坏的记录。

KStream<byte[], byte[]> validRecordsOnly = input.filter(
    (k, v) -> {
      boolean isValidRecord = false;
      try {
        bytesSerde.deserializer().deserialize(inputTopic, k);
        longSerde.deserializer().deserialize(inputTopic, v);
        isValidRecord = true;
      }
      catch (SerializationException e) {
        // log + ignore/skip the corrupted message
        System.err.println("Could not deserialize record: " + e.getMessage());
      }
      return isValidRecord;
    }
);
KStream<String, Long> doubled = validRecordsOnly.map(
    (key, value) -> KeyValue.pair(
        // Must deserialize a second time unfortunately.
        stringSerde.deserializer().deserialize(inputTopic, key),
        2 * longSerde.deserializer().deserialize(inputTopic, value)));

非常感谢您的帮助。
我希望我能帮上忙。如果是的话,我将非常感谢您对我们如何改进kafka streams api的反馈,以便以比现在更好/更方便的方式处理失败/异常。:-)

wb1gzix0

wb1gzix03#

目前,kafka streams只提供有限的错误处理功能。简化这一点的工作正在进行中。目前来看,你的整体方法似乎是一个不错的选择。
关于处理反序列化错误的一个注解:手动处理这些错误,需要“手动”进行反序列化。这意味着您需要配置 ByteArraySerde 为您输入/输出流应用程序主题的键和值,并添加 map() 进行反序列化(即, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> --或者反过来,如果您还想捕获序列化异常)。否则,你不能 try-catch 反序列化异常。
使用当前的方法,“只”验证给定的字符串是否表示有效的文档,但可能是消息本身已损坏,无法转换为文档 String 首先在源操作符中。因此,在代码中实际上并不包含反序列化异常。但是,如果您确信反序列化异常永远不会发生,那么您的方法也就足够了。
更新
这个问题通过kip-161解决,并将包含在下一个版本1.0.0中。它允许您通过参数注册回调 default.deserialization.exception.handler . 每次反序列化过程中发生异常时都会调用处理程序,并允许您返回 DeserializationResponse ( CONTINUE ->把唱片丢下去,或者 FAIL 这是默认设置)。
更新2
使用kip-210(将是kafka1.1的一部分),也可以通过注册 ProductionExceptionHandler 通过配置 default.production.exception.handler 那会回来的 CONTINUE .

ibps3vxo

ibps3vxo4#

我不相信这些例子在与avro一起工作时能起到任何作用。
当模式无法解析时(例如,存在损坏主题的坏消息/非avro消息),没有 key 或者 value 首先要反序列化,因为当dsl .branch() 代码被调用时,异常已经被抛出(或处理)。
有人能证实我是否真的这样做了吗?你在这里提到的非常流畅的方法在使用avro时是不可能的?
kip-161确实解释了如何使用处理程序,但是,将其视为拓扑的一部分要流畅得多。

相关问题