java在trident中实现事务拓扑的问题

epggiuax  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(416)

我的用例是调用一个查询,用不同的输入参数从数据库中获取记录。获取记录后,进行一些处理,最后将其写入文件。我的输入参数值取决于上一个查询的完整处理。我的问题是,我如何才能在一个喷口中知道上一个查询的处理已经完成,即记录已经成功写入文件。
我试着实现 ITridentSpout 但仍然没有得到任何解决办法。下面是我的密码 ITridentSpout :
tridentcoordinator.java文件

  1. package com.TransactionlTopology;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. import storm.trident.spout.ITridentSpout;
  4. public class TridentCoordinator implements ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>>{
  5. ConcurrentHashMap<Long,String> prevMetadata=new ConcurrentHashMap<Long, String>();
  6. boolean result=true;
  7. @Override
  8. public void success(long txid) {
  9. System.out.println("inside success mehod with txid as "+txid);
  10. if(prevMetadata.containsKey(txid)){
  11. prevMetadata.replace(txid, "SUCCESS");
  12. }
  13. }
  14. @Override
  15. public boolean isReady(long txid) {
  16. if(!prevMetadata.isEmpty()){
  17. result=true;
  18. for(Long txId:prevMetadata.keySet()){
  19. System.out.println("txId:---- "+txId +" value"+prevMetadata.get(txId) );
  20. if(prevMetadata.get(txId).equalsIgnoreCase("SUCESS")){
  21. prevMetadata.put(txid, "STARTED");
  22. result= true;
  23. }
  24. }
  25. }
  26. else{
  27. prevMetadata.put(txid, "STARTED");
  28. result= true;
  29. }
  30. System.out.println("inside isReady function with txid as:---- "+txid+"result value:-- "+result);
  31. return result;
  32. }
  33. @Override
  34. public void close() {
  35. // TODO Auto-generated method stub
  36. }
  37. @Override
  38. public ConcurrentHashMap<Long,String> initializeTransaction(long txid, ConcurrentHashMap<Long,String> prevMetadata, ConcurrentHashMap<Long,String> currMetadata) {
  39. System.out.println("inside initialize transaction method with values as:----- "+txid+" "+prevMetadata+" "+currMetadata);
  40. return prevMetadata;
  41. }
  42. }

tridentemitterimpl.java文件

  1. package com.TransactionlTopology;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. import storm.trident.operation.TridentCollector;
  4. import storm.trident.spout.ITridentSpout;
  5. import storm.trident.topology.TransactionAttempt;
  6. import backtype.storm.tuple.Values;
  7. public class TridentEmitterImpl implements ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> {
  8. @Override
  9. public void emitBatch(TransactionAttempt tx, ConcurrentHashMap<Long,String> coordinatorMeta,TridentCollector collector) {
  10. System.out.println("inside emitbatch of emitter class with values as:--- "+coordinatorMeta);
  11. System.out.println("tx.getAttemptId() "+tx.getAttemptId()+"tx.getTransactionId() "+tx.getTransactionId()+"tx.getId() "+tx.getId().toString());
  12. collector.emit(new Values("preeti"));
  13. }
  14. @Override
  15. public void success(TransactionAttempt tx) {
  16. System.out.println("inside success of emitter with tx id as "+tx.getTransactionId());
  17. }
  18. @Override
  19. public void close() {
  20. // TODO Auto-generated method stub
  21. }
  22. }

三叉戟.java

  1. package com.TransactionlTopology;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. import storm.trident.spout.ITridentSpout;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.tuple.Fields;
  8. public class TridentSpoutImpl implements ITridentSpout<ConcurrentHashMap<Long,String>> {
  9. @Override
  10. public storm.trident.spout.ITridentSpout.BatchCoordinator<ConcurrentHashMap<Long,String>> getCoordinator(String txStateId, Map conf, TopologyContext context) {
  11. return new TridentCoordinator();
  12. }
  13. @Override
  14. public storm.trident.spout.ITridentSpout.Emitter<ConcurrentHashMap<Long,String>> getEmitter(String txStateId, Map conf, TopologyContext context) {
  15. return new TridentEmitterImpl();
  16. }
  17. @Override
  18. public Map getComponentConfiguration() {
  19. Map<String,String> newMap=new HashMap<String, String>();
  20. newMap.put("words","preeti");
  21. return newMap;
  22. }
  23. @Override
  24. public Fields getOutputFields() {
  25. return new Fields("word");
  26. }
  27. }

也不能理解什么价值观会进来 initializeTransaction 作为 prevMetaData 以及 curMetada . 请提供一些解决方案

ohtdti5x

ohtdti5x1#

你有各种各样的选择。不过,也许最简单的方法是在拓扑结构中设置最后一个螺栓,在写入文件后,通知spout通过spout可以监视的消息队列启动下一个查询是好的。当喷口接收到这个通知时,它就可以处理下一个查询。
然而,更一般地说,这似乎是一个值得怀疑的storm用例。拓扑的许多资源可能会在很多时候处于空闲状态,因为一次只有一个事务在其中运行。显然,我不知道问题的所有细节,但是事务之间的这种依赖性限制了使用storm所增加的复杂性的价值。

相关问题