目前elasticsearch hadoop正在将数据集/rdd转换为具有1对1Map的文档,即数据集中的一行转换为一个文档。在我们的场景中,我们是这样做的
为了“统一”
PUT spark/docs/1
{
"_k":"one",
"_k":"two",
"_k":"three" // large sets , we dont need to store much, we just want to map multiple keys to single value.
"_v" :"key:
}
GET spark/docs/_search
{
"query" : {
"constant_score" : {
"filter" : {
"terms" : {
"_k" : ["one"] // all values work.
}
}
}
}
}
有什么建议我们如何实施以上,如果有更好的策略,请建议。
下面的代码是不工作的,但我正在努力实现下面的东西在理论上
final Dataset<String> df = spark.read().csv("src/main/resources/star2000.csv").select("_c1").dropDuplicates().as(Encoders.STRING());
final Dataset<ArrayList> arrayListDataset = df.mapPartitions(new MapPartitionsFunction<String, ArrayList>() {
@Override
public Iterator<ArrayList> call(Iterator<String> iterator) throws Exception {
ArrayList<String> s = new ArrayList<>();
iterator.forEachRemaining(it -> s.add(it));
return Iterators.singletonIterator(s);
}
}, Encoders.javaSerialization(ArrayList.class));
JavaEsSparkSQL.saveToEs(arrayListDataset,"spark/docs");
我不想在一个列表中收集完整的数据集,因为它可能导致oom,所以计划是获取每个分区的列表,并根据分区键对其进行索引。
2条答案
按热度按时间dw1jzc5e1#
这将有助于张贴一些源代码,你正在使用的问题也不清楚你要实现什么。
我假设您想将一个数组发布到键字段(\u k)并将另一个值发布到值字段(\u v)?
因此,您可以创建一个javapairrdd并将其保存到elasticsearch,如下所示:
olqngx592#
使用pojo作为
和下面的代码片段
这将创建上面的数组索引。