我正在运行多个单独的Kafka流。为此,我创建了一个保存这些流的流管理器。下面是sm类的本质。
public class StreamManager {
public Map<String, BaseStream> streamMap = new HashMap<String, BaseStream>();
public Map<String, ReadOnlyKeyValueStore<Long, BaseModel>> storeMap = new HashMap<String, ReadOnlyKeyValueStore<Long, BaseModel>>();
public StreamManager(String bootstrapServer){
initialize(bootstrapServer);
}
/**
* initialize streams. Right now hard coding creation of streams here.
* @param bootstrapServer
*/
public void initialize(String bootstrapServer){
Properties s1Props = this.GetStreamingProperties(bootstrapServer, "STREAM_1");
Properties s2Props = this.GetStreamingProperties(bootstrapServer, "STREAM_1");
BaseStream s1Stream = new CompositeInfoStream(new KStreamBuilder(), compositeInfoProps);
BaseStream s2Stream = new ImcInfoStream(new KStreamBuilder(), imcInfoProps);
streamMap.put(s1Stream.storeName, s1Stream);
streamMap.put(s2Stream.storeName, s2Stream);
/**
* Start all streams
*/
public void startStreams(){
for(BaseStream stream: streamMap.values()){
stream.start();
}
}
public static void main(String[] args) throws Exception {
StreamManager mgr = new StreamManager(StreamingConfig.instance().MESSAGE_SERVER);
StreamManager.startStreamServices(mgr);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
} finally {
mgr.closeStreams();
}
}
});
int i = 0;
while(true) {
if (i++ == 0)
mgr.logAllStreamStates();
Thread.sleep(60000);
if (i == 60) i = 0;
}
}
}
我初始化并启动流,然后让进程在循环中运行。现在我想要的是对单个流有更多的控制。如果需要的话,启动并杀死它们(出于一些奇怪的原因,我的流经常处于再平衡模式,不会回来。目前,如果其中一个流进入重新平衡,我必须杀死整个sm(所有流)并重新启动。我想做的只是重新启动单个流。
我想了解一下我的架构应该是怎样的。kafka流是否提供了一种机制来管理流集群?我可以使用多处理来完成这个任务吗?如果可以的话,请您给我一些参考资料,让我记住我们使用windows进行开发,使用linux进行部署
暂无答案!
目前还没有任何答案,快来回答吧!