如何理解apache flink中的窗口机制

xytpbqjk  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(377)

我正在学习如何使用flink处理流数据。
据我所知,我可以使用这个函数 map 多次做各种变换。
说数据源一直在向flink发送字符串。所有字符串都是json格式的数据,如下所示:

{"name":"titi","age":18}
{"name":"toto","age":20}
...

这是我的密码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>()))
    .build();

// Convert String to Json Object
// MyJson is a POJO class, defined by me
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
    .map(new MapFunction<String, MyJson>() {
    @Override
    public MyJson map(String s) throws Exception {
        MyJson myJson = JSON.parseObject(s, MyJson.class);
        return myJson;
        }
    });
// Convert MyJson Object to String and extract what I need
DataStream<String> valueInJson = jsonStream
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            return myJson.getName().toString();
        }
    });
valueInJson.print();
env.execute("StreamingJob");

如您所见,我的示例非常简单:获取并反序列化数据--->将字符串转换为json对象--->将json对象转换为字符串并获取所需内容(我只需要 name 这里)。
目前看来,一切正常。我确实从日志文件中得到了预期的输出。
然而,我知道flink为我们提供了一个强大的功能:窗口。
我想知道如何在我的例子中使用这个机制。
例如,如果我想用2秒钟的时间来分割数据流,如何编写代码?
我试过这样:

DataStream<String> valueInJson = jsonStream
    .timeWindow(Time.seconds(2))
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            return myJson.toString();
        }
    });
valueInJson.print();

但是,我有一个错误:
找不到符号
符号:方法
时间窗口(org.apache.flink.streaming.api.windowing.time.time)
位置:类型为org.apache.flink.streaming.api.datastream.datastream的变量jsonstream
但是,我导入了:

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;

为什么我会犯这个错误?我用错Windows了吗?我不了解Flink吗?

gcxthw6b

gcxthw6b1#

你有错误是因为 timeWindow() 函数在 KeyedStream 不是在公园里 DataStream 因为它是基于密钥的操作。在你的情况下,改变就足够了 timeWindow() 进入 timeWindowAll() .

相关问题