scala—如何从Dataframe列的逗号分隔字符串中提取db name和table name到两列

q5lcpyga  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(283)

我有一个Dataframe列“table\u name”,它的值小于字符串
tradingpartner.parent\u supplier,lookup.store,lab\u promo\u invoice.tl\u cc\u mbr\u prc\u wkly\u inv,lab\u promo\u invoice.mpp\u club\u card\u promotion\u funding\u view,lab\u promo\u invoice.supplier\u sale\u Assignment\u cc,tradingpartner.supplier,stores.rpm\u zone\u location\u mapping,lookup.calendar
如何从上述字符串中提取db name和table name,并将其作为db name存储在一列中,tablename存储在另一列中。
我希望输出如下

vojdkbi0

vojdkbi01#

可以使用正则表达式提取dbname和表:

val result = df.select(
    col("table_name"),
    regexp_replace(col("table_name"), "\\.[^,]+(,|$)", "$1").as("DBName"),
    regexp_replace(col("table_name"), "(^|,)[^,]+\\.", "$1").as("Table")
)

result.show(false)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|table_name                                                                                                                                                                                                                                                           |DBName                                                                                                  |Table                                                                                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tradingpartner.parent_supplier,lookup.store,lab_promo_invoice.tl_cc_mbr_prc_wkly_inv,lab_promo_invoice.mpp_club_card_promotion_funding_view,lab_promo_invoice.supplier_sale_apportionment_cc,tradingpartner.supplier,stores.rpm_zone_location_mapping,lookup.calendar|tradingpartner,lookup,lab_promo_invoice,lab_promo_invoice,lab_promo_invoice,tradingpartner,stores,lookup|parent_supplier,store,tl_cc_mbr_prc_wkly_inv,mpp_club_card_promotion_funding_view,supplier_sale_apportionment_cc,supplier,rpm_zone_location_mapping,calendar|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+
qncylg1j

qncylg1j2#

抱歉,我用java为您的需求编写了udf,但是我认为它很容易转换成scala。

//Spark >= 2.3
UserDefinedFunction splitTableNameUDF = udf(transformTables(), getTableName());
df.withColumn("table_name_new", splitTableNameUDF.apply(col("table_name")))
    .select("table_name", "table_name_new.DBName", "table_name_new.Table");

//Spark < 2.3

sqlContext.udf().register("splitTableNameUDF", transformTables(), getTableName());
df.withColumn("table_name_new", callUDF("splitTableNameUDF", col("table_name")))
    .select("table_name", "table_name_new.DBName", "table_name_new.Table");

//schema

public static StructType getTableName() {
    List<StructField> inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("DBName", DataTypes.StringType, true));
    inputFields.add(DataTypes.createStructField("Table", DataTypes.StringType, true));
    return DataTypes.createStructType(inputFields);
}

//UDF

public static UDF1<String, Row> transformTables() {
    return (row) -> {
        String[] dbtableNames = row.split(",");
        List<String> dbNames = new ArrayList<>();
        List<String> tableNames = new ArrayList<>();
        for (String dbtableName : dbtableNames) {
            String[] split = dbtableName.split(".");
            dbNames.add(split[0]);
            tableNames.add(split[1]);
        }
        return RowFactory.create(String.join(",", dbNames), String.join(",", tableNames));
    };
}

相关问题