没有自定义项。 使用 def regexp_replace(e: org.apache.spark.sql.Column,pattern: org.apache.spark.sql.Column,replacement: org.apache.spark.sql.Column) ``` scala> val df = Seq(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur")).toDF("description","states") df: org.apache.spark.sql.DataFrame = [description: string, states: string]
scala> df.show(false) +-----------------------------------------------------+--------------+ |description |states | +-----------------------------------------------------+--------------+ |{0} is the 4th biggest state of India |Andhra Pradesh| |The {0} remains the most beutiful state of India |Maharashtra | |This state {0} often termed as 'Switzerland of India'|Manipur | +-----------------------------------------------------+--------------+
图案- `lit("\\{0\\}")` ```
scala> df
.withColumn("description",
regexp_replace(
$"description",
lit("\\{0\\}"),
$"states"
)
)
.show(false)
+---------------------------------------------------------+--------------+
|Description |states |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra |
|This state Manipur often termed as 'Switzerland of India'|Manipur |
+---------------------------------------------------------+--------------+
手柄 null 在 states 列。
scala> df.withColumn("description",when($"states".isNotNull,regexp_replace($"description",lit("\\{0\\}"),$"states")).otherwise($"description")).show(false)
+---------------------------------------------------------+--------------+
|description |states |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra |
|This state Manipur often termed as 'Switzerland of India'|Manipur |
|Sample Data with null |null |
+---------------------------------------------------------+--------------+
+-----------------------------------------------------+--------------+
|Description |States |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India |Andhra Pradesh|
|The {0} remains the most beutiful state of India |Maharashtra |
|This state {0} often termed as 'Switzerland of India'|Manipur |
+-----------------------------------------------------+--------------+
object SparkColumnSubstring {
// build the spark session
val spark = SparkSession
.builder()
.appName("SparkColumnSubstring")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","SparkColumnSubstring") // To silence Metrics warning
.getOrCreate()
val sqlContext = spark.sqlContext
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
import spark.implicits._
val data = List(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur"))
try {
// input dataframe
val dataDF = sc.parallelize(data).toDF("Description","States")
dataDF.show(truncate = false)
// transforming the data
val dataSub = dataDF.map(r => (r(0).toString.replace("{0}", r(1).toString),r(1).toString)).toDF("Description", "States")
dataSub.show(truncate = false)
// To have the opportunity to view the web console of Spark: http://localhost:4040/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
预期产量
+---------------------------------------------------------+--------------+
|Description |States |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra |
|This state Manipur often termed as 'Switzerland of India'|Manipur |
+---------------------------------------------------------+--------------+
df.map(each => {
val col1 = each.getString(0)
val col2 = each.getString(1)
val newCol = col1.replace("{0}",col2)
// return the changed value
(newCol, col2)
}).toDF("Description","States").show
scala> val input = List(("{0} is One","1"), ("{0} is Two", "2"))
input: List[(String, String)] = List(({0} is One,1), ({0} is Two,2))
scala> val rdd = sc.parallelize(input)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.collect().foreach(println)
({0} is One,1)
({0} is Two,2)
// Replace each {0} with the value in the 2nd column
scala> rdd.map(each => each._1.replace("{0}",each._2)).collect().foreach(println)
1 is One
2 is Two
5条答案
按热度按时间wwwo4jvm1#
您可以通过在自定义项前面加上前缀来定义自定义项:
可以使用df.withcolumn将现有列值替换为新列值
b91juud32#
没有自定义项。
使用
def regexp_replace(e: org.apache.spark.sql.Column,pattern: org.apache.spark.sql.Column,replacement: org.apache.spark.sql.Column)
```scala> val df = Seq(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur")).toDF("description","states")
df: org.apache.spark.sql.DataFrame = [description: string, states: string]
scala> df.show(false)
+-----------------------------------------------------+--------------+
|description |states |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India |Andhra Pradesh|
|The {0} remains the most beutiful state of India |Maharashtra |
|This state {0} often termed as 'Switzerland of India'|Manipur |
+-----------------------------------------------------+--------------+
手柄
null
在states
列。yqhsw0fo3#
你可以遵循这个方法
输入
预期产量
希望这有帮助。
omvjsjqw4#
编辑1:由于您使用的是Dataframe,因此可以使用:
如果您有一个大的数据集,最好使用
udf
您还可以使用rdd方法map
每个值都是一个函数。请参见下面的代码:envsm3lx5#
试试这个-
1. 加载测试数据
2. 无自定义项
3. 有自定义项