pig jvm java堆空间错误

ukxgm1gy  于 2021-06-21  发布在  Pig
关注(0)|答案(1)|浏览(411)

我试着运行一个pig脚本,这个脚本调用一个用java编写的用户定义函数。我最终得到java堆空间错误,作业失败。我试过用-xms1024m选项运行这个作业,它对较小的文件运行,但对较大的文件失败。即使我的集群足够强大,不会被这么小的文件绊倒,我想知道如何修复这个内存泄漏。有人能帮忙吗,

  1. import java.util.HashMap;
  2. import java.lang.annotation.Annotation;
  3. import java.lang.reflect.Array;
  4. import java.lang.reflect.Method;
  5. import java.io.IOException;
  6. import java.util.Iterator;
  7. import java.util.List;
  8. import java.util.ArrayList;
  9. import java.util.Map;
  10. import java.util.Set;
  11. import java.text.*;
  12. import org.apache.pig.EvalFunc;
  13. import org.apache.pig.data.*;
  14. import com.tictactec.ta.lib.CoreAnnotated;
  15. import com.tictactec.ta.lib.MAType;
  16. import com.tictactec.ta.lib.MInteger;
  17. import com.tictactec.ta.lib.RetCode;
  18. import com.tictactec.ta.lib.meta.annotation.InputParameterInfo;
  19. import com.tictactec.ta.lib.meta.annotation.InputParameterType;
  20. import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo;
  21. import com.tictactec.ta.lib.meta.annotation.OptInputParameterType;
  22. import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo;
  23. import com.tictactec.ta.lib.meta.annotation.OutputParameterType;
  24. public class taLib extends EvalFunc<DataBag>
  25. {
  26. private static final int MIN_ARGS = 3;
  27. public static CoreAnnotated core = new CoreAnnotated();
  28. private static Method func_ref = null;
  29. public DecimalFormat df = new DecimalFormat("#.###");
  30. public DataBag exec(Tuple args) throws IOException
  31. {
  32. DataBag input=null;
  33. MInteger outStart = new MInteger();
  34. MInteger outLen = new MInteger();
  35. Map<String,Object>outputParams=new HashMap<String, Object>();
  36. String func_name;
  37. List<Integer> ip_colmns= new ArrayList<Integer>();
  38. List<double[]>ip_list=new ArrayList<double[]>();
  39. List<String>opt_type=new ArrayList<String>();
  40. List<Object>opt_params=new ArrayList<Object>();
  41. //////
  42. long m1=Runtime.getRuntime().freeMemory();
  43. System.out.println(m1);
  44. long m2=Runtime.getRuntime().totalMemory();
  45. System.out.println(m2);
  46. //////
  47. int ip_noofparams=0;
  48. int op_noofparams=0;
  49. int opt_noofparams=0;
  50. if (args == null || args.size() < MIN_ARGS)
  51. throw new IllegalArgumentException("talib: must have at least " +
  52. MIN_ARGS + " args");
  53. if(args.get(0) instanceof DataBag)
  54. {input = (DataBag)args.get(0);}
  55. else{throw new IllegalArgumentException("Only a valid bag name can be
  56. passed");}
  57. // get no of fields in bag
  58. Tuple t0=input.iterator().next();
  59. int fields_in_bag=t0.getAll().size();
  60. if(args.get(1) instanceof String)
  61. {func_name = (String)args.get(1);}
  62. else{throw new IllegalArgumentException("Only valid function name can be
  63. passed at arg 1");}
  64. func_ref=methodChk(func_name);
  65. if (func_ref == null) {
  66. throw new IllegalArgumentException("talib: function "
  67. + func_name + " was not found");
  68. }
  69. for (Annotation[] annotations : func_ref.getParameterAnnotations())
  70. {
  71. for (Annotation annotation : annotations)
  72. {
  73. if(annotation instanceof InputParameterInfo)
  74. {
  75. InputParameterInfo inputParameterInfo =
  76. (InputParameterInfo)annotation;
  77. if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price))
  78. {
  79. ip_noofparams=numberOfSetBits(inputParameterInfo.flags());
  80. }
  81. else
  82. {
  83. ip_noofparams++;
  84. }
  85. }
  86. if(annotation instanceof OptInputParameterInfo)
  87. {
  88. OptInputParameterInfo optinputParameterInfo=
  89. (OptInputParameterInfo)annotation;
  90. opt_noofparams++;
  91. if
  92. (optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange))
  93. {
  94. opt_type.add("Integer");
  95. }
  96. else
  97. if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange))
  98. {
  99. opt_type.add("Double");
  100. }
  101. else
  102. if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList))
  103. {
  104. opt_type.add("String");
  105. }
  106. else{throw new IllegalArgumentException("whoopsie ...serious
  107. mess in opt_annotations");}
  108. }
  109. if (annotation instanceof OutputParameterInfo)
  110. {
  111. OutputParameterInfo outputParameterInfo =
  112. (OutputParameterInfo) annotation;
  113. op_noofparams++;
  114. if
  115. (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real))
  116. {
  117. outputParams.put(outputParameterInfo.paramName(), new
  118. double[(int) input.size()]);
  119. }
  120. else if
  121. (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer))
  122. {
  123. outputParams.put(outputParameterInfo.paramName(), new
  124. int[(int)input.size()]);
  125. }
  126. }
  127. }
  128. }
  129. int total_params =ip_noofparams+opt_noofparams;
  130. if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong
  131. no of argumets passed to UDF");}
  132. // get the ip colmns no's
  133. for(int i=2;i<(2+ip_noofparams);i++)
  134. {
  135. if(args.get(i) instanceof Integer )
  136. {
  137. if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag)
  138. {
  139. ip_colmns.add((Integer) args.get(i));
  140. }
  141. else{throw new IllegalArgumentException("The input colmn specified
  142. is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));}
  143. }
  144. else{throw new IllegalArgumentException("Wrong arguments entered:
  145. Only"+ip_noofparams+"field no's of type(integer) allowed for fn"+func_name ); }
  146. }
  147. // create a list of ip arrays
  148. for(int i=0;i<ip_colmns.size();i++)
  149. {
  150. ip_list.add((double[]) Array.newInstance(double.class, (int)input.size()));
  151. }
  152. int z=0;
  153. int x=0;
  154. // fill up the arrays
  155. for(Tuple t1: input)
  156. {
  157. Iterator<double[]> itr=ip_list.iterator();
  158. z=0;
  159. while(itr.hasNext())
  160. {
  161. if((Double)t1.get(ip_colmns.get(z)) instanceof Double)
  162. {
  163. ((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++));
  164. }
  165. else{throw new IllegalArgumentException("Illegal argument while
  166. filling up array...only double typr allowed");}
  167. }
  168. x++;
  169. }
  170. //deal with opt params
  171. int s=0;
  172. for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++)
  173. {
  174. if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString()))
  175. {
  176. if(opt_type.get(s).equalsIgnoreCase("String"))
  177. {
  178. String m=args.get(i).toString().toLowerCase();
  179. String ma=m.substring(0, 1).toUpperCase();
  180. String mac=m.substring(1);
  181. String macd=ma+mac;
  182. MAType type =MAType.valueOf(macd);
  183. opt_params.add(type);
  184. s++;
  185. }
  186. else{
  187. opt_params.add(args.get(i));
  188. s++;
  189. }
  190. }
  191. else if(opt_type.get(s).equalsIgnoreCase("Double"))
  192. {
  193. if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer"))
  194. {
  195. opt_params.add((Double)((Integer)args.get(i)+0.0));
  196. s++;
  197. }
  198. else{throw new IllegalArgumentException("Opt arguments do
  199. not match for fn:"+func_name+", pls enter opt arguments in right order"); }
  200. }
  201. else{throw new IllegalArgumentException("Opt arguments do not match
  202. for fn:"+func_name+", pls enter opt arguments in right order");}
  203. }
  204. List<Object> ta_argl = new ArrayList<Object>();
  205. ta_argl.add(new Integer(0));
  206. ta_argl.add(new Integer((int)input.size() - 1));
  207. for(double[]in: ip_list)
  208. {
  209. ta_argl.add(in);
  210. }
  211. if(opt_noofparams!=0)
  212. {ta_argl.addAll(opt_params);}
  213. ta_argl.add(outStart);
  214. ta_argl.add(outLen);
  215. for (Map.Entry<String, Object> entry : outputParams.entrySet())
  216. {
  217. ta_argl.add(entry.getValue());
  218. }
  219. RetCode rc = RetCode.Success;
  220. try {
  221. rc = (RetCode)func_ref.invoke(core, ta_argl.toArray());
  222. } catch (Exception e)
  223. {
  224. assert false : "I died in ta-lib, but Java made me a zombie...";
  225. }
  226. assert rc == RetCode.Success : "ret code from " + func_name;
  227. if (outLen.value == 0) return null;
  228. //////
  229. DataBag ret=null;
  230. ret =outTA(input,outputParams,outStart);
  231. outputParams.clear();
  232. ip_list.clear();
  233. opt_params.clear();
  234. opt_type.clear();
  235. ip_colmns.clear();
  236. Runtime.getRuntime().gc();
  237. return ret;
  238. }
  239. public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart)
  240. {
  241. DataBag nbag=null;
  242. TupleFactory mTupleFactory=TupleFactory.getInstance();
  243. BagFactory mBagFactory=BagFactory.getInstance();
  244. nbag=mBagFactory.newDefaultBag();
  245. Tuple tw=bag.iterator().next();
  246. int fieldsintup=tw.getAll().size();
  247. for(Tuple t0: bag)
  248. {
  249. Tuple t1=mTupleFactory.newTuple();
  250. for(int z=0;z<fieldsintup;z++)
  251. {
  252. try {
  253. t1.append(t0.get(z));
  254. } catch (Exception e) {
  255. // TODO Auto-generated catch block
  256. System.out.println("Ouch");
  257. }
  258. }
  259. nbag.add(t1);
  260. }
  261. int i = 0;
  262. int j=0;
  263. for (Tuple t2: nbag)
  264. {
  265. if(i>=outStart.value)
  266. {
  267. for(Map.Entry<String,Object>entry: outputParams.entrySet())
  268. {
  269. t2.append(entry.getKey().substring(3).toString());
  270. if(entry.getValue() instanceof double[])
  271. {
  272. t2.append( new Double
  273. (df.format(((double[])entry.getValue())[j])));
  274. }
  275. else if(entry.getValue() instanceof int[])
  276. {
  277. t2.append( ((int[])entry.getValue())[j]);
  278. }
  279. else{throw new
  280. IllegalArgumentException(entry.getValue().getClass()+"not supported");}
  281. }
  282. i++;j++;
  283. }
  284. else
  285. {t2.append(0.0);
  286. i++;
  287. }
  288. }
  289. return nbag;
  290. }
  291. public Method methodChk(String fn)
  292. {
  293. String fn_name=fn;
  294. Method tmp_fn=null;
  295. for (Method meth: core.getClass().getDeclaredMethods())
  296. {
  297. if (meth.getName().equalsIgnoreCase(fn_name))
  298. {
  299. tmp_fn = meth;
  300. break;
  301. }
  302. }
  303. return tmp_fn;
  304. }
  305. public int numberOfSetBits(int i) {
  306. i = i - ((i >> 1) & 0x55555555);
  307. i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
  308. return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24;
  309. }
  310. }
xqkwcwgp

xqkwcwgp1#

可能是bzip编解码器有问题-api确实注意到它非常需要内存:
http://hadoop.apache.org/common/docs/r0.20.0/api/org/apache/hadoop/io/compress/bzip2/cbzip2outputstream.html
压缩需要大量内存
当你用 -Xms2048m 你为Pig咕噜壳,还是Map/减少作业设置了选项?

  1. set mapred.child.java.opts=-Xmx2048m

您可以通过查看jobtracker进行检查,找到失败的作业,打开job.xml并找到 mapred.child.java.opts

相关问题