我有一套csv文件,需要通过spark结构化流读取。在创建 DataFrame
我要把东西放进Hive的table里。
在运行代码之前文件已经存在时 spark-submit
,数据已成功加载到配置单元中。但当我在运行时添加新的csv文件时,它根本不会将其插入配置单元中。
代码为:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
val spark = SparkSession.builder().appName("Spark SQL Example").config("hive.metastore.uris","thrift://hostname:port").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
spark.conf.set("spark.sql.streaming.schemaInference", true)
import spark.implicits._
val df = spark.readStream.option("header", true).csv("file:///folder path/")
val query = df.writeStream.queryName("tab").format("memory").outputMode(OutputMode.Append()).start()
spark.sql("insert into hivetab select * from tab").show()
query.awaitTermination()
我错过什么了吗?
任何建议都会有帮助。
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!