将bigquery tablerow写入cassandra

k5ifujac  于 2021-06-09  发布在  Cassandra
关注(0)|答案(1)|浏览(420)

我尝试从bigquery读取数据(使用tablerow)并将输出写入cassandra。怎么做?
这是我试过的。这样做有效:

/* Read BQ */
PCollection<CxCpmMapProfile> data =  p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, CxCpmMapProfile>() {
    public CxCpmMapProfile apply(SchemaAndRecord record) {
        GenericRecord r = record.getRecord();
        return new CxCpmMapProfile((String) r.get("channel_no").toString(), (String) r.get("channel_name").toString());
    }
}).fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`").usingStandardSql().withoutValidation());

/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
    .withHosts(Arrays.asList("<IP addr1>", "<IP addr2>"))
    .withPort(9042)
    .withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
    .withEntity(CxCpmMapProfile.class));

但当我改为使用tablerow阅读bq部分时,如下所示:

/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
    .fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`")
    .usingStandardSql().withoutValidation());

在写给Cassandra的信中,我发现了以下错误 The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (CassandraIO.Write<CxCpmMacProfile>)

lskq00tm

lskq00tm1#

错误是由于包含 TableRow 元素,而cassandraio read预期 CxCpmMacProfile 元素。您需要将bigquery中的元素读取为 CxCpmMacProfile 元素。bigqueryio文档提供了一个示例,通过 read(SerializableFunction) 方法。

相关问题