我试着从Flink那里读Kafka的资料,由于我对Kafka和Flink还不熟悉,我不知道如何把它们联系起来。
kb5ga3dv1#
Flink提供Kafka连接器。为了从Kafka主题中读取数据,首先需要添加flink-kafka连接器依赖项。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>1.1.3</version> </dependency>
接下来,您只需调用流执行环境并添加kafka源代码。这是一个样本
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(),properties)) .print();
就这样。你们都准备好使用Kafka主题中的数据了。完整的代码可以通过这个链接下载
1条答案
按热度按时间kb5ga3dv1#
Flink提供Kafka连接器。为了从Kafka主题中读取数据,首先需要添加flink-kafka连接器依赖项。
接下来,您只需调用流执行环境并添加kafka源代码。这是一个样本
就这样。你们都准备好使用Kafka主题中的数据了。
完整的代码可以通过这个链接下载