我想在flink on yarn模式下提交作业,只使用记录和打印。代码在本地计算机上运行良好,但在提交到yarn时失败。我尝试更改依赖关系或将jar放入lib dir,但不起作用。对此有何想法?谢谢。
代码和环境如下。
Flink1.10,Kafka0.9。
代码:
FlinkKafkaConsumer09<String> kafkaSource = new FlinkKafkaConsumer09(
"test",
new SimpleStringSchema(),
properties);
kafkaSource.setStartFromLatest();
DataStream<String> stream = env.addSource(kafkaSource);
stream.print().setParallelism(1);
pom的一部分
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
<scope>compile</scope>
</dependency>
错误信息
2020-05-14 20:22:22,925 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
Custom Source -> Sink: Print to Std. Out (1/1) (dd3a239f2854c735c23ab307ff277e62) switched from RUNNING to FAILED.
java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Lorg/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge09;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/api/common/io/ratelimiting/FlinkConnectorRateLimiter;)V
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:270)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
暂无答案!
目前还没有任何答案,快来回答吧!