ApacheKafka—流处理时如何在cassandra中精确实现一次?

7vux5j2d  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(338)

我有一张像这样的Cassandratable

CREATE TABLE tmp.inventory (
    t_id text,
    is_available boolean,
    modified_at bigint,
    price double,
    available_units bigint,
    PRIMARY KEY(t_id, modified_at)
) WITH CLUSTERING ORDER BY (modified_at);

我有一个流媒体管道更新Cassandra的项目。流式管道每隔一段时间检查一次。因此,当管道失败时,它将重新处理自上次成功检查点以来的源数据。当它在失败后重新处理时,它将尝试覆盖cassandra中已经成功写入的数据(即在最后一个成功的检查点之后但在失败之前)。我在考虑利用 modified_at 实现这一点。像这样的

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? AND modified_at < ?

只有当cassandra中的修改的\u at小于管道中的修改的\u at时,我才尝试进行更新。但是,这会引发 InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements 我想如果这种情况有帮助的话。

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? IF modified_at < ?

但这件事 InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions 那么,处理这个问题的理想方法是什么呢?
编辑如果我只在这个表中有这些字段,那么重新处理事件可能没有那么大的问题,因为当管道赶上实时流时,它最终会变得一致,但是说有另一个流作业用当前价格、可用单位等更新同一个表。在这种情况下,如果其中一个作业失败并重新启动,则表可能处于一致状态。

f1tvaqid

f1tvaqid1#

为了避免一个线程可以在另一个线程已经插入较新数据之后写入较旧数据的情况,可以使用 USING TIMESTAMP 在执行insert或update时(在cassandra中,任何东西都是向上插入的,所以从语法的Angular 来看,使用insert可能更容易,imho)。其思想是显式指定记录的时间戳,因此当另一个线程插入比前一个线程晚的旧数据时,数据将被插入,但它们不会成功,因为cassandra使用时间戳(显式指定)来检测最新版本。像这样:

INSERT INTO tmp.inventory (t_id, is_available, modified_at)
  VALUES (?, ?,?)
  USING TIMESTAMP <modified_at*1000>

唯一要记住的是 USING TIMESTAMP 使用微秒而不是毫秒,您需要计算 <modified_at*1000> -这里不能使用表达式(这里只是举个例子)。

相关问题