akka流kafka:找不到密钥“kafka客户端”的配置设置

yvt65v4c  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(471)

我试图用alpakkafka连接器(akkastreamkafka)创建一个简单的原型。
运行应用程序时,出现以下错误:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'

我有以下代码 ./src/main/scala/App.scala :

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system = ActorSystem("fakeProducer")
    implicit val materializer: Materializer = ActorMaterializer()

    val config = system.settings.config // ConfigFactory.load()

    val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

    val done: Future[Done] =
      Source(1 to 100)
        .map(_.toString)
        .map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
        .runWith(Producer.plainSink(producerSettings))

    println("Done")
  }
}

以下 build.sbt :

name := "test-akka-stream"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

我使用 sbt run . 我没有配置任何uber/程序集jar。
我可能错过了一些明显的东西,但我看不到。。。我怀疑akka依赖性有问题。

更新

正如@terminally chill calling所建议的 ProducerSettings(system, new StringSerializer, new StringSerializer) (通过 ActorSystem 而不是配置)解决问题。我只是不明白这是设计的还是bug。

更新2

我已经创建了一个github问题,这个问题已经解决了。现在文档更加准确,并解释了创建 ProducerSettings / ConsumerSettings .

val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
      ProducerSettings(config, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

或者你可以通过考试 ActorSystem 如上所述。

n3h0vuf2

n3h0vuf21#

感谢@terminally chill和@murray todd williams的回答。我做了一些进一步的研究,并试图总结如下:
两者 ConsumerSettings 以及 ProducerSettingsapply 需要 Config (见此处)或 ActorSystem (参见此处)。
问题是当使用 ActorSystem 代码是:

val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer) // call the other overload

使用时 Config 代码是:

val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))

因此,当直接传递配置时,代码将搜索 kafka-clients 属性,而不是在传递 ActorSystem 代码检查 akka.kafka.consumer/akka.kafka.producer .
最后,在创建 ActorSystem 示例默认情况下,大多数设置都是从嵌入的 reference.conf 文件并与您的 application.conf 文件(如果存在)。更多信息请点击此处。所以基本上唯一需要设置的属性通常是 bootstrap.servers .
所以你现在可以理解为什么使用 system.settings.config 代码不起作用。此配置示例已加载 reference.conf (使用所有默认值,请参见此处)和自定义 application.conf . 这个 kafka-clients 财产在里面 akka.kafka.consumer/akka.kafka.producer ,但代码直接检查 kafka-clients .
一些可能的解决方案:
直接通过 ActorSystem 使用另一个过载
通过正确的部分使用 system.settings.config.getConfig("akka.kafka.consumer") 手动构造 Config 示例与 kafka-clients 部分
对我来说,问题是这里提供的官方文件没有提到这些差异,所提供的例子也不完整和/或不准确。可能对于一个akkaMaven来说这是很清楚的,但是对于新开发人员来说这可能是非常混乱的。
我在这个要点中创建了一个更“易于使用”的示例,并打开了一个问题。

odopli94

odopli942#

感谢您注意到并提交了一个在阿尔帕卡Kafka连接器项目的问题。文档现已更新:https://doc.akka.io/docs/akka-stream-kafka/current/producer.html

nnsrf1az

nnsrf1az3#

通常我将所有配置都保存在akkasystem示例中。我不把alpakka用于kafka,但是我的很多实现都是基于源代码的。
加载typesafe配置对象 val config = ConfigFactory.load() 然后通过 config 加入 val system = ActorSystem("fakeProducer", config) .
最后,通过 system.settings.configProducerSettings .
您当前的代码没有通过任何设置,因为您还没有将配置加载到akka系统中。你的 val config = system.settings.config 正在引用一个空配置,该配置没有kafka clients部分(最佳猜测)。

m0rkklqb

m0rkklqb4#

我想我遇到了和你一样的问题(几乎在同一时间),尽管我试图创建一个基本的“你好世界”Kafka消费者而不是生产者。我猜您只是在浏览alpakka-kafka连接器文档中的文档,并遵循它们最初定义的示例

val config = system.settings.config

然后将其传递到新的consumersettings对象中。我猜在线文档有一个缺陷,但我对akka streams还不够了解(这是我第一次尝试通过示例学习),我没有资格准确判断什么是对的什么是错的。
我尝试创建和application.conf文件,然后执行configfactory.load(),然后在创建时手动将其传递给actorsystem,然后将该系统传递给consumersettings构造函数,丢失的“kafka clients”错误消失了,但显然我甚至不必这样做。正如您所说,只需传递'system'变量而不是'config'变量就可以了。
希望这能帮助那些在黑暗中摸索的人。我要说的是,尽管阿克卡河周围到处都是嗡嗡声,但似乎真的缺少文档。我可能要写一篇博客文章,一旦我弄明白了这些东西!

相关问题