flink将csv文件流Map成元组

1l5u6lss  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(449)

我正在尝试将一个csv文件Map到tuple4中,这个文件已经被flink使用,由kafka生成。我的csv文件有4列,我想把每一行Map成一个tuple4。问题是我不知道如何实现map()和csv2tuple函数。
我被困在这里:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

我还要将元组中的项从字符串解析为整数。

ilmyapht

ilmyapht1#

假设你生产每一行 csv 文件为kafka消息并使用flink-kafka连接器使用它,您只需使用 , (因为它是一个 csv 文件)。

DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                String[] temp = str.split(",");
                return new Tuple4<>(
                        Integer.parseInt(temp[0]),
                        Integer.parseInt(temp[1]),
                        Integer.parseInt(temp[2]),
                        Integer.parseInt(temp[3])
                );

            }
        });

相关问题