我遇到了这样一种情况:所有事件数据都存储在一个s3存储桶中,我需要将其从S3提取到ec2上的Kafka主题。我使用CamelAWSS3Connector,遇到了连接器无法工作的问题。以下是我遇到的错误
[2023-01-06 10:11:21,048] ERROR Failed to create job for config/s3_connect.properties (org.apache.kafka.connect.cli.ConnectStandalone:107)
[2023-01-06 10:11:21,053] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:117)
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/jctools/queues/MessagePassingQueue$Supplier
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:114)
Caused by: java.lang.NoClassDefFoundError: org/jctools/queues/MessagePassingQueue$Supplier
我原以为出版商会把msg推到主题上从s3到Kafka
以下是我的属性文件
name=CamelAwss3SourceConnector
connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
camel.source.maxPollDuration=10000
topics=mytopic
camel.component.aws-s3.access-key=XXXXXXXX
camel.component.aws-s3.region=ap-south-1
camel.source.path.bucketNameOrArn=poc-s3-kafkatopic
camel.source.endpoint.autocloseBody=true
camel.source.endpoint.deleteAfterRead=true
1条答案
按热度按时间sshcrbum1#
确保已将
plugin.path=/path/to/extracted-camel-connector
添加到connect-standalone.properties
文件中。如果这样做不起作用,您需要导出
CLASSPATH
环境变量,以便将jar文件包含在该路径中。