如何使用连接处理spark结构化流中的备份场景?

lnvxswe2  于 2021-06-14  发布在  Cassandra
关注(0)|答案(0)|浏览(283)

我使用的是sparksql2.4.1、spark-cassandra-connector_2.11-2.4.1.jar和java8。我有一个场景,需要将流数据与c*/cassandra表数据连接起来。
我有两张表,分别是“主表”和“备份表”

table kspace.master_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );

table kspace.backup_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    backup_timestamp timestamp,
    PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC,   backup_timestamp DESC);

每个流式记录都有“statement\u flag”,可以是“i”或“u”。
如果带有“i”的记录出现,则直接插入“master\u表”。
如果出现带有“u”的记录,我们需要检查“主表”中是否有给定(语句id)、语句日期的记录。
如果“主表”中有记录,则用当前时间戳(即备份时间戳)将该记录复制到“备份表”中。
用最新记录更新“主表”中的记录。
下面的代码应该可以做到这一点,但事实并非如此。

Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));

String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";

Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);

writeDfToCassandra(baseDs.toDF(), keyspace, master_table);

u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");

Dataset<Row> joinUpdatedRecordsDs =  sparkSession.sql(
            " select p.statement_id, p.statement_flag, p.statement_date,"
            + "p.x_val,p.y_val,p.z_val "
            + " from persisted_records as p "
            + "join u_records as u "
            + "on p.statement_id = u.statement_id  and p.statement_date = u.statement_date");

Dataset<Row> updated_records = joinUpdatedRecordsDs
  .withColumn("backup_timestamp", current_timestamp());

writeDfToCassandra(updated_records.toDF(), keyspace, backup_table);

“主表”中的所有内容都被复制到备份表中。事实上,它假设只复制以前版本的记录,而不是最新版本的记录。
假设只从master\u表复制以前版本的记录,但所有内容都复制到backup\u表
如何仅从中生成以前版本的记录 master_table 仅复制到 backup_table 不是最新的吗?

样本数据

对于第一个带“i”标志的记录
主表

备份表

对于带有“u”标志的第二条记录,即与前面的记录相同,但“y\u val”列数据除外。
主表

备份表
预期

但实际的表数据是

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题