下面是我的用例示例。
kxxlusnw1#
输入
990004916946605-1404157897784,S1,1404157898275990004916946605-1404157897784,S1,1404157898286990004916946605-1404157897784,S2,1404157898337990004947764274-1435162269418,S1,1435162274044990004947764274-1435162269418,S2,1435162274057990004947764274-1435162269418,S3,1435162274081990004947764274-1435162287965,S2,1435162690002990004947764274-1435162287965,S1,1435162690001990004947764274-1435162287965,S3,1435162690003990004947764274-1435162287965,S1,1435162690004990004947764274-1435162212345,S1,1435168768574990004947764274-1435162212345,S2,1435168768585990004947764274-1435162212345,S3,1435168768593register /home/cloudera/jar/ScreenFilter.jar;screen_records = LOAD '/user/cloudera/inputfiles/screen.txt' USING PigStorage(',') AS(session_id:chararray,screen_name:chararray,launch_time:long);screen_rec_order = ORDER screen_records by launch_time ASC;session_grped = GROUP screen_rec_order BY session_id;eached = FOREACH session_grped { ordered = ORDER screen_rec_order by launch_time; GENERATE group as session_id, REPLACE(BagToString(ordered.screen_name),'_','-->') as screen_str; };screen_each = FOREACH eached GENERATE session_id, GetOrderedScreen(screen_str) as screen_pattern;screen_grp = GROUP screen_each by screen_pattern;screen_final_each = FOREACH screen_grp GENERATE group as screen_pattern, COUNT(screen_each) as pattern_cnt;ranker = RANK screen_final_each BY pattern_cnt DESC DENSE;output_data = FOREACH ranker GENERATE screen_pattern, pattern_cnt, $0 as rank_value;dump output_data;
990004916946605-1404157897784,S1,1404157898275
990004916946605-1404157897784,S1,1404157898286
990004916946605-1404157897784,S2,1404157898337
990004947764274-1435162269418,S1,1435162274044
990004947764274-1435162269418,S2,1435162274057
990004947764274-1435162269418,S3,1435162274081
990004947764274-1435162287965,S2,1435162690002
990004947764274-1435162287965,S1,1435162690001
990004947764274-1435162287965,S3,1435162690003
990004947764274-1435162287965,S1,1435162690004
990004947764274-1435162212345,S1,1435168768574
990004947764274-1435162212345,S2,1435168768585
990004947764274-1435162212345,S3,1435168768593
register /home/cloudera/jar/ScreenFilter.jar;
screen_records = LOAD '/user/cloudera/inputfiles/screen.txt' USING PigStorage(',') AS(session_id:chararray,screen_name:chararray,launch_time:long);
screen_rec_order = ORDER screen_records by launch_time ASC;
session_grped = GROUP screen_rec_order BY session_id;
eached = FOREACH session_grped
{
ordered = ORDER screen_rec_order by launch_time;
GENERATE group as session_id, REPLACE(BagToString(ordered.screen_name),'_','-->') as screen_str;
};
screen_each = FOREACH eached GENERATE session_id, GetOrderedScreen(screen_str) as screen_pattern;
screen_grp = GROUP screen_each by screen_pattern;
screen_final_each = FOREACH screen_grp GENERATE group as screen_pattern, COUNT(screen_each) as pattern_cnt;
ranker = RANK screen_final_each BY pattern_cnt DESC DENSE;
output_data = FOREACH ranker GENERATE screen_pattern, pattern_cnt, $0 as rank_value;
dump output_data;
我找不到使用pig内置函数删除同一会话id的相邻屏幕的方法,因此我使用了javaudf来删除相邻的屏幕名称。我创建了一个名为getorderedscreen的JavaUDF,将该udf合并到jar中,并将该jar命名为screenfilter.jar,并在这个pig脚本中注册了该jar下面是getOrderedScreenJavaUDF的代码
public class GetOrderedScreen extends EvalFunc<String> {@Overridepublic String exec(Tuple input) throws IOException { String incoming_screen_str= (String)input.get(0); String outgoing_screen_str =""; String screen_array[] =incoming_screen_str.split("-->"); String full_screen=screen_array[0]; for (int i=0; i<screen_array.length;i++) { String prefix_screen= screen_array[i]; String suffix_screen=""; int j=i+1; if(j< screen_array.length) { suffix_screen = screen_array[j]; } if (!prefix_screen.equalsIgnoreCase(suffix_screen)) { full_screen = full_screen+ "-->" +suffix_screen; } } outgoing_screen_str =full_screen.substring(0, full_screen.lastIndexOf("-->")); return outgoing_screen_str;}
public class GetOrderedScreen extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
String incoming_screen_str= (String)input.get(0);
String outgoing_screen_str ="";
String screen_array[] =incoming_screen_str.split("-->");
String full_screen=screen_array[0];
for (int i=0; i<screen_array.length;i++)
String prefix_screen= screen_array[i];
String suffix_screen="";
int j=i+1;
if(j< screen_array.length)
suffix_screen = screen_array[j];
}
if (!prefix_screen.equalsIgnoreCase(suffix_screen))
full_screen = full_screen+ "-->" +suffix_screen;
outgoing_screen_str =full_screen.substring(0, full_screen.lastIndexOf("-->"));
return outgoing_screen_str;
}输出
(S1-->S2-->S3,2,1)(S1-->S2,1,2)(S1-->S2-->S3-->S1,1,2)
(S1-->S2-->S3,2,1)
(S1-->S2,1,2)
(S1-->S2-->S3-->S1,1,2)
希望这对你有帮助!。。还要再等一段时间,一些看到这个问题的好头脑会有效地回答(没有javaudf)
6jygbczu2#
你可以参考这个问题,一个特工问了类似的问题。如果我正确地理解了您的问题,那么您希望从路径中删除重复项,但仅当它们彼此相邻时。所以呢 1 -> 1 -> 2 -> 1 会变成 1 -> 2 -> 1 . 如果这是正确的,那么你不能只是分组和 distinct (我相信你已经注意到了)因为它会删除所有的副本。一个简单的解决方案是编写一个udf来删除那些重复项,同时保留用户的不同路径。自定义项:
1 -> 1 -> 2 -> 1
1 -> 2 -> 1
distinct
package something;import java.util.ArrayList;import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.hadoop.io.Text;public class RemoveSequentialDuplicatesUDF extends UDF { public ArrayList<Text> evaluate(ArrayList<Text> arr) { ArrayList<Text> newList = new ArrayList<Text>(); newList.add(arr.get(0)); for (int i = 1; i < arr.size(); i++) { String front = arr.get(i).toString(); String back = arr.get(i-1).toString(); if (!back.equals(front)) { newList.add(arr.get(i)); } } return newList; }}
package something;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class RemoveSequentialDuplicatesUDF extends UDF {
public ArrayList<Text> evaluate(ArrayList<Text> arr) {
ArrayList<Text> newList = new ArrayList<Text>();
newList.add(arr.get(0));
for (int i = 1; i < arr.size(); i++) {
String front = arr.get(i).toString();
String back = arr.get(i-1).toString();
if (!back.equals(front)) {
newList.add(arr.get(i));
return newList;
要建造这个jar,你需要一个 hive-core.jar 以及 hadoop-core.jar ,您可以在maven存储库中找到这些。确保您获得了在您的环境中使用的hive和hadoop版本。另外,如果您计划在生产环境中运行这个,我建议您向udf添加一些异常处理。构建jar之后,导入它并运行以下查询:查询:
hive-core.jar
hadoop-core.jar
add jar /path/to/jars/brickhouse-0.7.1.jar;add jar /path/to/jars/hive_common-SNAPSHOT.jar;create temporary function collect as "brickhouse.udf.collect.CollectUDAF";create temporary function remove_dups as "something.RemoveSequentialDuplicatesUDF";select screen_flow, count , dense_rank() over (order by count desc) rankfrom ( select screen_flow , count(*) count from ( select session_id , concat_ws("->", remove_dups(screen_array)) screen_flow from ( select session_id , collect(screen_name) screen_array from ( select * from database.table order by screen_launch_time ) a group by session_id ) b ) c group by screen_flow ) d
add jar /path/to/jars/brickhouse-0.7.1.jar;
add jar /path/to/jars/hive_common-SNAPSHOT.jar;
create temporary function collect as "brickhouse.udf.collect.CollectUDAF";
create temporary function remove_dups as "something.RemoveSequentialDuplicatesUDF";
select screen_flow, count
, dense_rank() over (order by count desc) rank
from (
select screen_flow
, count(*) count
select session_id
, concat_ws("->", remove_dups(screen_array)) screen_flow
, collect(screen_name) screen_array
select *
from database.table
order by screen_launch_time ) a
group by session_id ) b
) c
group by screen_flow ) d
输出:
s1->s2->s3 2 1s1->s2 1 2s1->s2->s3->s1 1 2
s1->s2->s3 2 1
s1->s2 1 2
s1->s2->s3->s1 1 2
希望这有帮助。
2条答案
按热度按时间kxxlusnw1#
输入
我找不到使用pig内置函数删除同一会话id的相邻屏幕的方法,因此我使用了javaudf来删除相邻的屏幕名称。
我创建了一个名为getorderedscreen的JavaUDF,将该udf合并到jar中,并将该jar命名为screenfilter.jar,并在这个pig脚本中注册了该jar
下面是getOrderedScreenJavaUDF的代码
}
输出
希望这对你有帮助!。。还要再等一段时间,一些看到这个问题的好头脑会有效地回答(没有javaudf)
6jygbczu2#
你可以参考这个问题,一个特工问了类似的问题。如果我正确地理解了您的问题,那么您希望从路径中删除重复项,但仅当它们彼此相邻时。所以呢
1 -> 1 -> 2 -> 1
会变成1 -> 2 -> 1
. 如果这是正确的,那么你不能只是分组和distinct
(我相信你已经注意到了)因为它会删除所有的副本。一个简单的解决方案是编写一个udf来删除那些重复项,同时保留用户的不同路径。自定义项:
要建造这个jar,你需要一个
hive-core.jar
以及hadoop-core.jar
,您可以在maven存储库中找到这些。确保您获得了在您的环境中使用的hive和hadoop版本。另外,如果您计划在生产环境中运行这个,我建议您向udf添加一些异常处理。构建jar之后,导入它并运行以下查询:查询:
输出:
希望这有帮助。