我一直在学习fs2Kafka的例子。不过,对于一个消费者来说,我还是坚持使用这个例子。我遇到的问题是fs2.stream和cats.effect.io之间的类型不匹配(错误如下)
代码:nb:现在更新了@alexeynovakov的建议,提供了一个工作示例
package pb.streams
import cats.effect.{ContextShift, Timer}
import fs2.kafka._
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._
import cats.implicits._
import cats.effect.IO
object Consumer {
implicit val ec: ExecutionContextExecutor =
ExecutionContext.fromExecutor(new ForkJoinPool(4))
implicit val contextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
def main(args: Array[String]): Unit = {
consumeFeed()
()
}
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] = {
println(s"${record.key()} => ${record.value()}")
IO.unit
}
def consumeFeed()= {
val consumerSettings = (executionContext: ExecutionContext) ⇒
ConsumerSettings(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
executionContext = executionContext
)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withPollTimeout(250.milliseconds)
.withGroupId("group")
for {
executionContext ← consumerExecutionContextStream[IO]
consumer ← consumerStream[IO].using(consumerSettings(executionContext))
_ ← fs2.Stream.eval(consumer.subscribeTo("topic-inbox"))
_ ← consumer.stream
.mapAsync( 4) { message ⇒
processRecord(message.record)
.as(message.committableOffset)
}
.groupWithin(500, 15.seconds)
.map(_.foldLeft(CommittableOffsetBatch.empty[IO])(_ updated _))
.evalMap(_.commit)
} yield ()
}
}
我似乎在编译时遇到的错误是:
Error:(55, 29) type mismatch;
found : fs2.Stream[[x]cats.effect.IO[x],Unit]
required: cats.effect.IO[?]
_ ← consumer.stream
Error:(54, 29) type mismatch;
found : cats.effect.IO[Nothing]
required: fs2.Stream[?,?]
_ ← consumer.subscribeTo("topic-inbox")
Error:(55, 9) parameter value consumer in value $anonfun is never used
consumer ← consumerStream[IO].using(consumerSettings(executionContext))
有谁能提供一些见解来帮助我理解和修复这个神秘的错误吗?我试过各种方法来解决这个问题,但都没有用。任何帮助将不胜感激,因为我似乎无法谷歌类似的情况。
暂无答案!
目前还没有任何答案,快来回答吧!