kafka节点关闭时发出警报

pbwdgjma  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(356)

我已经暴露了一个Kafka节点和一个主题名。我的web服务器接收到大量http请求数据,我需要对它们进行处理,然后将它们推送到kafka。有时,如果kafka节点关闭了,那么我的服务器仍然不断地传输数据,这会导致我的内存崩溃,服务器也会关闭。
如果Kafka倒下了,我想停止发布数据。我的java示例代码如下:

static Producer producer;

  Produce() {
    Properties properties = new Properties();
    properties.put("request.required.acks","1");
    properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
    properties.put("enabled","true");
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("kafka-topic","pixel-server");
    properties.put("batch.size","1000");
    properties.put("producer.type","async");
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<String, String>(properties);
  }

  public static void main(String[] args) {
    Produce produce = new Produce();

    produce.send(producer, "pixel-server", "Some time");

  }

  //This method is called lot of times
  public void send(Producer<String, String> producer, String topic, String data) {
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
    Future<RecordMetadata> response = producer.send(producerRecord, (metadata, exception) -> {
      if (null != exception) {
        exception.printStackTrace();
      } else {
        System.out.println("Done");
      }
    });

我刚刚抽象出一些示例代码。send方法被多次调用。我只是不想在Kafka倒下的时候发任何信息。解决这种情况的有效方法是什么。

wlp8pajw

wlp8pajw1#

如果我是你,我就试着装一个断路器。当您在发送记录时遇到合理数量的故障时,电路中断并提供一些回退行为。一旦满足某个条件(例如:经过一段时间),电路将关闭,您将再次发送记录。也 vertx.io 它有自己的解决方案。

相关问题