我正在使用 Spark 2.4
与 Hive Warehouse Connector
以及 Scala 2.11
. hortonworks提供的当前Hive仓库连接器与spark 2.4不兼容。所以我从https://github.com/abh1sh2k/spark-llap/pull/1/files 与spark 2.4兼容。
我的spark应用程序从kafka输入流中读取数据,并使用hivewarehouse连接器提供的配置单元输出流写入配置单元表(orc格式)。
这是我的spark代码(scala):
package example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.functions.from_confluent_avro
object NormalizedEventsToHive extends Logging {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("NormalizedEventsToHive")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema_registry_config = Map(
"schema.registry.url" -> "http://schema-registry:8081",
"value.schema.naming.strategy" -> "topic.name",
"schema.registry.topic" -> "events-v1",
"value.schema.id" -> "latest"
)
val input_stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "events-v1")
.load()
val data = input_stream_df
.select(from_confluent_avro(col("value"), schema_registry_config) as 'data)
.select("data.*")
val output_stream_df = data.writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
.option("database", "default")
.option("table", "events")
.option("checkpointLocation", "file:///checkpoint2")
.option("metastoreUri", "thrift://hive-metastore:9083")
.start()
output_stream_df.awaitTermination()
}
}
输入 Kafka messages
是 AVRO encoded
以及 Confluent Schema Registry
用于架构版本控制。 za.co.absa.abris.avro.functions.from_confluent_avro
用于解码avro编码的Kafka消息。
以下是avro模式:
{
"type": "record",
"name": "events",
"fields": [
{ "name": "id", "type": ["null", "string"], "default": null },
.....
{ "name": "field_map", "type": ["null", { "type": "map", "values": ["null", "string"] }], "default": null },
{ "name": "field_array", "type": ["null", { "type": "array", "items": "string" }], "default": null },
{ "name": "field_array_of_map", "type": ["null", { "type": "array", "items": { "type": "map", "values": ["null", "string"] }}], "default": null }
]
}
这个 events
配置单元表(orc格式)创建为:
CREATE TABLE `events`(
`id` string,
......
`field_map` map<string,string>,
`field_array` array<string>,
`field_array_of_map` array<map<string,string>>
)
CLUSTERED BY(id) INTO 9 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
田野里 array<string>, map<string, string>, array<map<string, string>>
类型,它们错误地保存在配置单元表中。
当选择查询以直线形式发出时,它们显示:
field_map {"org.apache.spark.sql.catalyst.expressions.UnsafeMapData@101c5674":null}
field_array ["org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@6b5730c2"]
field_array_of_map [{"org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@ca82f1a4":null}]
从https://github.com/hortonworks-spark/spark-llap,它提到 Array
类型受支持,但 Map
不是。你知道怎么省钱吗 Array
正确地?有什么解决办法吗 Map
类型?
1条答案
按热度按时间lf3rwulv1#
在hwc github存储库pull请求上发现的更改在结构化流中对我有效。
我所做的:
克隆@massoudm分支
在我运行的项目根目录中
sbt assembly
我使用了新创建的hwcjar我的代码:
最重要的是:
这是pull请求的链接。