java在trident中实现事务拓扑的问题

epggiuax  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(350)

我的用例是调用一个查询,用不同的输入参数从数据库中获取记录。获取记录后,进行一些处理,最后将其写入文件。我的输入参数值取决于上一个查询的完整处理。我的问题是,我如何才能在一个喷口中知道上一个查询的处理已经完成,即记录已经成功写入文件。
我试着实现 ITridentSpout 但仍然没有得到任何解决办法。下面是我的密码 ITridentSpout :
tridentcoordinator.java文件

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;

public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{

    ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
    boolean result=true;

    @Override
    public void success(long txid) {
        System.out.println("inside success mehod with txid as  "+txid);
        if(prevMetadata.containsKey(txid)){
            prevMetadata.replace(txid, "SUCCESS");
        }
    }

    @Override
    public boolean isReady(long txid) {
        if(!prevMetadata.isEmpty()){
            result=true;
        for(Long txId:prevMetadata.keySet()){
            System.out.println("txId:---- "+txId +"    value"+prevMetadata.get(txId) );
            if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
                prevMetadata.put(txid, "STARTED");
                result= true;
            }
        }
        }
        else{
            prevMetadata.put(txid, "STARTED");
            result= true;
        }

        System.out.println("inside isReady function with txid as:---- "+txid+"result value:--  "+result);

        return result;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
        System.out.println("inside initialize transaction method with values as:----- "+txid+"   "+prevMetadata+"   "+currMetadata);

        return prevMetadata;
    }
}

tridentemitterimpl.java文件

package com.TransactionlTopology;

import java.util.concurrent.ConcurrentHashMap;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;

public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {

    @Override
    public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
        System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
        System.out.println("tx.getAttemptId()   "+tx.getAttemptId()+"tx.getTransactionId()  "+tx.getTransactionId()+"tx.getId()  "+tx.getId().toString());
        collector.emit(new Values("preeti"));
    }

    @Override
    public void success(TransactionAttempt tx) {
        System.out.println("inside success of emitter with tx id as   "+tx.getTransactionId());

    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
}

三叉戟.java

package com.TransactionlTopology;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import storm.trident.spout.ITridentSpout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {

    @Override
    public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {

        return new TridentCoordinator();
    }

    @Override
    public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {

        return new TridentEmitterImpl();
    }

    @Override
    public Map getComponentConfiguration() {

        Map<String,String> newMap=new HashMap<String, String>();
        newMap.put("words","preeti");
        return newMap;
    }

    @Override
    public Fields getOutputFields() {

        return new Fields("word");
    }

}

也不能理解什么价值观会进来 initializeTransaction 作为 prevMetaData 以及 curMetada . 请提供一些解决方案

ohtdti5x

ohtdti5x1#

你有各种各样的选择。不过,也许最简单的方法是在拓扑结构中设置最后一个螺栓,在写入文件后,通知spout通过spout可以监视的消息队列启动下一个查询是好的。当喷口接收到这个通知时,它就可以处理下一个查询。
然而,更一般地说,这似乎是一个值得怀疑的storm用例。拓扑的许多资源可能会在很多时候处于空闲状态,因为一次只有一个事务在其中运行。显然,我不知道问题的所有细节,但是事务之间的这种依赖性限制了使用storm所增加的复杂性的价值。

相关问题