如何在pyspark中添加一行来添加增量索引?

nom7f22z  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(550)

我想以增量方式自动返回'idx'列!我已经提供了如何可以手动完成!

schema = StructType([
StructField( 'vin', StringType(), True),StructField( 'age', IntegerType(), True),StructField( 'var', IntegerType(), True),StructField( 'rim', IntegerType(), True),StructField( 'cap', IntegerType(), True),StructField( 'cur', IntegerType(), True)
  ])

data = [['tom', 10,54,87,23,90], ['nick', 15,63,23,11,65], ['juli', 14,87,9,43,21]]
data_1=['sam',60,45,34,12,67]
df=spark.createDataFrame(data,schema)
df=df.withColumn('idx',monotonically_increasing_id()).union(spark.createDataFrame(data_1 ))\

# --------------------------------------------------------------------

# I could do it in this way below , but I want it to be automated!

df=df.withColumn('idx',F.row_number().over(Window.orderBy('age')))\
     .union(spark.createDataFrame([data_1 + [4]] ))

# ---------------------------------------------------------------------

df.show()

# Expected outcome:

# >>>

+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90|  1|
|juli| 14| 87|  9| 43| 21|  2|
|nick| 15| 63| 23| 11| 65|  3|
| sam| 60| 45| 34| 12| 67|  4|
+----+---+---+---+---+---+---+
2izufjch

2izufjch1#

您可以从原始df获得max idx,并将其添加到新df的idx中。

from pyspark.sql import functions as F, Window

df = df.withColumn('idx',F.row_number().over(Window.orderBy('age')))
df.show()
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90|  1|
|juli| 14| 87|  9| 43| 21|  2|
|nick| 15| 63| 23| 11| 65|  3|
+----+---+---+---+---+---+---+

df2 = df.union(
    spark.createDataFrame([data_1], schema).withColumn(
       'idx',
       F.row_number().over(Window.orderBy('age')) + F.lit(df.select(F.max('idx')).head()[0])
    )
)

df2.show()
+----+---+---+---+---+---+---+
| vin|age|var|rim|cap|cur|idx|
+----+---+---+---+---+---+---+
| tom| 10| 54| 87| 23| 90|  1|
|juli| 14| 87|  9| 43| 21|  2|
|nick| 15| 63| 23| 11| 65|  3|
| sam| 60| 45| 34| 12| 67|  4|
+----+---+---+---+---+---+---+
vom3gejh

vom3gejh2#

你可以再跑一次 row_number 合并和更新后 idx 仅当为空时:

from pyspark.sql import functions as F

data_1 = ['sam', 60, 45, 34, 12, 67]

df = df.withColumn('idx', F.row_number().over(Window.orderBy('age'))) \
    .union(spark.createDataFrame([data_1]).withColumn("idx", F.lit(None))) \
    .withColumn('idx', F.coalesce('idx', F.row_number().over(Window.orderBy('age'))))

df.show()

# +----+---+---+---+---+---+---+

# | vin|age|var|rim|cap|cur|idx|

# +----+---+---+---+---+---+---+

# | tom| 10| 54| 87| 23| 90|  1|

# |juli| 14| 87|  9| 43| 21|  2|

# |nick| 15| 63| 23| 11| 65|  3|

# | sam| 60| 45| 34| 12| 67|  4|

# +----+---+---+---+---+---+---+

但是,如果添加具有 age < df.age ,则需要在按排序时生成新的行号 age 列。否则,你的专栏 idx 不会是正确的。

相关问题