kafka apache flink执行log4j错误

k5ifujac  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(371)

我试图运行一个简单的apache-flink脚本和kafka不协调,但我一直有执行问题。脚本应该读取来自Kafka制作人的消息,对其进行详细说明,然后将处理结果再次发送回另一个主题。我从这里得到了这个例子:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/simple-flink-kafka-test-td4828.html
我的错误是:

Exception in thread "main" java.lang.NoSuchFieldError:ALL 
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenera‌tor.createJobGraph(S‌​treamingJobGraphGene‌​rator.java:86) 
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph‌​(StreamGraph.java:42‌​9) 
at org.apache.flink.streaming.api.environment.LocalStreamEnviro‌nment.execute(LocalS‌​treamEnvironment.jav‌​a:46)

网址:org.apache.flink.streaming.api.environment.localstreamenviro‌执行(本地)‌​treamenvironment.jav文件‌​答:33)
这是我的密码:

public class App {
      public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
            Properties properties = new Properties(); 
            properties.setProperty("bootstrap.servers", "localhost:9092"); 

            //properties.setProperty("zookeeper.connect", "localhost:2181"); 
            properties.setProperty("group.id", "javaflink"); 

            DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
            System.out.println("Step D"); 
            messageStream.map(new MapFunction<String, String>(){ 

                    public String map(String value) throws Exception { 
                            // TODO Auto-generated method stub 
                            return "Blablabla " +  value; 
                    } 
            }).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema())); 
            env.execute(); 
      }
}

这些是 pom.xml 依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java_2.11</artifactId>
    <version>0.10.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

什么会导致这种错误?
谢谢,卢卡

6vl6ewon

6vl6ewon1#

这个问题很可能是由您在应用程序中定义的不同flink版本的混合造成的 pom.xml . 要运行此程序,应包含以下依赖项:

<!-- Streaming API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- In order to execute the program from within your IDE -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- Kafka connector dependency -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

相关问题