我使用spring+mq+websphereapplicationserver。
我希望异步地使用来自mq的消息,并将这些消息组合起来,以便在每次提交时将n个实体持久化到数据库(而不会给我的目标oracle数据库带来太多的压力)
我使用defaultmessagelistenercontainer,使onmessage方法同步以添加消息(直到批大小),并创建线程以等待条件满足(时间/大小),并将消息推送到另一个执行业务逻辑和db持久化的线程。
线程开始的条件:
一旦onmessage方法中的第一条消息到达,线程就必须等待1000毫秒内收到25条消息,如果1000毫秒内没有收到25条消息,它会将可用数量的消息推送到另一个线程。
问题:
我看到线程只在服务器启动期间启动,而不是在第一次调用onmessage方法时启动。
有什么建议/其他方法可以实现从队列中收集消息吗?
应用程序上下文.xml
<bean id="myMessageListener" class="org.mypackage.MyMessageListener">
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" ref="queue"/>
<property name="messageListener" ref="myMessageListener"/>
<property name ="concurrentConsumers" value ="10"/>
<property name ="maxConcurrentConsumers" value ="50"/>
</bean>
侦听器:
package org.mypackage.MyMessageListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.mypackage.service.MyService;
public class MyMessageListener implements MessageListener {
private volatile long startTime = 0;
private volatile int messageCount;
private volatile List<String> messagesFromQueue = null;
private int batchSize = 25;
private long maximumBatchWaitTime = 1000;
@Autowired
private MyService myService;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
boolean threadRun = true;
while (threadRun) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("InterruptedException is caught inside run method");
}
if ((messageCount >0 && messageCount == batchSize)) {
System.out.println("----Batch size Reached----");
threadRun = false;
processMsgsFromQueue(messagesFromQueue);
} else {
if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
System.out.println("----Time limit is not reached----");
threadRun = true;
} else {
threadRun = false;
System.out.println("----Time limit is reached----");
processMsgsFromQueue(messagesFromQueue);
}
}
}
}
});
{
thread.start();
}
@Override
public synchronized void onMessage(Message message) {
if (messageCount == 0) {
startTime = System.currentTimeMillis();
messagesFromQueue = new ArrayList<String>();
System.out.println("----First Message Arrived at----"+startTime);
}
try {
messageCount++;
TextMessage tm = (TextMessage) message;
String msg = tm.getText();
messagesFromQueue.add(msg);
if (messageCount == 0) {
thread.start();
}
} catch (JMSException e1) {
e1.printStackTrace();
}
}
private void processMsgsFromQueue(List<String> messageFromQueue) {
System.out.println("Inside processMsgsFromQueue");
messageCount = 0;
messagesFromQueue = null;
if(!messageFromQueue.isEmpty()) {
this.myService.insertMsgsFromQueueToDB(messageFromQueue);
}
}
}
1条答案
按热度按时间ss2ws0br1#
您还需要同步对来自队列的消息的访问。
https://docs.oracle.com/javase/7/docs/api/java/util/collections.html#synchronizedlist(java.util.list)
每次调用ProcessMsgFromQueue时,您都会有一个nullpointerexception!!
最好持久化消息,当commit正常时,清除列表并重置计数器。