pig如何示例化udf对象

tkclm6bt  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(440)

有人能告诉我pig是如何示例化udf对象的吗?我用pig构建了一个管道来处理一些数据。我在多节点中部署了管道 Hadoop cluster和我希望保存管道中每个步骤之后生成的所有中间结果。所以我用java编写了一个udf,它将在初始化时打开一个http连接,并在中传输数据 exec . 另外,我将关闭中的连接 finalize 对象的名称。
我的脚本可以简化如下:

  1. REGISTER MyPackage.jar;
  2. DEFINE InterStore test.InterStore('localhost', '58888');
  3. DEFINE Clean test.Clean();
  4. raw = LOAD 'mydata';
  5. cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES '');
  6. cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*));
  7. named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount;
  8. named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount);
  9. grp = GROUP named BY LocationID;
  10. grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)});
  11. sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses;
  12. sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses);
  13. ordered = ORDER sum BY TotalAccesses DESC;
  14. STORE ordered INTO 'result';

中间层的代码可以简化如下:

  1. class InterStore extends EvalFunc<Tuple>{
  2. HttpURLConnection con; //Avoid redundant connection establishment in exec
  3. public InterStore(String ip, String port) throws IOException
  4. {
  5. URL url = new URL("http://" + ip + ':' + port);
  6. con = (HttpURLConnection)url.openConnection();
  7. con.setRequestMethod("PUT");
  8. con.setDoOutput(true);
  9. con.setDoInput(true);
  10. }
  11. public Tuple exec(Tuple input) throws IOException
  12. {
  13. con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes());
  14. return input;
  15. }
  16. @Override
  17. protected void finalize() throws Throwable
  18. {
  19. con.getOutputStream().close();
  20. int respcode = con.getResponseCode();
  21. BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
  22. System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine());
  23. in.close();
  24. }
  25. }

但是,我发现http连接不能像在本地模式下那样成功地传输数据。怎么处理?

w9apscun

w9apscun1#

是否有服务监听“localhost”、“58888”?
请注意,本地主机因每个执行节点而异,您可能需要执行以下操作:

  1. %default LHOST `localhost`

并将此变量用作参数

  1. DEFINE InterStore test.InterStore('$LHOST', '58888');

一般来说,我会在udf中做一些打印输出,并仔细检查传递给它的参数,然后测试连接(比如ping和检查端口是否可以从hadoop节点访问)

相关问题