我对Kafka还不太熟悉,我正在为我的新应用程序尝试一些小用例。用例基本上是,kafka producer->kafka consumer->flume kafka source->flume hdfs sink。
消费时(步骤2),以下是步骤顺序。。1消费者投票(1.0)1.a。制作多个主题(多个flume代理正在收听)1.b。生产。轮询()2。每25 msgs 3冲洗一次。提交()每个消息(asynchcommit=false)
问题1:这一系列的行动对吗!?!
问题2:这是否会导致数据丢失,因为刷新是每25 msgs一次,提交是每msg一次?!?
问题3:生产者的poll()和消费者的poll()之间的区别?
问题4:当消息已提交但未刷新时会发生什么!?!
如果有人能帮我理解生产者/消费者之间关于投票、刷新和提交的抵消示例,我将不胜感激。
提前谢谢!!
1条答案
按热度按时间xjreopfe1#
让我们先简单地了解一下Kafka:
什么是Kafka制作人:
你可以忽略这个警告。Kafka似乎找不到主题并自动创建主题。
让我们看看Kafka是如何储存这条信息的:
生产者在代理服务器中的
/kafka-logs
(针对Apache·Kafka)或/kafka-cf-data
(合流版)将cd放入此目录,然后列出文件。你会看到
.log
存储实际数据的文件:如果打开日志文件,您将看到:
让我们了解消费者将如何投票和读取记录:
什么是Kafka民意测验:
kafka为分区中的每条记录维护一个数字偏移量。这个偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并且接下来将接收偏移量为5的记录。实际上,有两个与消费者的用户相关的位置概念:消费者的位置给出了下一条记录的偏移量。它将比使用者在该分区中看到的最高偏移量大一个。每次消费者在call to poll(long)中收到消息时,它都会自动前进。
因此,poll以持续时间作为输入,读取
00000000000000000000.log
文件,并将其返回给使用者。何时删除邮件:
Kafka负责信息的传递。有两种方法:
基于时间:默认值为7天。可以使用
log.retention.ms=1680000
基于大小:可以设置为log.retention.bytes=10487500
现在让我们看看消费者:上面的命令指示使用者从
offset = 0
. Kafka给这个控制台分配了一个消费者group_id
并保持这个group_id
已经读过了。所以,它可以把更新的消息推送到这个consumer-group
什么是Kafka承诺:提交是告诉kafka消费者已成功处理的消息的一种方式。这可以看作是更新
group-id : current_offset + 1
. 您可以使用consumer对象的commitasync()或commitsync()方法来管理它。参考文献:https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html