在pig拉丁语中提取文件之间的不匹配记录

eni9jsuy  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(324)

我是初学者,学Pig拉丁语。需要从文件中提取记录。已经创建了两个文件t1和t2,一些元组对于这两个文件都是公共的,所以需要提取只存在于t1中的元组,并且需要省略t1和t2之间的公共元组。谁能帮帮我。。。
谢谢

7kqas0il

7kqas0il1#

首先你要看看 this Venn Diagram . 你想要的只是中间那一点。所以首先你需要做一个 full outer JOIN 在数据上。那么,自从 nulls 如果在外部联接中创建,则当键不是公共键时,您将希望过滤联接的结果,使其仅包含具有一个空值的行(维恩图的非相交部分)。
在Pig的剧本里是这样的:

-- T1 and T2 are the two sets of tuples you are using, their schemas are:
-- T1: {t: (num1: int, num2: int)}
-- T2: {t: (num1: int, num2: int)}
-- Yours will be different, but the principle is the same

B = JOIN T1 BY t FULL, T2 BY t ;
C = FILTER B BY T1::t is null OR T2::t is null ;
D = FOREACH C GENERATE (T1::t is not null? T1::t : A2::t) ;

使用此示例输入完成以下步骤:

T1:      T2:
(1,2)    (4,5)
(3,4)    (1,2)
``` `B` 完全外部连接是否会导致:

B: {T1::t: (num1: int,num2: int),T2::t: (num1: int,num2: int)}
((1,2),(1,2))
(,(4,5))
((3.4),)
``` T1 是左元组,并且 T2 是正确的元组。我们必须使用 :: 以确定 t ,因为他们有相同的名字。
现在, C 过滤器 B 这样就只保留带有null的行。导致:

C: {T1::t: (num1: int,num2: int),T2::t: (num1: int,num2: int)}
(,(4,5))
((3.4),)

这是您想要的输出,但使用起来有点混乱。 D 使用 bincond (the?:)删除空值。因此,最终输出将是:

D: {T1::t: (num1: int,num2: int)}
((4,5))
((3.4))

更新:
如果您只想保留联接的左侧(t1)(或者如果您切换了对象,则保留右侧(t2))。您可以这样做:

-- B is the same

-- We only want to keep tuples where the T2 tuple is null
C = FILTER B BY T2::t is null ;
-- Generate T1::t to get rid of the null T2::t
D = FOREACH C GENERATE T1::t ;

然而,回顾原始的维恩图,使用一个完整的 JOIN 是不必要的。如果你看这个 different Venn Diagram ,您可以看到它覆盖了您想要的集合,而不需要任何额外的操作。因此,你应该改变 B 收件人:

B = JOIN T1 BY t LEFT, T2 BY t ;
monwx1rj

monwx1rj2#

我相信有一个更有效的方法来完成它,特别是如果t1和t2非常大的话。我正在处理一个每个文件有几十亿行的数据集,我只对t2中t1中不存在的行感兴趣。两个文件具有相同的架构和相似的大小。

T1 = load '/path/to/file1' using PigStorage() as (
  f1,
  f2,
  f3);

T1 = foreach T1 generate
  $0.., --all fields
  1 as day1,
  0 as day2);

T2 = load '/path/to/file2' using PigStorage() as (
  f1,
  f2,
  f3);

T2 = foreach T2 generate
  $0.., --all fields
  0 as day1,
  1 as day2);

T3 = union T1, T2;
-- assuming f1 is your join field
T3grouped = foreach (group T3 by f1) generate 
   flatten(T3),
   sum(T3.day1) as day1,
   sum(T3.day2) as day2;

T3result = filter T3grouped by day1==0;

这将返回在第1天没有出现的具有f1的行。相当于

T3 = T2 by f1 LEFT OUTER, T1 by f1;
T3result = filter T3 by T1::f1 is null

但要快得多。联合版本在10分钟内运行,联合版本已经运行了超过2小时(仍然没有完成)。从计数器上看,union版本生成了更多的i/o(特别是在Map器周围),但只使用了50%的cpu。

相关问题