kafka连接自定义时间戳.extractor

bvjveswy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(284)

在尝试从kafka读取到s3的消息时,向kafka connect类路径添加jar时遇到问题。
目标是基于时间戳在分区中编写消息,时间戳是kafka消息中的密钥的一部分。
为了使故事简短,我必须提供自定义的时间戳提取器。按照这里的文档创建了一个实现 TimestampExtractor 接口,并将jar位置添加到 plugin.path 财产。
问题是,当我启动connect时,找不到类。不知为什么jar不在类路径中

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor

其他数据:
版本:confluent 4.0.0
连接:独立连接
启动命令: sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties 准备好任何帮助。

dtcbnfnu

dtcbnfnu1#

要使自定义时间戳提取器类可用于s3连接器,您需要以下内容:
添加带有自定义类的jar以及其他连接器的依赖项。例子:
保存在 ./share/java/kafka-connect-s3 如果您希望此功能仅在s3连接器中可用,或 ./share/java/kafka-connect-storage-common 使其可用于所有存储接收器连接器(当前为s3和hdfs连接器)。
确保自定义类实现 io.confluent.connect.storage.partitioner.TimestampExtractor 接口。
在设置 timestamp.extractor 属性,当然要确保它与您在jar中定义和打包的包匹配。例如: timestamp.extractor=me.connectors.MyTimestampExtractor 最后,您将遵循类似的过程来为连接器提供一个自定义分区器。

相关问题