我试图按组计算记录之间的差异,并按组计算行数。这可以通过在hive中使用lag和row number函数以及窗口函数来实现。正在尝试使用pig和python udf重新创建这个。
在下面的示例中,我需要为每个名称从1重新启动行号,并为新的月份(新记录)增加行号。另外,我需要的差额从上个月为每个名字。
输入数据
name month balance
A 1 10
A 2 5
A 3 15
B 2 20
B 3 10
B 4 45
B 5 50
输出数据
name month balance row_number balance_diff
A 1 10 1 0
A 2 5 1 -5
A 3 15 3 10
B 2 20 1 0
B 3 10 2 -10
B 4 45 3 35
B 5 50 4 5
如何使用pig和python自定义项来实现这一点?下面是我试过的。
Pig
output = foreach (group input by (name)) {
sorted = order input BY month asc;
row_details= myudf.rownum_and_diff(sorted.(month, balance));
generate flatten (sorted), flatten (row_details));
};
python自定义项
def row_num(mth):
return [x+1 for x,y in enumerate (mth)]
def diff(bal, n=1):
return [x-y if (x is not None and y is not None) else 0.0 \
for x,y in zip(bal, [:n] + bal)]
@outputSchema('udfbag:bag{udftuple:tuple(row_number: int, balance_diff: int)}')
def row_metrics(mthbal):
mth, bal = zip(*mthbal)
row_number = row_num(mth)
balance_diff = diff(bal)
return zip(row_number, balance_diff)
我的python函数可以工作。然而,一旦我把结果带到pig中,我就很难将这两个包(分类和行详细信息)组合起来。非常感谢您的帮助。
我还看到pig中的enumerate函数对行号做了我想做的事情。不过,作为学习pig的一部分,我正在寻找使用python udfs的解决方案。
2条答案
按热度按时间dldeef671#
试试这个。
python自定义项:
Pig脚本:
wi3ka0sx2#
使用piggybank的缝合功能在我的案例中是有效的。会有兴趣学习任何其他方法来做到这一点。