java:当一个事件实际上丢失时,如何对收集事件msg的类设置时间限制

nsc4cvqm  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(400)

我想找到一种方法来防止“collector”类永远等待,因为它的内容是不完整的,而且它无法知道丢失的事件消息不存在,无法传递给它。
我有一个名为eventset的类,它具有从jms onmessage侦听器传递给它的相关事件msg(来自监视)。每个相关事件都有一个序列号,最后一个事件称为“txnend”,它的序列号最高。当事件msgs的总数等于txnend的序列号时,我知道所有事件都存在,并且我的代码指示回调类处理事件集。
当缺少一个事件时,eventset的示例会一直耐心地等待。
我解决这个问题的想法是,在创建类时,以某种方式对等待时间进行限制。当超过这个值时,我的代码应该指示回调处理不完整的事件集。
在过去的2/3个月里,我曾在密歇根州的两个场合,根据建议和想法,尝试这个和那个,但都没有成功。直到现在,这是可能的停车,因为它几乎从来没有发生过。现在,我确实需要解决它。
从本质上说,这似乎是一个相当普遍的情况,应该有一个直接的解决办法。我没找到一个。
如有任何建议或解决办法,我将不胜感激。

scyqe7ek

scyqe7ek1#

解决方案是使用计时器和timertask(作为内部类)。如果接收到所有相关事件,则构造函数将创建计时器并按所需的秒数调度timertask,然后调用回调并取消计时器。在少有的情况下,如果事件丢失,timertask会启动并向回调发送一组不完整的事件并记录错误。
仅供参考。
公共类事件集{

  1. private MonitorEventsConsumer eventsConsumer;
  2. // For timeout handling
  3. private Timer timer;
  4. private int waitTime; // seconds to wait
  5. protected boolean isComplete;
  6. private String localTxnId;
  7. String eventSourceAddress;
  8. private boolean txnEndReceived;
  9. private int maxCounter = 0;
  10. private List<String> monEvents = new ArrayList<>();
  11. public EventSet(String localTxnId) {
  12. this(localTxnId, 20); // Default the waiting time for all events to be received to 20 secs
  13. }
  14. public EventSet(String localTxnId, int waitTime) {
  15. this.localTxnId = localTxnId;
  16. timer = new Timer();
  17. this.waitTime = waitTime;
  18. timer.schedule(new TimeoutTask(), waitTime*1000);
  19. }
  20. public boolean isNew() {
  21. return monEvents.isEmpty();
  22. }
  23. public void addEvent(String eventAsXML) {
  24. monEvents.add(eventAsXML);
  25. String counter = StringUtils.substringBetween(eventAsXML, "wmb:counter=\"", "\"/>");
  26. eventSourceAddress = StringUtils.substringBetween(eventAsXML, " wmb:eventSourceAddress=\"", "\">");
  27. if (eventSourceAddress.endsWith("transaction.End")) {
  28. txnEndReceived = true;
  29. }
  30. int iCounter = Integer.parseInt(counter);
  31. if (iCounter > maxCounter) {
  32. maxCounter = iCounter;
  33. }
  34. if (txnEndReceived && maxCounter == monEvents.size()) {
  35. // all events should have been received unless maxCounter = 1
  36. // Call back to the onMessageHandler to process this event set.
  37. eventsConsumer.sendEventsToNjams(localTxnId, this);
  38. isComplete = true;
  39. timer.cancel();
  40. }
  41. }
  42. public List<String> getMonEvents() {
  43. return monEvents;
  44. }
  45. public void clear() {
  46. monEvents.clear();
  47. localTxnId = null;
  48. isComplete = false;
  49. maxCounter = 0;
  50. }
  51. public void registerHandler(final MonitorEventsConsumer eventsConsumer) {
  52. if (this.eventsConsumer == null) {
  53. this.eventsConsumer = eventsConsumer;
  54. }
  55. }
  56. class TimeoutTask extends TimerTask {
  57. public void run() {
  58. eventsConsumer.sendEventsToNjams(localTxnId, EventSet.this);
  59. isComplete = true;
  60. System.err.println(String.format("Message events for local txn id '%s' were incomplete after waiting for %d seconds. they were sent to nJAMS in this state", localTxnId, waitTime));
  61. timer.cancel(); //Terminate the timer thread
  62. }
  63. }

}

展开查看全部
eeq64g8w

eeq64g8w2#

我不知道你的库是否支持这种方法,但是一个普通的java尝试会像这样:

  1. public class MessageProcessor extends Thread {
  2. private static final long PROCESSOR_TIMEOUT = 3600 * 1000 /* 1 hour? */;
  3. private static final long ONE_MESSAGE_TIMEOUT = 1000 /* 1 sec? */;
  4. private long m_startTime = 0;
  5. public void startProcessing() {
  6. m_startTime = System.currentTimeMillis();
  7. start();
  8. }
  9. @Override
  10. public void run() {
  11. while(true) {
  12. // ---------------------------------
  13. // check abort condition
  14. // ---------------------------------
  15. if(System.currentTimeMillis() - m_startTime > PROCESSOR_TIMEOUT) {
  16. break;
  17. }
  18. // ---------------------------------
  19. // handle 1 message (with timeout)
  20. // ---------------------------------
  21. try {
  22. // pseudo code
  23. collectOneMessage(ONE_MESSAGE_TIMEOUT);
  24. }
  25. catch(InterruptedException e) {
  26. // ignoring timeout exception
  27. }
  28. }
  29. // ---------------------------------
  30. // finish up
  31. // ---------------------------------
  32. // ...
  33. }
  34. }
展开查看全部

相关问题