flink(1.2)窗口每个窗口产生超过1个输出

dgjrabp2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(480)

问题是:问题是这个程序每一个窗口都要向kafka写不止一次(每个窗口创建2-3行或更多行,同时每个窗口应该创建1行,就像 reduce 函数(它只允许一个元素)。我有同样的代码写在Spark和它的作品完美。我一直试图找到有关此问题的信息,但没有找到任何信息:(。我也一直在尝试改变一些函数的并行性和一些更多的东西,但什么都不起作用,我不知道哪里会有问题。
我正在测试flink延迟。这里是我遇到问题的环境:
集群:我使用的是flink1.2.0和openjdk8。我有3台计算机:1台jobmanager,2台taskmanager(4核,2gb ram,每个taskmanager有4个任务槽)。
输入数据:由一个java生产者生成的kafka 24分区主题行,包含两个元素:增量值和创建时间戳:
1 1497790546981
2 1497790546982
3 1497790546983
4 1497790546984
............................
我的java类:
它读取具有24个分区的kafka主题(kafka与jobmanager位于同一台计算机中)。
这个 filter 函数和 union 因为我只是用它们来检查它们的延迟。
基本上,每行加一个“1”,就有一个 tumbling window 每2秒 reduce 函数将所有的1和所有的时间戳相加,最后的时间戳稍后在 map 函数介于1的和,它给出了平均值,最后在最后 map 函数,它将当前时刻的时间戳添加到每个缩减的行中,以及此时间戳与平均时间戳之间的差。
这一行是写给Kafka的(两个分区的主题)。

//FLINK CONFIGURATION
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    //KAFKA CONSUMER CONFIGURATION
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
    FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);

    //KAFKA PRODUCER
    Properties producerConfig = new Properties();
    producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
    producerConfig.setProperty("acks", "0");
    producerConfig.setProperty("linger.ms", "0");

    //MAIN PROGRAM
    //Read from Kafka
    DataStream<String> line = env.addSource(myConsumer);

    //Add 1 to each line
    DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder());

    //Filted Odd numbers
    DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());

    //Filter Even numbers
    DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());

    //Join Even and Odd
    DataStream<Tuple2<String, Integer>> line_Num_U = line_Num_Odd.union(line_Num_Even);

    //Tumbling windows every 2 seconds
    AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U = line_Num_U
            .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));

    //Reduce to one line with the sum
    DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = windowedLine_Num_U.reduce(new Reducer());

    //Calculate the average of the elements summed
    DataStream<String> wL_Average = wL_Num_U_Reduced.map(new AverageCalculator());

    //Add timestamp and calculate the difference with the average
    DataStream<String> averageTS = wL_Average.map(new TimestampAdder());

    //Send the result to Kafka
    FlinkKafkaProducer010Configuration<String> myProducerConfig = (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
            .writeToKafkaWithTimestamps(averageTS, "testRes", new SimpleStringSchema(), producerConfig);

    myProducerConfig.setWriteTimestampToKafka(true);

    env.execute("TimestampLongKafka");
}

//Functions used in the program implementation:

public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
        return isOdd;
    }
};

public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
        return isEven;
    }
};

public static class NumberAdder implements MapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    public Tuple2<String, Integer> map(String line) {
        Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1);
        return newLine;
    }
};

public static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, Tuple2<String, Integer> line2) throws Exception {
        Long sum = Long.valueOf(line1._1.split(" ")[0]) + Long.valueOf(line2._1.split(" ")[0]);
        Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + Long.valueOf(line2._1.split(" ")[1]);
        Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS), 
                line1._2 + line2._2);
        return newLine;
    }
};

public static class AverageCalculator implements MapFunction<Tuple2<String, Integer>, String> {
    private static final long serialVersionUID = 1L;

    public String map(Tuple2<String, Integer> line) throws Exception {
        Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
        String result = String.valueOf(line._2) + " " + String.valueOf(average);
        return result;
    }
};

public static final class TimestampAdder implements MapFunction<String, String> {
    private static final long serialVersionUID = 1L;

    public String map(String line) throws Exception {
        Long currentTime = System.currentTimeMillis();
        String totalTime = String.valueOf(currentTime - Long.valueOf(line.split(" ")[1]));
        String newLine = line.concat(" " + String.valueOf(currentTime) + " " + totalTime);

        return newLine;
    }
};

一些输出数据:此输出已写入2个分区的主题,并且生成速率小于1000条记录/秒(**在本例中,它为每个窗口创建3条输出行):
1969 1497791240910 1497791241999 1089 1497791242001 1091
1973 1497791240971 1497791241999 1028 1497791242002 1031
1970 1497791240937 1497791242094 1157 1497791242198 1261
1917 1497791242912 1497791243999 1087 1497791244051 1139
1905 1497791242971 1497791243999 1028 1497791244051 1080
1916 1497791242939 1497791244096 1157 1497791244199 1260
1994 1497791244915 1497791245999 1084 1497791246002 1087
1993 1497791244966 1497791245999 1033 1497791246004 1038
1990 1497791244939 1497791246097 1158 1497791246201 1262
提前谢谢!

r7knjye2

r7knjye21#

我不知道确切的原因,但我可以解决停止flink群集并重新启动它的问题。在执行了一些任务之后,它开始产生更多的输出(leatsx3),而且问题可能会继续增长。我将打开一个关于jira的问题,并尽快更新。

相关问题