logstash5.x如何与kafka0.10.1.1集成?无法输出消息?

exdqitrt  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(289)

那些日子我一直在努力融入 Logstash5.xKafka_0.10.1.1 ,在完成就绪环境和更新设置后似乎没有什么问题,并通过控制台shell脚本检查kafka,它可以生成和使用消息。然后启动logstash代理尝试收集消息,但启动失败,它的完整输出消息如下:

[2017-02-04T14:16:51,575][INFO ][org.apache.kafka.clients.producer.ProducerConfig] ProducerConfig values: 
        acks = 1
        batch.size = 16384
        block.on.buffer.full = false
        bootstrap.servers = [xxxxx:9093, xxxxxxx:90902]
        buffer.memory = 33554432
        client.id = producer-1
        compression.type = snappy
        connections.max.idle.ms = 540000
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.fetch.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 10
        request.timeout.ms = 30000
        retries = 5
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        timeout.ms = 30000
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[2017-02-04T14:16:51,820][INFO ][org.apache.kafka.clients.producer.KafkaProducer] Closing the Kafka producer with timeoutMillis = 0 ms.
[2017-02-04T14:16:51,830][ERROR][logstash.outputs.kafka   ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}
[2017-02-04T14:16:51,859][ERROR][logstash.agent           ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:338)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/share/logstash/plugin/logstash-output-kafka/lib/logstash/outputs/kafka.rb:242)", "RUBY.register(/usr/share/logstash/plugin/logstash-output-kafka/lib/logstash/outputs/kafka.rb:178)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:8)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:37)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:229)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:229)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:183)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:292)", "java.lang.Thread.run(java/lang/Thread.java:745)"]}
[2017-02-04T14:16:51,953][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-02-04T14:16:54,919][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

出现了一些相关信息:
各种版本: Logstash5.2 , Kafka_0.10.1.1 , Logstash-output-kafka-6.1.3 日志存储配置文件:
输入{file{path=>“/data/tmp/*.log”start\u position=>beging codec=>“json”}
过滤器{json{source=>message}
output{kafka{bootstrap\u servers=>“test-broker1.sao.so:9093,test-broker2.sao.so:90902”topic\u id=>“logstash test”compression\u type=>“snappy”retries=>5 message\u key=>“logstash test”codec=>plain{format=>“%{message}}}}
有什么配置错误吗?我现在能做什么。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题