我们说过了kafka 的Consumer 是单线程设计的,也就是说它是线程不安全的,如果你在多线程中使用就会抛出ConcurrentModificationException
异常
下面我们从源码角度来看一下为什么会抛出这个异常,开始之前我们先演示一下报错,代码如下:
public class ModificationExceptionDemo {
private static KafkaConsumer consumer;
/** * 初始化配置 */
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", false);
props.put("session.timeout.ms", 30000);
props.put("max.poll.records", 1000);
props.put("max.poll.interval.ms", 5000);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
public static void main(String[] args) {
Properties pros = initConfig();
consumer = new KafkaConsumer<String, String>(pros);
consumer.subscribe(Arrays.asList("flink_json_source_4"));
new Thread(new Runnable() {
@Override
public void run() {
consumer.poll(100);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
consumer.poll(100);
}
}).start();
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我们看到这个报错信息已经出来了
Exception in thread "Thread-2" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2244)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2228)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1180)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
at com.kingcall.clients.consumer.thread.ModificationExceptionDemo$2.run(ModificationExceptionDemo.java:44)
at java.lang.Thread.run(Thread.java:748)
其实这里我们调用了poll方法,我么就跟着这个方法进来,看到这个方法里面调用了acquireAndEnsureOpen
这个方法
private void acquireAndEnsureOpen() {
acquire();
if (this.closed) {
release();
throw new IllegalStateException("This consumer has already been closed.");
}
}
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
其实我们看到acquire
就是我们的逻辑判断,判断是不是同一个线程,如果不是的则抛出异常,其实consumer 端的代码,几乎全部都调用了acquireAndEnsureOpen
方法
优势
不足
优势
下面我们的演示topicflink_json_source_4
是两个分区
public class MultiConsumerMode1 {
// 在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现多线程架构,需要注意的是启的太多没有意义
@Test
public void test() {
int partitionCount = 2;
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < partitionCount; i++) {
executor.submit(new KafkaConsumerRunner());
}
try {
executor.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer consumer;
/** * 初始化配置 */
private Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", false);
props.put("session.timeout.ms", 30000);
props.put("max.poll.records", 1000);
props.put("max.poll.interval.ms", 5000);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
public void run() {
Properties pros = initConfig();
consumer = new KafkaConsumer<String, String>(pros);
try {
consumer.subscribe(Arrays.asList("flink_json_source_4"));
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (WakeupException e) {
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
输出信息:我们已经看到多个线程在消费了
这里我们思考一个问题,那就是我们并没指定那个那个线程去消费哪一个partition,按照我们上面的设计,我们应该是一个线程区去消费一个partition,从单线程的消费或者是单个consumer 的消费实例的角度,这里不应该是每个线程区消费区部分区吗?那为什么这里可以做到一个线程恰好只消费了一个分区呢。
这个方案本质上和上面有点相似,不同的是在每个consumer 获取到数据之后,处理数据的时候是多线程的。至于获取数据的过程可以设计成单线程的也可以是多线程的。
Standalone Consumer 的优势和使用场景
多线程消费者的实现思路
其实我们自己实现的多线程消费其实就是一种Standalone Consume的模式
所以到这里我们看到kafka 总共有这么几种消费模式
consumer group
Standalone
自定义多线程
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114702895
内容来源于网络,如有侵权,请联系作者删除!