我有一个storm拓扑,可以将kafka队列中的一些数据写入cassandra数据库。这个程序是多线程的。为了方便插入cassandra db,我将此作为我的dbutils:
public DBUtils() {
if(session == null) {
session = CassandraUtil.getInstance().getSession();
LOG.info("Started a new session for dbUtils-Monitoring.....");
}
synchronized(session) {
testMapper = new MappingManager(session).mapper(TestVO.class);
}
}
因此,我使用synchronized在所有运行的线程中创建了一个dbutils示例。但是当我查看日志时,似乎会话正在初始化多次。storm拓扑中的dbutils仅在prepare方法中初始化,并且已在prepare/execute/clean-up方法中使用。因此,如果要在所有线程中使用的变量在多个位置使用,则不确定如何使用synchronized block。我的问题是如何在所有线程中只初始化session/dbutils变量一次。
1条答案
按热度按时间c7rzv4ha1#
由于storm是一个分布式系统,您不能在所有并行运行的螺栓上使用一个共享变量。您只能在单个工作jvm中的执行器上共享一个变量。
为此,您需要创建一个
static
变量并使用共享/静态对象示例来同步其初始化。