从flinkkafkaconsumer09更改为flinkkafkaconsumer flink代码时出现新错误:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
@SuppressWarnings("deprecation")
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-consumer-group");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<String>("test4", new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
}
}
错误:log4j:warn找不到记录器(org.apache.flink.api.java.closurecleaner)的附加程序。log4j:warn请正确初始化log4j系统。log4j:请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 更多信息。线程“main”org.apache.flink.runtime.client.jobexecutionexception中出现异常:作业执行失败。在org.apache.flink.runtime.jobmaster.jobresult.tojobexecutionresult(jobresult。java:146)在org.apache.flink.runtime.minicluster.minicluster.executejobblocking(minicluster。java:626)位于org.apache.flink.streaming.api.environment.localstreamenvironment.execute(localstreamenvironment)。java:117)在org.apache.flink.streaming.api.environment.streamexecutionenvironment.execute(streamexecutionenvironment。java:1507)位于org.apache.flink.streaming.api.environment.streamexecutionenvironment.execute(streamexecutionenvironment)。java:1489)在readfromkafka.main(readfromkafka。java:33)原因:org.apache.kafka.common.errors.timeoutexception:超时获取主题元数据时过期
pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dataartisans</groupId>
<artifactId>kafka-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkaex</name>
<description>this is flink kafka example</description>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
</project>
1条答案
按热度按时间0pizxfdo1#
flink-connector-kafka_2.12
与不兼容FlinkKafkaConsumer09
.flink-connector-kafka_2.12
是一个“通用”Kafka连接器,编译用于scala 2.12。此通用连接器可用于0.11.0以后的任何版本的kafka。FlinkKafkaConsumer09
与Kafka0.9.x一起使用。如果您的kafka代理运行的是kafka 0.9.x,那么您需要flink-connector-kafka-0.9_2.11
或者flink-connector-kafka-0.9_2.12
,具体取决于所需的scala版本。另一方面,如果您的kafka代理运行的是kafka的最新版本(0.11.0或更新版本),那么请坚持使用
flink-connector-kafka_2.12
使用FlinkKafkaConsumer
而不是FlinkKafkaConsumer09
.有关更多信息,请参阅文档。