I have this pipeline : KafkaProducer -> Topic1 -> FlinkConsumer -> Topic2 -> KafkaConsumer
I'm trying to extract the timing of the record for each stage of the pipeline:
In Flink java application I did something like this :
inputstream.
// To calculate flink input time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("source time : %d\n",System.nanoTime());
writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
return s;
}
}).
// Process
map(new MapFunction<String, String>() {
@Override
public String map(String record) throws InterruptedException {
for(int i=0;i<2;i++)
Thread.sleep(1);
return record + " mapped";
}
}).
// To calculate flink output time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("sink time : %d\n",System.nanoTime());
writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
return s;
}
}).
addSink(producer);
While this is working in mini-cluster in Intellij, it doesn't work on a standalone cluster. Can someone plz explain to me why the print and write to csv lines are ignored?
1条答案
按热度按时间t3irkdon1#
无论任务管理器写入stdout的内容是什么,都将写入每个任务管理器节点上Flink的log目录中的文件中。