kafka服务器和客户端jar已移动到最新库:0.10.0.1
我的消费者和生产者代码使用上面提到的最新kafka jar,但仍然使用旧的消费者api(0.8.2)。
我在调用commit offset时遇到了消费者方面的问题。
2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)
kafka服务器端配置:
listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false
Kafka消费者的以下配置:
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000
要创建消费者,我将使用以下api:
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
对于提交调用
consumer.commitOffsets();
在读取来自kafka的消息时,我们使用下面的方法来处理超时
private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
try
{
it.hasNext();
return true;
}
catch (ConsumerTimeoutException e)
{
return false;
}
}
这是必需的,因为我们只希望在特定的时间间隔或从kafka接收的消息大小(字节)之后才开始处理。
同样的异常,即使在设置dual.commit.enabled=false consumer.timeout.ms=1000之后,其他设置仍保持为旧配置。
更多细节:
对于版本0.8.2.1,我从未遇到过这样的问题。在移动到0.10.0.1(客户端和服务器)之后,开始获取此异常。
在处理/推送到hadoop之前,我们正在读取多条消息。处理/写入hadoop部件需要时间(约5分钟)。在这个过程之后,当我们尝试推送时,我们就超越了例外。这个例外我每两次都会收到一次委托。有时(commitofset在前一次提交的10秒内调用),第二次提交也不例外。
供你参考。如果commit offset失败,则使用者只读取下一条消息,而不返回到上一个成功的commit offset位置。但若提交偏移失败并重新启动使用者进程,则它将从旧的提交位置进行读取。
1条答案
按热度按时间c9qzyr3d1#
正如我在“问题详细信息”中提到的,我使用的是最新的kafkajar,但使用的是旧的客户机:
我通过第二次给Commitofset打电话解决了这个问题。
实际上,问题与connections.max.idle.ms有关。这个属性是用最新的kafka(broker=10分钟,consumer=9分钟,producer=9分钟)引入的。
因此,每当我的老客户在10分钟后调用第二次提交补偿时,我都会遇到上述异常。
对于旧的使用者api,无法设置此属性。我不能更改代理配置(由其他团队处理,并为其他用户提供相同的代理)。。。
在这里,我认为旧的commitofset调用需要另一个连接(而不是iterator),当理想的连接持续10分钟以上时,这个连接就接近了。我不太确定。
如果在第一次commitofset调用中发生任何失败,那么第二次调用将确保成功。如果第一个本身获得成功,那么下一个执行将不会产生任何问题。无论如何,我们很少调用commit offset。
接下来,我将使用最新的kafka消费者和生产者javaapi移动我的代码。