spring 如何在Sping Boot 中实现Oracle AQ队列?

zrfyljdw  于 2024-01-05  发布在  Spring
关注(0)|答案(5)|浏览(182)

我已经学会了如何用AQ(Streams?)包创建Oracle数据库。我还在Oracle中创建了一些队列(手工)。(使用PL/SQL和SQL)。
然而,我很难从Spring建立一个适当的连接。
下面的工作原理(使用oracle.AQ java包):

  1. private final String aqUrl = "jdbc:oracle:thin:@localhost:1521:orcl";
  2. private final String aqUser = "queue_mut";
  3. private final String aqPassword = "******";
  4. private final String aqSchema = "queue_mut";
  5. private final String aqTable = "aq_table1";
  6. private final String aqQueue = "aq_queue1";
  7. @Test
  8. public void testManualAQ() throws ClassNotFoundException, SQLException, AQException {
  9. Class.forName("oracle.jdbc.driver.OracleDriver");
  10. Connection connection = DriverManager.getConnection(aqUrl, aqUser, aqPassword);
  11. connection.setAutoCommit(false);
  12. Class.forName("oracle.AQ.AQOracleDriver");
  13. AQSession aqSession = AQDriverManager.createAQSession(connection);
  14. AQQueueTable q_table = aqSession.createQueueTable(aqSchema, aqTable, new AQQueueTableProperty("RAW"));
  15. aqSession.createQueue(q_table, aqQueue, new AQQueueProperty());
  16. }

字符串
(基于https://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm
这表明我可以连接到Oracle并实现AQ功能。
现在,我正在尝试创建Java配置的bean,以便使用JmsTemplate

  1. @Resource
  2. private JmsTemplate jmsTemplate;
  3. @Test
  4. public void testJmsTemplate() {
  5. String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
  6. "<product id=\"10\">\n" +
  7. " <description>Foo</description>\n" +
  8. " <price>2.05</price>\n" +
  9. "</product>";
  10. jmsTemplate.convertAndSend(aqSchema + ".jms_ws_incoming_queue", xmlval);
  11. }


(yes,队列已存在;-))
使用以下配置类:

  1. import oracle.jms.AQjmsFactory;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  5. import org.springframework.jms.core.JmsTemplate;
  6. import javax.jms.ConnectionFactory;
  7. import javax.jms.JMSException;
  8. import javax.sql.DataSource;
  9. @Configuration
  10. public class OracleAQConfiguration {
  11. @Bean
  12. public DataSourceTransactionManager transactionManager(DataSource dataSource) {
  13. DataSourceTransactionManager manager = new DataSourceTransactionManager();
  14. manager.setDataSource(dataSource);
  15. return manager;
  16. }
  17. @Bean
  18. public ConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
  19. return AQjmsFactory.getQueueConnectionFactory(dataSource);
  20. }
  21. @Bean
  22. public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
  23. JmsTemplate jmsTemplate = new JmsTemplate();
  24. jmsTemplate.setSessionTransacted(true);
  25. jmsTemplate.setConnectionFactory(connectionFactory);
  26. return jmsTemplate;
  27. }
  28. }


使用属性yml:

  1. spring:
  2. datasource:
  3. url: jdbc:oracle:thin:@localhost:1521:orcl
  4. username: queue_mut
  5. password: ******
  6. driverClassName: oracle.jdbc.driver.OracleDriver


但是,这样我就犯了我无法理解的错误:

  1. 2017-04-19 12:11:17,151 INFO my.project.QueueTest: Started QueueTest in 5.305 seconds (JVM running for 6.588)
  2. org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is oracle.jms.AQjmsException: Error creating the db_connection; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
  3. at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
  4. at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
  5. at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
  6. at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:570)
  7. at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:658)
  8. at my.project.QueueTest.testJmsTemplate(QueueTest.java:51)
  9. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  10. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  11. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  12. at java.lang.reflect.Method.invoke(Method.java:498)
  13. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
  14. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  15. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
  16. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  17. at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
  18. at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
  19. at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
  20. at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
  21. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
  22. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
  23. at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
  24. at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
  25. at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
  26. at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
  27. at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
  28. at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
  29. at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
  30. at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
  31. at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
  32. at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
  33. at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
  34. at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
  35. at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
  36. at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
  37. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  38. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  39. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  40. at java.lang.reflect.Method.invoke(Method.java:498)
  41. at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
  42. Caused by: oracle.jms.AQjmsException: Error creating the db_connection
  43. at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:625)
  44. at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:399)
  45. at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:249)
  46. at oracle.jms.AQjmsConnectionFactory.createConnection(AQjmsConnectionFactory.java:513)
  47. at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
  48. at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:474)
  49. ... 36 more
  50. Caused by: java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
  51. at oracle.jms.AQjmsGeneralDBConnection.getProviderKey(AQjmsGeneralDBConnection.java:98)
  52. at oracle.jms.AQjmsGeneralDBConnection.<init>(AQjmsGeneralDBConnection.java:67)
  53. at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:566)
  54. ... 41 more


