AWS Athena将结构体数组导出到JSON

bybem2ql  于 2023-01-14  发布在  其他
关注(0)|答案(6)|浏览(130)

我有一个Athena表,其中一些字段具有相当复杂的嵌套格式。S3中的支持记录是JSON。沿着以下方式(但我们有更多的嵌套级别):

CREATE EXTERNAL TABLE IF NOT EXISTS test (
  timestamp double,
  stats array<struct<time:double, mean:double, var:double>>,
  dets array<struct<coords: array<double>, header:struct<frame:int, 
    seq:int, name:string>>>,
  pos struct<x:double, y:double, theta:double>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('ignore.malformed.json'='true')
LOCATION 's3://test-bucket/test-folder/'

现在我们需要能够查询数据并将结果导入Python进行分析。由于安全限制,我不能直接连接到Athena;我需要能够给予别人的查询,然后他们会给我的CSV结果。
如果我们直接选择 *,那么返回的结构体/数组列的格式不完全是JSON格式,下面是一个输入文件的例子:

{"timestamp":1520640777.666096,"stats":[{"time":15,"mean":45.23,"var":0.31},{"time":19,"mean":17.315,"var":2.612}],"dets":[{"coords":[2.4,1.7,0.3], "header":{"frame":1,"seq":1,"name":"hello"}}],"pos": {"x":5,"y":1.4,"theta":0.04}}

示例输出:

select * from test

"timestamp","stats","dets","pos"
"1.520640777666096E9","[{time=15.0, mean=45.23, var=0.31}, {time=19.0, mean=17.315, var=2.612}]","[{coords=[2.4, 1.7, 0.3], header={frame=1, seq=1, name=hello}}]","{x=5.0, y=1.4, theta=0.04}"

我希望以更方便的格式导出这些嵌套字段--以JSON格式导出它们会很棒。
不幸的是,转换为JSON似乎只适用于Map,而不适用于结构体,因为它只是将所有内容扁平化为数组:

SELECT timestamp, cast(stats as JSON) as stats, cast(dets as JSON) as dets, cast(pos as JSON) as pos FROM "sampledb"."test"

"timestamp","stats","dets","pos"
"1.520640777666096E9","[[15.0,45.23,0.31],[19.0,17.315,2.612]]","[[[2.4,1.7,0.3],[1,1,""hello""]]]","[5.0,1.4,0.04]"

有没有转换成JSON(或其他易于导入的格式)的好方法,或者我应该直接执行一个自定义解析函数?

ohfgkhjo

ohfgkhjo1#

我已经浏览了所有的文档,不幸的是,到目前为止似乎还没有办法做到这一点。
converting a struct to a json when querying athena

SELECT
  my_field,
  my_field.a,
  my_field.b,
  my_field.c.d,
  my_field.c.e
FROM 
  my_table

或者我会使用后处理将数据转换为json。

#!/usr/bin/env python
import io
import re

pattern1 = re.compile(r'(?<={)([a-z]+)=', re.I)
pattern2 = re.compile(r':([a-z][^,{}. [\]]+)', re.I)
pattern3 = re.compile(r'\\"', re.I)

with io.open("test.csv") as f:
    headers = list(map(lambda f: f.strip(), f.readline().split(",")))
    for line in f.readlines():
        orig_line = line
        data = []
        for i, l in enumerate(line.split('","')):
            data.append(headers[i] + ":" + re.sub('^"|"$', "", l))

        line = "{" + ','.join(data) + "}"
        line = pattern1.sub(r'"\1":', line)
        line = pattern2.sub(r':"\1"', line)
        print(line)

输入数据的输出为

{"timestamp":1.520640777666096E9,"stats":[{"time":15.0, "mean":45.23, "var":0.31}, {"time":19.0, "mean":17.315, "var":2.612}],"dets":[{"coords":[2.4, 1.7, 0.3], "header":{"frame":1, "seq":1, "name":"hello"}}],"pos":{"x":5.0, "y":1.4, "theta":0.04}
}

哪个是有效的JSON

8ehkhllq

8ehkhllq2#

@tarun的python代码几乎让我达到了这个目标,但是由于数据的原因,我不得不在几个方面进行修改。

  • 在Athena中保存为字符串的json结构
  • 包含多个单词的字符串,因此需要放在双引号之间。其中一些字符串包含“[]”和“{}”符号。

下面是代码,为我工作,希望将是有用的其他人:

#!/usr/bin/env python
import io
import re, sys

pattern1 = re.compile(r'(?<={)([a-z]+)=', re.I)
pattern2 = re.compile(r':([a-z][^,{}. [\]]+)', re.I)
pattern3 = re.compile(r'\\"', re.I)

with io.open(sys.argv[1]) as f:
    headers = list(map(lambda f: f.strip(), f.readline().split(",")))
    print(headers)
    for line in f.readlines():

        orig_line = line
        #save the double quote cases, which mean there is a string with quotes inside
        line = re.sub('""', "#", orig_line)
        data = []
        for i, l in enumerate(line.split('","')):
            item = re.sub('^"|"$', "", l.rstrip())
            if (item[0] == "{" and item[-1] == "}") or (item[0] == "[" and item[-1] == "]"):
                data.append(headers[i] + ":" + item)
            else: #we have a string
                data.append(headers[i] + ": \"" + item + "\"")

        line = "{" + ','.join(data) + "}"
        line = pattern1.sub(r'"\1":', line)
        line = pattern2.sub(r':"\1"', line)

        #restate the double quotes to single ones, once inside the json
        line = re.sub("#", '"', line)

        print(line)
rggaifut

rggaifut3#

此方法不是通过修改查询。
对于Javascript/Nodejs,我们可以使用npm包 athena-struct-parser

详细答案及示例

https://stackoverflow.com/a/67899845/6662952
参考-https://www.npmjs.com/package/athena-struct-parser

ki1q1bka

ki1q1bka4#

我使用了一个简单的方法来绕过struct -〉json Athena的限制,我创建了第二个表,其中json列被保存为原始字符串,使用presto json和数组函数,我能够查询数据并将有效的json字符串返回给我的程序:

--Array transform functions too
select 
  json_extract_scalar(dd, '$.timestamp') as timestamp,
  transform(cast(json_extract(json_parse(dd), '$.stats') as ARRAY<JSON>), x -> json_extract_scalar(x, '$.time')) as arr_stats_time,
  transform(cast(json_extract(json_parse(dd), '$.stats') as ARRAY<JSON>), x -> json_extract_scalar(x, '$.mean')) as arr_stats_mean,
  transform(cast(json_extract(json_parse(dd), '$.stats') as ARRAY<JSON>), x -> json_extract_scalar(x, '$.var')) as arr_stats_var
from 
(select '{"timestamp":1520640777.666096,"stats":[{"time":15,"mean":45.23,"var":0.31},{"time":19,"mean":17.315,"var":2.612}],"dets":[{"coords":[2.4,1.7,0.3], "header":{"frame":1,"seq":1,"name":"hello"}}],"pos": {"x":5,"y":1.4,"theta":0.04}}' as dd);

我知道查询将花费更长的时间来执行,但有一些方法可以优化。

kmb7vmvb

kmb7vmvb5#

我使用相同的S3位置创建了第二个表,但将字段的数据类型更改为字符串,从而解决了这个问题。生成的CSV包含Athena从JSON文件中的对象提取的字符串,我能够解析结果。

u91tlkcl

u91tlkcl6#

我还必须调整@tarun代码,因为我有更复杂的数据和嵌套结构。下面是我的解决方案,希望它能有所帮助:

import re
import json
import numpy as np

pattern1 = re.compile(r'(?<=[{,\[])\s*([^{}\[\],"=]+)=')
pattern2 = re.compile(r':([^{}\[\],"]+|()(?![{\[]))')
pattern3 = re.compile(r'"null"')

def convert_metadata_to_json(value):
    if type(value) is str:
        value = pattern1.sub('"\\1":', value)
        value = pattern2.sub(': "\\1"', value)
        value = pattern3.sub('null', value)
    elif np.isnan(value):
        return None
    
    return json.loads(value)

df = pd.read_csv('test.csv')

df['metadata_json'] = df.metadata.apply(convert_metadata_to_json)

相关问题