在java中迭代sparkDataframe而不使用collect

plicqrtu  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(436)

我正在使用spark 1.6.1
我有一个Dataframe,我需要遍历它并将每一行写入kafka。现在我在做这样的事情:

Producer<String><String> message;
for(Row x: my_df.collect()){
    kafka_message = new Producer<String><String>(topic, String.valueOf(x))
    my_kafka_producer.send(kafka_message);
}

这里的问题是collect将数据发送给驱动程序,然后推送到kafka。考虑到我有大约250个执行者,我的1驱动程序不能有效地处理工作负载。所以,我想知道如何迭代执行器上的Dataframe。这将需要避免执行collect()。我发现了一篇文章,大致解释了如何做到这一点,但不幸的是,他们的github链接实际上已经过期,所以我找不到如何实现它。
参考条款:https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/

nhn9ugyo

nhn9ugyo1#

在java中,您可以尝试以下操作。扩展 AbstractFunction1 ```
import scala.runtime.AbstractFunction1;

abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
}

现在打电话 `foreachPartition` 对于您的Dataframe,如下所示。

import scala.collection.Iterator;
import scala.runtime.BoxedUnit;

df.foreachPartition(new MyFunction1<Iterator,BoxedUnit>(){
@Override
public BoxedUnit apply(Iterator rows) {
while(rows.hasNext()){
//get the Row
Row row = rows.next();
}
return BoxedUnit.UNIT;
}
});

相关问题