我正在使用avro模式对Dictionary进行JSON编码,使其成为字符串化JSON:
import json
from datetime import datetime
import fastavro
message = {
"name": "any",
"ingestion_ts": datetime.utcnow(),
"values": {
"amount": 5,
"countries": ["se", "nl"],
"source": {
"name": "web",
"url": "whatever"
}
}
}
avro_schema = "" # import avro schema
fo = StringIO()
fastavro.json_writer(fo, avro_schema, [message])
message_str = fo.getvalue()
字符串
这就像预期的那样工作,并返回一个符合所提供模式的JSON编码字符串,看起来像这样:
'{"name": "any", "ingestion_ts": 1703192665965373, "values": {"amount": 5, "countries": ["se", "nl"], "source": {"name": "web", "url": "whatever"}}}'
型
然而,现在我需要将这个str Package 在payload
键上的字典中,并将其发布到期望此模式的消息队列中:
{
"type": "record",
"namespace": "CDCEvent",
"name": "CDCEvent",
"fields": [
{
"doc": "The system that generated the event",
"type": "string",
"name": "sys"
},
{
"doc": "The operation performed on the event",
"type": "string",
"name": "op"
},
{
"doc": "The content of the event",
"type": "string",
"name": "payload"
}
]
}
型
所以我就这样做:
wrap = {
"sys": "my_system",
"op": "c",
"payload": message_str
}
wrap_str = json.dumps(wrap)
型
问题是,由于message_str
之前已经被编码过了,当我通过调用json.dumps
再次编码wrap时,有效载荷再次被编码,其中的双引号被转义,消费者无法正确解码消息:
'{"sys": "my_system", "op": "c", "payload": "{\\"name\\": \\"any\\", \\"ingestion_ts\\": 1703192665965373, \\"values\\": {\\"amount\\": 5, \\"countries\\": [\\"se\\", \\"nl\\"], \\"source\\": {\\"name\\": \\"web\\", \\"url\\": \\"whatever\\"}}}"}'
型
如何避免这种双重编码?我一直在尝试重新设计解决方案,但也许我已经被混淆了,我错过了一个非常简单的方法。请记住,队列需要payload
字段的str
1条答案
按热度按时间vawmfj5a1#
当你说消费者无法解码消息时,他们是如何解码的?如果他们从你的例子中获取
wrap_str
并应用以下转换,他们将获得原始消息:字符串