我试图通过SparkShell内部的SparkStructuredStreaming来阅读Kafka的主题,但似乎我没有从Kafka那里得到任何线索。
Kafka单独工作良好(与控制台消费者和控制台生产者测试):
~/opt/bd/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testtopic --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
first
thrid
fifth
seventh
eight
bla
blal2
das ist
testmaschine
hallo
kleiner
blsllslsd
这是我在spark shell中运行的代码:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:2181")
.option("subscribe", "testtopic")
.option("startingOffsets" , "earliest")
.load()
ds1.writeStream.format("console").start
我希望我得到的消息已经存储在Kafka这个主题,所有的消息将打印在Spark壳。但没有印刷品。我的错在哪里?我用的是spark 2.0.2和kafka 010.2。
1条答案
按热度按时间rryofs0p1#
您需要更改kafka引导服务器的端口。像这样-
然后您将能够从
readStream
.希望有帮助!