如何从apachenifi生成kafka主题中的avro消息,然后使用kafka流读取它?

eiee3dmh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(584)

我想使用apachenifi将一些通用数据生成kafka主题,并且我希望这些数据是avro格式的。我为此所做的:
在架构注册表中创建新架构:
{“type”:“record”,“name”:“my\u schema”,“namespace”:“my\u namespace”,“doc”:“,”fields”:[{“name”:“key”,“type”:“int”},{“name”:“value”,“type”:[“null”,“int”]},{“name”:“event\u time”,“type”:“long”}}
创建简单的nifi管道:

convertavroschema设置:

发布Kafka录制设置:

avroreader设置:

avrorecordsetwriter设置:

然后我试着用Kafka流来读:
public class test{private final static logger logger=logger.getlogger(kafkafilterusingcacheavro.class);

public static void main(String[] args) {
    Properties properties = new Properties();

    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, GenericRecord> source = builder.stream("topic");
    source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, properties);
    streams.start();
}

}
一般顺序-https://github.com/johnreedlol/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/genericavroserde.java
结果我得到了错误:
原因:org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错原因:org.apache.kafka.common.errors.serializationexception:未知的魔法字节!
我还尝试在avroreader\writer中显式设置avro模式,但没有帮助。另外,如果我尝试简单地从主题中读取字节并将其转换为字符串表示形式,我会得到如下结果:
objavro.schema{“type”:“record”,“name”:“my\u schema”,“namespace”:“my\u namespace”,“doc”:“,”fields“:[{“name”:“key”,“type”:“int”},{“name”:“value”,“type”:[“null”,“int”]},{“name”:“event\u time”,“type”:“long”}avro.codesnappyû4ý米[©qãàg0级ê¸ä»/}½{û4ý米[©qãàg0级
我该怎么修?

jvlzgdj9

jvlzgdj91#

在publishkafka处理器中,您的avro编写器配置了“嵌入式avro模式”的“模式写入策略”。这意味着写入kafka的消息是嵌入了完整模式的标准avro消息。
在用户端(kafka streams),它似乎希望使用合流模式注册表,在这种情况下,它不希望使用嵌入的avro模式,而是希望使用一个指定模式id的特殊字节序列,后跟裸avro消息。
假设您希望保持您的消费者的原样,那么在nifi方面,您将希望将avro编写器的“模式写入策略”更改为“合流模式注册表引用”。我认为这可能还需要您更改avro读取器,以便使用合流模式注册服务访问模式。
或者,也许有一种方法可以让kafka流读取嵌入的模式,而不使用合流模式注册表,但是我以前没有使用过kafka流,所以我不能说这是否可行。

相关问题