更新if块中的常量,并将其提取为同一generate语句中的字段

uxh89sit  于 2021-06-25  发布在  Pig
关注(0)|答案(1)|浏览(375)

我有一个关系,如下加载到“调用”。

(Header India)
(Call1)
(Call2)
(END)
(Header NZ)
(Call1)
(Call2)
(END)

我正在尝试更新关系,使其变为如下所示,并且我可以按第2个字段分组,以获得国家/地区的呼叫计数。

(Header India, Header India)
(Call1, Header India)
(Call2, Header India)
(END, Header India)
(Header NZ, Header NZ)
(Call1, Header NZ )
(Call2, Header NZ)
(END, Header NZ)

第一个元组总是(header)。我使用下面的代码,我想更新常量,然后提取常量作为第二个字段。但它不起作用。有什么建议吗?

%declare HeaderText 'Header '
calls = LOAD 'Data File';
extrctd = FOREACH calls GENERATE $0 as (country:chararray), (SUBSTRING($0,1,7)=='Header '?'$HeaderText'=$0:'$HeaderText') as (txt:chararray);
blpfk2vs

blpfk2vs1#

一种选择是您可以编写自己的自定义项来解决这个问题。下面的示例代码
输入文件

Header India
Call1
Call2
END
Header NZ
Call1
Call2
END

Pig手稿:

REGISTER mycountry.jar;

calls = LOAD 'input.txt' AS (line:chararray);
extrctd = FOREACH calls GENERATE $0 AS country, mypackage.COUNTRY(line,'Header') as txt;
DUMP extrctd;

输出:

(Header India,Header India)
(Call1,Header India)
(Call2,Header India)
(END,Header India)
(Header NZ,Header NZ)
(Call1,Header NZ)
(Call2,Header NZ)
(END,Header NZ)

示例udf代码:下面的java类(country和myglobal)编译并生成为 mycountry.jar 国家.java

package mypackage;
    import java.io.IOException;
    import org.apache.commons.lang.StringUtils;
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.Tuple;

    class MyGlobal {
        public static String myCountry;
    }

    public class COUNTRY extends EvalFunc<String> {
    @Override
    public String exec(Tuple arg0) throws IOException {
            try
            {
                    String input = ((String) arg0.get(0));
                    String header = ((String) arg0.get(1));
                    String output;

                   if(input.startsWith(header))
                    {
                            output = input;
                            MyGlobal.myCountry = output;
                    }
                    else
                    {
                            output = MyGlobal.myCountry;
                    }
                    return output;
            }
            catch(Exception e)
            {
                throw new IOException("Caught exception while processing the input row ", e);
            }
        }
    }

相关问题