camel:如何使用streamlist从sql组件流式处理

qij5mzcb  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(555)

我正在尝试使用camels sql组件从数据库中使用outputtype=streamlist进行流式处理。我从一个带有consumertemplate的java类中获取resultiterator:

  1. public Flux<String> CreateFlux () {
  2. ConsumerTemplate consumer = camelContext.createConsumerTemplate();
  3. ResultSetIterator resultSetIterator = consumer.receiveBody(
  4. "sql:SELECT DATA FROM TRANSAKSJON WHERE REQ_ID='" + recId + "'?outputType=StreamList", ResultSetIterator.class);
  5. ...
  6. while (result.hasNext()) {
  7. Map<String, String> map = (Map<String, String>) result.next();
  8. String data = map.get("DATA");
  9. }
  10. }

尝试迭代ResultSeterator时出现以下错误:
org.h2.jdbc.jdbcsqlexception:对象已关闭[90007-197]
经检查,我看到连接处已关闭。连接={hikariproxyconnection@16287} "hikariproxyconnection@1048081993 Package com.zaxxer.hikari.pool.proxyconnection.closedconnection“
如何使用camel sql组件来流式处理?我必须从一个不在 Camel 路线内的豆子上使用它。我发现只有在camel路由中使用sql组件时,流才能工作。 
camel版本是:2.24.1
更新1:在看了源代码之后,它是有意的。ondone关闭连接。我正在尝试在defaultexchange上设置unitofwork,以便通过将exchange标记为“未完成”来保持连接打开。
update2:通过设置unitof work:

  1. ProducerTemplate pTmp = camelContext.createProducerTemplate();
  2. DefaultExchange defaultExchange = new DefaultExchange(camelContext);
  3. UnitOfWork unitOfWork = new DefaultUnitOfWork(defaultExchange);
  4. defaultExchange.setUnitOfWork(unitOfWork);
  5. pTmp.send("direct:DbStream", defaultExchange);

route dbstream执行上述sql select

8nuwlpux

8nuwlpux1#

不要使用receivebody,而只使用receive来获取 Exchange 回来。然后,您可以从迭代器的消息体中获取迭代器,在使用之后,您可以完成交换(参见javadoc)

相关问题