spark变换rdd

yr9zkbsy  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(430)

我有这样的csv文件输入:

time,col1,col2,col3  
0,5,8,9 
1,6,65,3 
2,5,8,465,4 
3,85,45,8

列数未知,我希望结果rdd的格式为:

(constant,column,time,value)

这意味着:((car1,col1,0,5),(car1,col2,1,8)…)
我有rdds时间,行和标题

class SimpleCSVHeader(header:Array[String]) extends Serializable {
    val index = header.zipWithIndex.toMap
    def apply(array:Array[String], key:String):String = array(index(key))
  }
  val constant = "car1"

  val csv = sc.textFile("C:\\file.csv")  

  val data = csv.map(line => line.split(",").map(elem => elem.trim)) 

  val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
  val rows = data.filter(line => header(line,"time") != "time") // filter the header out
  val time = rows.map(row => header(row,"time"))

但我不知道如何从中创建结果rdd

hrysbysz

hrysbysz1#

我的建议是在您的场景中使用Dataframe而不是rdd。但我已经厌倦了给你工作的解决方案,这取决于大量的数据。

val lines = Array("time,col1,col2,col3", "0,5,8,9", "1,6,65,3", "2,5,8,465,4")

        val sc = prepareConfig()    
        val baseRDD = sc.parallelize(lines)    
        val columList = baseRDD.take(1)

//Prepare column list. this code can be avoided if you use DataFrames
        val map = scala.collection.mutable.Map[Int, String]()
        columList.foreach { x =>
          {

        var index: Int = 0
            x.split(",").foreach { x =>
              {
                index += 1
                map += (index -> x)

              }
            }

          }
        }

        val mapRDD = baseRDD.flatMap { line =>
          {
            val splits = line.split(",")

//Replace Tuples with your case classes 
            Array(("car1", map(2), splits(0), splits(1)), ("car1", map(3), splits(0), splits(2)), ("car1", map(4), splits(0), splits(3)))
          }
        }

        mapRDD.collect().foreach(f => println(f))

结果:
(car1,col1,0,5)(car1,col2,0,8)(car1,col3,0,9)(car1,col1,1,6)(car1,col2,1,65)(car1,col3,1,3)(car1,col1,2,5)(car1,col2,2,8)(car1,col3,2465)

相关问题