kafka producer消息在ec2 linux的主题中不可用

iqxoj9l9  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(258)

我编写了kafka producer,并在windows机器上用eclipse进行了分区。我的kafka集群运行在ec2linux中。我能够从eclipse中执行kafaka生产者代码,但是我没有看到ec2框中的主题。

Produce code :
package com.panda.kafka.training;
import java.util.*;   
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class PandaKafkaProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]); 
        Random rnd = new Random(); 
        Properties props = new Properties();
        props.put("metadata.broker.list", "ec2-xx-yy-zzz-212.compute-1.amazonaws.com:9092"); 
        //props.put("producer.type", "sync"); 
        props.put("serializer.class", "kafka.serializer.StringEncoder"); 
        props.put("partitioner.class", "com.panda.kafka.training.PandaKafkaPartitioner");
        props.put("request.required.acks", "1"); 
        props.put("producer.type","async");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String,
                String>(config); 
        for (long nEvents = 0; nEvents < events; nEvents++) 
        { System.out.println("creating event "+nEvents); 
        long runtime = new Date().getTime(); 
        String ip = "192.168.2."+ rnd.nextInt(255); 
        String msg = runtime + ",www.vulab.com," + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("vulab123", ip, msg);
        producer.send(data); 
        } 
        producer.close(); 
    }

}

Server Properties file:

# The port the socket server listens on

port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName().

advertised.host.name=ec2-xx-yy-zzz-212.compute-1.amazonaws.com

# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

advertised.port=0000

Producer properties file:

# list of brokers used for bootstrapping knowledge about the rest of the cluster

# format: host1:port1,host2:port2 ...

metadata.broker.list=52.2.202.212:0000

# name of the partitioner class for partitioning events; default partition spreads data randomly

# partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)

producer.type=sync

Output from Producer in ec2:
[ec2-user@ip-xxxx bin]$ sh kafka-console-producer.sh --broker-list xxxx:yyy.zzzz:9092 --topic vulab123
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

任何详细的解释都会大有帮助。

pdsfdshx

pdsfdshx1#

请找出详细答案。我有一台安装了eclipse的windows机器,并且为kafka创建了maven项目。我想从WindowsEclipse向kafaka集群(ec2)发送一些消息。我有一台机器在做Kafka的事。
注:java生产者代码。您必须确保这里提到了完整的dns名称。

props.put("metadata.broker.list", "ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092"); 

props.put("advertised.host.name", "ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092");

第一步:

I started a new EC2 Linux machine and installed Kafka.
wget http://mirror.sdunix.com/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz - See more at: http://vulab.com/blog/?p=576#sthash.GiMHbvXm.dpuf

tar -xzf kafka_2.9.2-0.8.1.1.tgz

第二步:
ec2安全组中允许的ssh和所有入站规则。
在安装eclipse的windows机器的/etc/hosts/文件中添加公共ip(ec2)。
第三步:

Modified the kafka server properties files. I put the exact DNS name.
    # The port the socket server listens on
    port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

host.name=ec2-52-xx-yyy-216.compute-1.amazonaws.com

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName().

advertised.host.name=ec2-52-xx-yyy-216.compute-1.amazonaws.com
zookeeper.connect=ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181

第四步:
执行的命令

1. sh zookeeper-server-start.sh /home/ec2/user/kafka/kafka/config/zookeeper.properties

    2. sh kafka-server-start.sh /home/ec2-user/kafka/kafka/config/server.properties

    3 .sh kafka-topics.sh --create --zookeeper ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 --replication-factor 1 --partitions 1 --topic spanda20

    4. sh kafka-console-producer.sh --broker-list ec2-52-xx-yyy-216.compute-1.amazonaws.com:9092 --topic spanda20--sync 

    5.sh kafka-console-consumer.sh --zookeeper ec2-52-xx-yyy-216.compute-1.amazonaws.com:2181 --topic spanda20 --from-beginning

我可以看到所有的信息在消费者。

相关问题