我相信Cast异常的发生是因为它是一个ProxyConnection[PooledConnection[oracle.jdbc.driver.T4CConnection@40016ce1]]。但我不知道如何修复这个问题。

toe95027

toe950271#

更改jdbc库,在我的情况下,这修复了它(如果不这样做,请尝试使用其他版本):

  1. <dependency>
  2. <groupId>com.oracle</groupId>
  3. <artifactId>ojdbc7</artifactId>
  4. <version>12.1.0.2.0</version>
  5. </dependency>

字符串

balp4ylt

balp4ylt2#

我们在尝试从Sping Boot 访问Oracle AQ时遇到了相同的异常。研究表明,此异常是由于数据库连接池库不允许访问Oracle AQ库所需的底层连接而引发的。(dbcp和tomcat连接池库都引发了异常,不相同但相似)
当我们从依赖项中删除数据库连接池库时,这个异常就消失了,这会导致整个应用程序没有数据库连接池的不良状态。
我们注意到,如果我们使用以下方法,则不会抛出AQjmsFactory.getQueueConnectionFactory(url, info);异常
也许解决方案中也缺少连接池,但这仅限于从AQ读取的组件。应用程序中的其他组件将具有连接池的好处。
下面是Bean定义的java配置:

  1. @Bean
  2. public QueueConnectionFactory connectionFactory() throws Exception {
  3. OracleServiceInfo serviceInfo = (OracleServiceInfo) this.cloud().getServiceInfo(NAME_PRIMARY_DS);
  4. Properties info = new Properties();
  5. String url = serviceInfo.getJdbcUrl();
  6. info.put("driver-name", "oracle.jdbc.OracleDriver");
  7. info.put("user", serviceInfo.getUserName());
  8. info.put("password", serviceInfo.getPassword());
  9. return oracle.jms.AQjmsFactory.getQueueConnectionFactory(url, info);
  10. }
  11. @Bean
  12. public JmsTemplate jmsTemplate() throws Exception {
  13. JmsTemplate jmsTemplate = new JmsTemplate();
  14. jmsTemplate.setConnectionFactory(connectionFactory());
  15. return jmsTemplate;
  16. }

字符串
我不知道这是否一个好的解决办法,但这肯定是一个方法,以消除问题中讨论的例外情况。

展开查看全部
n1bvdmb6

n1bvdmb63#

嗨,我也花了相当长的时间来使连接工作,但最后它做到了,这是如何:
首先,确保Oracle AQ队列表的有效负载未设置为RAW,而最好设置为Text:RAW.AQ$_JMS_TEXT_MESSAGE
接下来使用类似于下面的OracleAQConfiguration:

  1. import oracle.jdbc.pool.OracleDataSource;
  2. import oracle.jms.AQjmsFactory;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import javax.jms.JMSException;
  7. import javax.jms.QueueConnectionFactory;
  8. import javax.sql.DataSource;
  9. import java.sql.SQLException;
  10. @Configuration
  11. public class OracleAQConfiguration {
  12. // Values are retrieved from custom added props in Spring application.properties
  13. @Value("${myapplication.datasource.user}")
  14. private String user;
  15. @Value("${myapplication.datasource.password}")
  16. private String password;
  17. @Value("${myapplication.datasource.connectionstring}")
  18. private String connectionstring;
  19. @Bean
  20. /**
  21. * Spring bean with the configuration details of where the Oracle database is containing the QUEUES
  22. */
  23. public DataSource dataSource() throws SQLException {
  24. OracleDataSource ds = new OracleDataSource();
  25. ds.setUser(user);
  26. ds.setPassword(password);
  27. ds.setURL(connectionstring);
  28. ds.setImplicitCachingEnabled(true);
  29. ds.setFastConnectionFailoverEnabled(true);
  30. return ds;
  31. }
  32. @Bean
  33. /**
  34. * The KEY component effectively connecting to the Oracle AQ system using the datasource input
  35. */
  36. public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
  37. return AQjmsFactory.getQueueConnectionFactory(dataSource);
  38. }
  39. }

