如何在spark 2.3.1 api中使用蓄能器

ktecyv1j  于 2021-06-15  发布在  Cassandra
关注(0)|答案(1)|浏览(385)

我使用的是带有cassandra3.x的spark-sql2.11-2.3.1版本。我需要提供一个验证功能

column_family_name text,
    oracle_count bigint,
    cassandra_count bigint,
    create_timestamp timestamp,
    last_update_timestamp timestamp,
    update_user text

同样,我需要计算成功插入的记录计数,即要填充的cassandra\u计数,为此我想使用spark acculator。但不幸的是,我找不到spark-sql2.11-2.3.1版本所需的api示例。
下面是我保存到Cassandra片段

o_model_df.write.format("org.apache.spark.sql.cassandra")
    .options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
    .mode(SaveMode.Append)
    .save()

在这里如何实现累加器增量为每一行被成功地保存到Cassandra。。。
任何帮助都是万分感激的。

uyto3xhc

uyto3xhc1#

spark的累加器通常用于您编写的转换中,不要指望spark cassandra连接器会为您提供类似的功能。
但总的来说,如果您的工作完成时没有出现错误,则意味着数据已正确写入数据库。
如果要检查数据库中实际有多少行,则需要对数据库中的数据进行计数—可以使用spark cassandra连接器的cassandracount方法。主要原因是,您的Dataframe中可能有多行可以Map到单个cassandra行(例如,如果您错误地定义了主键,那么多行都有主键)。

相关问题