kafka-java消息的并行处理

lpwwtiir  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(339)
JavaPairReceiverInputDStream<String, byte[]> messages = KafkaUtils.createStream(...);
JavaPairDStream<String, byte[]> filteredMessages = filterValidMessages(messages);

JavaDStream<String> useCase1 = calculateUseCase1(filteredMessages);
JavaDStream<String> useCase2 = calculateUseCase2(filteredMessages);
JavaDStream<String> useCase3 = calculateUseCase3(filteredMessages);
JavaDStream<String> useCase4 = calculateUseCase4(filteredMessages);

...
我从kafka检索消息,过滤这些消息,并对多个用例使用相同的消息。这里用例1到4相互独立,可以并行计算。但是,当我查看日志时,我看到计算是按顺序进行的。我怎样才能使它们平行运行。任何建议都会有帮助。

c3frrgcw

c3frrgcw1#

尝试为4个用例中的每一个创建Kafka主题。然后尝试创建4个不同的Kafka数据流。

u0sqgete

u0sqgete2#

我将所有代码移到for循环中,并按kafka主题中的分区数进行迭代,我看到了一个改进。

for(i=0;i<numOfPartitions;i++)
{
JavaPairReceiverInputDStream<String, byte[]> messages =
KafkaUtils.createStream(...);
JavaPairDStream<String, byte[]> filteredMessages =
filterValidMessages(messages);

JavaDStream<String> useCase1 = calculateUseCase1(filteredMessages);
JavaDStream<String> useCase2 = calculateUseCase2(filteredMessages);
JavaDStream<String> useCase3 = calculateUseCase3(filteredMessages);
JavaDStream<String> useCase4 = calculateUseCase4(filteredMessages);
}

参考文献:http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

相关问题