如何使用fs2Kafka读取嵌入式Kafka

pgccezyw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(625)

我用fs2Kafka来阅读嵌入Kafka。
我使用 withRunningKafkaOnFoundPort ,创建主题并发布一些消息。然而,当我试图用fs2Kafka读回它时,我得到了一个nullpointerexception。我已经隔离了一个测试用例,代码如下。
这是我的密码:

  1. import cats.effect._
  2. import cats.implicits._
  3. import cats.effect.implicits._
  4. import fs2.Stream
  5. import fs2.kafka.{AutoOffsetReset, ConsumerSettings, KafkaConsumer, consumerStream}
  6. import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
  7. import org.scalatest.{BeforeAndAfterAll, FunSuite}
  8. import scala.concurrent.ExecutionContext
  9. class KafkaSuite extends FunSuite with EmbeddedKafka {
  10. val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
  11. implicit val contextShift = IO.contextShift(singleThreadExecutor)
  12. implicit val timer = IO.timer(singleThreadExecutor)
  13. val topic = "example"
  14. val partition = 0
  15. val clientId = "client"
  16. test("works") {
  17. val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
  18. withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
  19. createCustomTopic(topic)
  20. publishStringMessageToKafka(topic, "example-message1")
  21. publishStringMessageToKafka(topic, "example-message2")
  22. publishStringMessageToKafka(topic, "example-message3")
  23. publishStringMessageToKafka(topic, "example-message4")
  24. val broker = s"localhost:${actualConfig.kafkaPort}"
  25. val consumerSettings = ConsumerSettings[IO, String, String]
  26. .withAutoOffsetReset(AutoOffsetReset.Earliest)
  27. .withBootstrapServers(broker)
  28. .withGroupId("group")
  29. .withClientId(clientId)
  30. val r = consumerStream[IO].using(consumerSettings)
  31. .evalTap(_.subscribeTo(topic))
  32. .evalTap(_.seekToBeginning)
  33. .flatMap { consumer =>
  34. consumer.stream.take(1)
  35. }
  36. .compile
  37. .toList
  38. val res = r.unsafeRunSync()
  39. Console.println(res)
  40. assert(res.size == 1)
  41. }
  42. }
  43. }
  44. ``` `build.sbt` :

name := "test"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies ++= Seq(
"org.scalatest" % "scalatest_2.12" % "3.1.2" % "test",
"org.slf4j" % "slf4j-simple" % "1.7.25",
"com.github.fd4s" %% "fs2-kafka" % "1.0.0",
"io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1.1" % Test
)

  1. 这里是stacktrace

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
at java.lang.String.(String.java:515)
at fs2.kafka.Deserializer$.$anonfun$string$1(Deserializer.scala:208)
at fs2.kafka.Deserializer$.$anonfun$lift$1(Deserializer.scala:184)
at fs2.kafka.Deserializer$$anon$1.deserialize(Deserializer.scala:133)
at fs2.kafka.ConsumerRecord$.deserializeFromBytes(ConsumerRecord.scala:166)
at fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:177)
at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:378)
at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:300)
at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:245)
at cats.Traverse$Ops.traverse(Traverse.scala:19)
at cats.Traverse$Ops.traverse$(Traverse.scala:19)
at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:376)
at cats.instances.VectorInstances$$anon$1.$anonfun$traverse$2(vector.scala:80)
at cats.instances.VectorInstances$$anon$1.loop$2(vector.scala:43)
at cats.instances.VectorInstances$$anon$1.$anonfun$foldRight$2(vector.scala:44)
at cats.Eval$.advance(Eval.scala:271)
at cats.Eval$.loop$1(Eval.scala:350)
at cats.Eval$.cats$Eval$$evaluate(Eval.scala:368)
at cats.Eval$Defer.value(Eval.scala:257)
at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:79)
at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:15)
at cats.Traverse$Ops.traverse(Traverse.scala:19)
at cats.Traverse$Ops.traverse$(Traverse.scala:19)
at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
at fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:373)
at fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$2(KafkaConsumerActor.scala:405)
at cats.effect.internals.IORunLoop$.liftedTree1$1(IORunLoop.scala:95)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:95)
at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:86)
at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:72)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:52)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

vbopmzt1

vbopmzt11#

问题是 ConsumerSettings[IO, String, String]String 但Kafka写道 Null 作为一个键,所以反序列化键时它会失败,并出现nullpointerexception。将键类型设置为 Unit 例外地解决问题。
另一个问题是 withRunningKafkaOnFoundPort 在io评估开始前完成。要让它运行,需要做一个 Resource 从嵌入式Kafka和 Package io到。

  1. val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))

下一个问题是 fs2-kafka 无法使用单线程执行器,因此必须为其提供执行器池(例如 ExecutionContext.global ).
下面是一个完整的工作示例:

  1. import cats.effect._
  2. import fs2.Stream
  3. import fs2.kafka.{AutoOffsetReset, ConsumerSettings, consumerStream}
  4. import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
  5. import org.scalatest.FunSuite
  6. import scala.concurrent.ExecutionContext
  7. class KafkaSuite extends FunSuite with EmbeddedKafka {
  8. implicit val ec = ExecutionContext.global
  9. implicit val contextShift = IO.contextShift(ec)
  10. implicit val timer = IO.timer(ec)
  11. val topic = "example"
  12. val partition = 0
  13. val clientId = "client"
  14. val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
  15. def broker(port: Long) = s"localhost:${port}"
  16. val consumerSettings = ConsumerSettings[IO, Unit, String]
  17. .withAutoOffsetReset(AutoOffsetReset.Earliest)
  18. .withEnableAutoCommit(true)
  19. .withGroupId("group")
  20. .withClientId(clientId)
  21. val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))
  22. test("works") {
  23. val r = Stream.resource(embeddedKafka).flatMap { kafka =>
  24. implicit val actualConfig: EmbeddedKafkaConfig = kafka.config
  25. createCustomTopic(topic)
  26. publishStringMessageToKafka(topic, "example-message1")
  27. publishStringMessageToKafka(topic, "example-message2")
  28. publishStringMessageToKafka(topic, "example-message3")
  29. publishStringMessageToKafka(topic, "example-message4")
  30. consumerStream(consumerSettings.withBootstrapServers(broker(actualConfig.kafkaPort)))
  31. .evalTap(_.subscribeTo(topic))
  32. .evalTap(_.seekToBeginning)
  33. .flatMap(_.stream)
  34. .map(_.record.value)
  35. .take(1)
  36. }
  37. val res = r.compile.toList.unsafeRunSync()
  38. assert(res.contains("example-message1"))
  39. }
  40. }
展开查看全部

相关问题