我编写了以下java方法,通过apachespark将多个pojo的数据持久化到apachecassandra数据库中。
这似乎工作正常,但是spark没有提供任何关于记录是被插入(密钥在cassandra中不存在)还是被更新(密钥在db中已经存在)的信息。
是否有一种成本最低的方法(我希望避免在Dataframe中加载表的内容并检查重复键)在插入时找出数据库中已经存在哪些记录(具有重复键)?
具体代码如下:
@Service
public class WriteDB {
@Autowired
private SparkSession sparkSession;
Logger LOG = LoggerFactory.getLogger(WriteDB.class);
public <T> void uploadData(List<T> objects, Class<T> clazz, String keyspaceName, String tableName) {
LOG.info("Number of records to be committed to database: " + objects.size());
//Create dataset from entity object
Dataset<Row> df = sparkSession.createDataFrame(objects, clazz);
//Write data from spark dataframe to cassandra schema
df.write().mode(SaveMode.Append).format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", keyspaceName);
put("table", tableName);
}}).save();
LOG.info("Records Commited");
}
}
1条答案
按热度按时间wr98u20j1#
在cassandra中,一切都是向上插入的-插入和更新之间没有区别。cassandra在插入或更新时不检查数据是否存在(lwts除外),它只是添加数据,在压缩过程中删除以前的副本。
实现任务的唯一方法是从表中加载数据—使用DataFrameAPI,它将在spark级别上完成,方法是将整个表读入dataframe,然后加入,或者使用RDDAPI
joinWithCassandra
或者leftJoinWithCassandra
(见文件)。