Flume连接拒绝Kafka经纪人

xdnvmnnf  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(426)

我正在尝试将消息从apacheflume发送到我的apachekafka示例。当通过localhost在本地完成这些操作时,我没有任何问题。当我在我的机器上从不同的vm尝试这个方法时,我在flume的调试日志中得到拒绝的连接。就演示而言,我只是通过telnet向我的flume示例发送消息。
172.16.26.1是vms看到的mac的ip
172.16.26.138是运行kafka/zookeeper的vm的ip
172.16.26.139是运行flume的vm的ip
Flume配置


# Name the components of this agent

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    #Describe/configure the source
    a1.sources.r1.type = netcat 
    a1.sources.r1.port = 44444
    a1.sources.r1.bind = 0.0.0.0 
    a1.sources.r1.host = 172.16.26.1 

    #Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = macdemo
    a1.sinks.k1.kafka.bootstrap.servers = 172.16.26.138:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy

    #Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    #Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Kafka配置(关键部件,其他都是默认配置)

broker.id=0
    listeners=PLAINTEXT://localhost:9092
    zookeeper.connect=localhost:2181

这是代理启动时从flume打印出来的配置

compression.type = snappy
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [172.16.26.138:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = 
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLS
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    acks = 1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 1

有效的方案
flume/kafka/zookeeper都在mac->works本地运行
flume在vm中(在mac上),kafka/zookeeper从mac->works本地运行
虚拟机中的flume(在mac上),虚拟机中的kafka/zookeeper(在mac上)->连接被拒绝
这是我得到的输出错误

2018-02-26 14:23:47,009 (kafka-producer-network-thread | producer-1) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)]            Connection with /172.16.26.138 disconnected
    java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
    at java.lang.Thread.run(Thread.java:748)
    2018-02-26 14:23:47,010 (kafka-producer-network-thread | producer-1) [DEBUG - org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:454)] Node -1 disconnected.
    2018-02-26 14:23:47,011 (kafka-producer-network-thread | producer-1) [DEBUG - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:608)] Give up sending metadata request since no node is available

我确信我忽略了一些非常简单的事情,但我已经花了好几天没有从谷歌博士那里得到任何运气。关于虚拟机,它们在centos 7上运行,我已经禁用了firewalld.service和selinux。

vi4fp9gy

vi4fp9gy1#

我发现问题出在听众身上=plaintext://localhost:9092设置。将localhost更改为vm的实际ip地址(kafka的ip)有效。

相关问题