连接问题

tcomlyy6  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(260)

我想在Kafka和萨姆扎之间创造一种融合。更准确地说,我试图创建一个samza任务,它从一个kafka主题中读取内容,然后简单地写入另一个主题。
kafka-na-samza位于同一局域网的不同服务器上。
作为消费者连接到kafka时没有问题,但是作为生产者尝试连接时发生错误。

public class testTask implements StreamTask
    {

    @SuppressWarnings("unchecked")
    @Override
    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
        String message = (String) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "test"), message));
    }
}

配置文件:


# Job

job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=testTask

# Task

task.class=testTask 
task.inputs=kafka.testTask

# Metrics

metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers

serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory

# Systems

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=kafka_system_ip:2181
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=kafka_system_ip:9092
systems.kafka.producer.producer.type=sync

# Normally, we'd set this much higher, but we want things to look snappy in the demo.

systems.kafka.producer.batch.num.messages=1
systems.kafka.streams.metrics.samza.msg.serde=metrics

# Declare that we want our job's checkpoints to be written to Kafka

task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka

# By default, a checkpoint is written every 60 seconds. You can change this if you like.

task.commit.ms=60000

# Job Coordinator

job.coordinator.system=kafka

# Normally, this would be 3, but we have only one broker.

job.coordinator.replication.factor=1

任务日志中包含以下信息:

2015-12-28 19:57:29 CoordinatorStreamSystemProducer [INFO] Starting coordinator stream producer.
2015-12-28 19:57:29 KafkaSystemProducer [INFO] Creating a new producer for system kafka.
2015-12-28 19:57:29 ProducerConfig [INFO] ProducerConfig values:
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        block.on.buffer.full = true
        retry.backoff.ms = 100
        buffer.memory = 33554432
        batch.size = 16384
        metrics.sample.window.ms = 30000
        metadata.max.age.ms = 300000
        receive.buffer.bytes = 32768
        timeout.ms = 30000
        max.in.flight.requests.per.connection = 1
        bootstrap.servers = [kafka_system_ip:9092]
        metric.reporters = []
        client.id = samza_producer-testTask-1-1451325449607-4
        compression.type = none
        retries = 2147483647
        max.request.size = 1048576
        send.buffer.bytes = 131072
        acks = 1
        reconnect.backoff.ms = 10
        linger.ms = 0
        metrics.num.samples = 2
        metadata.fetch.timeout.ms = 60000

2015-12-28 19:57:29 ProducerConfig [WARN] The configuration batch.num.messages = null was supplied but isn't a known config.
2015-12-28 19:57:29 ProducerConfig [WARN] The configuration producer.type = null was supplied but isn't a known config.
2015-12-28 19:57:29 Selector [WARN] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
        at java.lang.Thread.run(Thread.java:745)

我真的不明白为什么task试图连接到localhost以充当生产者,以及这个配置来自何处。是不是我错过了Kafka那边的一些配置?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题