rust 如何使用serde_json构建一个有状态的流解析器?

c9qzyr3d  于 2023-04-06  发布在  其他
关注(0)|答案(1)|浏览(135)

我试图用serdeserde_json做一些有状态的JSON解析。我从检查How to pass options to Rust's serde that can be accessed in Deserialize::deserialize()?开始,虽然我几乎得到了我需要的东西,但我似乎缺少了一些关键的东西。
我想做的是双重的:
1.我的JSON非常大--太大了,不能只把输入读入内存--所以我需要流处理它。(顺便说一句,它还有很多嵌套层,所以我需要使用disable_recursion_limit
1.我需要一些有状态的处理,在这里我可以将一些数据传递给序列化器,这将影响从输入JSON中保留哪些数据以及在序列化过程中如何转换这些数据。
例如,我的输入可能看起来像这样:

{ "documents": [
    { "foo": 1 },
    { "baz": true },
    { "bar": null }
    ],
    "journal": { "timestamp": "2023-04-04T08:28:00" }
}

这里,documents数组中的每个对象都非常大,我只需要它们的一个子集。不幸的是,我需要首先找到键值对"documents",然后我需要访问该数组中的每个元素。现在,我不关心其他键值对(例如"journal"),但这可能会改变。
我目前的做法如下:

use serde::de::DeserializeSeed;
use serde_json::Value;

/// A simplified state passed to and returned from the serialization.
#[derive(Debug, Default)]
struct Stats {
    records_skipped: usize,
}

/// Models the input data; `Documents` is just a vector of JSON values,
/// but it is its own type to allow custom deserialization
#[derive(Debug)]
struct MyData {
    documents: Vec<Value>,
    journal: Value,
}

struct MyDataDeserializer<'a> {
    state: &'a mut Stats,
}

/// Top-level seeded deserializer only so I can plumb the state through
impl<'de> DeserializeSeed<'de> for MyDataDeserializer<'_> {
    type Value = MyData;

    fn deserialize<D>(mut self, deserializer: D) -> Result<Self::Value, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let visitor = MyDataVisitor(&mut self.state);
        let docs = deserializer.deserialize_map(visitor)?;
        Ok(docs)
    }
}

struct MyDataVisitor<'a>(&'a mut Stats);

impl<'de> serde::de::Visitor<'de> for MyDataVisitor<'_> {
    type Value = MyData;

    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(formatter, "a map")
    }

    fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
    where
        A: serde::de::MapAccess<'de>,
    {
        let mut documents = Vec::new();
        let mut journal = Value::Null;

        while let Some(key) = map.next_key::<String>()? {
            println!("Got key = {key}");
            match &key[..] {
                "documents" => {
                    // Not sure how to handle the next value in a streaming manner
                    documents = map.next_value()?;
                }

                "journal" => journal = map.next_value()?,
                _ => panic!("Unexpected key '{key}'"),
            }
        }

        Ok(MyData { documents, journal })
    }
}

struct DocumentDeserializer<'a> {
    state: &'a mut Stats,
}

impl<'de> DeserializeSeed<'de> for DocumentDeserializer<'_> {
    type Value = Vec<Value>;

    fn deserialize<D>(mut self, deserializer: D) -> Result<Self::Value, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let visitor = DocumentVisitor(&mut self.state);
        let documents = deserializer.deserialize_seq(visitor)?;
        Ok(documents)
    }
}

struct DocumentVisitor<'a>(&'a mut Stats);

impl<'de> serde::de::Visitor<'de> for DocumentVisitor<'_> {
    type Value = Vec<Value>;

    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(formatter, "a list")
    }

    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
    where
        A: serde::de::SeqAccess<'de>,
    {
        let mut agg_map = serde_json::Map::new();

        while let Some(item) = seq.next_element()? {
            // If `item` isn't a JSON object, we'll skip it:
            let Value::Object(map) = item else { continue };

            // Get the first element, assuming we have some
            let (k, v) = match map.into_iter().next() {
                Some(kv) => kv,
                None => continue,
            };

            // Ignore any null values; aggregate everything into a single map
            if v == Value::Null {
                self.0.records_skipped += 1;
                continue;
            } else {
                println!("Keeping {k}={v}");
                agg_map.insert(k, v);
            }
        }
        let values = Value::Object(agg_map);
        println!("Final value is {values}");

        Ok(vec![values])
    }
}

fn main() {
    let fh = std::fs::File::open("input.json").unwrap();
    let buf = std::io::BufReader::new(fh);
    let read = serde_json::de::IoRead::new(buf);

    let mut state = Stats::default();
    let mut deserializer = serde_json::Deserializer::new(read);

    let mydata = MyDataDeserializer { state: &mut state }
        .deserialize(&mut deserializer)
        .unwrap();

    println!("{mydata:?}");
}

这段代码成功运行,并正确地反序列化了我的输入数据。问题是我不知道如何一次一个元素地流式传输'documents'数组。我不知道如何将documents = map.next_value()?;更改为将状态传递给DocumentDeserializer的东西。它应该 * 也许 * 使用如下内容:

let d = DocumentDeserializer { state: self.0 }
    .deserialize(&mut map)
    .unwrap();

但是.deserialize需要serde::Deserializer<'de>,但是mapserde::de::MapAccess<'de>
无论如何,这整个事情似乎过于冗长,所以如果这不是普遍接受或惯用的,我愿意接受另一种方法。

mctunoxg

mctunoxg1#

你的问题是很好的研究,所以我有点怀疑的解决方案可以这么简单,但你不只是想

"documents" => {
    documents = map.next_value_seed(DocumentDeserializer { self.0 })?;
}

Playground
(就我个人而言,我不会把这些东西命名为…Deserializer,也许是…Seed?)

相关问题