将案例类列表连接到kafka制作人?

snvhrwxg  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(396)

我有下面的案例课:

  1. case class Alpakka(id:Int,name:String,animal_type:String)

我正在尝试使用以下代码将这些案例类的列表连接到kafka中的生产者:

  1. def connectEntriesToProducer(seq: Seq[Alpakka]) = {
  2. val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  3. .withBootstrapServers("localhost:9092")
  4. seq.map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))
  5. .runWith(Producer.plainSink(producerSettings))
  6. }

我使用circe将case类转换为json。但是,我不断收到一个编译器错误:

  1. Error:(87, 34) type mismatch;
  2. found : akka.stream.scaladsl.Sink[org.apache.kafka.clients.producer.ProducerRecord[String,String],scala.concurrent.Future[akka.Done]]
  3. required: org.apache.kafka.clients.producer.ProducerRecord[String,String] => ?
  4. .runWith(Producer.plainSink(producerSettings))

我不知道发生了什么事!

zwghvu4y

zwghvu4y1#

你想建立一个 GraphSeq 而不是 Source .
你的方法 connectEntriesToProducer 应该看起来像

  1. def connectEntriesToProducer(seq: Source[Alpakka]) = {

注意, Source 而不是 Seq .
或者,您可以从 Seq ,但你必须使用 immutable.SeqSource.apply 只需要一个不变的iterable。

  1. def connectEntriesToProducer(seq: scala.collection.immutable.Seq[Alpakka]) = {
  2. val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  3. .withBootstrapServers("localhost:9092")
  4. Source(seq).
  5. map(alpakka => new ProducerRecord[String, String]("alpakkas", alpakka.asJson.noSpaces))
  6. .runWith(Producer.plainSink(producerSettings))
  7. }

相关问题