pig-使用python udf(窗口函数)按组计算记录之间的差异

kwvwclae  于 2021-06-21  发布在  Pig
关注(0)|答案(2)|浏览(320)

我试图按组计算记录之间的差异,并按组计算行数。这可以通过在hive中使用lag和row number函数以及窗口函数来实现。正在尝试使用pig和python udf重新创建这个。
在下面的示例中,我需要为每个名称从1重新启动行号,并为新的月份(新记录)增加行号。另外,我需要的差额从上个月为每个名字。
输入数据

name    month   balance
A   1   10
A   2   5
A   3   15
B   2   20
B   3   10
B   4   45
B   5   50

输出数据

name    month   balance row_number  balance_diff
A   1   10  1   0
A   2   5   1   -5
A   3   15  3   10
B   2   20  1   0
B   3   10  2   -10
B   4   45  3   35
B   5   50  4   5

如何使用pig和python自定义项来实现这一点?下面是我试过的。
Pig

output = foreach (group input by (name)) {
    sorted = order input BY month asc;
    row_details= myudf.rownum_and_diff(sorted.(month, balance));
    generate flatten (sorted), flatten (row_details));
    };

python自定义项

def row_num(mth):
    return [x+1 for x,y in enumerate (mth)]

def diff(bal, n=1):
    return [x-y if (x is not None and y is not None) else 0.0 \
        for x,y in zip(bal, [:n] + bal)]

@outputSchema('udfbag:bag{udftuple:tuple(row_number: int, balance_diff: int)}')

def row_metrics(mthbal):
    mth, bal = zip(*mthbal)
    row_number = row_num(mth)
    balance_diff = diff(bal)
    return zip(row_number, balance_diff)

我的python函数可以工作。然而,一旦我把结果带到pig中,我就很难将这两个包(分类和行详细信息)组合起来。非常感谢您的帮助。
我还看到pig中的enumerate函数对行号做了我想做的事情。不过,作为学习pig的一部分,我正在寻找使用python udfs的解决方案。

dldeef67

dldeef671#

试试这个。
python自定义项:

def row_num(mth):
    return [x+1 for x,y in enumerate (mth)]

def diff(bal, n=1):
    return [0]+[x-y for x,y in zip(bal[n:],bal[:-n])]

@outputSchema('udfbag:bag{udftuple:tuple(name: chararray, mth: int, row_number: int, balance_diff: int)}')

def row_metrics(mthbal):
    name, mth, bal = zip(*mthbal)
    row_number = row_num(mth)
    balance_diff = diff(bal)
    return zip(name,mth,row_number, balance_diff)

Pig脚本:

register 'myudf.py' using jython as myudf;
inpdat = load 'input.dat' using PigStorage(',') as (name:chararray, month:int, balance:int);

outdat = foreach (group inpdat by name) {
    sorted = order inpdat BY month asc;
    row_details = myudf.row_metrics(sorted);
    generate flatten (row_details);
    };

dump outdat;
wi3ka0sx

wi3ka0sx2#

使用piggybank的缝合功能在我的案例中是有效的。会有兴趣学习任何其他方法来做到这一点。

REGISTER /mypath/piggybank.jar;
define Stitch org.apache.pig.piggybank.evaluation.Stitch;

input = load 'input.dat' using PigStorage(',') as (name:chararray, month:int, balance:int);

output = FOREACH (group input by name) { 
sorted = ORDER input by month asc; 
udf_fields = myudf.row_metrics(sorted.(month, balance));
generate flatten(Stitch(sorted,udf_fields)) as (name, month, balance, row_number, balance_diff);
};

相关问题