我正在尝试使用Scala(v2.13.10)和FS 2 Kafka库(v3.0.0-M8)以及Vulcan模块,使用Avro模式为压缩的Kafka主题生成墓碑消息。
该应用从主题A消费,并为匹配某个条件的值生成到同一主题A的逻辑删除。
示例片段
val producerSettings =
ProducerSettings(
keySerializer = keySerializer,
valueSerializer = Serializer.unit[IO]
).withBootstrapServers("localhost:9092")
def processRecord(committableRecord: CommittableConsumerRecord[IO, KeySchema, ValueSchema]
, producer: KafkaProducer.Metrics[IO, KeySchema, Unit]
): IO[CommittableOffset[IO]] = {
val key = committableRecord.record.key
val value = committableRecord.record.value
if(value.filterColumn.field1 == "<removable>") {
val tombStone = ProducerRecord(committableRecord.record.topic, key, ())
val producerRecord: ProducerRecords[CommittableOffset[IO], KeySchema, Unit] = ProducerRecords.one(tombStone, committableRecord.offset)
producer.produce(producerRecord).flatten.map(_.flatMap(a => {
IO(a.passthrough)
}))
}
else
IO(committableRecord.offset)
}
如果我生成一个有效的键值消息,上面的代码段可以正常工作。但是,当我试图生成一个null/empty消息时,我得到了下面的错误:
java.lang.IllegalArgumentException: Invalid Avro record: bytes is null or empty
at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$4(AvroDeserializer.scala:32)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:184)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:265)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$26(KafkaConsumer.scala:267)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
Avro模式示例:
{
"type": "record",
"name": "SampleOrder",
"namespace": "com.myschema.global",
"fields": [
{
"name": "cust_id",
"type": "int"
},
{
"name": "month",
"type": "int"
},
{
"name": "expenses",
"type": "double"
},
{
"name": "filterColumn",
"type": {
"type": "record",
"name": "filterColumn",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "field1",
"type": "string"
}
]
}
}
]
}
先谢谢你。
我已经尝试了不同的序列化器为生产者,但都导致相同的上述异常。
1条答案
按热度按时间8ehkhllq1#
首先,一个生产者会使用一个 Serializer,而你的stacktrace说是反序列化器。除非你的键是Avro,否则你不需要一个Avro模式来发送空值到一个主题中。使用ByteArraySerializer,然后简单地发送空值...
如果传入的记录键/值为空,它应该
return null
,而不是显式抛出错误https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala#L29
与Confluent实施进行比较