我将数值作为字符串存储在Spark SQL数据库的一列中,因为它们可能会溢出所有数值数据类型(〉128位数字)。到目前为止,我可以使用普通的SUM()函数来对值求和。我很好奇这样做是否总是安全的,以及如何处理它不起作用的情况。我的思绪:我认为在求和过程中,内部的数字字符串值被转换为真实的数据类型。如果这种内部转换失败,整个求和将失败。
SUM()
jdzmm42g1#
十进制类型最多有38位。实际上Spark docs中没有提到它,但是在尝试创建更大精度的类型时会出现错误。Databricks docs明确说明了这一点。尽管如此,38位数仍允许您存储和安全地添加126位(log2(10^38))数字。
log2(10^38)
val df = Seq( "123456789012345678901234567890", "1", "100000100000100000100000100001" ).toDF("x") df.withColumn("x", $"x".cast(new DecimalType(38))).agg(sum($"x")).show(false) +------------------------------+ |sum(x) | +------------------------------+ |223456889012445679001234667892| +------------------------------+
如果你需要更多,你可以创建一个custom aggregate function操作字符串。
import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator object BigSum extends Aggregator[String, String, String] { def zero: String = "0" def reduce(buffer: String, x: String): String = (BigInt(buffer) + BigInt(x)).toString def merge(b1: String, b2: String): String = (BigInt(b1) + BigInt(b2)).toString def finish(b: String): String = b def bufferEncoder: Encoder[String] = Encoders.STRING def outputEncoder: Encoder[String] = Encoders.STRING } val big_sum = udaf(BigSum) df.agg(big_sum($"x")).show(false) +------------------------------+ |bigsum$(x) | +------------------------------+ |223456889012445679001234667892| +------------------------------+
PS.我的BigSum不是超级有效的,由于来回转换,它会更好地有缓冲区作为BigInt -我只是不知道如何写编码器[BigInt]...
1条答案
按热度按时间jdzmm42g1#
十进制类型最多有38位。实际上Spark docs中没有提到它,但是在尝试创建更大精度的类型时会出现错误。Databricks docs明确说明了这一点。
尽管如此,38位数仍允许您存储和安全地添加126位(
log2(10^38)
)数字。如果你需要更多,你可以创建一个custom aggregate function操作字符串。
PS.我的BigSum不是超级有效的,由于来回转换,它会更好地有缓冲区作为BigInt -我只是不知道如何写编码器[BigInt]...