我已经写了一个Kafka生产者尾部的内容,日志文件(format:csv)kafka consumer是一个流应用程序,它创建一个javadstream。使用foreachrdd方法,我在分隔符“”上拆分文件的每一行,并创建row对象。然后我使用javardd和schema创建dataframe。但这里的问题是,日志文件中的所有行的列数并不相同。因此,有没有办法过滤掉不满足模式的行,或者根据行内容动态地创建模式?以下是代码的一部分:
JavaDStream<String> msgDataStream =directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
@Override
public Row call(String msg) {
String[] splitMsg=msg.split(",");
Object[] vals = new Object[splitMsg.length];
for(int i=0;i<splitMsg.length;i++)
{
vals[i]=splitMsg[i].replace("\"","").trim();
}
Row row = RowFactory.create(vals);
return row;
}
});
//Create Schema
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("timeIpReq", DataTypes.StringType, true),DataTypes.createStructField("SrcMac", DataTypes.StringType, true),
DataTypes.createStructField("Proto", DataTypes.StringType, true),DataTypes.createStructField("ACK", DataTypes.StringType, true),
DataTypes.createStructField("srcDst", DataTypes.StringType, true),DataTypes.createStructField("NATSrcDst", DataTypes.StringType, true),
DataTypes.createStructField("len", DataTypes.StringType, true)});
//Get Spark 2.0 session
Dataset<Row> msgDataFrame = session.createDataFrame(rowRDD, schema);
1条答案
按热度按时间kiz8lqtg1#
删除与预期模式不匹配的行的一种简单方法是使用
flatMap
用一个Option
同样,如果您的目标是构建Dataframe,那么我们使用相同的flatMap
步骤将架构应用于数据。在scala中,通过使用case class
锿。关于:
日志文件中的所有行的列数不同。
假设它们都表示相同类型的数据,但可能缺少一些列,正确的策略是要么过滤掉不完整的数据(如这里所示),要么在定义的模式中使用可选值(如果有确定的方法知道缺少哪些字段)。这个要求应该向生成数据的上游应用程序提出。在csv中用空逗号序列表示缺少的值是很常见的(例如。
field0,,field2,,,field5
)处理每行差异的动态模式是没有意义的,因为无法应用它来完成
DataFrame
由具有不同模式的行组成。