我正在使用Spark 2.1(scala 2.11)。
我想从一个 Dataframe 加载一个定义了模式的json格式的字符串到另一个 Dataframe 。我尝试了一些解决方案,但最便宜的是来自_json的标准列函数。我尝试了一个例子(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-collection.html#from_json),这个函数给了我意想不到的结果。
val df = spark.read.text("testFile.txt")
df.show(false)
+----------------+
|value |
+----------------+
|{"a": 1, "b": 2}|
|{bad-record |
+----------------+
df.select(from_json(col("value"),
StructType(List(
StructField("a",IntegerType),
StructField("b",IntegerType)
))
)).show(false)
+-------------------+
|jsontostruct(value)|
+-------------------+
|[1,2] |
|null |
+-------------------+
此行为类似于mode:PERMISSIVE,但不是默认值。默认情况下,它设置为FAILFAST模式,这意味着只要输入数据与强制模式不匹配,它就会抛出异常。
我尝试使用DataFrameReader(JSON数据源和FAILFAST模式)加载testFile.txt,并成功捕获了一个异常。
spark.read.option("mode","FAILFAST").json("test.txt").show(false)
---
Caused by: org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {bad-record
---
虽然解析模式在两种情况下是相同的,为什么各自的输出如此不同?
3条答案
按热度按时间sg2wtvxw1#
这是预期的行为。
from_json
是SQL函数,在此级别没有异常(故意的)的概念。如果操作失败,结果是未定义的NULL
。虽然
from_json
提供了options
参数,允许您设置JSON读取器选项,但是由于上面提到的原因,此行为不能被覆盖。另一方面,
DataFrameReader
的默认模式是允许的。wnvonmuf2#
请注意,您将文件作为文本文件读取并将其转换为json。默认情况下,换行符将作为文本文件的分隔符,如果您有一个有效的JSON字符串,则它将正确转换为您在from_json()方法中定义的模式。
如果有空行或无效的JSON文本,则会得到NULL。
看看这个:
当in/testFile.txt具有以下内容时,
它打印
当您的输入带有空行时
结果是
ljsrvy3e3#
要添加到@user11022201答案-看起来
options
参数可以实现所需的FAILFAST
行为。以下代码在pyspark中,并使用Spark 3.2.2进行了测试上述代码的结果是一个异常,这是所需的行为: