我有一个aws物联网规则,它将传入的json发送到kinesis firehose。
我的物联网发布的json数据都在一行上-例如:
{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"}
管理ui中的iot“测试”部分允许您发布消息,默认为以下内容(注解格式的多行json):
{
"message": "Hello from AWS IoT console"
}
我将消防水龙带传输到s3,然后由emr将其转换为一种列格式,最终供雅典娜使用。
问题是,在转换为列格式的过程中,hive(特别是json serde)无法处理跨多行的json对象。它会破坏转换,而不会转换好的、单行的json记录。
我的问题是:
如何设置firehose以忽略多行json?
如果不可能,如何告诉配置单元在加载到表之前删除换行符,或者至少捕获异常并尝试继续?
在定义配置单元表时,我已经尝试忽略格式错误的json:
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,
dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
'ignore.malformed.json' = 'true',
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://...';
下面是我的完整hql,它进行转换:
--Example of converting to OEX/columnar formats
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,
dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
'ignore.malformed.json' = 'true',
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://bucket.me.com/raw/all-sites/';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17';
DROP TABLE to_orc;
CREATE EXTERNAL TABLE to_orc (
count int,
dateTime8601 timestamp
)
STORED AS ORC
LOCATION 's3://bucket.me.com/orc'
TBLPROPERTIES ("orc.compress"="ZLIB");
INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15;
1条答案
按热度按时间bzzcjhmw1#
嗯,emr和athena上使用的默认json serde不能处理多行json记录。每个json记录应该在一行上。
在多行json上,我从hive/hadoop甚至presto(在athean中使用)的Angular 看到了两个问题
给定一个文件,hive/hadoop和json serde显然无法识别json记录的结束和开始来返回其对象表示。
给定多个文件,多行json文件不像普通的/n分隔的json文件那样是可拆分的。
为了从emr/athena端绕过这个问题,您需要根据您的数据结构编写自己的自定义serde,并捕获异常等。
如何设置firehose以忽略多行json?
firehose无法忽略特定格式。它将使用其api(putrecord或putrecordbatch)作为数据blob放入任何内容,并将其发送到目标。
http://docs.aws.amazon.com/firehose/latest/apireference/api_putrecordbatch.html
不管怎样,aws firehose提供了带有aws lambda的数据转换,您可以使用lambda函数转换数据firehose上的传入数据,并将转换后的数据放在目标位置。因此,您可以使用该特性在手之前识别并展平多行json。如果记录格式不正确,您也可能会删除这些记录等。您将需要了解iot如何将多行json数据发送到firehose(如逐行发送等)以编写您自己的函数。
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
如果不可能,如何告诉配置单元在加载到表之前删除换行符,或者至少捕获异常并尝试继续?
如果firehose目标中仍然有多行json,那么由于etl中有emr,因此可以使用它的compute而不是lambda来展平json。spark上的此功能也可以帮助您实现这一点。https://issues.apache.org/jira/browse/spark-18352
然后您可以接收这些数据,创建一个列格式,供雅典娜处理。