我使用的是带有cassandra3.x的spark-sql2.11-2.3.1版本。我需要提供一个验证功能
column_family_name text,
oracle_count bigint,
cassandra_count bigint,
create_timestamp timestamp,
last_update_timestamp timestamp,
update_user text
同样,我需要计算成功插入的记录计数,即要填充的cassandra\u计数,为此我想使用spark acculator。但不幸的是,我找不到spark-sql2.11-2.3.1版本所需的api示例。
下面是我保存到Cassandra片段
o_model_df.write.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
.mode(SaveMode.Append)
.save()
在这里如何实现累加器增量为每一行被成功地保存到Cassandra。。。
任何帮助都是万分感激的。
1条答案
按热度按时间uyto3xhc1#
spark的累加器通常用于您编写的转换中,不要指望spark cassandra连接器会为您提供类似的功能。
但总的来说,如果您的工作完成时没有出现错误,则意味着数据已正确写入数据库。
如果要检查数据库中实际有多少行,则需要对数据库中的数据进行计数—可以使用spark cassandra连接器的cassandracount方法。主要原因是,您的Dataframe中可能有多行可以Map到单个cassandra行(例如,如果您错误地定义了主键,那么多行都有主键)。