如何使用scala/python/spark在csv文件中创建新的timestamp(minute)列

x759pob2  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(401)

我有一个csv文件,我想创建一个新的分钟时间戳列,如下所示
实际值:

Col1, Col2
1.19185711131486, 0.26615071205963  
-1.3598071336738, -0.0727811733098497   
-0.966271711572087, -0.185226008082898
-0.966271711572087, -0.185226008082898
-1.15823309349523, 0.877736754848451
-0.425965884412454, 0.960523044882985

预期:

Col1, Col2, ts
1.19185711131486, 0.26615071205963, 00:00:00
-1.3598071336738, -0.0727811733098497, 00:01:00
-0.966271711572087, -0.185226008082898, 00:02:00
-0.966271711572087, -0.185226008082898, 00:03:00
-1.15823309349523, 0.877736754848451, 00:04:00
-0.425965884412454, 0.960523044882985, 00:05:00

提前谢谢!

af7jpaap

af7jpaap1#

也许这是有用的-

val data =
      """
        |Col1, Col2
        |1.19185711131486, 0.26615071205963
        |-1.3598071336738, -0.0727811733098497
        |-0.966271711572087, -0.185226008082898
        |-0.966271711572087, -0.185226008082898
        |-1.15823309349523, 0.877736754848451
        |-0.425965884412454, 0.960523044882985
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)
    df.printSchema()
    df.show(false)
    /**
      * root
      * |-- Col1: double (nullable = true)
      * |-- Col2: double (nullable = true)
      *
      * +------------------+-------------------+
      * |Col1              |Col2               |
      * +------------------+-------------------+
      * |1.19185711131486  |0.26615071205963   |
      * |-1.3598071336738  |-0.0727811733098497|
      * |-0.966271711572087|-0.185226008082898 |
      * |-0.966271711572087|-0.185226008082898 |
      * |-1.15823309349523 |0.877736754848451  |
      * |-0.425965884412454|0.960523044882985  |
      * +------------------+-------------------+
      */
df.withColumn("ts",
      date_format(to_timestamp((row_number().over(Window.orderBy(df.columns.map(col): _*)) - 1).cast("string"),
        "mm")
        , "00:mm:00"))
      .show(false)

    /**
      * +------------------+-------------------+--------+
      * |Col1              |Col2               |ts      |
      * +------------------+-------------------+--------+
      * |-1.3598071336738  |-0.0727811733098497|00:00:00|
      * |-1.15823309349523 |0.877736754848451  |00:01:00|
      * |-0.966271711572087|-0.185226008082898 |00:02:00|
      * |-0.966271711572087|-0.185226008082898 |00:03:00|
      * |-0.425965884412454|0.960523044882985  |00:04:00|
      * |1.19185711131486  |0.26615071205963   |00:05:00|
      * +------------------+-------------------+--------+
      */

相关问题