sparkDataframe来自动态模式或过滤掉不满足该模式的行

f1tvaqid  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(520)

我已经写了一个Kafka生产者尾部的内容,日志文件(format:csv)kafka consumer是一个流应用程序,它创建一个javadstream。使用foreachrdd方法,我在分隔符“”上拆分文件的每一行,并创建row对象。然后我使用javardd和schema创建dataframe。但这里的问题是,日志文件中的所有行的列数并不相同。因此,有没有办法过滤掉不满足模式的行,或者根据行内容动态地创建模式?以下是代码的一部分:

JavaDStream<String> msgDataStream =directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> rdd) {
                JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                    @Override
                    public Row call(String msg) {
                        String[] splitMsg=msg.split(",");
                        Object[] vals = new Object[splitMsg.length];
                        for(int i=0;i<splitMsg.length;i++)
                        {
                            vals[i]=splitMsg[i].replace("\"","").trim();
                        }

                        Row row = RowFactory.create(vals);
                        return row;
                    }
                });

                //Create Schema
                StructType schema = DataTypes.createStructType(new StructField[] {
                        DataTypes.createStructField("timeIpReq", DataTypes.StringType, true),DataTypes.createStructField("SrcMac", DataTypes.StringType, true),
                        DataTypes.createStructField("Proto", DataTypes.StringType, true),DataTypes.createStructField("ACK", DataTypes.StringType, true),
                        DataTypes.createStructField("srcDst", DataTypes.StringType, true),DataTypes.createStructField("NATSrcDst", DataTypes.StringType, true),
                        DataTypes.createStructField("len", DataTypes.StringType, true)});

                //Get Spark 2.0 session

                Dataset<Row> msgDataFrame = session.createDataFrame(rowRDD, schema);
kiz8lqtg

kiz8lqtg1#

删除与预期模式不匹配的行的一种简单方法是使用 flatMap 用一个 Option 同样,如果您的目标是构建Dataframe,那么我们使用相同的 flatMap 步骤将架构应用于数据。在scala中,通过使用 case class 锿。

// Create Schema
case class NetInfo(timeIpReq: String, srcMac: String, proto: String, ack: String, srcDst: String, natSrcDst: String, len: String)

val netInfoStream = msgDataStream.flatMap{msg => 
  val parts = msg.split(",")
  if (parts.size == 7) {  //filter out messages with unmatching set of fields
    val Array(time, src, proto, ack, srcDst, natSrcDst, len) = parts // use a extractor to get the different parts in variables
    Some(NetInfo(time, src, proto, ack, srcDst, natSrcDst, len)) // return a valid record
  } else {
    None  // We don't have a valid. Return None
  }
}

netInfoStream.foreachRDD{rdd =>
    import sparkSession.implicits._ 
    val df = rdd.toDF() // DataFrame transformation is possible on RDDs with a schema (based on a case class)
    // do stuff with the dataframe
}

关于:
日志文件中的所有行的列数不同。
假设它们都表示相同类型的数据,但可能缺少一些列,正确的策略是要么过滤掉不完整的数据(如这里所示),要么在定义的模式中使用可选值(如果有确定的方法知道缺少哪些字段)。这个要求应该向生成数据的上游应用程序提出。在csv中用空逗号序列表示缺少的值是很常见的(例如。 field0,,field2,,,field5 )
处理每行差异的动态模式是没有意义的,因为无法应用它来完成 DataFrame 由具有不同模式的行组成。

相关问题