我的storm拓扑既不工作(不生成输出)也不失败(不生成错误或异常)

dzhpxtsq  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(280)

我有一个拓扑结构,我试图统计simulatorspout(不是真正的流)生成的单词出现次数,在写入mysql数据库表之后,表方案非常简单:

  1. Field | Type | ...
  2. ID | int(11) | Auto_icr
  3. word | varchar(50) |
  4. count | int(11) |

但是我遇到了一个奇怪的问题(如前所述),我成功地将拓扑提交给了我的storm集群,它由4个管理者组成,我可以在storm web ui中看到拓扑的流程(没有例外),但是当我检查mysql表时,出乎我的意料,表是空的。。。
欢迎任何意见、建议。。。
下面是喷口和螺栓:

  1. public class MySQLConnection {
  2. private static Connection conn = null;
  3. private static String dbUrl = "jdbc:mysql://192.168.0.2:3306/test?";
  4. private static String dbClass = "com.mysql.jdbc.Driver";
  5. public static Connection getConnection() throws SQLException, ClassNotFoundException {
  6. Class.forName(dbClass);
  7. conn = DriverManager.getConnection(dbUrl, "root", "qwe123");
  8. return conn;
  9. }
  10. }

======================================句子喷口===============================

  1. public class SentenceSpout extends BaseRichSpout{
  2. private static final long serialVersionUID = 1L;
  3. private boolean _completed = false;
  4. private SpoutOutputCollector _collector;
  5. private String [] sentences = {
  6. "Obama delivered a powerfull speech against USA",
  7. "I like cold beverages",
  8. "RT http://www.turkeyairline.com Turkish Airlines has delayed some flights",
  9. "don't have a cow man...",
  10. "i don't think i like fleas"
  11. };
  12. private int index = 0;
  13. public void open (Map config, TopologyContext context, SpoutOutputCollector collector) {
  14. _collector = collector;
  15. }
  16. public void nextTuple () {
  17. _collector.emit(new Values(sentences[index]));
  18. index++;
  19. if (index >= sentences.length) {
  20. index = 0;
  21. Utils.waitForSeconds(1);
  22. }
  23. }
  24. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  25. declarer.declare(new Fields("sentence"));
  26. }
  27. public void ack(Object msgId) {
  28. System.out.println("OK: " + msgId);
  29. }
  30. public void close() {}
  31. public void fail(Object msgId) {
  32. System.out.println("FAIL: " + msgId);
  33. }
  34. }

=====================================分离螺栓==============================

  1. public class SplitSentenceBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 1L;
  3. private OutputCollector _collector;
  4. public void prepare (Map config, TopologyContext context, OutputCollector collector) {
  5. _collector = collector;
  6. }
  7. public void execute (Tuple tuple) {
  8. String sentence = tuple.getStringByField("sentence");
  9. String httpRegex = "((https?|ftp|telnet|gopher|file)):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*";
  10. sentence = sentence.replaceAll(httpRegex, "").replaceAll("RT", "").replaceAll("[.|,]", "");
  11. String[] words = sentence.split(" ");
  12. for (String word : words) {
  13. if (!word.isEmpty())
  14. _collector.emit(new Values(word.trim()));
  15. }
  16. _collector.ack(tuple);
  17. }
  18. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  19. declarer.declare(new Fields("word"));
  20. }
  21. }

