了解kafka poll()、flush()和commit()

7uhlpewt  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(552)

我对Kafka还不太熟悉,我正在为我的新应用程序尝试一些小用例。用例基本上是,kafka producer->kafka consumer->flume kafka source->flume hdfs sink。
消费时(步骤2),以下是步骤顺序。。1消费者投票(1.0)1.a。制作多个主题(多个flume代理正在收听)1.b。生产。轮询()2。每25 msgs 3冲洗一次。提交()每个消息(asynchcommit=false)
问题1:这一系列的行动对吗!?!
问题2:这是否会导致数据丢失,因为刷新是每25 msgs一次,提交是每msg一次?!?
问题3:生产者的poll()和消费者的poll()之间的区别?
问题4:当消息已提交但未刷新时会发生什么!?!
如果有人能帮我理解生产者/消费者之间关于投票、刷新和提交的抵消示例,我将不胜感激。
提前谢谢!!

xjreopfe

xjreopfe1#

让我们先简单地了解一下Kafka:
什么是Kafka制作人:

t.turner@devs:~/developers/softwares/kafka_2.12-2.2.0$ bin/kafka-console-producer.sh --broker-list 100.102.1.40:9092,100.102.1.41:9092 --topic company_wallet_db_v3-V3_0_0-transactions
>{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}
[2019-07-21 11:53:37,907] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 7 : {company_wallet_db_v3-V3_0_0-transactions=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

你可以忽略这个警告。Kafka似乎找不到主题并自动创建主题。
让我们看看Kafka是如何储存这条信息的:
生产者在代理服务器中的 /kafka-logs (针对Apache·Kafka)或 /kafka-cf-data (合流版)

drwxr-xr-x   2 root root  4096 Jul 21 08:53 company_wallet_db_v3-V3_0_0-transactions-0

将cd放入此目录,然后列出文件。你会看到 .log 存储实际数据的文件:

-rw-r--r--   1 root root 10485756 Jul 21 08:53 00000000000000000000.timeindex
-rw-r--r--   1 root root 10485760 Jul 21 08:53 00000000000000000000.index
-rw-r--r--   1 root root        8 Jul 21 08:53 leader-epoch-checkpoint
drwxr-xr-x   2 root root     4096 Jul 21 08:53 .
-rw-r--r--   1 root root      762 Jul 21 08:53 00000000000000000000.log

如果打开日志文件,您将看到:

^@^@^@^@^@^@^@^@^@^@^Bî^@^@^@^@^B<96>T<88>ò^@^@^@^@^@^@^@^@^Al^S<85><98>k^@^@^Al^S<85><98>kÿÿÿÿÿÿÿÿÿÿÿÿÿÿ^@^@^@^Aö
^@^@^@^Aè
{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}^@

让我们了解消费者将如何投票和读取记录:
什么是Kafka民意测验:
kafka为分区中的每条记录维护一个数字偏移量。这个偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并且接下来将接收偏移量为5的记录。实际上,有两个与消费者的用户相关的位置概念:消费者的位置给出了下一条记录的偏移量。它将比使用者在该分区中看到的最高偏移量大一个。每次消费者在call to poll(long)中收到消息时,它都会自动前进。
因此,poll以持续时间作为输入,读取 00000000000000000000.log 文件,并将其返回给使用者。
何时删除邮件:
Kafka负责信息的传递。有两种方法:
基于时间:默认值为7天。可以使用 log.retention.ms=1680000 基于大小:可以设置为 log.retention.bytes=10487500 现在让我们看看消费者:

t.turner@devs:~/developers/softwares/kafka_2.12-2.2.0$ bin/kafka-console-consumer.sh --bootstrap-server 100.102.1.40:9092 --topic company_wallet_db_v3-V3_0_0-transactions --from-beginning
{"created_at":1563415200000,"payload":{"action":"insert","entity":{"amount":40.0,"channel":"INTERNAL","cost_rate":1.0,"created_at":"2019-07-18T02:00:00Z","currency_id":1,"direction":"debit","effective_rate":1.0,"explanation":"Voucher,"exchange_rate":null,expired","id":1563415200,"instrument":null,"instrument_id":null,"latitude":null,"longitude":null,"other_party":null,"primary_account_id":2,"receiver_phone":null,"secondary_account_id":362,"sequence":1,"settlement_id":null,"status":"success","type":"voucher_expiration","updated_at":"2019-07-18T02:00:00Z","primary_account_previous_balance":0.0,"secondary_account_previous_balance":0.0}},"track_id":"a011ad33-2cdd-48a5-9597-5c27c8193033"}
^CProcessed a total of 1 messages

上面的命令指示使用者从 offset = 0 . Kafka给这个控制台分配了一个消费者 group_id 并保持这个 group_id 已经读过了。所以,它可以把更新的消息推送到这个 consumer-group 什么是Kafka承诺:
提交是告诉kafka消费者已成功处理的消息的一种方式。这可以看作是更新 group-id : current_offset + 1 . 您可以使用consumer对象的commitasync()或commitsync()方法来管理它。
参考文献:https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html

相关问题