为什么两个spark流媒体作业从同一个kafka主题中提取具有相同组id的消息而不是平衡负载,而是获取相同的消息?

4dbbbstv  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(247)

Kafka0.8官方文件对Kafka消费者的描述如下:
使用者用使用者组名称标记自己,发布到某个主题的每条消息都会传递到每个订阅使用者组中的一个使用者示例。使用者示例可以在不同的进程中,也可以在不同的机器上。如果所有使用者示例都具有相同的使用者组,那么这就像传统的队列平衡使用者的负载一样。”
我用kafka 0.8.1.1设置了一个kafka集群,并使用spark streaming job(spark 1.3)从其主题中提取数据。spark流代码如下:

... ...

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokerList);
    kafkaParams.put("group.id", groupId);

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );

    messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {

        @Override
        public Void call(JavaPairRDD<String, String> rdd) throws Exception {
            long msgNum = strJavaRDD.count();
            System.out.println("There are " + msgNum + " messages read from Kafka.");

        ... ...

        return null;}});

然后我提交了两个spark流作业来访问具有相同组id的同一主题,我假设当我向该主题发送100条消息时,这两个作业总共得到100条消息(例如job1得到50,job2得到50;或者job1得到100,job2得到0)。然而,他们分别得到100分。这样的结果似乎与Kafka博士所说的有所不同。
我的密码有问题吗?我是否正确设置了组id配置?这是一个bug还是createdirectstream()的设计?
测试环境:kafka 0.8.1.1+spark 1.3.1

uidvcgyl

uidvcgyl1#

组是kafka在版本0.9之前的高级消费api的一个特性,它在简单的消费api中不可用。 createDirectStream 使用简单的使用者api。
一些提示:
使用simpleconsumer实现的主要原因是您希望比使用者组能够更好地控制分区使用(把一条信息读几遍)
createdirectstream:这种方法不使用接收器来接收数据,而是周期性地向kafka查询每个topic+分区中的最新偏移量,并相应地定义每个批处理中要处理的偏移量范围。
参考:
spark流媒体+Kafka集成指南
0.8.0简单消费者示例
kafka0.9.0版本添加了一个新的java使用者,以取代现有的基于zookeeper的高级使用者和低级使用者api。然后您可以使用group并同时提交偏移量手册。

yzuktlbb

yzuktlbb2#

创建两个不同的spark应用程序来对相同的消息执行相同的操作是没有意义的。将一个应用程序与多个执行者一起使用。

相关问题