kafka flink连接错误显示nosuchmethoderror

cuxqih21  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(567)

从flinkkafkaconsumer09更改为flinkkafkaconsumer flink代码时出现新错误:

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  6. import java.util.Properties;
  7. @SuppressWarnings("deprecation")
  8. public class ReadFromKafka {
  9. public static void main(String[] args) throws Exception {
  10. // create execution environment
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. Properties properties = new Properties();
  13. properties.setProperty("bootstrap.servers", "localhost:9092");
  14. properties.setProperty("group.id", "test-consumer-group");
  15. DataStream<String> stream = env
  16. .addSource(new FlinkKafkaConsumer<String>("test4", new SimpleStringSchema(), properties));
  17. stream.map(new MapFunction<String, String>() {
  18. private static final long serialVersionUID = -6867736771747690202L;
  19. @Override
  20. public String map(String value) throws Exception {
  21. return "Stream Value: " + value;
  22. }
  23. }).print();
  24. env.execute();
  25. }
  26. }

错误:log4j:warn找不到记录器(org.apache.flink.api.java.closurecleaner)的附加程序。log4j:warn请正确初始化log4j系统。log4j:请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 更多信息。线程“main”org.apache.flink.runtime.client.jobexecutionexception中出现异常:作业执行失败。在org.apache.flink.runtime.jobmaster.jobresult.tojobexecutionresult(jobresult。java:146)在org.apache.flink.runtime.minicluster.minicluster.executejobblocking(minicluster。java:626)位于org.apache.flink.streaming.api.environment.localstreamenvironment.execute(localstreamenvironment)。java:117)在org.apache.flink.streaming.api.environment.streamexecutionenvironment.execute(streamexecutionenvironment。java:1507)位于org.apache.flink.streaming.api.environment.streamexecutionenvironment.execute(streamexecutionenvironment)。java:1489)在readfromkafka.main(readfromkafka。java:33)原因:org.apache.kafka.common.errors.timeoutexception:超时获取主题元数据时过期
pom.xml文件

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <groupId>com.dataartisans</groupId>
  4. <artifactId>kafka-example</artifactId>
  5. <version>0.0.1-SNAPSHOT</version>
  6. <name>kafkaex</name>
  7. <description>this is flink kafka example</description>
  8. <dependencies>
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-java</artifactId>
  12. <version>1.9.1</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-streaming-java_2.12</artifactId>
  17. <version>1.9.1</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-clients_2.12</artifactId>
  22. <version>1.9.1</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-connector-kafka_2.12</artifactId>
  27. <version>1.9.1</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.flink</groupId>
  31. <artifactId>flink-core</artifactId>
  32. <version>1.9.1</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>com.googlecode.json-simple</groupId>
  36. <artifactId>json-simple</artifactId>
  37. <version>1.1</version>
  38. </dependency>
  39. </dependencies>
  40. </project>
0pizxfdo

0pizxfdo1#

flink-connector-kafka_2.12 与不兼容 FlinkKafkaConsumer09 . flink-connector-kafka_2.12 是一个“通用”Kafka连接器,编译用于scala 2.12。此通用连接器可用于0.11.0以后的任何版本的kafka。 FlinkKafkaConsumer09 与Kafka0.9.x一起使用。如果您的kafka代理运行的是kafka 0.9.x,那么您需要 flink-connector-kafka-0.9_2.11 或者 flink-connector-kafka-0.9_2.12 ,具体取决于所需的scala版本。
另一方面,如果您的kafka代理运行的是kafka的最新版本(0.11.0或更新版本),那么请坚持使用 flink-connector-kafka_2.12 使用 FlinkKafkaConsumer 而不是 FlinkKafkaConsumer09 .
有关更多信息,请参阅文档。

相关问题