我想在Kafka和萨姆扎之间创造一种融合。更准确地说,我试图创建一个samza任务,它从一个kafka主题中读取内容,然后简单地写入另一个主题。
kafka-na-samza位于同一局域网的不同服务器上。
作为消费者连接到kafka时没有问题,但是作为生产者尝试连接时发生错误。
public class testTask implements StreamTask
{
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
String message = (String) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "test"), message));
}
}
配置文件:
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=testTask
# Task
task.class=testTask
task.inputs=kafka.testTask
# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=kafka_system_ip:2181
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=kafka_system_ip:9092
systems.kafka.producer.producer.type=sync
# Normally, we'd set this much higher, but we want things to look snappy in the demo.
systems.kafka.producer.batch.num.messages=1
systems.kafka.streams.metrics.samza.msg.serde=metrics
# Declare that we want our job's checkpoints to be written to Kafka
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# By default, a checkpoint is written every 60 seconds. You can change this if you like.
task.commit.ms=60000
# Job Coordinator
job.coordinator.system=kafka
# Normally, this would be 3, but we have only one broker.
job.coordinator.replication.factor=1
任务日志中包含以下信息:
2015-12-28 19:57:29 CoordinatorStreamSystemProducer [INFO] Starting coordinator stream producer.
2015-12-28 19:57:29 KafkaSystemProducer [INFO] Creating a new producer for system kafka.
2015-12-28 19:57:29 ProducerConfig [INFO] ProducerConfig values:
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
timeout.ms = 30000
max.in.flight.requests.per.connection = 1
bootstrap.servers = [kafka_system_ip:9092]
metric.reporters = []
client.id = samza_producer-testTask-1-1451325449607-4
compression.type = none
retries = 2147483647
max.request.size = 1048576
send.buffer.bytes = 131072
acks = 1
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 60000
2015-12-28 19:57:29 ProducerConfig [WARN] The configuration batch.num.messages = null was supplied but isn't a known config.
2015-12-28 19:57:29 ProducerConfig [WARN] The configuration producer.type = null was supplied but isn't a known config.
2015-12-28 19:57:29 Selector [WARN] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
我真的不明白为什么task试图连接到localhost以充当生产者,以及这个配置来自何处。是不是我错过了Kafka那边的一些配置?
暂无答案!
目前还没有任何答案,快来回答吧!