如何在pig脚本中根据列获取序列号?

643ylb08  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(402)

目前我的数据是这样来的,但我想我的数据显示排名相对于pid字段的变化顺序。我的脚本是这样的。我尝试了排名运算符和密集排名运算符,但仍然没有理想的输出。

trans_c1 = LOAD '/mypath/data_file.csv' using PigStorage(',') as (date,Product_id);  

    (DATE,Product id)
    (2015-01-13T18:00:40.622+05:30,B00XT)
    (2015-01-13T18:00:40.622+05:30,B00XT)
    (2015-01-13T18:00:40.622+05:30,B00XT)
    (2015-01-13T18:00:40.622+05:30,B00XT)
    (2015-01-13T18:00:40.622+05:30,B00OZ)
    (2015-01-13T18:00:40.622+05:30,B00OZ)
    (2015-01-13T18:00:40.622+05:30,B00OZ)
    (2015-01-13T18:00:40.622+05:30,B00VB)
    (2015-01-13T18:00:40.622+05:30,B00VB)
    (2015-01-13T18:00:40.622+05:30,B00VB)
    (2015-01-13T18:00:40.622+05:30,B00VB)

最后的输出应该是这样的,秩序列随着(产品id)的变化而变化,并重置为1。在pig中有可能这样做吗?

(1,2015-01-13T18:00:40.622+05:30,B00XT)
    (2,2015-01-13T18:00:40.622+05:30,B00XT)
    (3,2015-01-13T18:00:40.622+05:30,B00XT)
    (4,2015-01-13T18:00:40.622+05:30,B00XT)
    (1,2015-01-13T18:00:40.622+05:30,B00OZ)
    (2,2015-01-13T18:00:40.622+05:30,B00OZ)
    (3,2015-01-13T18:00:40.622+05:30,B00OZ)
    (1,2015-01-13T18:00:40.622+05:30,B00VB)
    (2,2015-01-13T18:00:40.622+05:30,B00VB)
    (3,2015-01-13T18:00:40.622+05:30,B00VB)
    (4,2015-01-13T18:00:40.622+05:30,B00VB)
hzbexzde

hzbexzde1#

这个问题可以通过使用piggybank函数来解决 Stitch 以及 Over . 它也可以通过使用datafu的 Enumerate 功能。
使用piggybank函数编写脚本:

REGISTER <path to piggybank folder>/piggybank.jar;
DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch;
DEFINE Over org.apache.pig.piggybank.evaluation.Over('int');
input_data = LOAD 'data_file.csv' USING PigStorage(',') AS (date:chararray, pid:chararray);
group_data = GROUP input_data BY pid;
rank_grouped_data = FOREACH group_data GENERATE FLATTEN(Stitch(input_data, Over(input_data, 'row_number')));
display_data = FOREACH rank_grouped_data GENERATE stitched::result AS rank_number, stitched::date AS date, stitched::pid AS pid;
DUMP display_data;

使用datafu的枚举函数编写脚本:

REGISTER <path to pig libraries>/datafu-1.2.0.jar;
DEFINE Enumerate datafu.pig.bags.Enumerate('1');
input_data = LOAD 'data_file.csv' USING PigStorage(',') AS (date:chararray, pid:chararray);
group_data = GROUP input_data BY pid;
data = FOREACH group_data GENERATE FLATTEN(Enumerate(input_data));
display_data = FOREACH data GENERATE $2, $0, $1;
DUMP display_data;

datafu jar文件可以从maven存储库下载:http://search.maven.org/#search%7cga%7c1%7cg%3a%22com.linkedin.datafu%22
输出:

(1,2015-01-13T18:00:40.622+05:30,B00OZ)
(2,2015-01-13T18:00:40.622+05:30,B00OZ)
(3,2015-01-13T18:00:40.622+05:30,B00OZ)
(1,2015-01-13T18:00:40.622+05:30,B00VB)
(2,2015-01-13T18:00:40.622+05:30,B00VB)
(3,2015-01-13T18:00:40.622+05:30,B00VB)
(4,2015-01-13T18:00:40.622+05:30,B00VB)
(1,2015-01-13T18:00:40.622+05:30,B00XT)
(2,2015-01-13T18:00:40.622+05:30,B00XT)
(3,2015-01-13T18:00:40.622+05:30,B00XT)
(4,2015-01-13T18:00:40.622+05:30,B00XT)

裁判:
apache中行号函数的实现
apache pig rank函数的用法

相关问题