我正在创建一个Dataframe,如下所示:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import StructType, StructField, DataType, IntegerType, StringType
schma = StructType([
StructField("id", IntegerType(), True),
StructField("name",StringType(), True),
]
)
empdf=spark.read.format("csv").csv("/home/hdfs/sparkwork/hiveproj/Datasets/empinfo/emp.csv",schema=schma);
empdf.show();
我正在将Dataframe保存为Parquet文件。
empdf.write.parquet(path="/home/hdfs/sparkwork/hiveproj/Data/empinfo/empl_par/")
如果我在loaddatainpath命令中使用特定的文件名,那么它可以正常工作。
spark.sql("LOAD DATA INPATH '/home/hdfs/sparkwork/hiveproj/Data/empinfo/empl_par/part-00000-6cdfcba5-49ab-499c-8d7f-831c9ec314de-c000.snappy.parquet' INTO TABLE EMPINFO.EMPLOYEE")
但是,如果我使用通配符而不是文件名(或.parquet),这是给我的错误。
spark.sql("LOAD DATA INPATH '/home/hdfs/sparkwork/hiveproj/Data/empinfo/empl_par/*.parquet' INTO TABLE EMPINFO.EMPLOYEE")
有没有办法使用spark的wildcard-in-hive命令推送文件夹的所有内容?请帮忙做同样的事。
1条答案
按热度按时间qgelzfjb1#
而不是
spark.sql("LOAD DATA INPATH '/home/hdfs/sparkwork/hiveproj/Data/empinfo/empl_par/*.parquet' INTO TABLE EMPINFO.EMPLOYEE")
试着用这个empdf.write.partitionBy("year","month","day").insertInto("EMPINFO.EMPLOYEE")
注意,我使用分区列作为year
,month
&day
. 您可能需要根据您的要求进行更改。