如何使用带有一些先决条件的Apache Beam从Cassandra读取

nvbavucw  于 2023-03-29  发布在  Cassandra
关注(0)|答案(1)|浏览(150)

我正在使用apache beam CassandraIO读取cassandra表。我想知道是否有一种方法可以帮助我在从cassandra读取之前放置一些条件/过滤器。由于表体积将在未来增加,读取整个表将花费大量时间。
我有下面的PCollection,它保存了pCollection的计数,我希望它用作从cassandra读取的前提条件,如果计数〉0

PCollection<Long> countRecords = dataPCollection.apply(
        "Count", Count.globally());

我正在从cassandra阅读数据。根据CassandraIO。读取应该总是在管道的根处。有什么解决方法吗?

PCollection<CassandraEntity> cassandraEntityPCollection = pipeline
        .apply("Fetching from Cassandra",
            CassandraIO.<CassandraEntity>read()
                .withCassandraConfig(cassandraConfigSpec)
                .withTable("data")
                .withEntity(CassandraEntity.class)
                .withCoder(SerializableCoder.of(CassandraEntity.class)));
kwvwclae

kwvwclae1#

您可能想尝试使用CassandraIO.Read的条件创建作为CassandraIO.readAll()的输入来执行类似的操作:

PCollection<Long> countRecords = dataPCollection.apply(
        "Count", Count.globally());

PCollection<Scientist> output =
    countRecords.apply(ParDo.of(new DoFn<Long, CassandraIO.Read<CassandraEntity>>() {
          @ProcessElement
          public void processElement(ProcessContext context) {
            long numRecords = context.element();
            if (numRecords > 0) {
              context.output(CassandraIO.<CassandraEntity>read()
                  .withCassandraConfig(cassandraConfigSpec)
                  .withTable("data")
                  .withEntity(CassandraEntity.class)
                  .withCoder(SerializableCoder.of(CassandraEntity.class)));
            }
          }
        }))
        .apply(
            CassandraIO.<CassandraEntity>readAll().withCoder(SerializableCoder.of(CassandraEntity.class)));

相关问题