从spout访问数据库-连接池

r6l8ljro  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(285)

有一个喷口,每个滴答上的喷口通向postgre数据库并读取另外一行。喷口代码如下所示:

class RawDataLevelSpout extends BaseRichSpout implements Serializable {

private int counter;

SpoutOutputCollector collector;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("col1", "col2"));
}

@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
}

private Connection initializeDatabaseConnection() {

    try {
        Class.forName("org.postgresql.Driver");
        Connection connection = null;
        connection = DriverManager.getConnection(
                DATABASE_URI,"root", "root");
        return connection;
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
    List<String> values = new ArrayList<>();

    PreparedStatement statement = null;
    try {
        Connection connection = initializeDatabaseConnection();
        statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
        statement.setInt(1, counter++);
        ResultSet resultSet = statement.executeQuery();
        resultSet.next();
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        int totalColumns = resultSetMetaData.getColumnCount();
        for (int i = 1; i <= totalColumns; i++) {
            String value = resultSet.getString(i);
            values.add(value);
        }

        connection.close();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    collector.emit(new Values(values.stream().toArray(String[]::new)));
}

}
在ApacheStorm中,在喷口中实现连接池的标准方法是什么?此外,是否可以在集群拓扑中的多个运行示例之间以某种方式同步coutner变量accross?

yhived7q

yhived7q1#

关于连接池,如果需要的话,可以通过静态变量来池连接,但是由于不能保证所有的spout示例都在同一个jvm中运行,因此我认为没有任何意义。
不,无法同步计数器。喷口示例可能在不同的jvm上运行,您不希望它们都阻塞,而喷口同意计数器值是多少。我认为你的喷口实现是没有意义的。如果您想一次只读取一行,为什么不运行一个spout示例而不尝试同步多个spout呢?
您似乎试图将关系数据库用作队列系统,这可能不适合。比如说Kafka。我想你应该能用这两种方法中的任何一种https://www.confluent.io/product/connectors/ 或者http://debezium.io/ 从你的博士后到Kafka的数据流。

相关问题