我有一个javapairdd,我想对它进行迭代,执行一些操作,并将输出存储到hive。目前,我正在尝试在foreach中创建一个dataframe,它抛出了一个异常,因为在foreach中不能创建dataframe。那么,还有什么选择呢?
JavaPairRDD<Long, Iterable<EmployeeDetail>> employeeDetailPairList = fetchEmployeeDetailData();
List<EmployeeZone> employeeZoneFCList = fetchEmployeeZoneData();
employeeDetailPairList.foreach(employeeDetailPair -> {
Iterable<EmployeeDetail> employeeDetailList = employeeDetailPair._2;
Set<String> zipCodeSet = StreamSupport.stream(employeeDetailList.spliterator(), false).map(e -> e.getZipCode()).collect(Collectors.toSet());
List<EmployeeZone> employeeZoneFilteredList = employeeZoneList.stream().filter(e -> zipCodeSet.contains(String.valueOf(e.getLoc()))).collect(Collectors.toList());
List<Output> outputListList = processEmployeeData(employeeZoneFilteredList);
outputListList = addWeekStartDay(outputListList, weekStartDay);
if(outputListList != null && this.getSession()!= null) {
Dataset<Row> recordsDF = this.getSession().sqlContext().createDataFrame(outputListList, Output.class);
recordsDF.write().insertInto(SHIPCODE_PREFERRED_FC_HIVE_TABLE);
}
});
1条答案
按热度按时间fgw7neuy1#
不能在转换内创建Dataframe。实现这一点的唯一方法是将hivetable与rdd连接起来。这样可以避免查找并执行所需的操作。希望它能回答您的问题