我使用spark从cassandra读取行,其中一列是json对象。以下是数据集中的架构和示例行:
root
|-- attributes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- count: integer (nullable = true)
| | |-- score: double (nullable = true)
| | |-- id: integer (nullable = true)
|-- id: string (nullable = true)
|-- col1: string (nullable = true)
+--------------------------------------------------------------------------------+--------+---------+
|attributes |id |col1 |
+--------------------------------------------------------------------------------+--------+---------+
|[[usage,2,5.0,12], [price,1,10.0,48], [hair,1,10.0,23737], [curls,1,10.0,30807]]|19400335|val_str_1|
+--------------------------------------------------------------------------------+--------+---------+
使用以下架构从表中加载:
ArrayType schema =
DataTypes.createArrayType(
new StructType(
new StructField[] {
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("count", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("score", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("id", DataTypes.IntegerType, true, Metadata.empty())}));
是否可以将新字段注入到 attributes
列-将值从 id
列并将值从 col1
结构中的列?
1条答案
按热度按时间1tu0hz3e1#
你可以试着用
transform
函数更新数组中的每个结构元素:对于spark<2.4,可以分解数组列,然后更新groupby以再次收集列表: