Flink,Kafka和Zookeeper

qacovj5a  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(394)

我正在尝试从本地计算机连接到Kafka:

kafkaParams.setProperty("bootstrap.servers", Defaults.BROKER_URL)
kafkaParams.setProperty("metadata.broker.list", Defaults.BROKER_URL)
kafkaParams.setProperty("group.id", "group_id")
kafkaParams.setProperty("auto.offset.reset", "earliest")

很好,但是我的 BROKER_URI 定义如下 my-server.com:1234/my/subdirectory .
我发现这种现象叫做chroot路径。
它抛出以下错误: Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: my-server.com:1234/my/subdirectory 我该怎么解决这个问题?
这些是我的依赖项:

val flinkVersion = "1.0.3"

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion,
rqqzpn5f

rqqzpn5f1#

试试看 host:port 不带路径上下文和斜杠的格式。如果您有多个服务器,它将是一个列表 host1:port1,host2:port2 参考文献:http://kafka.apache.org/documentation.html

ljo96ir5

ljo96ir52#

愚蠢的。Zookeeper!=Kafka。正如您在代码中看到的,我使用了相同的url两次,但结果发现它们应该是不同的。
我正在尝试从本地计算机连接到Kafka:

kafkaParams.setProperty("bootstrap.servers", Defaults.KAFKA_URL)
kafkaParams.setProperty("metadata.broker.list", Defaults.ZOOKEEPER_URL)
kafkaParams.setProperty("group.id", "group_id")
kafkaParams.setProperty("auto.offset.reset", "earliest")
k7fdbhmy

k7fdbhmy3#

bootstrap.servers 应该是逗号分隔的列表,如下所示: address1:port1,address2:port2,...,addressn:portn . 如果你只有一个Kafka经纪人,你应该输入 localhost:9092 (除非您将kafka配置为在另一个端口上运行)。
你可以参考dataartisans的这篇文章,了解更多关于如何让flink和kafka一起工作的细节。

相关问题