sparkDataframe验证parquet写入的列名

htrmnn0y  于 2021-07-12  发布在  Spark
关注(0)|答案(5)|浏览(453)

我使用从json事件流转换而来的Dataframe来处理事件,json事件流最终被写成Parquet格式。
但是,一些json事件在键中包含空格,我希望在将其转换为parquet之前记录并过滤/删除Dataframe中的此类事件,因为 ;{}()\n\t= 在下面[1]中列出的Parquet模式(catalystschemaconverter)中被视为特殊字符,因此不应在列名中使用。
我怎样才能在dataframe中对列名进行这样的验证,并完全删除这样的事件,而不会导致spark流作业出错。
[1] Spark催化转换器

def checkFieldName(name: String): Unit = {
  // ,;{}()\n\t= and space are special characters in Parquet schema
  checkConversionRequirement(
    !name.matches(".*[ ,;{}()\n\t=].*"),
    s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
             |Please use alias to rename it.
           """.stripMargin.split("\n").mkString(" ").trim
  )
}
wnavrhmk

wnavrhmk1#

对于在pyspark中遇到这种情况的每个人:在重命名列之后,这种情况甚至发生在我身上。我可以在一些迭代之后让它工作的一种方法是:

file = "/opt/myfile.parquet"
df = spark.read.parquet(file)
for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", ""))

df = spark.read.schema(df.schema).parquet(file)
0s0u357o

0s0u357o2#

在写入parquet之前,可以使用正则表达式用下划线替换所有无效字符。此外,还可以去掉列名中的重音符号。
这里有一个函数 normalize 对scala和python都这样做:

斯卡拉

/**
  * Normalize column name by replacing invalid characters with underscore
  * and strips accents
  *
  * @param columns dataframe column names list
  * @return the list of normalized column names
  */
def normalize(columns: Seq[String]): Seq[String] = {
  columns.map { c =>
    org.apache.commons.lang3.StringUtils.stripAccents(c.replaceAll("[ ,;{}()\n\t=]+", "_"))
  }
}

// using the function
val df2 = df.toDF(normalize(df.columns):_*)

Python

import unicodedata
import re

def normalize(column: str) -> str:
    """
    Normalize column name by replacing invalid characters with underscore
    strips accents and make lowercase
    :param column: column name
    :return: normalized column name
    """
    n = re.sub(r"[ ,;{}()\n\t=]+", '_', column.lower())
    return unicodedata.normalize('NFKD', n).encode('ASCII', 'ignore').decode()

# using the function

df = df.toDF(*map(normalize, df.columns))
nxowjjhe

nxowjjhe3#

这是我的解决方案,使用regex按照parquet约定重命名所有dataframe的列:

df.columns.foldLeft(df){
  case (currentDf,  oldColumnName) => currentDf.withColumnRenamed(oldColumnName, oldColumnName.replaceAll("[ ,;{}()\n\t=]", ""))
}

希望对你有帮助,

jaql4c8m

jaql4c8m4#

我对包含空格的列名也有同样的问题。
解决方案的第一部分是把名字放在反引号里。
解决方案的第二部分是用下划线替换空格。
抱歉,我只准备了pyspark代码:

from pyspark.sql import functions as F

df_tmp.select(*(F.col("`" + c+ "`").alias(c.replace(' ', '_')) for c in df_tmp.columns)
gdrx4gfi

gdrx4gfi5#

使用 alias 更改没有这些特殊字符的字段名。

相关问题