在尝试从kafka读取到s3的消息时,向kafka connect类路径添加jar时遇到问题。
目标是基于时间戳在分区中编写消息,时间戳是kafka消息中的密钥的一部分。
为了使故事简短,我必须提供自定义的时间戳提取器。按照这里的文档创建了一个实现 TimestampExtractor
接口,并将jar位置添加到 plugin.path
财产。
问题是,当我启动connect时,找不到类。不知为什么jar不在类路径中
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
其他数据:
版本:confluent 4.0.0
连接:独立连接
启动命令: sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
准备好任何帮助。
1条答案
按热度按时间dtcbnfnu1#
要使自定义时间戳提取器类可用于s3连接器,您需要以下内容:
添加带有自定义类的jar以及其他连接器的依赖项。例子:
保存在
./share/java/kafka-connect-s3
如果您希望此功能仅在s3连接器中可用,或./share/java/kafka-connect-storage-common
使其可用于所有存储接收器连接器(当前为s3和hdfs连接器)。确保自定义类实现
io.confluent.connect.storage.partitioner.TimestampExtractor
接口。在设置
timestamp.extractor
属性,当然要确保它与您在jar中定义和打包的包匹配。例如:timestamp.extractor=me.connectors.MyTimestampExtractor
最后,您将遵循类似的过程来为连接器提供一个自定义分区器。