scala类型不匹配monad[?]?

wkftcu5l  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我有以下函数,可以进行递归:

@tailrec
  private def pool[F[_]: Monad, A]
  : Consumer[String, String] => (Vector[KkConsumerRecord] => F[A]) => IO[Unit]
  = consumer => cb => {
    val records: ConsumerRecords[String, String] = consumer.poll(Long.MaxValue)
    val converted = records.iterator().asScala.map(rec => {
      KkConsumerRecord(rec.key(), rec.value(), rec.offset(), rec.partition(), rec.topic())
    })

    val vec = converted.foldLeft(Vector.empty[KkConsumerRecord]) { (b, a) =>
      a +: b
    }
    cb(vec)
    pool(consumer)(cb)
  }

编译器抱怨:

[error] /home/developer/Desktop/microservices/bary/kafka-api/src/main/scala/io/khinkali/Consumer/KkConsumer.scala:57:10: type mismatch;
[error]  found   : org.apache.kafka.clients.consumer.Consumer[String,String]
[error]  required: cats.Monad[?]
[error]     pool(consumer)(cb)
[error]          ^
[error] two errors found

我做错什么了?

7kjnsjlb

7kjnsjlb1#

编译以下代码:

import cats.Monad
import cats.effect.IO
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords}
import scala.collection.JavaConverters._
import scala.annotation.tailrec

object App {
  case class KkConsumerRecord(key: String, value: String, offset: Long, partition: Int, topic: String)

//  @tailrec
  private def pool[F[_]: Monad, A]
  : Consumer[String, String] => (Vector[KkConsumerRecord] => F[A]) => IO[Unit]
  = consumer => cb => {
    val records: ConsumerRecords[String, String] = consumer.poll(Long.MaxValue)
    val converted = records.iterator().asScala.map(rec => {
      KkConsumerRecord(rec.key(), rec.value(), rec.offset(), rec.partition(), rec.topic())
    })

    val vec = converted.foldLeft(Vector.empty[KkConsumerRecord]) { (b, a) =>
      a +: b
    }
    cb(vec)
    pool.apply(consumer)(cb)
  }
}
``` `def pool[F[_]: Monad, A]` 手段 `def pool[F[_], A](implicit monad: Monad[F])` 所以我被虐待了 `consumer` 作为隐式参数。 `tailrec` 注解已删除,因为 `pool` 不是尾部递归的(最后一个操作是构造lambda,我猜它叫做尾部递归模cons)。
如果你想让它成为递归的,你可以重写为

@tailrec
private def pool[F[_]: Monad, A](consumer: Consumer[String, String])(cb: Vector[KkConsumerRecord] => F[A]): IO[Unit] = {
val records: ConsumerRecords[String, String] = consumer.poll(Long.MaxValue)
val converted = records.iterator().asScala.map(rec => {
KkConsumerRecord(rec.key(), rec.value(), rec.offset(), rec.partition(), rec.topic())
})

val vec = converted.foldLeft(Vector.empty[KkConsumerRecord]) { (b, a) =>
  a +: b
}
cb(vec)
pool(consumer)(cb)

}

相关问题