==================================wordcountbolt=================================

  1. public class WordCountBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 1L;
  3. private HashMap<String , Integer> counts = null;
  4. private OutputCollector _collector;
  5. private ResultSet resSet = null;
  6. private Statement stmt = null;
  7. private Connection _conn = null;
  8. private String path = "/home/hduser/logOfStormTops/logger.txt";
  9. String rLine = null;
  10. public void prepare (Map config, TopologyContext context, OutputCollector collector) {
  11. counts = new HashMap<String, Integer>();
  12. _collector = collector;
  13. }
  14. public void execute (Tuple tuple) {
  15. int insertResult = 0;
  16. int updateResult = 0;
  17. String word = tuple.getStringByField("word");
  18. //----------------------------------------------------
  19. if (!counts.containsKey(word)) {
  20. counts.put(word, 1);
  21. try {
  22. insertResult = wordInsertIfNoExist(word);
  23. if (insertResult == 1) {
  24. _collector.ack(tuple);
  25. } else {
  26. _collector.fail(tuple);
  27. }
  28. } catch (ClassNotFoundException e) {
  29. e.printStackTrace();
  30. } catch (SQLException e) {
  31. e.printStackTrace();
  32. }
  33. } else {
  34. //-----------------------------------------------
  35. counts.put(word, counts.get(word) + 1);
  36. try {
  37. // writing to db
  38. updateResult = updateCountOfExistingWord(word);
  39. if (updateResult == 1) {
  40. _collector.ack(tuple);
  41. } else {
  42. _collector.fail(tuple);
  43. }
  44. // Writing to file
  45. BufferedWriter buffer = new BufferedWriter(new FileWriter(path));
  46. buffer.write("[ " + word + " : " + counts.get("word") + " ]");
  47. buffer.newLine();
  48. buffer.flush();
  49. buffer.close();
  50. } catch (ClassNotFoundException e) {
  51. e.printStackTrace();
  52. } catch (SQLException e) {
  53. e.printStackTrace();
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }
  57. System.out.println("{word-" + word + " : count-" + counts.get(word) + "}");
  58. }
  59. }
  60. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  61. }
  62. //*****************************************************
  63. public int wordInsertIfNoExist(String word) throws ClassNotFoundException, SQLException {
  64. String query = "SELECT word FROM wordcount WHERE word=\"" + word + "\"";
  65. String insert = "INSERT INTO wordcount (word, count) VALUES (\"" + word + "\", 1)";
  66. _conn = MySQLConnection.getConnection();
  67. stmt = _conn.createStatement();
  68. resSet = stmt.executeQuery(query);
  69. int res = 0;
  70. if (!resSet.next()) {
  71. res = stmt.executeUpdate(insert);
  72. } else {
  73. System.out.println("Yangi qiymatni kirityotrganda nimadir sodir bo'ldi");
  74. }
  75. resSet.close();
  76. stmt.close();
  77. _conn.close();
  78. return res;
  79. }
  80. public int updateCountOfExistingWord(String word) throws ClassNotFoundException, SQLException {
  81. String update = "UPDATE wordcount SET count=count+1 WHERE word=\"" + word + "\"";
  82. _conn = MySQLConnection.getConnection();
  83. stmt = _conn.createStatement();
  84. int result = stmt.executeUpdate(update);
  85. //System.out.println(word + "'s count has been updated (incremented)");
  86. resSet.close();
  87. stmt.close();
  88. _conn.close();
  89. return result;
  90. }
  91. }

=============================字计数拓扑==============================

  1. public class WordCountTopology {
  2. private static final String SENTENCE_SPOUT_ID = "sentence-spout";
  3. private static final String SPLIT_BOLT_ID = "split-bolt";
  4. private static final String COUNT_BOLT_ID = "count-bolt";
  5. private static final String TOPOLOGY_NAME = "NewWordCountTopology";
  6. @SuppressWarnings("static-access")
  7. public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
  8. SentenceSpout spout = new SentenceSpout();
  9. SplitSentenceBolt splitBolt = new SplitSentenceBolt();
  10. WordCountBolt countBolt = new WordCountBolt();
  11. TopologyBuilder builder = new TopologyBuilder();
  12. builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
  13. builder.setBolt(SPLIT_BOLT_ID, splitBolt, 4).shuffleGrouping(SENTENCE_SPOUT_ID);
  14. builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
  15. Config config = new Config();
  16. config.setMaxSpoutPending(100);
  17. config.setDebug(true);
  18. StormSubmitter submitter = new StormSubmitter();
  19. submitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
  20. }
  21. }
vom3gejh

vom3gejh1#

这是因为当抛出异常时,没有调用\u collector.ack(tuple)。当挂起的元组太多时,spout将停止发送新元组。尝试抛出runtimeexception而不是printstacktrace。

相关问题