我正在尝试使用camels sql组件从数据库中使用outputtype=streamlist进行流式处理。我从一个带有consumertemplate的java类中获取resultiterator:
public Flux<String> CreateFlux () {
ConsumerTemplate consumer = camelContext.createConsumerTemplate();
ResultSetIterator resultSetIterator = consumer.receiveBody(
"sql:SELECT DATA FROM TRANSAKSJON WHERE REQ_ID='" + recId + "'?outputType=StreamList", ResultSetIterator.class);
...
while (result.hasNext()) {
Map<String, String> map = (Map<String, String>) result.next();
String data = map.get("DATA");
}
}
尝试迭代ResultSeterator时出现以下错误:
org.h2.jdbc.jdbcsqlexception:对象已关闭[90007-197]
经检查,我看到连接处已关闭。连接={hikariproxyconnection@16287} "hikariproxyconnection@1048081993 Package com.zaxxer.hikari.pool.proxyconnection.closedconnection“
如何使用camel sql组件来流式处理?我必须从一个不在 Camel 路线内的豆子上使用它。我发现只有在camel路由中使用sql组件时,流才能工作。
camel版本是:2.24.1
更新1:在看了源代码之后,它是有意的。ondone关闭连接。我正在尝试在defaultexchange上设置unitofwork,以便通过将exchange标记为“未完成”来保持连接打开。
update2:通过设置unitof work:
ProducerTemplate pTmp = camelContext.createProducerTemplate();
DefaultExchange defaultExchange = new DefaultExchange(camelContext);
UnitOfWork unitOfWork = new DefaultUnitOfWork(defaultExchange);
defaultExchange.setUnitOfWork(unitOfWork);
pTmp.send("direct:DbStream", defaultExchange);
route dbstream执行上述sql select
1条答案
按热度按时间8nuwlpux1#
不要使用receivebody,而只使用receive来获取
Exchange
回来。然后,您可以从迭代器的消息体中获取迭代器,在使用之后,您可以完成交换(参见javadoc)