pyspark 合并.dat文件中的列名并将其转换为spark Dataframe

gab6jxml  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(81)

我给sample.dat数据和.dml
.dat文件中的内容:

101|US|Ram
102|INDIA|Shyam
103|JAPAN|Geeta

字符串
示例.dml文件:

record
string("|") ID;
string("|") Country;
string("\n") Name;
end;


在我的.dat文件中没有列名,我想提供列名(.dml文件),并将该列名附加到我的.dat文件中,并转换为spark dataframe
我尝试了下面的代码:

dataset = "my_file_name.dat"
dataFile = spark.read.format("csv").option("delimiter",'|').option("header","false").load(dataset)
        
dataFile = dataFile.toDF("COL1","COL2","COL3")
dataFile.show()


这是工作,但在这里我手动给予列名,但我想从上面提到的.dml文件提供列名。
现在我在python3.6中尝试了下面的代码

with open ("src.dml") as fp:
    next (fp)
    cols = []
    row = fp.readline().strip()
    while not row.startswith('end;'):
        row = fp.readline().strip()
        col = row.split(maxsplit=1).strip(';')
        cols.append(col)

ryoqjall

ryoqjall1#

要从dml文件中提取列,可以用途:

# walrus operator, Python>=3.8
with open('data.dml') as fp:
    next(fp)  # skip first row
    cols = []
    while not (row := fp.readline().strip()).startswith('end;'):
        col = row.split(maxsplit=1)[1].strip(';')
        cols.append(col)

字符串
输出量:

>>> cols
['ID', 'Country', 'Name']

更新:对于Python 3.8以下版本,可以用途:

with open('data.dml') as fp:
    cols = []
    for row in fp:
        row = row.strip()
        if row.startswith('record'):
            is_record = True
        elif row.startswith('end'):
            is_record = False
        elif is_record:
            col = row.split(maxsplit=1)[-1].strip(';')
            cols.append(col)


之后,您可以执行以下操作:

dataFile = (spark.read.format("csv")
                 .option("delimiter",'|').option("header","false")
                 .load('data.dat').toDF(*cols))


输出量:

>>> dataFile.show()
+---+-------+-----+
| ID|Country| Name|
+---+-------+-----+
|101|     US|  Ram|
|102|  INDIA|Shyam|
|103|  JAPAN|Geeta|
+---+-------+-----+


要写入文件,请执行以下操作:

dataFile.toPandas().to_csv('data.csv', index=False, sep='|')


data.csv的内容现在是:

ID|Country|Name
101|US|Ram
102|INDIA|Shyam
103|JAPAN|Geeta

相关问题