scala—使用sparksql从配置单元表读取并写回它

arknldoa  于 2021-05-29  发布在  Hadoop
关注(0)|答案(7)|浏览(1087)

我正在使用sparksql读取一个配置单元表,并将其分配给scala val

val x = sqlContext.sql("select * from some_table")

然后我对Dataframex做了一些处理,最后得到了一个Dataframey,它的模式与表some\ u table完全相同。
最后,我尝试将覆盖yDataframe插入到同一个配置单元表的某个\u表中

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")

那我就错了
org.apache.spark.sql.analysisexception:无法将覆盖插入到同时从中读取的表中
我尝试创建一个insert sql语句并使用sqlcontext.sql()触发它,但它也给了我同样的错误。
有什么方法可以绕过这个错误吗?我需要把记录插回同一张表。
嗨,我试着按建议做,但还是得到同样的错误。

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;
g52tjvyc

g52tjvyc1#

你应该先保存你的Dataframe y 在临时桌上

y.write.mode("overwrite").saveAsTable("temp_table")

然后可以覆盖目标表中的行

val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("some_table")
mi7gmzs6

mi7gmzs62#

你应该先把钱存起来 DataFrame y 比如Parquet锉刀:

y.write.parquet("temp_table")

加载后,如下所示:

val parquetFile = sqlContext.read.parquet("temp_table")

然后在表中插入数据

parquetFile.write.insertInto("some_table")
shstlldc

shstlldc3#

在执行下面的操作之前,需要记住的是,要覆盖的配置单元表应该是由hiveddl创建的,而不是由

spark(df.write.saveAsTable("<table_name>"))

如果以上不是真的,这是行不通的。我在spark 2.3.0中测试过这个

val tableReadDf=spark.sql("select * from <dbName>.<tableName>")
val updatedDf=tableReadDf.<transformation> //any update/delete/addition 
updatedDf.createOrReplaceTempView("myUpdatedTable")
spark.sql("""with tempView as(select * from myUpdatedTable) insert overwrite table 
<dbName>.<tableName> <partition><partition_columns> select * from tempView""")
kulphzqa

kulphzqa4#

如果您正在执行以下操作,则还会出现错误:“cannot overwrite a path that also being reading from”:
您将从视图“v”(执行您的逻辑)中“insert overwrite”到配置单元表“a”
这个视图也引用了同一个表“a”。我发现这很困难,因为视图是嵌套很深的代码,同时也在查询“a”。真倒霉。
这就像是把你坐在上面的树枝砍下来:-(

guykilcj

guykilcj5#

实际上,您也可以使用检查点来实现这一点。由于它破坏了数据沿袭,spark无法检测到您正在读取和覆盖同一个表:

sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")
h6my8fg2

h6my8fg26#

从spark中的配置单元表读取数据:

val hconfig = new org.apache.hadoop.conf.Configuration()
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hconfig , "dbname", "tablename")

val inputFormat = (new HCatInputFormat).asInstanceOf[InputFormat[WritableComparable[_],HCatRecord]].getClass

val data = sc.newAPIHadoopRDD(hconfig,inputFormat,classOf[WritableComparable[_]],classOf[HCatRecord])
4xrmg8kj

4xrmg8kj7#

结合spark 2.2
这个错误意味着我们的进程是从同一个表读取数据,然后写入同一个表。
通常情况下,这应该作为进程写入目录.hivestaging。。。
saveastable方法会发生此错误,因为它会覆盖整个表而不是单个分区。
insertinto方法不应出现此错误,因为它会覆盖分区而不是表。
之所以会发生这种情况,是因为配置单元表在其定义中具有以下spark tblproperty。如果删除以下spark tblproperty,对于insertinto方法,此问题将得到解决-
'spark.sql.partitionprovider''spark.sql.sources.provider''spark.sql.sources.schema.numparts''spark.sql.sources.schema.part.0''spark.sql.sources.schema.part.1''spark.sql.sources.schema.part.2''spark.sql.sources.schema.partcol.0''spark.sql.sources.schema.partcol.1'
https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html
当我们将hdp升级到2.6.3时,spark从2.2升级到了2.3,这导致了下面的错误-

Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906)

此错误发生在我们正在读写同一路径的作业中。像scd逻辑的作业
解决方案-
set--conf“spark.sql.hive.convertmetastoreorc=false”
或者,更新作业,使其将数据写入临时表。然后从临时表中读取并将其插入到最终表中。
https://querydb.blogspot.com/2020/09/orgapachesparksqlanalysisexception.html

相关问题