如何从数组[strings]创建Dataframe?

vddsk6oq  于 2021-05-17  发布在  Spark
关注(0)|答案(2)|浏览(556)

我使用rdd.collect()创建了一个数组,现在我想使用这个数组[strings]创建一个Dataframe。我的测试文件采用以下格式(用管道|分隔)。

  1. TimeStamp
  2. IdC
  3. Name
  4. FileName
  5. Start-0f-fields
  6. column01
  7. column02
  8. column03
  9. column04
  10. column05
  11. column06
  12. column07
  13. column08
  14. column010
  15. column11
  16. End-of-fields
  17. Start-of-data
  18. G0002B|0|13|IS|LS|Xys|Xyz|12|23|48|
  19. G0002A|0|13|IS|LS|Xys|Xyz|12|23|45|
  20. G0002x|0|13|IS|LS|Xys|Xyz|12|23|48|
  21. G0002C|0|13|IS|LS|Xys|Xyz|12|23|48|
  22. End-of-data
  23. document

列名介于字段开头和字段结尾之间。我想将“|”管道存储在Dataframe的不同列中。
如以下示例:

  1. column01 column02 column03 column04 column05 column06 column07 column08 column010 column11
  2. G0002C 0 13 IS LS Xys Xyz 12 23 48
  3. G0002x 0 13 LS MS Xys Xyz 14 300 400

我的代码:

  1. val rdd = sc.textFile("the above text file")
  2. val columns = rdd.collect.slice(5,16).mkString(",") // it will hold columnnames
  3. val data = rdd.collect.slice(5,16)
  4. val rdd1 = sc.parallelize(rdd.collect())
  5. val df = rdd1.toDf(columns)

但这并没有给我上述所需的Dataframe

xqnpmsa8

xqnpmsa81#

如果列的数量和列的名称是固定的,则可以按以下方式执行:

  1. val columns = rdd.collect.slice(5,15).mkString(",") // it will hold columnnames
  2. val data = rdd.collect.slice(17,21)
  3. val d = data.mkString("\n").split('\n').toSeq.toDF()
  4. import org.apache.spark.sql.functions._
  5. val dd = d.withColumn("columnX",split($"value","\\|")).withColumn("column1",$"columnx".getItem(0)).withColumn("column2",$"columnx".getItem(1)).withColumn("column3",$"columnx".getItem(2)).withColumn("column4",$"columnx".getItem(3)).withColumn("column5",$"columnx".getItem(4)).withColumn("column6",$"columnx".getItem(5)).withColumn("column8",$"columnx".getItem(7)).withColumn("column10",$"columnx".getItem(8)).withColumn("column11",$"columnx".getItem(9)).drop("columnX","value")
  6. display(dd)

您可以看到如下输出:

hsvhsicv

hsvhsicv2#

你能试试这个吗?

  1. import spark.implicits._ // Add to use `toDS()` and `toDF()`
  2. val rdd = sc.textFile("the above text file")
  3. val columns = rdd.collect.slice(5,16) // `.mkString(",")` is not needed
  4. val dataDS = rdd.collect.slice(5,16)
  5. .map(_.trim()) // to remove whitespaces
  6. .map(s => s.substring(0, s.length - 1)) // to remove last pipe '|'
  7. .toSeq
  8. .toDS
  9. val df = spark.read
  10. .option("header", false)
  11. .option("delimiter", "|")
  12. .csv(dataDS)
  13. .toDF(columns: _*)
  14. df.show(false)
  1. +--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+
  2. |column01|column02|column03|column04|column05|column06|column07|column08|column010|column11|
  3. +--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+
  4. |G0002B |0 |13 |IS |LS |Xys |Xyz |12 |23 |48 |
  5. |G0002A |0 |13 |IS |LS |Xys |Xyz |12 |23 |45 |
  6. |G0002x |0 |13 |IS |LS |Xys |Xyz |12 |23 |48 |
  7. |G0002C |0 |13 |IS |LS |Xys |Xyz |12 |23 |48 |
  8. +--------+--------+--------+--------+--------+--------+--------+--------+---------+--------+

打电话 spark.read...csv() 没有模式的方法,可能需要很长时间处理大量数据,因为模式推理(例如。附加读数)。
在这种情况下,您可以如下所示指定模式。

  1. /*
  2. column01 STRING,
  3. column02 STRING,
  4. column03 STRING,
  5. ...
  6. * /
  7. val schema = columns
  8. .map(c => s"$c STRING")
  9. .mkString(",\n")
  10. val df = spark.read
  11. .option("header", false)
  12. .option("delimiter", "|")
  13. .schema(schema) // schema inferences not occurred
  14. .csv(dataDS)
  15. // .toDF(columns: _*) => unnecessary when schema is specified
展开查看全部

相关问题