cassandra 如何使用Spark DataFrames查询JSON数据列?

eeq64g8w  于 2022-11-05  发布在  Cassandra
关注(0)|答案(5)|浏览(149)

我有一张 cassandra 表,简单来说看起来像这样:

key: text
jsonData: text
blobData: blob

我可以使用spark和spark-cassandra-connector创建一个基本的数据框架:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

但是我很难将JSON数据扩展到它的底层结构中。我最终希望能够基于json字符串中的属性进行过滤,并返回blob数据。类似于jsonData.foo =“bar”并返回blobData。目前这是可能的吗?

mf98qq94

mf98qq941#

我使用以下
(从2.2.0开始可用,我假设您的json字符串列位于列索引0)

def parse(df: DataFrame, spark: SparkSession): DataFrame = {
    val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
    spark.read.json(stringDf)
}

它会自动推断JSON中的模式。https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

7kjnsjlb

7kjnsjlb2#

Spark〉= 2.4

如果需要,可以使用schema_of_json函数确定模式(请注意,这假定任意一行是模式的有效代表)。

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

Spark〉= 2.1

可以使用from_json函数:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

Spark〉= 1.6

您可以使用get_json_object,它包含一个列和一个路径:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

并将字段提取到可进一步转换为预期类型的各个字符串。
path参数使用点语法表示,前导$.表示文档根(由于上面的代码使用字符串插值,因此$必须转义,因此$$.)。

Spark〈= 1.5

这在目前是否可行?
据我所知,这是不可能的。你可以尝试类似的做法:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我假设blob字段不能用JSON表示,否则可以省略拆分和连接:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

另一种方法(虽然更复杂,但成本更低)是使用UDF解析JSON并输出structmap列。

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)
x6492ojm

x6492ojm3#

zero323's answer是全面的,但缺少Spark 2.1+中可用的一种方法,该方法比使用schema_of_json()更简单、更健壮:

import org.apache.spark.sql.functions.from_json

val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))

下面是Python的等价物:

from pyspark.sql.functions import from_json

json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))

正如zero 323所指出的,schema_of_json()的问题在于它只检查一个字符串,并从中派生出一个模式。如果JSON数据具有不同的模式,那么从schema_of_json()返回的模式将不会反映出在DataFrame中合并所有JSON数据的模式时所得到的结果。使用from_json()解析该数据将产生大量null或空值,其中schema_of_json()返回的模式与数据不匹配。
通过使用Spark从JSON字符串的RDD中导出全面的JSON模式的能力,我们可以保证所有JSON数据都可以被解析。

示例:schema_of_json()spark.read.json()

这里有一个例子(在Python中,Scala的代码非常类似),用来说明使用schema_of_json()从单个元素派生模式和使用spark.read.json()从所有数据派生模式之间的区别。

>>> df = spark.createDataFrame(
...     [
...         (1, '{"a": true}'),
...         (2, '{"a": "hello"}'),
...         (3, '{"b": 22}'),
...     ],
...     schema=['id', 'jsonData'],
... )

a在一个数据列中有布林值,而在另一个数据列中有字串值。a的合并纲要会将其类型设定为字串。b会是整数。
让我们来比较一下不同的方法。首先,schema_of_json()方法:

>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: boolean (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1|  [true]|
|  2|    null|
|  3|      []|
+---+--------+

正如您所看到的,我们派生的JSON模式非常有限。"a": "hello"不能被解析为布尔值并返回null,而"b": 22只是被丢弃,因为它不在我们的模式中。
现在使用spark.read.json()

>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: long (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1| [true,]|
|  2|[hello,]|
|  3|  [, 22]|
+---+--------+

在这里,我们保留了所有数据,并有一个全面的模式来解释所有数据。"a": true被转换为一个字符串,以匹配"a": "hello"的模式。
使用spark.read.json()的主要缺点是Spark将扫描所有数据以导出模式。根据数据量的不同,开销可能会很大。如果您 * 知道 * 所有JSON数据都具有一致的模式,则可以继续对单个元素使用schema_of_json()。如果您具有模式可变性,但不想扫描所有数据,您可以在调用spark.read.json()时将samplingRatio设置为小于1.0的值以查看数据的子集。
以下是spark.read.json()的文档:Scala API / Python API

gstyhher

gstyhher4#

from_json函数正是您要查找的函数。您的代码将如下所示:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

//You can define whatever struct type that your json states
val schema = StructType(Seq(
  StructField("key", StringType, true), 
  StructField("value", DoubleType, true)
))

df.withColumn("jsonData", from_json(col("jsonData"), schema))
oprakyz7

oprakyz75#

基础JSON字符串为

"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";

下面是过滤JSON并将所需数据加载到Cassandra中的脚本。

sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
            .write.format("org.apache.spark.sql.cassandra")
            .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
            .mode(SaveMode.Append)
            .save()

相关问题