我正在使用flink(1.7)kafka客户端和avro4s(2.0.4),我想序列化到字节数组:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {
override def serialize(element: IN): Array[Byte] = {
val str = AvroSchema[IN]
val schema: Schema = new Parser().parse(str.toString)
val out = new ByteArrayOutputStream()
val os = AvroOutputStream.data[IN].to(out).build(schema)
os.write(element)
out.close()
out.flush()
os.flush()
os.close()
out.toByteArray
}
}
然而,我不断得到这个例外:
Error:(15, 35) could not find implicit value for evidence parameter of type com.sksamuel.avro4s.Encoder[IN]
val os = AvroOutputStream.data[IN].to(out).build(schema)
以及
Error:(15, 35) not enough arguments for method data: (implicit evidence$3: com.sksamuel.avro4s.Encoder[IN])com.sksamuel.avro4s.AvroOutputStreamBuilder[IN].
Unspecified value parameter evidence$3.
val os = AvroOutputStream.data[IN].to(out).build(schema)
2条答案
按热度按时间lmvvr0a81#
根据规范
IN
必须是Encoder
类型:所以应该是这样的:
zpgglvta2#
你不需要使用
FromRecord
写入输出流时。这是为那些想拥有GenericRecord
供自己使用。你需要使用Encoder
.