Flink:打印和写入文件不工作

8hhllhi2  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(130)

I have this pipeline : KafkaProducer -> Topic1 -> FlinkConsumer -> Topic2 -> KafkaConsumer
I'm trying to extract the timing of the record for each stage of the pipeline:
In Flink java application I did something like this :

inputstream.

                // To calculate flink input time
                map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.printf("source time : %d\n",System.nanoTime());
                writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
                return s;
            }
        }).

                // Process
                map(new MapFunction<String, String>() {
            @Override
            public String map(String record) throws InterruptedException {
                for(int i=0;i<2;i++)
                    Thread.sleep(1);
                return record + " mapped";
            }
        }).

                // To calculate flink output time
                map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.printf("sink time : %d\n",System.nanoTime());
                writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
                return s;
            }
        }).
                addSink(producer);

While this is working in mini-cluster in Intellij, it doesn't work on a standalone cluster. Can someone plz explain to me why the print and write to csv lines are ignored?

t3irkdon

t3irkdon1#

无论任务管理器写入stdout的内容是什么,都将写入每个任务管理器节点上Flink的log目录中的文件中。

相关问题