我必须实现一个kafka消费者,它从一个主题中读取数据,并根据有效负载中的帐户id(将接近百万)将其写入一个文件。假设每秒大约有3k个事件。是否可以为每次读取的邮件打开和关闭文件?或者我应该考虑另一种方法?
ldxq2e6h1#
如果您的帐户id重复,那么最好打开窗口。您可以通过窗口聚合1分钟内的所有事件,然后可以按键对事件进行分组,并一次处理所有accountid。这样,您就不必多次打开一个文件。
wj8zmpe12#
我假设如下:每个帐户id都是唯一的,并且有自己的唯一文件。文件中的数据有点滞后是可以的,即文件中的数据将接近实时。每个事件读取的数据并不庞大。解决方案:kafka使用者读取数据并写入数据库,最好是nosql db。一个单独的线程定期读取数据库中插入的新记录,并按accountid对它们进行分组。然后对accoundid进行迭代,并为每个accountid打开文件,一次写入数据,关闭文件并移动到下一个accountid。优势:您的使用者不会因为文件处理而被阻止,因为这两个操作是解耦的。即使文件处理失败,数据也始终存在于数据库中以重新处理。
yzxexxkh3#
为每一条消息打开一个文件是不好的,你应该缓冲一个固定数量的消息,然后在每个消息都有限制时写入一个文件。您可以使用confluent提供的hdfs-kafka连接器来管理此功能。如果配置了 FieldPartitioner 向给定的本地文件系统写入 store.url=file:///tmp ,例如,将为主题中的每个唯一accountid字段创建一个目录。然后 flush.size 配置决定了一个文件中会有多少条消息结束hadoop不需要安装,因为hdfs库包含在kafka connect类路径中,并且它们支持本地文件系统在创建两个属性文件之后,您可以这样启动它
FieldPartitioner
store.url=file:///tmp
flush.size
bin/connect-standalone worker.properties hdfs-local-connect.properties
3条答案
按热度按时间ldxq2e6h1#
如果您的帐户id重复,那么最好打开窗口。您可以通过窗口聚合1分钟内的所有事件,然后可以按键对事件进行分组,并一次处理所有accountid。
这样,您就不必多次打开一个文件。
wj8zmpe12#
我假设如下:
每个帐户id都是唯一的,并且有自己的唯一文件。
文件中的数据有点滞后是可以的,即文件中的数据将接近实时。
每个事件读取的数据并不庞大。
解决方案:
kafka使用者读取数据并写入数据库,最好是nosql db。
一个单独的线程定期读取数据库中插入的新记录,并按accountid对它们进行分组。
然后对accoundid进行迭代,并为每个accountid打开文件,一次写入数据,关闭文件并移动到下一个accountid。
优势:
您的使用者不会因为文件处理而被阻止,因为这两个操作是解耦的。
即使文件处理失败,数据也始终存在于数据库中以重新处理。
yzxexxkh3#
为每一条消息打开一个文件是不好的,你应该缓冲一个固定数量的消息,然后在每个消息都有限制时写入一个文件。
您可以使用confluent提供的hdfs-kafka连接器来管理此功能。
如果配置了
FieldPartitioner
向给定的本地文件系统写入store.url=file:///tmp
,例如,将为主题中的每个唯一accountid字段创建一个目录。然后flush.size
配置决定了一个文件中会有多少条消息结束hadoop不需要安装,因为hdfs库包含在kafka connect类路径中,并且它们支持本地文件系统
在创建两个属性文件之后,您可以这样启动它