我有两个来源,一个是Kafka源和一个是自定义源,我需要使睡眠自定义源一个小时,但我得到以下中断。
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.hulu.hiveIngestion.HiveAddPartitionThread.run(HiveAddPartitionThread.java:48)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
代码:
<kafka_Source>.union(<custom_source>)
public class custom_source implements SourceFunction<String> {
public void run(SourceContext<String> ctx) {
while(true)
{
Thread.sleep(1000);
ctx.collect("string");
}
}
}
如何使睡眠自定义源,而Kafka源将继续其流。为什么我得到线程中断异常?
1条答案
按热度按时间lh80um4z1#
这是一个java问题,而不是flink问题。简而言之,您永远不能依赖thread.sleep(x)来睡眠x毫秒。正确地支持中断也很重要,否则您就不能优雅地关闭您的工作。