字符串
接下来使用类似于下面的JMSConfiguration。在这里,我读取和写入相同的队列,这在真实的应用程序集成场景中是不可能的。但是可以用于测试

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  5. import org.springframework.jms.core.JmsTemplate;
  6. import org.springframework.jms.listener.DefaultMessageListenerContainer;
  7. import javax.jms.ConnectionFactory;
  8. import javax.sql.DataSource;
  9. @Configuration
  10. public class JMSConfiguration {
  11. private static final String QUEUENAME_WRITE = "MYQUEUE";
  12. private static final String QUEUENAME_READ = "MYQUEUE";
  13. @Autowired
  14. private JMSReceiver jmsReceiver;
  15. @Bean
  16. /**
  17. * Spring bean to WRITE/SEND/ENQUEUE messages on a queue with a certain name
  18. */
  19. public JmsTemplate jmsTemplate(ConnectionFactory conFactory) {
  20. JmsTemplate jmsTemplate = new JmsTemplate();
  21. jmsTemplate.setDefaultDestinationName(QUEUENAME_WRITE);
  22. jmsTemplate.setSessionTransacted(true);
  23. jmsTemplate.setConnectionFactory(conFactory);
  24. return jmsTemplate;
  25. }
  26. /**
  27. * Spring bean to READ/RECEIVE/DEQUEUE messages of a queue with a certain name
  28. * All of this happens under a code managed transaction
  29. * to commit the change on Oracle (remove of the message from the queue table)
  30. * Reference the application custom code handling the message here
  31. */
  32. @Bean
  33. public DefaultMessageListenerContainer messageListenerContainer(ConnectionFactory conFactory, DataSource dataSource) {
  34. DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
  35. dmlc.setDestinationName(QUEUENAME_READ);
  36. dmlc.setSessionTransacted(true);
  37. dmlc.setConnectionFactory(conFactory);
  38. DataSourceTransactionManager manager = new DataSourceTransactionManager();
  39. manager.setDataSource(dataSource);
  40. dmlc.setTransactionManager(manager);
  41. // Add here our self-written JMS Receiver
  42. dmlc.setMessageListener(jmsReceiver);
  43. return dmlc;
  44. }
  45. }


最后,对于处理传入的JMS消息,使用类似于:

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.jms.listener.SessionAwareMessageListener;
  4. import org.springframework.stereotype.Component;
  5. import javax.jms.JMSException;
  6. import javax.jms.Message;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. @Component
  10. public class JMSReceiver implements SessionAwareMessageListener {
  11. private static final Logger logger = LoggerFactory.getLogger(JMSReceiver.class);
  12. @Override
  13. public void onMessage(Message message, Session session) throws JMSException {
  14. // We know/assume the Queue Payload type was set to 'TextMessage'
  15. TextMessage txtMessage = (TextMessage) message;
  16. logger.info("JMS Text Message received: " + txtMessage.getText());
  17. // ... further implementation
  18. }
  19. }

展开查看全部
n1bvdmb6

n1bvdmb64#

问题是AQ代码需要OracleConnection,但在池化时连接被 Package ,因此失败

ljsrvy3e

ljsrvy3e5#

  1. @Bean
  2. public AQjmsConnectionFactory connectionFactory(@Autowired DataSource dataSource) throws Exception{
  3. AQjmsConnectionFactory connectionFactory=new AQjmsConnectionFactory();
  4. connectionFactory.setDatasource(dataSource);
  5. return connectionFactory;
  6. }
  7. @Bean
  8. public JmsTemplate jmsTemplate(@Autowired AQjmsConnectionFactory connectionFactory) throws Exception{
  9. JmsTemplate jmsTemplate=new JmsTemplate();
  10. jmsTemplate.setConnectionFactory(connectionFactory);
  11. return jmsTemplate;
  12. }
  13. POM File:
  14. <dependency>
  15. <groupId>com.oracle.database.jdbc</groupId>
  16. <artifactId>ojdbc8</artifactId>
  17. </dependency>
  18. <dependency>
  19. <groupId>com.oracle.database.messaging</groupId>
  20. <artifactId>aqapi-jakarta</artifactId>
  21. <version>23.2.1.0</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>jakarta.jms</groupId>
  25. <artifactId>jakarta.jms-api</artifactId>
  26. </dependency>
  27. <!-- https://mvnrepository.com/artifact/jakarta.transaction/jakarta.transaction-api -->
  28. <dependency>
  29. <groupId>jakarta.transaction</groupId>
  30. <artifactId>jakarta.transaction-api</artifactId>
  31. </dependency>

字符串
//使用Spring JMS模板向Oracle AQ JMS队列发送消息的代码

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.jms.core.JmsTemplate;
  3. import org.springframework.jms.core.MessageCreator;
  4. import org.springframework.stereotype.Service;
  5. import com.fasterxml.jackson.databind.ObjectMapper;
  6. import jakarta.jms.JMSException;
  7. import jakarta.jms.Message;
  8. import jakarta.jms.Session;
  9. @Service
  10. public class JmsUtil {
  11. @Autowired
  12. JmsTemplate jmsTemplate;
  13. //Send JMS message to Oracle AQ JMS queue
  14. public void sendMessageToJmsQueue(YourPojoClass yourPojoClass) {
  15. try {
  16. ObjectMapper mapper=new ObjectMapper();
  17. String pojoAsJson=mapper.writeValueAsString(yourPojoClass);
  18. jmsTemplate.send("YourQueueName", new MessageCreator() {
  19. public Message createMessage(Session session) throws JMSException{
  20. return session.createTextMessage(pojoAsJson);
  21. }
  22. });
  23. }catch(Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

展开查看全部

相关问题