我编写了kafka producer,并在windows机器上用eclipse进行了分区。我的kafka集群运行在ec2linux中。我能够从eclipse中执行kafaka生产者代码,但是我没有看到ec2框中的主题。
Produce code :
package com.panda.kafka.training;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class PandaKafkaProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "ec2-xx-yy-zzz-212.compute-1.amazonaws.com:9092");
//props.put("producer.type", "sync");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.panda.kafka.training.PandaKafkaPartitioner");
props.put("request.required.acks", "1");
props.put("producer.type","async");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String,
String>(config);
for (long nEvents = 0; nEvents < events; nEvents++)
{ System.out.println("creating event "+nEvents);
long runtime = new Date().getTime();
String ip = "192.168.2."+ rnd.nextInt(255);
String msg = runtime + ",www.vulab.com," + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("vulab123", ip, msg);
producer.send(data);
}
producer.close();
}
}
Server Properties file:
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=ec2-xx-yy-zzz-212.compute-1.amazonaws.com
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=0000
Producer properties file:
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=52.2.202.212:0000
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
Output from Producer in ec2:
[ec2-user@ip-xxxx bin]$ sh kafka-console-producer.sh --broker-list xxxx:yyy.zzzz:9092 --topic vulab123
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
任何详细的解释都会大有帮助。
1条答案
按热度按时间pdsfdshx1#
请找出详细答案。我有一台安装了eclipse的windows机器,并且为kafka创建了maven项目。我想从WindowsEclipse向kafaka集群(ec2)发送一些消息。我有一台机器在做Kafka的事。
注:java生产者代码。您必须确保这里提到了完整的dns名称。
第一步:
第二步:
ec2安全组中允许的ssh和所有入站规则。
在安装eclipse的windows机器的/etc/hosts/文件中添加公共ip(ec2)。
第三步:
第四步:
执行的命令
我可以看到所有的信息在消费者。