spark cassandra写的dataframe,如何在插入过程中发现数据库中已经存在哪些键

x6yk4ghg  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(426)

我编写了以下java方法,通过apachespark将多个pojo的数据持久化到apachecassandra数据库中。
这似乎工作正常,但是spark没有提供任何关于记录是被插入(密钥在cassandra中不存在)还是被更新(密钥在db中已经存在)的信息。
是否有一种成本最低的方法(我希望避免在Dataframe中加载表的内容并检查重复键)在插入时找出数据库中已经存在哪些记录(具有重复键)?
具体代码如下:

@Service
public class WriteDB {

    @Autowired
    private SparkSession sparkSession;

    Logger LOG = LoggerFactory.getLogger(WriteDB.class);

    public <T> void uploadData(List<T> objects, Class<T> clazz, String keyspaceName, String tableName) {

        LOG.info("Number of records to be committed to database: " + objects.size());

        //Create dataset from entity object
        Dataset<Row> df = sparkSession.createDataFrame(objects, clazz);

        //Write data from spark dataframe to cassandra schema
        df.write().mode(SaveMode.Append).format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
            put("keyspace", keyspaceName);
            put("table", tableName);
        }}).save();

        LOG.info("Records Commited");
    }
}
wr98u20j

wr98u20j1#

在cassandra中,一切都是向上插入的-插入和更新之间没有区别。cassandra在插入或更新时不检查数据是否存在(lwts除外),它只是添加数据,在压缩过程中删除以前的副本。
实现任务的唯一方法是从表中加载数据—使用DataFrameAPI,它将在spark级别上完成,方法是将整个表读入dataframe,然后加入,或者使用RDDAPI joinWithCassandra 或者 leftJoinWithCassandra (见文件)。

相关问题