我正在使用apache beam CassandraIO读取cassandra表。我想知道是否有一种方法可以帮助我在从cassandra读取之前放置一些条件/过滤器。由于表体积将在未来增加,读取整个表将花费大量时间。
我有下面的PCollection,它保存了pCollection的计数,我希望它用作从cassandra读取的前提条件,如果计数〉0
PCollection<Long> countRecords = dataPCollection.apply(
"Count", Count.globally());
我正在从cassandra阅读数据。根据CassandraIO。读取应该总是在管道的根处。有什么解决方法吗?
PCollection<CassandraEntity> cassandraEntityPCollection = pipeline
.apply("Fetching from Cassandra",
CassandraIO.<CassandraEntity>read()
.withCassandraConfig(cassandraConfigSpec)
.withTable("data")
.withEntity(CassandraEntity.class)
.withCoder(SerializableCoder.of(CassandraEntity.class)));
1条答案
按热度按时间kwvwclae1#
您可能想尝试使用
CassandraIO.Read
的条件创建作为CassandraIO.readAll()
的输入来执行类似的操作: