我刚刚踏入数据世界,被要求创建一个自定义项目,我需要使用Notebook(PySpark)将CSV转换为parquet。到目前为止,我已经将其整合在一起,似乎运行没有错误,但ADLS中的Parquet文件夹中没有任何内容。
def convert_csv_to_parquet(input_file_path, output_file_path):
# Read CSV file into a Pandas DataFrame
df = pd.read_csv(input_file_path)
# Convert Pandas DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)
# Write PyArrow Table to Parquet file
pq.write_table(table, output_file_path)
# Open the Parquet file
table = pq.read_table(output_file_path)
# Convert the table to a Pandas DataFrame
df = table.to_pandas()
# Print the DataFrame
print(df.head(100))
input_file_path = 'abfss://[email protected]/MySQL_Project-Table_Courses.csv'
output_file_path = 'abfss://[email protected]/Parquet'
convert_csv_to_parquet(input_file_path, output_file_path)
字符串
2条答案
按热度按时间qoefvg9y1#
Pandas和PySpark有很大的不同。虽然PySpark提供了与Pandas的互操作性,但Pandas DataFrame与PySpark/Spark DataFrame有很大的不同。
在编写任何代码之前,了解Pandas和PySpark之间的差异。
你的问题有两个部分。首先是了解如何读/写csv和parquet文件,它们驻留在你的笔记本电脑的硬盘上。第二是如何使用ADSL而不是本地硬盘。
第一部分:
参见PySpark docs附带的示例。
另请参阅PySpark SQL API文档(PySpark SQL API是一个python API,而不是SQL)。
第二部分:
当使用云存储作为底层存储(ADLS、S3等)时,您需要:
1.为所有路径添加适当的前缀,例如
s3a
,abfss
,.1.在你的pyspark环境中安装适当的hadoop扩展/库(对应/支持该方案)。PySpark将使用它来读/写云存储。
1.在Spark config中设置适当的配置参数或任何用于身份验证的方式。
根据您的使用情况,here is one和here is another有许多可用的指南。
uqxowvwt2#
你能使用pyspark reader/writer内置的方法吗?
这看起来很简单(我假设spark session在你的代码中声明了,或者你使用了Databricks noteboks):
字符串