java—如果消息是由生产者生成的,如何从kafka代理获得确认?

0sgqnhkj  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(476)

我希望在生成消息时得到代理的一些响应。我尝试过回调机制(通过实现回调)在 KafkaProducer.send 但它不起作用,也不打电话 onCompletion 方法。
当我关闭kafka服务器并尝试生成消息时,它确实调用了回调方法。
有没有其他方法得到确认?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`

我正在使用kafka客户端api 0.9.1和broker版本0.8.2。

gab6jxml

gab6jxml1#

发布完所有消息后,关闭producer,如下所示:

producer.close();

这样可以确保始终调用回调中的以下方法:

onCompletion(RecordMetadata metadata, Exception exception)

注:我已经测试了这一点,通过添加该行到这个示例生产者类,它的作品。

omjgkv6w

omjgkv6w2#

在kafkaproducer使用kafka0.9版本发布消息之后,有一种从代理获取信息的简单方法。您可以调用get()方法,该方法将返回一个recordmetadata对象,您可以在代码段下面获取诸如offset、topicpartition等信息,例如:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
                        topic, key.getBytes("UTF-8"), message
                                .getBytes("UTF-8"))).get();
System.out.println("Message produced, offset: " + m.offset());
System.out.println("Message produced, partition : " + m.partition());
System.out.println("Message produced, topic: " + m.topic());
blpfk2vs

blpfk2vs3#

所以我不能百分之百确定Kafka中的哪个版本适用于哪个版本。目前我使用的是0.8.2,我知道0.9引入了一些突破性的变化,但我不能肯定地告诉您现在什么是有效的/无效的。
一个非常强烈的建议是,我将使用与您的代理版本相对应的kafka客户机版本。如果您使用的是broker0.8.2,那么我也会使用kakfa客户端0.8.2。
你从来没有提出过你是如何使用它的任何代码,所以我只是有点猜在黑暗中。但是我已经在kafka0.8.2中通过在producer中使用这个方法实现了回调特性。下面是方法签名。

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

我将在哪里调用该方法,我实际上用重写的方法传入类。

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    } else {
      //implement logic here, or call another method to process metadata
      System.out.println("Callback");
    }
  }
});

我想有一种方法也可以像你那样做。但是你必须提供代码来显示你是如何将记录发送给Kafka的。除此之外我只是猜测。

相关问题