.你好,使用ApacheFlink1.8。我有一个来自kafka的记录流作为json,并对它们进行过滤,一切正常。现在,我想用数据库表中的查找值来丰富kafka的数据。这只是创建两个流,在第二个流中加载表,然后连接数据的情况吗?数据库表确实会更新,但并不频繁,我希望避免在流中的每个记录上查找数据库。
vc9ivgsu1#
Flink有一个州,你可以利用这个州。我也做过类似的事情,每天从我的查找表(在我的例子中是一个批量webservice调用)中进行一次查询,并通过查询结果生成一个kafka主题。这个kafka主题正被相同的服务flink作业使用,因为它需要用于查找的数据。两个主题的键控值相同,但我使用查找主题将数据存储到键控状态,在处理另一个主题时,我会将数据从状态中拉回来。我有一些额外的逻辑来检查给定密钥是否还没有状态。如果是这样的话,我会向webservice发出一个异步请求。但是,您可能不需要这样做。这里需要注意的是,我有用于状态管理的内存,而我的查找表只有大约3000万条记录,大约100 Gig分布在15个节点上的45个插槽中。[在评论中回答问题]对不起,我的回答太长了,所以不得不编辑我的帖子:我有一个python作业,它通过一个大容量rest调用加载数据(你的只需进行数据查找)。然后,它将数据转换成正确的格式,并将其转储到Kafka中。然后我的flink流有两个来源,一个是“真实数据”主题,另一个是“查找数据”主题。来自lookup data主题的数据存储在state中(我使用了valuestate,因为每个键都Map到一个可能的值,但是还有其他状态类型。我也有一个24小时的到期时间为每个条目,但这是我的用例。诀窍在于,将查找主题中的值存储为状态的操作必须是将值从“真实”主题中拉回到状态的操作。这是因为flink状态(甚至键控状态)与创建它们的操作符相关联。
1条答案
按热度按时间vc9ivgsu1#
Flink有一个州,你可以利用这个州。我也做过类似的事情,每天从我的查找表(在我的例子中是一个批量webservice调用)中进行一次查询,并通过查询结果生成一个kafka主题。这个kafka主题正被相同的服务flink作业使用,因为它需要用于查找的数据。两个主题的键控值相同,但我使用查找主题将数据存储到键控状态,在处理另一个主题时,我会将数据从状态中拉回来。
我有一些额外的逻辑来检查给定密钥是否还没有状态。如果是这样的话,我会向webservice发出一个异步请求。但是,您可能不需要这样做。
这里需要注意的是,我有用于状态管理的内存,而我的查找表只有大约3000万条记录,大约100 Gig分布在15个节点上的45个插槽中。
[在评论中回答问题]对不起,我的回答太长了,所以不得不编辑我的帖子:
我有一个python作业,它通过一个大容量rest调用加载数据(你的只需进行数据查找)。然后,它将数据转换成正确的格式,并将其转储到Kafka中。然后我的flink流有两个来源,一个是“真实数据”主题,另一个是“查找数据”主题。来自lookup data主题的数据存储在state中(我使用了valuestate,因为每个键都Map到一个可能的值,但是还有其他状态类型。我也有一个24小时的到期时间为每个条目,但这是我的用例。
诀窍在于,将查找主题中的值存储为状态的操作必须是将值从“真实”主题中拉回到状态的操作。这是因为flink状态(甚至键控状态)与创建它们的操作符相关联。