我正在尝试使用sparksql将csv文件作为分区配置单元表加载,并启动一个thrift服务器。以下是我尝试的:
def main(args: Array[String]): Unit = {
val conf = new SparkConf
conf
.set("hive.server2.thrift.port", "10000")
.set("spark.sql.hive.thriftServer.singleSession", "true")
.set("spark.sql.warehouse.dir", "hdfs://sql/metadata/hive")
.set("spark.sql.catalogImplementation","hive")
.set("skip.header.line.count","1")
.setMaster("local[*]")
.setAppName("ThriftServer")
val sc = new SparkContext(conf)
val spark = SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
spark.sql(
"CREATE TABLE IF NOT EXISTS freq (" +
"time_stamp bigint," +
"time_quality string )" +
"PARTITIONED BY (id int) "
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ',' " +
"STORED AS TEXTFILE " +
"LOCATION 'Path_to_CSV_file' " +
"TBLPROPERTIES(skip.header.line.count = 1)"
)
使用上面的代码将创建freq表,并在其上加载数据,但它没有基于列id进行分区。我还尝试了根据分区键更改表或插入数据,但没有成功。
更改表:
spark.sql("ALTER TABLE freq ADD PARTITION (id) " +
"LOCATION 'PATH_TO_CSV_FILE' ")
ERROR: Found an empty partition key 'id'.(line 1, pos 33)
== SQL ==
ALTER TABLE freq ADD PARTITION (id) LOCATION 'Path_To_CSV_File'
插入表格:
spark.sql(
"INSERT OVERWRITE TABLE freq PARTITION (id) " +
"SELECT * " +
"FROM freq"
)
ERROR: Exception in thread "main" org.apache.spark.sql.AnalysisException: id is not a valid partition column in table `database`.`freq`.;
第一个问题是使用sparksql和hive创建分区表的正确方法是什么?
另外,由于我事先在csv文件中有数据,我不想创建它们的副本(因为容量限制),我只想有一个副本(无论是以csv文件的形式还是以hive partitions子目录的形式)。有可能吗?
暂无答案!
目前还没有任何答案,快来回答吧!