使用单独 Dataframe 中的信息读取文本文件

41zrol4v  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(134)

我有固定宽度的文件,如下

00120181120xyz12341
00220180203abc56792
00320181203pqr25483

以及指定模式的相应 Dataframe (表示列名(_Name)和列宽(_Long):

如何使用PySpark获取文本文件DataFrame,如下所示:


# +---+----+--+

# |C1|  C2 |C3|

# +--+-----+--+

# | 0|02018|11|

# | 0|02018|02|

# | 0|02018|12|

# +--+-----+--+
e4yzc0pl

e4yzc0pl1#

您可以:

  • collect您的列名和长度;
  • 创建一个子串索引列表,用于提取您需要的字符串部分;
  • 使用子字符串索引列表来提取每行的字符串部分。

输入:

rdd_data = spark.sparkContext.textFile(r'C:Tempsample.txt')
df_lengths = spark.createDataFrame([("1", "C1"), ("5", "C2"), ("2", "C3")], ["_Length", "_Name"])

脚本:

from pyspark.sql import Row

lengths = df_lengths.collect()

ranges = [[0, 0]]
for x in lengths:
    ranges.append([ranges[-1][-1], ranges[-1][-1] + int(x["_Length"])])

Cols = Row(*[r["_Name"] for r in lengths])
df = rdd_data.map(lambda x: Cols(*[x[r[0]:r[1]] for r in ranges[1:]])).toDF()

df.show()

# +---+-----+---+

# | C1|   C2| C3|

# +---+-----+---+

# |  0|01201| 81|

# |  0|02201| 80|

# |  0|03201| 81|

# +---+-----+---+

如果您有一个列可以在orderBy中用于窗口函数,那么只使用DataFrame API就可以实现这样的操作。

from pyspark.sql import functions as F, Window as W
df_data = spark.read.csv(r"C:Tempsample.txt")
df_lengths = spark.createDataFrame([("1", "C1"), ("5", "C2"), ("2", "C3")], ["_Length", "_Name"])

sum_col = F.sum("_Length").over(W.orderBy("_Name")) + 1
df_lengths = (df_lengths
    .withColumn("_Len", F.array((sum_col - F.col("_Length")).cast('int'), "_Length"))
    .groupBy().pivot("_Name").agg(F.first("_Len"))
)
df_data = df_data.select(
    [F.substring("_c0", int(c[0]), int(c[1])) for c in df_lengths.head()]
).toDF(*df_lengths.columns)

df_data.show()

# +---+-----+---+

# | C1|   C2| C3|

# +---+-----+---+

# |  0|01201| 81|

# |  0|02201| 80|

# |  0|03201| 81|

# +---+-----+---+

相关问题