flink-createbroadcaststream来自数据库属性

pinkon5k  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(206)

我必须创建一个broadcaststream来更改数据库上的属性,并在应用程序上实时查看这些属性的应用程序。我有两个问题:
1) 当我读取数据库时,我需要通过resultset、hashmap或任何可以包含类型key-value结构的东西同时拥有所有行,因为某些属性依赖于其他属性,所以我不能单独处理它们。mapstatedescriptor的结构将是:

//String = topic name
//TopicProperties = object containing all the topic properties
MapStateDescriptor<String, TopicProperties> propertiesStateDescriptor = new MapStateDescriptor<String, TopicProperties>("properties", 
                                                                                                                        BasicTypeInfo.STRING_TYPE_INFO,
                                                                                                                        BasicTypeInfo.of(new TypeHint<TopicProperties>() {}));

BroadcastStream<Row> propertiesBroadcastStream = env.createInput(JDBCInputFormat)
                                                    .map(new TopicPropertiesDbMapper()
                                                    .broadcast(propertiesStateDescriptor);

topicpropertiesdbmapper将jdbcinputformat返回的内容转换为字符串结构topicproperties。问题是一次只处理一行,但我需要将它们一起处理,如上所述。
2) 重复读取属性并每小时更新一次broadcaststream。
我指定我已经制作了上述版本,但是通过以下方式从文件中读取属性:

readFile (FileInputFormat, path file, FileProcessingMode, milliseconds of interval for re-reading)

它正在工作,我解决了上面列出的数据库案例的两个问题:
1) 将fileinputformat类的“unsplitable”标志设置为“true”;
2) fileprocessingmode.process\u。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题