hadoop pig脚本-union with condition

btqmn9zl  于 2021-06-24  发布在  Pig
关注(0)|答案(2)|浏览(494)

我对Pig是完全陌生的。我想使用iid字段合并两个文件a和b,但是我不想输出有a没有的iid(来自b)。这似乎很简单,但我不知道如何正确地做。
下面是我的示例代码,它只包含union:

  1. a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
  2. b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
  3. out = union onschema a,b;
  4. singled = distinct out;
  5. ordered = order singled by iid;
  6. store ordered into '$output';

下面的示例数据只有3列来描述我的期望。请注意,字段实际上是以制表符分隔的。
样本数据a:

  1. 1 Name Tom Linkon
  2. 1 Title Professor
  3. 2 Name Whatever
  4. 2 Title Worker

样本数据b:

  1. 1 City New York
  2. 2 City Columbus
  3. 3 City Fake fake
  4. 4 City Blah Bla

样本输出

  1. 1 Name Tom Linkon
  2. 1 Title Professor
  3. 1 City New York
  4. 2 Name Whatever
  5. 2 Title Worker
  6. 2 City Columbus

非常感谢你的帮助!

zlwx9yxi

zlwx9yxi1#

使用 COGROUP 使用相同的键组织记录,但避免 JOIN 的不受欢迎的交叉积。那么 FILTER 看包里有没有 b 的记录是空的,拆分回两个关系,并执行 UNION :

  1. a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
  2. b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
  3. c = COGROUP a BY iid, b BY iid;
  4. c_filt = FILTER c BY NOT IsEmpty(b);
  5. a_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(a);
  6. b_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(b);
  7. out = UNION ONSCHEMA a_new, b_new;
  8. singled = DISTINCT out;
  9. STORE (ORDER singled BY iid) INTO '$output';

然而,我不喜欢这个解决方案——对于这样一个简单的操作来说,它有太多的行和新的关系。真正需要的是一种把两个袋子合二为一的方法。Pig显然没有提供这个(尽管如果有,请回答这个问题)。不过,您可以编写一个简单的自定义项:

  1. public class MERGE extends EvalFunc<DataBag> {
  2. public DataBag exec(Tuple input) throws IOException {
  3. DataBag b = new DefaultDataBag();
  4. try {
  5. if (input != null)
  6. for (int i = 0; i < input.size(); i++)
  7. b.addAll((DataBag) input.get(i));
  8. } catch (Exception e) { return null; }
  9. return b;
  10. }
  11. }

有了这个自定义项,解决方案就变成了:

  1. a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
  2. b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
  3. c = FOREACH (COGROUP a BY iid, b BY iid) GENERATE group AS iid, MERGE(a,b) AS bag;
  4. out = FOREACH c {
  5. uniq = DISTINCT bag;
  6. GENERATE iid, FLATTEN(bag);
  7. };
  8. STORE (ORDER out BY iid) INTO '$output';

这种方法的另一个优点是,如果您有多个输入,则不需要执行多个输入 FOREACH 在那之后 COGROUP . 只需在下面的语句中添加更多参数 MERGE :

  1. c = FOREACH (COGROUP a BY iid, b BY iid, ..., z BY iid)
  2. GENERATE group AS iid, MERGE(a,b,...,z) AS bag;
展开查看全部
js5cn81o

js5cn81o2#

这个应该能解决你的问题:

  1. f1 = LOAD '/user/hadoop/f1' USING PigStorage('\t') AS (id_f1:int, key_f1:chararray, value_f1:chararray);
  2. f2 = LOAD '/user/hadoop/f2' USING PigStorage('\t') AS (id_f2:int, key_f2:chararray, value_f2:chararray);
  3. f3 = JOIN f1 by id_f1 LEFT OUTER, f2 BY id_f2;
  4. f4 = FOREACH f3 GENERATE id_f1, key_f1, value_f1;
  5. f5 = FOREACH f3 GENERATE id_f2, key_f2, value_f2;
  6. f6 = UNION f4, f5;
  7. f7 = DISTINCT f6;
  8. f8 = ORDER f7 BY $0;
  9. DUMP f8;

相关问题