有一个喷口,每个滴答上的喷口通向postgre数据库并读取另外一行。喷口代码如下所示:
class RawDataLevelSpout extends BaseRichSpout implements Serializable {
private int counter;
SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("col1", "col2"));
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
private Connection initializeDatabaseConnection() {
try {
Class.forName("org.postgresql.Driver");
Connection connection = null;
connection = DriverManager.getConnection(
DATABASE_URI,"root", "root");
return connection;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
}
@Override
public void nextTuple() {
List<String> values = new ArrayList<>();
PreparedStatement statement = null;
try {
Connection connection = initializeDatabaseConnection();
statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
statement.setInt(1, counter++);
ResultSet resultSet = statement.executeQuery();
resultSet.next();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int totalColumns = resultSetMetaData.getColumnCount();
for (int i = 1; i <= totalColumns; i++) {
String value = resultSet.getString(i);
values.add(value);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
collector.emit(new Values(values.stream().toArray(String[]::new)));
}
}
在ApacheStorm中,在喷口中实现连接池的标准方法是什么?此外,是否可以在集群拓扑中的多个运行示例之间以某种方式同步coutner变量accross?
1条答案
按热度按时间yhived7q1#
关于连接池,如果需要的话,可以通过静态变量来池连接,但是由于不能保证所有的spout示例都在同一个jvm中运行,因此我认为没有任何意义。
不,无法同步计数器。喷口示例可能在不同的jvm上运行,您不希望它们都阻塞,而喷口同意计数器值是多少。我认为你的喷口实现是没有意义的。如果您想一次只读取一行,为什么不运行一个spout示例而不尝试同步多个spout呢?
您似乎试图将关系数据库用作队列系统,这可能不适合。比如说Kafka。我想你应该能用这两种方法中的任何一种https://www.confluent.io/product/connectors/ 或者http://debezium.io/ 从你的博士后到Kafka的数据流。