我正在尝试将一个csv文件Map到tuple4中,这个文件已经被flink使用,由kafka生成。我的csv文件有4列,我想把每一行Map成一个tuple4。问题是我不知道如何实现map()和csv2tuple函数。
我被困在这里:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);
DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
new SimpleStringSchema(), parameterTool.getProperties()));
DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}
我还要将元组中的项从字符串解析为整数。
1条答案
按热度按时间ilmyapht1#
假设你生产每一行
csv
文件为kafka消息并使用flink-kafka连接器使用它,您只需使用,
(因为它是一个csv
文件)。