我是flink的新手,想了解如何使用flink运行我的用例:应用程序有三个输入数据源a)历史数据b)从kafka获取所有实时事件c)获取具有触发条件的控制事件
由于应用程序处理的是历史数据,所以我认为我将合并历史数据和实时数据,并在该流上创建一个表。
为了触发事件,我们必须借助控制事件来编写sql查询,控制事件是输入源,并且包含where子句。
我的问题是在数据流中构建sql查询,当我执行类似的操作时
DataStream<ControlEvent> controlEvent
controlEvent.map(new FlatMapFunction(String, String)
{
@override
public String flatMap(String s, Collector<String> coll)
{
tableEnv.execute("select * from tableName"); /// throw serialization exception
}
});
它不会引发序列化异常localexecutionenvironment
1条答案
按热度按时间oxosxuxt1#
flinksql还不支持这种动态查询注入。
更新:
考虑到您所说的需求(查询中的变化将是有限的),您可以改为使用datastream api而不是sql来实现这一点。这可能是一个
KeyedBroadcastProcessFunction
它将保持一些键控状态,并且您可以广播对查询的更新。以欺诈检测演示为例,介绍如何使用flink构建这类东西。