如何在apachespark/hive中合并Dataframe,然后增加版本

oxiaedzo  于 2021-06-24  发布在  Hive
关注(0)|答案(13)|浏览(231)

我们接收来自外部系统的每日文件并将其存储到hive中。希望对数据启用版本控制。
col1,col2是复合键,所以如果我们从文件中接收到相同的数据组合,那么它应该以新版本存储到配置单元中。来自文件的最新数据应该获得最大的版本号。我们怎么能在spark做到这一点
文件df
+----+----+-----+

6yoyoihd

6yoyoihd1#

-+
|列1 |列2 |值| ts                    |新的\u版本|
+----+----+-----+

wb1gzix0

wb1gzix02#

---------+-------+
合并后
+----+----+-----+

ukqbszuj

ukqbszuj3#

---------+-------+
|a | b | 777 | 2019-01-01 00:00:00 | 1|
|k | d | 228 | 2019-01-01 00:00:00 | 1|
|g | g | 241 | 2019-01-01 00:00:00 | 1|
+----+----+-----+

jei2mxaa

jei2mxaa5#

-+
|b | c | 133 | 2018-01-03 00:00:00 | 1|
|k | d | 228 | 2019-01-01 00:00:00 | 1|
|a | b | 999 | 2018-01-01 00:00:00 | 1|
|a | b | 888 | 2018-01-02 00:00:00 | 2|
|a | b | 777 | 2019-01-01 00:00:00 | 3|
|g | g | 231 | 2018-01-01 00:00:00 | 1|
|g | g | 241 | 2019-01-01 00:00:00 | 2|
+----+----+-----+

kq0g1dla

kq0g1dla7#

---------+-------+
||列1 |列2 |值| ts            |版本|
+----+----+-----+

vlf7wbxs

vlf7wbxs8#

---------+-------+
||列1 |列2 |值| ts            |版本|
+----+----+-----+

b1uwtaje

b1uwtaje11#

---------+-------+
|a | b | 999 | 2018-01-01 00:00:00 | 1|
|a | b | 888 | 2018-01-02 00:00:00 | 2|
|b | c | 133 | 2018-01-03 00:00:00 | 1|
|g | g | 231 | 2018-01-01 00:00:00 | 1|
+----+----+-----+

rfbsl7qr

rfbsl7qr12#

现有主配置单元表:

INSERT INTO TABLE test_dev_db.test_1 VALUES
    ('A','B',124,1),
    ('A','B',123,2),
    ('B','C',133,1),
    ('G','G',231,1);

假设您已经从文件中加载了以下数据

INSERT INTO TABLE test_dev_db.test_2 VALUES
('A','B',222,1),
('K','D',228,1),
('G','G',241,1);

以下是您的问题:

WITH CTE AS (
    SELECT col1,col2,value,version FROM test_dev_db.test_1
    UNION
    SELECT col1,col2,value,version FROM test_dev_db.test_2
)
insert overwrite table test_dev_db.test_1
SELECT a.col1,a.col2,a.value, row_number() over(partition by a.col1,a.col2 order by a.col1,a.col1) as new_version
FROM CTE a;

hive> select * from test_dev_db.test_1;
OK
A       B       123     1
A       B       124     2
A       B       222     3
B       C       133     1
G       G       231     1
G       G       241     2
K       D       228     1

对于spark:
创建从文件和配置单元表读取的Dataframe并合并它们

uniondf=df1.unionAll(df2)

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().partitionBy('col1','col2').orderBy(lit('A'))
newdf= uniondf.withColumn("new_version", row_number().over(w)).drop('version')

>>> newdf.show();
+----+----+-----+-----------+
|col1|col2|value|new_version|
+----+----+-----+-----------+
|   B|   C|  133|          1|
|   K|   D|  228|          1|
|   A|   B|  124|          1|
|   A|   B|  123|          2|
|   A|   B|  222|          3|
|   G|   G|  231|          1|
|   G|   G|  241|          2|
+----+----+-----+-----------+

保存到Hive

newdf.write.format("orc").option("header", "true").mode("overwrite").saveAsTable('test_dev_db.new_test_1')
fhg3lkii

fhg3lkii13#

---------+-------+
不接收来自外部系统的版本,但如果我们需要它进行比较,那么它将始终是1
Hive测向
+----+----+-----+

相关问题