我正在使用spark 1.6.1
我有一个Dataframe,我需要遍历它并将每一行写入kafka。现在我在做这样的事情:
Producer<String><String> message;
for(Row x: my_df.collect()){
kafka_message = new Producer<String><String>(topic, String.valueOf(x))
my_kafka_producer.send(kafka_message);
}
这里的问题是collect将数据发送给驱动程序,然后推送到kafka。考虑到我有大约250个执行者,我的1驱动程序不能有效地处理工作负载。所以,我想知道如何迭代执行器上的Dataframe。这将需要避免执行collect()。我发现了一篇文章,大致解释了如何做到这一点,但不幸的是,他们的github链接实际上已经过期,所以我找不到如何实现它。
参考条款:https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/
1条答案
按热度按时间nhn9ugyo1#
在java中,您可以尝试以下操作。扩展
AbstractFunction1
```import scala.runtime.AbstractFunction1;
abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
}
import scala.collection.Iterator;
import scala.runtime.BoxedUnit;
df.foreachPartition(new MyFunction1<Iterator,BoxedUnit>(){
@Override
public BoxedUnit apply(Iterator rows) {
while(rows.hasNext()){
//get the Row
Row row = rows.next();
}
return BoxedUnit.UNIT;
}
});