我已经创建了一个delta表,现在我正在尝试使用foreachbatch()将数据插入到该表中。我遵循了这个例子。唯一的区别是我使用的是java而不是笔记本,但我想这不会有什么区别吧?
我的代码如下所示:
spark.sql("CREATE TABLE IF NOT EXISTS example_src_table(id int, load_date timestamp) USING DELTA LOCATION '/mnt/delta/events/example_src_table'");
Dataset<Row> exampleDF = spark.sql("SELECT e.id as id, e.load_date as load_date FROM example e");
try {
exampleDF
.writeStream()
.format("delta")
.foreachBatch((dataset, batchId) -> {
dataset.persist();
// Set the dataframe to view name
dataset.createOrReplaceTempView("updates");
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
dataset.sparkSession().sql("MERGE INTO example_src_table e" +
" USING updates u" +
" ON e.id = u.id" +
" WHEN NOT MATCHED THEN INSERT (e.id, e.load_date) VALUES (u.id, u.load_date)");
})
.outputMode("update")
.option("checkpointLocation", "/mnt/delta/events/_checkpoints/example_src_table")
.start();
} catch (TimeoutException e) {
e.printStackTrace();
}
这段代码运行起来没有任何问题,但是没有任何数据是用url'/mnt/delta/events/example\u src\u table'写入delta表的。有人知道我做错了什么吗?
我使用的是spark 3.0和Java8。
编辑
使用scala在databricks笔记本上进行了测试,然后效果很好。
1条答案
按热度按时间a0x5cqrl1#
如果要用新数据更新数据,请尝试遵循如下语法
如果您只想添加它占用的数据