我试图运行一个简单的apache-flink脚本和kafka不协调,但我一直有执行问题。脚本应该读取来自Kafka制作人的消息,对其进行详细说明,然后将处理结果再次发送回另一个主题。我从这里得到了这个例子:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/simple-flink-kafka-test-td4828.html
我的错误是:
Exception in thread "main" java.lang.NoSuchFieldError:ALL
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:429)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:46)
网址:org.apache.flink.streaming.api.environment.localstreamenviro执行(本地)treamenvironment.jav文件答:33)
这是我的密码:
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
//properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "javaflink");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
System.out.println("Step D");
messageStream.map(new MapFunction<String, String>(){
public String map(String value) throws Exception {
// TODO Auto-generated method stub
return "Blablabla " + value;
}
}).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema()));
env.execute();
}
}
这些是 pom.xml
依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java_2.11</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>
什么会导致这种错误?
谢谢,卢卡
1条答案
按热度按时间6vl6ewon1#
这个问题很可能是由您在应用程序中定义的不同flink版本的混合造成的
pom.xml
. 要运行此程序,应包含以下依赖项: