数据未通过avro序列化发送到kafka

kqlmhetl  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(382)

我正试图用avro向Kafka发送数据,但“消息”仍然是空的。我试图手动将模式添加到Kafka,但没有结果。我在应用程序的日志、模式注册表日志或kafka的日志中没有看到任何错误。你能告诉我我做错了什么吗?

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }    

case class User(name: String, age: Int)

val connectConfig: Map[String, Object] =
Map(
  "bootstrap.servers"   -> "localhost:9092",
  "client.id"           -> s"clientID${UUID.randomUUID().toString}",
  "key.serializer"      -> "org.apache.kafka.common.serialization.StringSerializer",
  "value.serializer"    -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "schema.registry.url" -> "localhost:8081"
)

implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)

def sendAvro(msg: DIL): Future[RecordMetadata] = {
    val newUser = User("Joe", 42)
    val schema  = AvroSchema[User]
    implicit val schemaFor = SchemaFor[User]
    val format = RecordFormat[User]

    val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
    producer.send(record)
}
x759pob2

x759pob21#

kafka批量发送数据,而您的单个记录小于默认的批量大小。
使用 .send(data).get() 或者 producer.flush() 立即等待一条记录被发送。
否则,应该在上定义一个线程 scala.sys.ShutdownHookThread 冲洗生产商

相关问题