拓扑中发生异常时跳过记录

vom3gejh  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(358)

我们正在编写一个kafka流拓扑,它聚合数据并实时显示它们。我们希望使显示尽可能健壮—理想情况下记录记录并继续处理任何异常。
根据文件,我们和
处理kafka流中的异常
使用kafka的streams api处理错误消息
https://groups.google.com/g/confluent-platform/c/p75clej9yu0
kafka流非常支持处理在生产者中或反序列化期间发生的异常。提供的 LogAndContinueExceptionHandler 给我们想要的行为。然而,我们的主要问题是在处理过程中发生的异常(例如 .mapValues() 或者 .leftJoin() 我们的想法基本上是验证先决条件
在反序列化过程中,如果未实现,则引发反序列化异常(并记录并继续)。
如果无法执行计算,as将检查处理函数以返回默认值( / by zero error 等)
但是,如果数据中有不可预见的东西,异常仍然可能出现,拓扑将关闭。
Kafka流提供了一个 UncaughtExceptionHandler 但它是在线程已经死亡之后调用的,因此不能用于防止拓扑关闭。
是否有某种方法可以编写跳过记录的uncaughtexceptionhandler?或者是一种跳过当前记录的机制 try-catch 处理函数内部的块?

ecbunoof

ecbunoof1#

我认为最好的解决方案是以从不抛出任何异常的方式编写处理操作(例如:mapper、filter等)。为此,可以使用一个 Package 器对象,它可以是成功的,也可以是错误的(例如 Either 键入scala)。之后,你可以使用 branch() 方法获取两个流:一个用于成功记录,另一个用于错误。
下面的代码显示了基本思想:

public static void main(String[] args) {
        var builder = new StreamsBuilder();
        KStream<Object, Result<Object>> stream = builder.stream("my-topic")
            .map((k, v) -> {
                try {
                    // unsafe operation, i.e that may throw an exception
                    return KeyValue.pair(k, new Success<>(v));
                } catch (Exception e) {
                    return KeyValue.pair(k, new Error<>(e));
                }
            });
        KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());

        // Handle the success steam
        KStream<Object, Result<Object>> successStream = branch[0];

        // Handle the error steam, e.g:  log errors, write errors to a Dead Letter Queue
        KStream<Object, Result<Object>> errorStream = branch[1];

    }

    public interface Result<T> {
        T get() throws Exception;
        Exception exception();
        boolean hasError();
    }

    public static class Success<T> implements Result<T> {

        private final T value;

        public Success(T value) {
            this.value = value;
        }

        @Override
        public T get() throws Exception {
            return value;
        }

        @Override
        public Exception exception() {
            return null;
        }

        @Override
        public boolean hasError() {
            return false;
        }
    }

    public static class Error<T> implements Result<T> {

        private final Exception error;

        public Error(Exception error) {  this.error = error; }

        @Override
        public T get() throws Exception{
            throw error;
        }

        @Override
        public Exception exception() {
            return error;
        }

        @Override
        public boolean hasError() {
            return true;
        }
    }

此外,对于您提到的反序列化异常,azkarra streams项目提供了一些方便的java类(例如safeserdes、deadletteropicexceptionhandler):github

相关问题