使用apachestorm时的java同步问题

zu0ti5jz  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(391)

我正在尝试apachestorm来处理geohash代码流。我正在使用这个库和ApacheStorm0.9.3。python的geohash详细信息可以在这里输入链接描述。
目前,我在一个bolt类的execute方法中遇到了一个同步问题。我试过用一个粗体字,这给了我正确的输出。但当我从一个螺栓螺纹变成两个或更多。输出出错了。
其中一个螺栓的代码段(只有它有问题)是:

  1. public static int PRECISION=6;
  2. private OutputCollector collector;
  3. BufferedReader br;
  4. String lastGeoHash="NONE";
  5. HashMap<String,Integer> map;
  6. HashMap<String,String[]> zcd;
  7. TreeMap<Integer,String> counts=new TreeMap<Integer,String>();
  8. public void prepare( Map conf, TopologyContext context, OutputCollector collector )
  9. {
  10. String line="";
  11. this.collector = collector;
  12. map=new HashMap<String,Integer>();
  13. zcd=new HashMap<String,String[]>();
  14. try {
  15. br = new BufferedReader(new FileReader("/tmp/zip_code_database.csv"));
  16. int i=0;
  17. while ((line = br.readLine()) != null) {
  18. if(i==0){
  19. String columns[]=line.split(",");
  20. for(int j=0;j<columns.length;j++){
  21. map.put(columns[j],j);
  22. }
  23. }else{
  24. String []split=line.split(",");
  25. zcd.put(split[map.get("\"zip\"")],new String[]{split[map.get("\"state\"")],split[map.get("\"primary_city\"")]});
  26. }
  27. i++;
  28. }
  29. br.close();
  30. // System.out.println(zcd);
  31. } catch (FileNotFoundException e) {
  32. e.printStackTrace();
  33. } catch (IOException e) {
  34. e.printStackTrace();
  35. }
  36. System.out.println("Initialize");
  37. initializeTreeMapAsPerOurRequirement(counts);
  38. }
  39. public void execute( Tuple tuple )
  40. {
  41. String completeFile = tuple.getStringByField("string");//So, this data is generated by Spout and it contains the complete shape file where each line is separated by a new line character i.e. "\n"
  42. String lines[]=completeFile.split("\t");
  43. String geohash=lines[0];
  44. int count=Integer.parseInt(lines[1]);
  45. String zip=lines[2];
  46. String best="";
  47. String city="";
  48. String state="";
  49. if(!(geohash.equals(lastGeoHash)) && !(lastGeoHash.equals("NONE"))){
  50. //if(counts.size()!=0){
  51. //System.out.println(counts.firstKey());
  52. best=counts.get(counts.lastKey());
  53. //System.out.println(geohash);
  54. if(zcd.containsKey("\""+best+"\"")){
  55. city = zcd.get("\""+best+"\"")[0];
  56. state = zcd.get("\""+best+"\"")[1];
  57. System.out.println(lastGeoHash+","+best+","+state+","+city+","+"US");
  58. }else if(!best.equals("NONE")){
  59. System.out.println(lastGeoHash);
  60. city="MISSING";
  61. state="MISSING";
  62. }
  63. // initializeTreeMapAsPerOurRequirement(counts);
  64. //}else{
  65. //System.out.println("else"+geohash);
  66. //}
  67. //}
  68. }
  69. lastGeoHash=geohash;
  70. counts.put(count, zip);
  71. collector.ack( tuple );
  72. }
  73. private void initializeTreeMapAsPerOurRequirement(TreeMap<Integer,String> counts){
  74. counts.clear();
  75. counts.put(-1,"NONE");
  76. }
  77. public void declareOutputFields( OutputFieldsDeclarer declarer )
  78. {
  79. System.out.println("here");
  80. declarer.declare( new Fields( "number" ) );
  81. }

拓扑代码为:

  1. public static void main(String[] args)
  2. {
  3. TopologyBuilder builder = new TopologyBuilder();
  4. builder.setSpout( "spout", new SendWholeFileDataSpout(),2);
  5. builder.setBolt( "map", new GeoHashBolt(),2).shuffleGrouping("spout");
  6. builder.setBolt("reduce",new GeoHashReduceBolt(),2).fieldsGrouping("map", new Fields("value"));
  7. Config conf = new Config();
  8. LocalCluster cluster = new LocalCluster();
  9. cluster.submitTopology("test", conf, builder.createTopology());
  10. Utils.sleep(10000);
  11. cluster.killTopology("test");
  12. cluster.shutdown();
  13. }

有人能帮我查一下代码并给我一点指导吗。

0g0grzrc

0g0grzrc1#

你已经设定了 parallelism_hint 你的嘴和两个螺栓都是2。这意味着每个组件将运行两个执行器,这可能会扰乱您的输出。
通过设置 parallelism_hint 到1你就可以达到你想要的输出。

相关问题