kafka:通过公共ip从windows访问vm上的kafka服务器

vhmi4jdf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(628)

我有个小问题。我想从windows java producer脚本连接到vm centos上的kafka服务器。
在config/server.properties中,我有一行代码:

  1. listeners=PLAINTEXT://0.0.0.0:9092
  2. advertised.listeners=PLAINTEXT://<public ip>:9092

(例如:advised.listeners)=plaintext://192.239.83.27:9092)
我仍然无法在windows上生成来自producer的消息。在虚拟盒上,我在网络设置中设置了“allow all”选项
无法运行kafka服务器,持续运行时出现警告:[controller id=0,targetbrokerid=0]无法建立到节点0的连接。代理可能不可用(org.apache.kafka.clients.networkclient)。
请帮忙:p

以及我在windows上的java代码:

  1. public class TwitterProducer {
  2. Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
  3. String consumerKey = "xxx";
  4. String consumerSecret = "xxx";
  5. String token = "xxx";
  6. String secret = "xxx";
  7. public TwitterProducer() {}
  8. public static void main(String[] args) {
  9. new TwitterProducer().run();
  10. }
  11. public void run()
  12. {
  13. logger.info("Setup");
  14. /**Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
  15. BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
  16. // create a twitter client
  17. Client client = createTwitterClient(msgQueue);
  18. // Attempts to establish a connection.
  19. client.connect();
  20. // create a kafka producer
  21. KafkaProducer<String, String> producer = createKafkaProducer();
  22. // on a different thread, or multiple different threads....
  23. while (!client.isDone()) {
  24. String msg = null;
  25. try {
  26. msg = msgQueue.poll(5, TimeUnit.SECONDS);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. client.stop();
  30. }
  31. if(msg != null){
  32. logger.info(msg);
  33. producer.send(new ProducerRecord<String, String>("twitter_tweets", null, msg), new Callback() {
  34. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  35. if (e != null){
  36. logger.error("Something bad happened", e);
  37. }
  38. }
  39. });
  40. }
  41. }
  42. logger.info("End of application");
  43. }
  44. public Client createTwitterClient(BlockingQueue<String> msgQueue)
  45. {
  46. /**Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
  47. Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
  48. StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
  49. // Optional: set up some followings and track terms
  50. List<String> terms = Lists.newArrayList("bitcoin");
  51. hosebirdEndpoint.trackTerms(terms);
  52. // These secrets should be read from a config file
  53. Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
  54. ClientBuilder builder = new ClientBuilder()
  55. .name("Hosebird-Client-01") // optional: mainly for the logs
  56. .hosts(hosebirdHosts)
  57. .authentication(hosebirdAuth)
  58. .endpoint(hosebirdEndpoint)
  59. .processor(new StringDelimitedProcessor(msgQueue))
  60. ; // optional: use this if you want to process client events
  61. Client hosebirdClient = builder.build();
  62. return hosebirdClient;
  63. }
  64. public KafkaProducer<String, String> createKafkaProducer()
  65. {
  66. String bootstrapServers = "193.239.83.27:9092";
  67. // create the producer properties
  68. Properties properties = new Properties();
  69. properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  70. properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  71. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  72. // create the producer
  73. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  74. return producer;
  75. }

}

dpiehjr4

dpiehjr41#

你说你有 listeners=PLAINTEXT://0.0.0.0:9092 但从图片上看,这已经被评论掉了。
日志还说,Kafka正在关闭,所以要确保brokerid是正数,并确保zookeeper正在运行
顺便说一句,有一个Kafka连接推特的来源,似乎做你想要的

相关问题