pyspark 将CSV转换为笔记本中的Parquet

zzwlnbp8  于 8个月前  发布在  Spark
关注(0)|答案(2)|浏览(114)

我刚刚踏入数据世界,被要求创建一个自定义项目,我需要使用Notebook(PySpark)将CSV转换为parquet。到目前为止,我已经将其整合在一起,似乎运行没有错误,但ADLS中的Parquet文件夹中没有任何内容。

def convert_csv_to_parquet(input_file_path, output_file_path):
    # Read CSV file into a Pandas DataFrame
    df = pd.read_csv(input_file_path)

    # Convert Pandas DataFrame to PyArrow Table
    table = pa.Table.from_pandas(df)

    # Write PyArrow Table to Parquet file
    pq.write_table(table, output_file_path)

    # Open the Parquet file
    table = pq.read_table(output_file_path)

    # Convert the table to a Pandas DataFrame
    df = table.to_pandas()

    # Print the DataFrame
    print(df.head(100))

input_file_path = 'abfss://[email protected]/MySQL_Project-Table_Courses.csv'
output_file_path = 'abfss://[email protected]/Parquet'

convert_csv_to_parquet(input_file_path, output_file_path)

字符串

qoefvg9y

qoefvg9y1#

Pandas和PySpark有很大的不同。虽然PySpark提供了与Pandas的互操作性,但Pandas DataFrame与PySpark/Spark DataFrame有很大的不同。
在编写任何代码之前,了解Pandas和PySpark之间的差异。
你的问题有两个部分。首先是了解如何读/写csv和parquet文件,它们驻留在你的笔记本电脑的硬盘上。第二是如何使用ADSL而不是本地硬盘。

第一部分:

参见PySpark docs附带的示例。

  • 如何从csv读取 Dataframe
  • 如何将DataFrame写入parquet。

另请参阅PySpark SQL API文档(PySpark SQL API是一个python API,而不是SQL)。

第二部分:

当使用云存储作为底层存储(ADLS、S3等)时,您需要:
1.为所有路径添加适当的前缀,例如s3aabfss,.
1.在你的pyspark环境中安装适当的hadoop扩展/库(对应/支持该方案)。PySpark将使用它来读/写云存储。
1.在Spark config中设置适当的配置参数或任何用于身份验证的方式。
根据您的使用情况,here is onehere is another有许多可用的指南。

uqxowvwt

uqxowvwt2#

你能使用pyspark reader/writer内置的方法吗?
这看起来很简单(我假设spark session在你的代码中声明了,或者你使用了Databricks noteboks):

def convert_csv_to_parquet(
    input_file_path: str,
    output_file_path: str
    ):

    df = spark.read.format('csv').load(input_file_path)
    df.write.format('parquet').save(input_file_path)
    return 1

字符串

相关问题