python—向Dataframepyspark中的连接列添加行号

muk1a3rh  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(398)

我有一个数据框,如下所示 pyspark ```
df = sqlContext.createDataFrame(
[(1,'Y','Y',0,0,0,2,'Y','N','Y','Y'),
(2,'N','Y',2,1,2,3,'N','Y','Y','N'),
(3,'Y','N',3,1,0,0,'N','N','N','N'),
(4,'N','Y',5,0,1,0,'N','N','N','Y'),
(5,'Y','N',2,2,0,1,'Y','N','N','Y'),
(6,'Y','Y',0,0,3,6,'Y','N','Y','N'),
(7,'N','N',1,1,3,4,'N','Y','N','Y'),
(8,'Y','Y',1,1,2,0,'Y','Y','N','N')
],
('id', 'compatible', 'product', 'ios', 'pc', 'other', 'devices', 'customer', 'subscriber', 'circle', 'smb')
)

现在我想创建一个新列 `bt_string` 在Dataframe中连接一些字符串。我做了如下的事情

import pyspark.sql.functions as f
from datetime import datetime
from time import strftime
from pyspark.sql import Window

the below values will change as per requirement

job_id = '123'
sess_id = '99'
batch_id = '1'
time_now = datetime.now().strftime('%Y%m%d%H%M%S')

con_string = job_id + sess_id + batch_id + time_now + '000000000000000'

df1 = df.withColumn('bt_string', f.lit(con_string))

现在对于Dataframe,我想为每一行指定一个唯一的数字。我申请了 `row_number` 功能如下

df2 = df1.withColumn("row_id",f.row_number().over(Window.partitionBy()))

输出低于

df2.show()

+---+----------+-------+---+---+-----+-------+--------+----------+------+---+--------------------+------+
| id|compatible|product|ios| pc|other|devices|customer|subscriber|circle|smb| bt_string|row_id|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+--------------------+------+
| 1| Y| Y| 0| 0| 0| 2| Y| N| Y| Y|12399120210301120...| 1|
| 2| N| Y| 2| 1| 2| 3| N| Y| Y| N|12399120210301120...| 2|
| 3| Y| N| 3| 1| 0| 0| N| N| N| N|12399120210301120...| 3|
| 4| N| Y| 5| 0| 1| 0| N| N| N| Y|12399120210301120...| 4|
| 5| Y| N| 2| 2| 0| 1| Y| N| N| Y|12399120210301120...| 5|
| 6| Y| Y| 0| 0| 3| 6| Y| N| Y| N|12399120210301120...| 6|
| 7| N| N| 1| 1| 3| 4| N| Y| N| Y|12399120210301120...| 7|
| 8| Y| Y| 1| 1| 2| 0| Y| Y| N| N|12399120210301120...| 8|
+---+----------+-------+---+---+-----+-------+--------+----------+------+---+--------------------+------+

现在我想添加 `row_id` 列到 `bt_string` 列。我是说像下面这样
如果 `bt_string` 的 `1st` 行是

1239912021030112091500000000000000 then add the corresponding row_id value.
In the case of first row the value will be 1239912021030112091500000000000001

创建的新列应具有如下值

1239912021030112091500000000000001
1239912021030112091500000000000002
1239912021030112091500000000000003
1239912021030112091500000000000004
1239912021030112091500000000000005
1239912021030112091500000000000006
1239912021030112091500000000000007
1239912021030112091500000000000008

还需要确保列的长度始终 `35` 角色。
下面的字符串不应超过 `35` 字符长度不惜任何代价

con_string = job_id + sess_id + batch_id + time_now + '000000000000000'

如果超过 `35` 那么我们需要 `trim` 数量 `zeros` 在上述声明中添加。
我怎样才能达到我想要的
jk9hmnmh

jk9hmnmh1#

按照以下步骤来实现您的结果


# import necessary functions

import pyspark.sql.functions as f
from datetime import datetime
from time import strftime
from pyspark.sql import Window

# assign variables as per requirement

job_id = '123'
sess_id = '99'
batch_id = '1'
time_now = datetime.now().strftime('%Y%m%d%H%M%S')

# Join variables to get desired format of base string

con_string =  job_id + sess_id + batch_id + time_now

# check length of base string and subtract from max length for that column 35

zero_to_add = 35 - len(con_string)

# Add the numbers of zeros based on the value received above

new_bt_string = con_string + zero_to_add * '0'

# add new column and convert column to decimal and then apply row_number

df1 = df.withColumn('bt_string', f.lit(new_bt_string).cast('decimal(35,0)'))\
    .withColumn("row_id",f.row_number().over(Window.partitionBy()))

# add new column by sum of values from above added columns

df2 = df1.withColumn('bt_id', f.expr('bt_string + row_id'))

相关问题