spark java—在oracle数据集中添加基于日期的新列

iugsix8n  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(293)

正在尝试编写spark java程序,在数据集中添加基于日期的列。我使用的是oracle数据库。
需要使用spark java在posteddate的基础上添加新的列(yearquarter)。例如。。。如果posteddate介于2020年1月1日至3月31日之间,则yearquarter值为2020年1月1日。您可以在下面看到当前数据集和预期的数据集。。。
有人能告诉我一段添加新列的spark java代码吗。。
用于从表/输入数据集读取的代码段

Dataset<Row> inputDataset = sparksession.read().jdbc(jdbcUrl, table_name, connectionProperties);
    inputDataset.show();

数据集

Current Dateset(inputDataset):-
                   +------+--------+---------------------|
                   | ID   |location| posteddate          |
                   +------+--------+---------------------+
                   |137570|chennai |2020-06-22 13:49:... |
                   |137571| kerala |2020-02-22 14:49:... |
                   |137572|chennai |2018-10-26 13:19:... |
                   |137573|chennai |2019-09-29 14:49:... |
                   +------+-------+---------------------+

           Expected DataSet:-
                   +------+--------+---------------------+--------------+
                   |   id |location| posteddate          |  yearquarter |
                   +------+--------+---------------------+--------------+
                   |137570|chennai |2020-06-22 13:49:... |        Q22020|
                   |137571| kerala |2020-02-22 14:49:... |        Q12020|
                   |137572|chennai |2018-10-26 13:19:... |        Q42018|
                   |137573|chennai |2019-09-29 14:49:... |        Q32019|
                   +------+--------+---------------------+--------------+

提前谢谢

mnemlml8

mnemlml81#

试试这个-
使用 quarter + year ```
dataset.show(false);
dataset.printSchema();
/**
* +------+--------+-------------------+
* |ID |location|posteddate |
* +------+--------+-------------------+
* |137570|chennai |2020-06-22 13:49:00|
* |137571|kerala |2020-02-22 14:49:00|
* |137572|chennai |2018-10-26 13:19:00|
* |137573|chennai |2019-09-29 14:49:00|
* +------+--------+-------------------+
*
* root
* |-- ID: integer (nullable = true)
* |-- location: string (nullable = true)
* |-- posteddate: timestamp (nullable = true)
*/

    dataset.withColumn("yearquarter", expr("concat('Q', quarter(posteddate), year(posteddate))"))
            .show(false);
    /**
     * +------+--------+-------------------+-----------+
     * |ID    |location|posteddate         |yearquarter|
     * +------+--------+-------------------+-----------+
     * |137570|chennai |2020-06-22 13:49:00|Q22020     |
     * |137571|kerala  |2020-02-22 14:49:00|Q12020     |
     * |137572|chennai |2018-10-26 13:19:00|Q42018     |
     * |137573|chennai |2019-09-29 14:49:00|Q32019     |
     * +------+--------+-------------------+-----------+
     */
fjnneemd

fjnneemd2#

进口:

import static org.apache.spark.sql.functions.lit
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;

你可以以此为出发点

inputDataset = inputDataset
   .withColumn( "yearquarter", // adding column
     when(                       // conditional operator
       col("posteddate")         
           .$greater("start_range")   // condition #1
       .and(                      // and 
           col("posteddate").$less("end_range")), // condition #2
                lit("Q12020"))   // column value if condition evaluates to true
     .otherwise(lit("Q22020"))); // column value if condition evaluates to false

相关问题