如何为Kafka阿夫罗主题制作墓碑

irlmq6kh  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(189)

我正在尝试使用Scala(v2.13.10)和FS 2 Kafka库(v3.0.0-M8)以及Vulcan模块,使用Avro模式为压缩的Kafka主题生成墓碑消息。
该应用从主题A消费,并为匹配某个条件的值生成到同一主题A的逻辑删除。
示例片段

val producerSettings =
  ProducerSettings(
    keySerializer = keySerializer,
    valueSerializer = Serializer.unit[IO]
  ).withBootstrapServers("localhost:9092")

def processRecord(committableRecord: CommittableConsumerRecord[IO, KeySchema, ValueSchema]
                     , producer: KafkaProducer.Metrics[IO, KeySchema, Unit]
                     ): IO[CommittableOffset[IO]] = {
      val key = committableRecord.record.key
      val value = committableRecord.record.value

      if(value.filterColumn.field1 == "<removable>") {
        val tombStone = ProducerRecord(committableRecord.record.topic, key, ())
        val producerRecord: ProducerRecords[CommittableOffset[IO], KeySchema, Unit] = ProducerRecords.one(tombStone, committableRecord.offset)
        producer.produce(producerRecord).flatten.map(_.flatMap(a => {
          IO(a.passthrough)
        }))
      }
      else
      IO(committableRecord.offset)
    }

如果我生成一个有效的键值消息,上面的代码段可以正常工作。但是,当我试图生成一个null/empty消息时,我得到了下面的错误:

java.lang.IllegalArgumentException: Invalid Avro record: bytes is null or empty
    at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$4(AvroDeserializer.scala:32)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
    at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:184)
    at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:265)
    at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$26(KafkaConsumer.scala:267)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)

Avro模式示例:

{
    "type": "record",
    "name": "SampleOrder",
    "namespace": "com.myschema.global",
    "fields": [
        {
            "name": "cust_id",
            "type": "int"
        },
        {
            "name": "month",
            "type": "int"
        },
        {
            "name": "expenses",
            "type": "double"
        },
        {
            "name": "filterColumn",
            "type": {
                "type": "record",
                "name": "filterColumn",
                "fields": [
                    {
                        "name": "id",
                        "type": "string"
                    },
                    {
                        "name": "field1",
                        "type": "string"
                    }
                ]
            }
        }
    ]
}

先谢谢你。
我已经尝试了不同的序列化器为生产者,但都导致相同的上述异常。

8ehkhllq

8ehkhllq1#

首先,一个生产者会使用一个 Serializer,而你的stacktrace说是反序列化器。除非你的键是Avro,否则你不需要一个Avro模式来发送空值到一个主题中。使用ByteArraySerializer,然后简单地发送空值...
如果传入的记录键/值为空,它应该return null,而不是显式抛出错误
https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala#L29
与Confluent实施进行比较

相关问题