如何为kafka 2.2实现flinkkafkaproducer序列化程序

xxslljrj  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(432)

我一直在更新一个flink处理器(flink版本1.9),它从kafka读取数据,然后写入kafka。我们已经编写了这个处理器来运行kafka 0.10.2集群,现在我们已经部署了一个运行版本2.2的新kafka集群。因此,我着手更新处理器以使用最新的flinkkafcummer和flinkkafcaproducer(正如flink文档所建议的那样)。但是我和Kafka制作人遇到了一些问题。我无法让它使用不推荐的构造函数序列化数据(这并不奇怪),而且我也无法在网上找到任何关于如何实现序列化程序的实现或示例(所有示例都使用较旧的kafka连接器)
当前的实现(对于kafka 0.10.2)如下

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

在尝试实现以下flinkkafkaproducer时

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

我得到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

我也不知道为什么。flinkkafkaproducer的构造函数也被弃用了,当我尝试实现非弃用的构造函数时,我不知道如何序列化数据。下面是它的外观:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

但我不知道如何实现Kafka塞利zationschema,我在网上和flink文档中都没有找到这样的例子。
有没有人有过实现这一点的经验,或者关于为什么flinkproducer在这个步骤中得到nullpointerexception的一些技巧?

ryhaxcpt

ryhaxcpt1#

在flinkkafkaproducer.semantic.exactly\的情况下处理超时,您应该阅读一次https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-011及更新版本,尤其是本部分:
semantic.unce模式依赖于提交事务的能力,这些事务是在执行检查点之前、从所述检查点恢复之后启动的。如果flink应用程序崩溃和完成重启之间的时间大于kafka的事务超时时间,则会丢失数据(kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。
默认情况下,kafka代理将transaction.max.timeout.ms设置为15分钟。此属性不允许为大于其值的生产者设置事务超时。flinkkafkaproducer011默认情况下将producer config中的transaction.timeout.ms属性设置为1小时,因此在使用semantic.u once模式之前,transaction.max.timeout.ms应该增加。

68bkxrlz

68bkxrlz2#

如果您只是向Kafka发送字符串:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;   

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

用于发送java对象:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

在你的代码里

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

相关问题