编写不带while循环的javakafka使用者应用程序

c86crjj0  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(238)

我们最近开始使用kafka,我正在使用kafkajava本机消费者api编写一个kafka消费者应用程序。
不过,我看到的大多数示例都使用 while 循环然后调用 poll 方法对循环中的使用者对象执行。如下所示:

while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(1000);

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
        }

我只是在寻找一种更好的方法来实现这一点,而无需使用本机java消费者api进行循环。我知道用SpringKafka你不需要写这些。使用本机api怎么样?有什么好方法或最佳实践吗?

kx5bkwkv

kx5bkwkv1#

持续的投票“只是”Kafka消费者的工作方式,Kafka协议下的工作方式。每个操作总是由客户机(拉模型)发起,而不是由代理(推模型)发起,在使用消息的情况下,代理将转换为轮询。使用javakafkaconsumerapi意味着拥有一个循环,使用一个调度器,或者使用java的任何技术来连续地执行代码,您必须处理它。其他框架,如spring或smallryeReact式消息传递,就是为您做的。他们将轮询循环隐藏到您的应用程序中,但最终总会有一个循环。。。Kafka就是这样工作的。

hs1rzwqc

hs1rzwqc2#

我试着使用一个调度程序和代码工作。

package com.kafka;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.TimerTask;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ScheduledTask extends TimerTask {
 Date now; // to display current time
    public void run( ) {
        now = new Date();
    System.out.println("Time is :" + now);
    String AlarmString=null;
    Properties props = new Properties();
     props.put("bootstrap.servers", "10.*.*.*:9092");
     props.put("group.id", "grp-1");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("RAW_MH_RAN_SAM"));

    ConsumerRecords<String, String> records = consumer.poll(1000);
        //ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
         {

             System.out.println("Consumer:=============== partition Id= " + record.partition() + "  offset = " + record.offset() + " value = " + record.value() + "=================");

             if (AlarmString==null && !(record.value().toString().contains("PR ALARM:")) ){

                 AlarmString=record.value(); 

                }
             else{
                    if( !(record.value().toString().contains("PR ALARM:"))   )

                    {
                        //System.out.println("record.value()   :::"+ record.value() );  
                        AlarmString=AlarmString+","+record.value();

                    }

                }

         }
     if (consumer != null) {
         System.out.println("Closing Connection");
         consumer.close();
      }

 }

}
//Simple consumer 
package com.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.google.gson.Gson;
import java.text.ParseException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
public class SimpleConsumer{
    public static void main(String[] args) 
{

    Timer time = new Timer();
    ScheduledTask st = new ScheduledTask(); // Instantiate SheduledTask class
    time.schedule(st, 0,60000); // Create Repetitively task for every 1 secs  
}

}

相关问题