从spark java向cassandra map列追加值

im9ewurl  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(342)

我有三列的Cassandra表。

id text,
value text,
mappings map<text,text>

假设样本数据如下:

id        | value       | mappings
-----------------------------------------------
1ABC      | xyz         | {"a":"abc","b":"bcd"}

在spark作业中,我为id计算了一个新值 1ABCb Map为hashmap Ex: "b":"xyz" (可以将Map转换为javardd)
如何使用cassandrajavaspark连接器将这个值附加(覆盖)到表中?我在看这个关于如何处理cql集合追加的示例,但我似乎不知道如何在java中实现这一点。任何提示都将不胜感激。

xcitsw88

xcitsw881#

解决方法如下。
通过传递新参数或使用spark会话中的相同参数来创建cassandra连接器。

import com.datastax.spark.connector.cql.CassandraConnector;

CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf()); // or pass different values for spark.cassandra.connection.host, username and password

rdd.foreach(new VoidFunction<TestBean>() {
        @Override
        public void call(TestBean t) throws Exception {
            final String id = t.getId();
            final Map<String, String> mappings = t.getMappings();
            boolean isUpdated = connector.withSessionDo(new AbstractFunction1<Session, Boolean>() {
                @Override
                public Boolean apply(Session v1) {
                    ResultSet updateResultSet = v1.execute(v1.prepare("update test set mappings = mappings + ? where id = ?")
                            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
                            .bind(mappings, id));
                    return updateResultSet.wasApplied();
                }
            });
        }
    });

相关问题