我们已经编写了一个spark批处理应用程序(spark版本:2.3.0)。代码如下。
转型: Dataset<CollectionFlattenedData> collectionDataDS = flatMap
(解析一些文件并返回数据集的函数);此数据集将有三种类型的数据,按列记录类型区分:1、2、3。
加载到临时表: collectionDataDS.createOrReplaceTempView(TEMP_TABLE);
正在创建数据集的临时视图。
行动1: sparkSession.sql("INSERT INTO TABLE1 SELECT COL1,COL2,COL3 FROM TEMP_TABLE WHERE recordtype='1'");
用于从临时表加载table1表的配置单元查询。
行动2: sparkSession.sql("INSERT INTO TABLE2 SELECT COL4,COL5,COL6 FROM TEMP_TABLE WHERE recordtype='2'");
用于从临时表加载表2的配置单元查询。
行动3: sparkSession.sql("INSERT INTO TABLE2 SELECT COL7,COL8,COL9 FROM TEMP_TABLE WHERE recordtype='3'");
用于加载错误表的配置单元查询
发生了什么:因为我们运行的是3个查询,它们只是单独的操作,所以flatmap转换被调用了三次(一个操作一次)。但我们的要求是只能调用一次flatmap操作。
collectionflatteddatapojo代码是这样的
public class CollectionFlattenedData implements Serializable {
private String recordtype;
private String COL1;
private String COL2;
private String COL3;
private String COL4;
private String COL5;
private String COL6;
private String COL7;
private String COL8;
private String COL9;
//getters and setters of all the columns
}
不管怎样我们都能做到。我们非常感谢您早日作出回应。
1条答案
按热度按时间axr492tv1#
我们可以用两种方法处理这个问题,但首先要确定“temp\u表”的大小。
如果大小按ram的顺序排列,也就是说,如果它能够缓存大量的临时表,那么您可以缓存它并在进一步的计算中使用它。(您可以从ui获取数据量)
另一种更好的方法是将数据保存到一个永久表中。你可以照常参考下一步。
当你使用
.createOrReplaceTempView()
,您将提供一个名称,以便在类似spark sql的查询中进一步使用它。它不会对结果Dataframe创建任何操作。