用Akka Stream流式传输巨大的json

oknwwptz  于 2022-11-06  发布在  其他
关注(0)|答案(3)|浏览(160)

我遇到了一个问题,一个json slab的http响应很大,只有一部分是兴趣点。

{
  "searchString": "search",
  "redirectUrl": "",
  "0": {
    "numRecords": 123,
    "refinementViewModelCollector": {},
//    Lots of data here
    "results": [
      {
        "productCode": "123",
        "productShortDescription": "Desc",
        "brand": "Brand",
        "productReview": {
          "reviewScore": 0
        },
        "priceView": {
          "salePriceDisplayable": false,
        },
        "productImageUrl": "url",
        "alternateImageUrls": [
          "url1"
        ],
        "largeProductImageUrl": "url4",
        "videoUrl": ""
      },
      {
        "productCode": "124",
        "productShortDescription": "Desc",
        "brand": "Brand",
        "productReview": {
          "reviewScore": 0
         },
        "priceView": {
          "salePriceDisplayable": false,
        },
        "preOrder": false,
        "productImageUrl": "url",
        "alternateImageUrls": [
          "url1"
        ],
        "largeProductImageUrl": "url4",
        "videoUrl": ""
      }
    ]
    //lots of data here
  }
}

我感兴趣的是results Jason Array中的条目,但这些条目位于json的中间
我创建了一个小型的Play WS客户端,如下所示:

val wsClient: WSClient = ???
val ret = wsClient.url("url").stream()
ret.flatMap { response =>
  response.body.via(JsonFraming.objectScanner(1024))
    .map(_.utf8String)
    .runWith(Sink.foreach(println))
}

这是行不通的,因为它会把整个json板当作Json对象。我需要跳过一些数据,直到"results":条目出现在流中,然后开始解析条目并跳过所有其余的。有什么想法如何做到这一点?

ltqd579y

ltqd579y1#

看看Alpakka的JSON模块,它可以流传输嵌套JSON结构的特定部分:

response
  .body
  .via(JsonReader.select("$.0.results[*]"))
  .map(_.utf8String)
  .runWith(Sink.foreach(println)) // or runForeach(println)
qvk1mo1f

qvk1mo1f2#

有一些解析器支持以流的形式解析。

kyxcudwk

kyxcudwk3#

对于这个问题,我希望有一个更好的、特定于Scala的答案,但是请查看Google GSON库文档中的“混合读取示例”:
https://sites.google.com/site/gson/streaming
Gson还支持混合的流和对象模型访问,这让你的应用程序同时拥有两个世界的优点:对象模型访问的生产率和流的效率......这段代码读取一个包含消息数组的JSON文档。它以流的形式遍历数组元素,以避免将整个文档加载到内存中。它之所以简洁,是因为它使用Gson的对象模型来解析单个消息
这应该具有很好的内存性能(代码从Java InputStream读取,因此整个结构永远不会在内存中),但可能需要一些工作才能将结果放入Scala case类中。

相关问题