合并到delta表中不使用java foreachbatch

uurv41yg  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(441)

我已经创建了一个delta表,现在我正在尝试使用foreachbatch()将数据插入到该表中。我遵循了这个例子。唯一的区别是我使用的是java而不是笔记本,但我想这不会有什么区别吧?
我的代码如下所示:

spark.sql("CREATE TABLE IF NOT EXISTS example_src_table(id int, load_date timestamp) USING DELTA LOCATION '/mnt/delta/events/example_src_table'");

Dataset<Row> exampleDF = spark.sql("SELECT e.id as id, e.load_date as load_date FROM example e");

        try {
            exampleDF
                    .writeStream()
                    .format("delta")
                    .foreachBatch((dataset, batchId) -> {
                        dataset.persist();
                        // Set the dataframe to view name
                        dataset.createOrReplaceTempView("updates");
                        // Use the view name to apply MERGE
                        // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
                        dataset.sparkSession().sql("MERGE INTO example_src_table e" +
                                " USING updates u" +
                                " ON e.id = u.id" +
                                " WHEN NOT MATCHED THEN INSERT (e.id, e.load_date) VALUES (u.id, u.load_date)");
                    })
                    .outputMode("update")
                    .option("checkpointLocation", "/mnt/delta/events/_checkpoints/example_src_table")
                    .start();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

这段代码运行起来没有任何问题,但是没有任何数据是用url'/mnt/delta/events/example\u src\u table'写入delta表的。有人知道我做错了什么吗?
我使用的是spark 3.0和Java8。
编辑
使用scala在databricks笔记本上进行了测试,然后效果很好。

a0x5cqrl

a0x5cqrl1#

如果要用新数据更新数据,请尝试遵循如下语法

WHEN NOT MATCHED THEN 
    UPDATE SET e.load_date = u.load_date AND  e.id = u.id

如果您只想添加它占用的数据

WHEN NOT MATCHED THEN INSERT *

相关问题