我的用例是调用一个查询,用不同的输入参数从数据库中获取记录。获取记录后,进行一些处理,最后将其写入文件。我的输入参数值取决于上一个查询的完整处理。我的问题是,我如何才能在一个喷口中知道上一个查询的处理已经完成,即记录已经成功写入文件。
我试着实现 ITridentSpout
但仍然没有得到任何解决办法。下面是我的密码 ITridentSpout
:
tridentcoordinator.java文件
package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{
ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
boolean result=true;
@Override
public void success(long txid) {
System.out.println("inside success mehod with txid as "+txid);
if(prevMetadata.containsKey(txid)){
prevMetadata.replace(txid, "SUCCESS");
}
}
@Override
public boolean isReady(long txid) {
if(!prevMetadata.isEmpty()){
result=true;
for(Long txId:prevMetadata.keySet()){
System.out.println("txId:---- "+txId +" value"+prevMetadata.get(txId) );
if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
prevMetadata.put(txid, "STARTED");
result= true;
}
}
}
else{
prevMetadata.put(txid, "STARTED");
result= true;
}
System.out.println("inside isReady function with txid as:---- "+txid+"result value:-- "+result);
return result;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
System.out.println("inside initialize transaction method with values as:----- "+txid+" "+prevMetadata+" "+currMetadata);
return prevMetadata;
}
}
tridentemitterimpl.java文件
package com.TransactionlTopology;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;
public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {
@Override
public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
System.out.println("tx.getAttemptId() "+tx.getAttemptId()+"tx.getTransactionId() "+tx.getTransactionId()+"tx.getId() "+tx.getId().toString());
collector.emit(new Values("preeti"));
}
@Override
public void success(TransactionAttempt tx) {
System.out.println("inside success of emitter with tx id as "+tx.getTransactionId());
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
三叉戟.java
package com.TransactionlTopology;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {
@Override
public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new TridentCoordinator();
}
@Override
public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new TridentEmitterImpl();
}
@Override
public Map getComponentConfiguration() {
Map<String,String> newMap=new HashMap<String, String>();
newMap.put("words","preeti");
return newMap;
}
@Override
public Fields getOutputFields() {
return new Fields("word");
}
}
也不能理解什么价值观会进来 initializeTransaction
作为 prevMetaData
以及 curMetada
. 请提供一些解决方案
1条答案
按热度按时间ohtdti5x1#
你有各种各样的选择。不过,也许最简单的方法是在拓扑结构中设置最后一个螺栓,在写入文件后,通知spout通过spout可以监视的消息队列启动下一个查询是好的。当喷口接收到这个通知时,它就可以处理下一个查询。
然而,更一般地说,这似乎是一个值得怀疑的storm用例。拓扑的许多资源可能会在很多时候处于空闲状态,因为一次只有一个事务在其中运行。显然,我不知道问题的所有细节,但是事务之间的这种依赖性限制了使用storm所增加的复杂性的价值。