我有一张 cassandra 表,简单来说看起来像这样:
key: text
jsonData: text
blobData: blob
我可以使用spark和spark-cassandra-connector创建一个基本的数据框架:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
但是我很难将JSON数据扩展到它的底层结构中。我最终希望能够基于json字符串中的属性进行过滤,并返回blob数据。类似于jsonData.foo =“bar”并返回blobData。目前这是可能的吗?
5条答案
按热度按时间mf98qq941#
我使用以下
(从2.2.0开始可用,我假设您的json字符串列位于列索引0)
它会自动推断JSON中的模式。https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
7kjnsjlb2#
Spark〉= 2.4
如果需要,可以使用
schema_of_json
函数确定模式(请注意,这假定任意一行是模式的有效代表)。Spark〉= 2.1
可以使用
from_json
函数:Spark〉= 1.6
您可以使用
get_json_object
,它包含一个列和一个路径:并将字段提取到可进一步转换为预期类型的各个字符串。
path
参数使用点语法表示,前导$.
表示文档根(由于上面的代码使用字符串插值,因此$
必须转义,因此$$.
)。Spark〈= 1.5:
这在目前是否可行?
据我所知,这是不可能的。你可以尝试类似的做法:
我假设
blob
字段不能用JSON表示,否则可以省略拆分和连接:另一种方法(虽然更复杂,但成本更低)是使用UDF解析JSON并输出
struct
或map
列。x6492ojm3#
zero323's answer是全面的,但缺少Spark 2.1+中可用的一种方法,该方法比使用
schema_of_json()
更简单、更健壮:下面是Python的等价物:
正如zero 323所指出的,
schema_of_json()
的问题在于它只检查一个字符串,并从中派生出一个模式。如果JSON数据具有不同的模式,那么从schema_of_json()
返回的模式将不会反映出在DataFrame中合并所有JSON数据的模式时所得到的结果。使用from_json()
解析该数据将产生大量null
或空值,其中schema_of_json()
返回的模式与数据不匹配。通过使用Spark从JSON字符串的RDD中导出全面的JSON模式的能力,我们可以保证所有JSON数据都可以被解析。
示例:
schema_of_json()
与spark.read.json()
这里有一个例子(在Python中,Scala的代码非常类似),用来说明使用
schema_of_json()
从单个元素派生模式和使用spark.read.json()
从所有数据派生模式之间的区别。a
在一个数据列中有布林值,而在另一个数据列中有字串值。a
的合并纲要会将其类型设定为字串。b
会是整数。让我们来比较一下不同的方法。首先,
schema_of_json()
方法:正如您所看到的,我们派生的JSON模式非常有限。
"a": "hello"
不能被解析为布尔值并返回null
,而"b": 22
只是被丢弃,因为它不在我们的模式中。现在使用
spark.read.json()
:在这里,我们保留了所有数据,并有一个全面的模式来解释所有数据。
"a": true
被转换为一个字符串,以匹配"a": "hello"
的模式。使用
spark.read.json()
的主要缺点是Spark将扫描所有数据以导出模式。根据数据量的不同,开销可能会很大。如果您 * 知道 * 所有JSON数据都具有一致的模式,则可以继续对单个元素使用schema_of_json()
。如果您具有模式可变性,但不想扫描所有数据,您可以在调用spark.read.json()
时将samplingRatio
设置为小于1.0
的值以查看数据的子集。以下是
spark.read.json()
的文档:Scala API / Python APIgstyhher4#
from_json
函数正是您要查找的函数。您的代码将如下所示:oprakyz75#
基础JSON字符串为
下面是过滤JSON并将所需数据加载到Cassandra中的脚本。