java—kafka消费者(0.8.2.2)能否批量读取消息

bjp0bcyl  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(565)

据我所知,kafka消费者从指定的分区顺序读取消息。。。
我们计划有多个kafka consumer(java),它具有相同的组i,所以如果它从一个指定的分区顺序读取,那么我们如何实现高吞吐量。例如,producer每秒发布40条消息。。。消费者每秒处理消息1。虽然我们可以有多个消费者,但不能有40 rt???如果我错了,请纠正我。。。
在我们的例子中,消费者只有在消息处理成功后才能提交偏移量,否则消息将被重新处理。。。有没有更好的解决办法???

2sbarzqh

2sbarzqh1#

您可以成批使用消息并以成批方式处理它们。batch.max.wait.ms(属性)消费者将等待此时间量并轮询新消息

h22fl7wq

h22fl7wq2#

基于你的问题澄清。
Kafka消费者一次可以阅读多条信息。但是Kafka消费者并不是真的阅读信息,更正确的说法是,消费者阅读一定数量的字节,然后根据单个信息的大小,决定将要阅读多少信息。阅读kafka consumer配置,不允许指定要获取多少消息,而是指定consumer可以获取的最大/最小数据大小。不管有多少信息适合在这个范围内,你会得到多少。正如你所指出的,你总是会按顺序收到消息。
相关使用者配置(适用于0.9.0.0及更高版本)
fetch.min.字节
最大分区获取字节数
更新
使用您在注解中的示例,“我的理解是,如果我在config中指定读取10个字节,并且如果每条消息为2个字节,则使用者一次读取5条消息。”这是正确的。您的下一个语句“这意味着这5条消息的偏移量是随机的,在分区中”是错误的。顺序阅读并不意味着一个接一个,它只意味着它们保持有序。您可以对项目进行批处理,并使它们保持连续/有序。举下面的例子。
在kafka日志中,如果有10条消息(每个2字节)具有以下偏移量,[0,1,2,3,4,5,6,7,8,9]。
如果读取10个字节,将得到一个批处理,其中包含偏移量为[0,1,2,3,4]的消息。
如果读取6个字节,将得到一个批处理,其中包含偏移量为[0,1,2]的消息。
如果读取6个字节,然后再读取6个字节,则会得到两个批,其中包含消息[0,1,2]和[3,4,5]。
如果读取8个字节,然后读取4个字节,则会得到两个批,其中包含消息[0,1,2,3]和[4,5]。
更新:澄清
我不是100%确定如何承诺工作,我主要是与Kafka从风暴的环境。提供的kafkaspout自动提交kafka消息。
但是看看0.9.0.1消费者API,我建议您这样做。似乎特别有三种方法与这一讨论有关。
轮询(长超时)
commitsync()
commitsync(java.util.map偏移量)
poll方法检索消息,可以是1,也可以是20,例如,假设返回了3条消息[0,1,2]。你现在有了这三条信息。现在由您决定如何处理它们。你可以处理它们0=>1=>2,1=>0=>2,2=>0=>1,这要看情况而定。不管您如何处理它们,在处理之后您都会希望提交它,它告诉kafka服务器您已经处理完这些消息。
使用commitsync()提交上次轮询时返回的所有内容,在本例中,它将提交偏移量[0,1,2]。
另一方面,如果选择使用commitsync(java.util.map offsets),则可以手动指定要提交的偏移量。如果您是按顺序处理它们,那么可以处理偏移量0,然后提交,处理偏移量1,然后提交,最后处理偏移量2,然后提交。
总之,Kafka给了你自由处理信息的愿望,你可以选择按顺序或完全随机处理他们在你的选择。

fumotvh3

fumotvh33#

为了实现并行性,这似乎是您所要求的,您使用了主题分区(您将主题拆分为n个称为分区的部分)。然后,在使用者中,生成多个线程来使用这些分区。
在producer端,您可以将消息发布到随机分区(默认),或者为kafka提供一些消息属性来计算散列(如果需要排序的话),这样可以确保具有相同散列的所有msg都进入相同的分区。
编辑(偏移提交请求示例):
我就是这样做的。所有未提供的方法都不是必需的。

/**
   * Commits the provided offset for the current client (i.e. unique topic/partition/clientName combination)
   * 
   * @param offset
   * @return {@code true} or {@code false}, depending on whether commit succeeded
   * @throws Exception
   */
  public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer,
      long offset) throws Exception {
    try {
      TopicAndPartition tap = new TopicAndPartition(topic, partition);
      OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L);
      Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(1);
      mapForCommitOffset.put(tap, offsetMetaAndErr);

      kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(
          ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName,
          ConsumerContext.getOffsetStorageType());

      OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq);
      Short errCode = (Short) offsetCommitResp.errors().get(tap);
      if (errCode != 0) {
        processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host()));
        ErrorMapping.maybeThrowException(errCode);
      }
      LOG.debug("Successfully committed offset [{}].", offset);
    } catch (Exception e) {
      LOG.error("Error while committing offset [" + offset + "].", e);
      throw e;
    }
    return true;
  }

相关问题