spring批处理kafka使用者作业:跨多个jvm进程处理消息组

plicqrtu  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(419)

我有一个简单的spring批处理kafka使用者作业,它从kafka主题读取数据并将数据写入文件。
我想到了产生我的Kafka消费者工作的5个示例,使工作可以更快地完成。也就是说,我启动了我的程序5次,使得5个使用者作业在它们自己的jvm进程中启动。
这种方法的直接问题是5个进程将写入同一个文件。我通过在文件名后面附加一个唯一的进程id来解决这个问题。我更新的writer bean如下:

private static final String UNIQUE_PROCESS_IDENTIFIER = System.currentTimeMillis();    

@Bean
public FlatFileItemWriter<String> testFileWriter() {
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(
            "I:/CK/data/output_from_consumer_"+UNIQUE_PROCESS_IDENTIFIER+".dat"));
    writer.setAppendAllowed(false);
    writer.setShouldDeleteIfExists(true);
    DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter(",");
    writer.setLineAggregator(lineAggregator);
    return writer;
}

通过将时间戳附加到输出文件名,我确保每个使用者jvm进程写入自己的文件。
当我最终启动同一个程序的5个示例(jvm进程)时,我的期望是,如果在它自己的jvm进程中运行的一个使用者作业从一个分区读取一条消息,在自己的jvm进程中运行的其他使用者作业将不会再次从同一分区读取相同的消息(因为所有5个java进程将使用相同的使用者组,即mygroup)
但是,我可以看到每个使用者作业进程(jvm)最终读取所有消息。结果,我现在有5个文件,每个文件包含相同的内容。示例输出文件名,其中包含每个文件中的记录数,以便更好地解释:

output_from_consumer_1600530320385.dat -> 1 million records
output_from_consumer_1600530335555.dat -> 1 million reocrds
output_from_consumer_1900530335555.dat -> 1 million records
output_from_consumer_1900530335556.dat -> 1 million records
output_from_consumer_1900730334556.dat -> 1 million records

Total records: 5 million

问:如何配置spring批处理作业,以便即使使用consumer作业启动了多个java进程,java进程也只读取作为单独java进程启动的同一组中的consumer尚未读取的数据?
这是我的预期输出(仅代表性):

output_from_consumer_1600530320385.dat -> 100,000 records
output_from_consumer_1600530335555.dat -> 200,000 records
output_from_consumer_1900530335555.dat -> 200,000 records
output_from_consumer_1900530335556.dat -> 400,000 records
output_from_consumer_1900730334556.dat -> 100,000 records 

Total records : 1 million
mbzjlibv

mbzjlibv1#

当您创建 KafkaItemReader ,可以指定要从中读取的分区:

KafkaItemReader reader = new KafkaItemReader(myConsumerProperties, "topic1", 0)

上面的读取器将从分区读取消息 0topic1 . 因此,您可以并行运行作业,并将每个作业配置为从不同的主题读取消息(例如,将主题/分区作为作业参数传递)。

wn9m85ua

wn9m85ua2#

在同一个组id中使用同一个consumer id运行多个kafka consumer示例无助于实现并行性。
kafka消费者的并行性可以通过使用多个消费者来实现,每个消费者具有不同的消费者id和相同的消费者组id。数据在一个组的所有消费者中平均分配,一个组中没有两个消费者接收到相同的数据。
在将分区分配给使用者之前,kafka将首先检查是否有任何具有给定group-id的现有使用者。当没有具有给定group-id的现有使用者时,它将该主题的所有分区分配给这个新使用者。当有两个消费者已经具有给定的组id,而第三个消费者想要使用相同的组id时,它将在所有三个消费者之间平均分配分区。相同组id的两个使用者不会被分配到同一分区。
例如,假设有一个主题有4个分区和两个使用者, consumer-A 以及 consumer-B 想要使用组id从中消费 my-consumer-group 然后,kafka将为每个使用者分配相等数量的分区,即2到4个分区 consumer-A 以及 2 to the consumer-B .
对于您的用例,因为kafka主题包含4个分区,所以您可以使用4个消费者,每个消费者具有不同的消费者id,并且具有相同的组id。

相关问题