使用sparksql从配置单元表中的列表插入数据

vohkndzv  于 2021-06-25  发布在  Hive
关注(0)|答案(2)|浏览(346)

我有一个文件名、文件路径和文件大小的列表,我想使用sparksql将这些详细信息插入到配置单元表中。

var fs1 = FileSystem.get(sparksession.sparkContext.hadoopConfiguration)
var file_path = fs1.listStatus(new  Path("path")).filter(_.isFile).map(_.getPath).toList
var new_files = fs1.listStatus(new  Path("path")).filter(_.isFile).map(_.getPath.getName).toList
var file_size = fs1.listStatus(new Path("path")).filter(_.isFile).map(_.getLen).toList
var file_data = file_path zip new_files zip file_size

for ((filedetail, size) <- file_size){
  var filepath = filedetail._1
  var filesize: Long = size
  var filename = filedetail._2
  var df = spark.sql(s"insert into mytable(file_path,filename,file_size)  select '${file_path}' as file_path,'${new_files}' as filename,'${file_size}' as file_size")
  df.write.insertInto("dbname.tablename")
}

它正在生成插入查询:

insert into mytable(file_path,filename,file_size) select  'List(path/filename.txt,path/filename4.txt,path/filename5.txt)' as file_path,'List(filename.txt, filename4.txt, filename5.txt)' as filename,'List(19, 19, 19)' as file_size;

我得到一个错误:
输入“file\u path”不匹配,应为{'('、'select'、'from'、'values'、'table'、'insert'、'map'、'reduce'}(第1行,位置34)
我想以表格格式插入数据

file_path                 filename      file_size
path/filename.txt         filename.txt  19
path/filename4.txt        filename4.txt  19
path/filename5.txt        filename5.txt  19

有人能建议我如何插入上述数据吗?
是否有任何方法可以再次将此查询拆分为3个不同的insert配置单元语句。

insert into mytable(file_path,filename,file_size) select 'path/filename.txt' as file_path,'filename.txt' as filename,'19' as file_size;
    insert into mytable(file_path,filename,file_size) select 'path/filename3.txt' as file_path,'filename3.txt' as filename,'19' as file_size;
    insert into mytable(file_path,filename,file_size) select 'path/filename4.txt' as file_path,'filename4.txt' as filename,'19' as file_size;
ldxq2e6h

ldxq2e6h1#

您只需创建一个包含所有行的列表,每行将有3个元素。创建一个dataframe,然后使用dataframe持久化数据。
参考这篇文章

bf1o4zei

bf1o4zei2#

你可以用多种方法。
首先,可以将列表转换为 RDD ```
val rdd1 = sc.parallelize(fs1.listStatus(new Path("path")).filter(.isFile).map(.getPath).toList)

// then you can convert the rdd into a dataframe

import spark.implicits._

val df1 = rdd1.map((value1, value2, ....) => (value1, value2,....)).toDF("col1", "col2", ....)

// from the dataframe you can create a temporary view

df1.createOrReplaceTempView("my_table")

// then you can load the temporary view in your table

sqlContext.sql("""
INSERT [INTO | OVERWRITE] my_hive_table SELECT * FROM my_table
""")

相关问题