序列化代码(Go lang)
1.制作人
func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
producerConfig := getKafkaProducerConfig(config.EnvConfig)
producer, err := confluent_kafka.NewProducer(producerConfig)
if err != nil {
log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
log.Panicf("Unable to create Kafka Producer")
}
client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
if err != nil {
log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
log.Panicf("Unable to create Kafka Client")
}
serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
if err != nil {
log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
log.Panicf("Unable to create Kafka Serializer")
}
KafkaProducerInstance = &KafkaProducer{
producer: producer,
serializer: serializer,
}
log.Info("Created Kafka Producer and Serializer")
}
字符串
2.发送Kafka消息
func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
deliveryChan := make(chan confluent_kafka.Event)
payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
if err != nil {
log.Errorf("Failed to serialize payload: %v\n", err)
close(deliveryChan)
return
}
err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
Value: payload,
}, deliveryChan)
if err != nil {
log.Errorf("Failed to Produce: %v\n", err)
close(deliveryChan)
return
}
e := <-deliveryChan
m := e.(*confluent_kafka.Message)
if m.TopicPartition.Error != nil {
log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
close(deliveryChan)
return
} else {
log.Infof("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
}
型
尝试消费meesage(Diff,Python中的Application)
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2 # replace with your generated module name
from google.protobuf.message import DecodeError
# Kafka consumer configuration
conf = {
'bootstrap.servers': "localhost:9092/v3/", # Replace with your Kafka server address
'group.id': "myGroup",
'auto.offset.reset': 'earliest'
}
# Create a consumer instance
consumer = Consumer(conf)
# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError
# Kafka consumer configuration
conf = {
'bootstrap.servers': "localhost:9092/v3/",
'group.id': "myGroup",
'auto.offset.reset': 'earliest'
}
# Create a consumer instance
consumer = Consumer(conf)
# Subscribe to a topic
consumer.subscribe(['diagnosis'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(msg.error())
break
# Deserialize the message
try:
data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest()
data.ParseFromString(msg.value())
except DecodeError as e:
print(f"Error parsing message: {e}")
print(f"Raw message data: {msg.value()}")
print("Received message: ", data)
except KeyboardInterrupt:
pass
finally:
consumer.close()
型
错误
第一个月
我试图调试它,但无法。
1.两个应用程序中的proto文件是相同的
1.我使用proton
来生成pb 2文件。
感谢你的帮助。
谢谢你
我可以得到原始格式的消息:
原始格式消息。
Raw message data: b'\x00\x00\x00\x00\x02\x02\x08\n$1775100a-1a47-48b2-93b7-b7a331be59b4\x12\tcompleted'
个
- 我试过使用UTF-8解码,但失败了,因为不是所有的字段都被读取。
print(" Decode 1: ", dict_str)
print("Decode 2: ", ast.literal_eval(dict_str))
型
以上代码的输出:
Unparsed Message: b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
Decode 1:
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e completed
Inner Exception Here source code string cannot contain null bytes
型
2条答案
按热度按时间6pp0gazn1#
从错误消息RE source code cannot contain null bytes来看,我假设问题出在你的python代码所在的文件上,而不是代码本身。
试试这个,从你的命令提示符:
第一个月
将文件名替换为源代码的文件名,并对每个python文件重复此操作。
这将删除源文件中的所有空字节。
woobm2wo2#
你的Go客户端正在使用Schema Registry进行序列化,这意味着你的Python代码必须做同样的事情。这些记录不仅仅是Protobuf,因为还有一个模式ID也编码在字节中,因此常规的Protobuf解析器将失败。
在repo中有使用Protobuf与Registry集成的示例代码
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/protobuf_consumer.py