java 如何获取Spark中跨多个文件的每一行的行号?

muk1a3rh  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(131)

我有一个包含多个JSONL文件的S3存储桶。每个文件中的每一行都是JSON字符串。
目前,我阅读的文件如下:

Dataset<Row> dataset = spark.read().option("recursiveFileLookup", "true").json(path);

这给了我一个包含JSON对象所有字段的数据集。
对于每一行,我都需要源文件中相应的行号。
例如:

File1.jsonl:
-------------
{"key1": "a"}
{"key1": "b"}
{"key1": "c"}

File2.jsonl:
-------------
{"key1": "x"}
{"key1": "y"}
{"key1": "z"}

我需要的数据集是这样的:
数据集
| 键1|文件名|行号|
| --------------|--------------|--------------|
| 一种|文件1|1|
| B|文件1|二|
| c|文件1|三|
| x|文件2|1|
| y|文件2|二|
| z|文件2|三|
这可能吗?

nuypyhwy

nuypyhwy1#

在这种情况下尝试使用monotonically_increasing_id()内置函数。

df.withColumn("mid",monotonically_increasing_id()))

如果这个mid没有给予预期的结果,那么使用这个mid列作为orderby列,在数据框上生成row_number()

from pyspark.sql import *
from pyspark.sql.functions import *
df1 = df1.withColumn("mid", monotonically_increasing_id())
windowSpec = W.orderBy("mid")
df1 = df1.withColumn("line_num", row_number().over(windowSpec)).show()

相关问题