druid集群和hive/hadoop集群单独运行正常。我们正在hive中创建一个表,用于从druid读取数据(用于etl),但是,在初始测试中,我们发现我们无法执行简单的操作 SELECT *
然后出现以下错误:
hive> select * from druid_hive_table;
OK
druid_hive_table.__time druid_hive_table.op_ts druid_hive_table.op_type druid_hive_table.pos druid_hive_table.table
Failed with exception java.io.IOException:org.apache.hive.druid.com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.util.ArrayList out of START_OBJECT token
at [Source: org.apache.hive.druid.com.metamx.http.client.io.AppendableByteArrayInputStream@656c5818; line: -1, column: 4]
Time taken: 0.449 seconds
然而,一个 SELECT COUNT(*)
很好用!
hive> select count(*) from druid_hive_table;
OK
$f0
21409
Time taken: 0.199 seconds, Fetched: 1 row(s)
规格:
Druid外桌
SET hive.druid.broker.address.default=<host>:8082;
CREATE EXTERNAL TABLE druid_hive_table
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "druid_datasource_name");
hive> DESCRIBE FORMATTED druid_hive_table;
OK
col_name data_type comment
# col_name data_type comment
__time timestamp from deserializer
op_ts string from deserializer
op_type string from deserializer
pos string from deserializer
table string from deserializer
# Detailed Table Information
Database: tests
Owner: OWNER
CreateTime: Mon Feb 10 13:52:13 UTC 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: <LOCATION>
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"}
EXTERNAL TRUE
druid.datasource druid_datasource_name
numFiles 0
numRows 0
rawDataSize 0
storage_handler org.apache.hadoop.hive.druid.DruidStorageHandler
totalSize 0
transient_lastDdlTime 1581342733
# Storage Information
SerDe Library: org.apache.hadoop.hive.druid.serde.DruidSerDe
InputFormat: null
OutputFormat: null
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.144 seconds, Fetched: 37 row(s)
供参考-Druid主管规范:
{
"dataSchema": {
"dataSource": "druid_datasource_name",
"timestampSpec": {
"column": "current_ts",
"format": "iso",
"missingValue": null
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"current_ts"
]
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"ioConfig": {
"topic": "<kafka_topic>",
"inputFormat": {
"type": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": []
},
"featureSpec": {}
},
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "<bootstrap_servers>",
"group.id": "<group_name>",
"security.protocol": "SASL_SSL",
"ssl.truststore.location": "<location>",
"ssl.truststore.password": "<pass>",
"sasl.jaas.config": "<config>",
"sasl.mechanism": "SCRAM-SHA-512"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": true,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "<kafka_topic>",
"useEarliestSequenceNumber": true,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/opt/apache-druid-0.17.0/var/tmp/druid-realtime-persist7801461398656096281",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
},
"type": "kafka"
}
谢谢你帮助解决这个问题。
1条答案
按热度按时间im9ewurl1#
我设法解决了这个问题。将hive和hadoop更新到版本3+解决了这个问题。
使用以下代码就像给一块面包涂黄油一样简单: