我试图阅读Kafka主题的数据,在Flink流媒体。我正在尝试运行以下示例代码,作为apache flink 1.1.3文档页上的示例:apache kafka连接器,
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class stock_streaming_kafka {
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);
DataStream<String> stream = env
.addSource(myConsumer)
.print();
}
}
我有以下错误:
Exception in thread "main" java.lang.Error: Unresolved compilation problems:
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)
at stock_streaming_kafka.main(stock_streaming_kafka.java:25)
你能指导我把这个修好吗?Kafka连接器是否存在依赖性问题。我的版本是:
Flink1.1.3
Kafka2.10
flink-connector-kafka-0.9_2.11-1.0.0.jar
3条答案
按热度按时间xyhw6mcr1#
请使用以下版本。它将与你的Kafka版本。
我在代码中看到一个编译问题。
更改此项:
收件人:
如果它仍然不为你工作,那么请让我知道,我会分享工作代码。
3vpjnl9f2#
因为答案还没有被接受,下面是一个完整的maven代码示例,它使用flink从kafka读取数据。
您可能需要调整pom.xml以匹配kafka和scala版本的设置。
希望这有帮助。
ubbxdtey3#
flink和flink连接器的版本必须匹配。更新
flink-connector
依赖于1.1.3。