Dump dictionary,其中字段包含已编码的JSON字符串

rryofs0p  于 12个月前  发布在  其他
关注(0)|答案(1)|浏览(117)

我正在使用avro模式对Dictionary进行JSON编码,使其成为字符串化JSON:

import json
from datetime import datetime

import fastavro

message = {
    "name": "any",
    "ingestion_ts": datetime.utcnow(),
    "values": {
        "amount": 5,
        "countries": ["se", "nl"],
        "source": {
            "name": "web",
            "url": "whatever"
        }
    }
}
avro_schema = "" # import avro schema
fo = StringIO()
fastavro.json_writer(fo, avro_schema, [message])
message_str = fo.getvalue()

字符串
这就像预期的那样工作,并返回一个符合所提供模式的JSON编码字符串,看起来像这样:

'{"name": "any", "ingestion_ts": 1703192665965373, "values": {"amount": 5, "countries": ["se", "nl"], "source": {"name": "web", "url": "whatever"}}}'


然而,现在我需要将这个str Package 在payload键上的字典中,并将其发布到期望此模式的消息队列中:

{
  "type": "record",
  "namespace": "CDCEvent",
  "name": "CDCEvent",
  "fields": [
    {
      "doc": "The system that generated the event",
      "type": "string",
      "name": "sys"
    },
    {
      "doc": "The operation performed on the event",
      "type": "string",
      "name": "op"
    },
    {
      "doc": "The content of the event",
      "type": "string",
      "name": "payload"
    }
  ]
}


所以我就这样做:

wrap = {
    "sys": "my_system",
    "op": "c",
    "payload": message_str
}
wrap_str = json.dumps(wrap)


问题是,由于message_str之前已经被编码过了,当我通过调用json.dumps再次编码wrap时,有效载荷再次被编码,其中的双引号被转义,消费者无法正确解码消息:

'{"sys": "my_system", "op": "c", "payload": "{\\"name\\": \\"any\\", \\"ingestion_ts\\": 1703192665965373, \\"values\\": {\\"amount\\": 5, \\"countries\\": [\\"se\\", \\"nl\\"], \\"source\\": {\\"name\\": \\"web\\", \\"url\\": \\"whatever\\"}}}"}'


如何避免这种双重编码?我一直在尝试重新设计解决方案,但也许我已经被混淆了,我错过了一个非常简单的方法。请记住,队列需要payload字段的str

vawmfj5a

vawmfj5a1#

当你说消费者无法解码消息时,他们是如何解码的?如果他们从你的例子中获取wrap_str并应用以下转换,他们将获得原始消息:

wrapped_object = json.loads(wrap_str)

for record in fastavro.json_reader(StringIO(wrapped_object["payload"]), avro_schema):
    print(record)

字符串

相关问题