?
Spring 提供了一個 JmsTransactionManager 用于對 JMS ConnectionFactory 做事務(wù)管理。這將允許 JMS 應(yīng)用利用 Spring 的事務(wù)管理特性。 JmsTransactionManager 在執(zhí)行本地資源事務(wù)管理時將從指定的 ConnectionFactory 綁定一個 ConnectionFactory/Session 這樣的配對到線程中。 JmsTemplate 會自動檢測這樣的事務(wù)資源,并對它們進(jìn)行相應(yīng)操作。
在 Java EE 環(huán)境中, ConnectionFactory 會池化 Connection 和 Session ,這樣這些資源將會在整個事務(wù)中被有效地重復(fù)利用。在一個獨(dú)立的環(huán)境中,使用 Spring 的 SingleConnectionFactory 時所有的事務(wù)將公用一個 Connection ,但是每個事務(wù)將保留自己獨(dú)立的 Session 。
JmsTemplate 可以利用 JtaTransactionManager 和能夠進(jìn)行分布式的 JMS ConnectionFactory 處理分布式事務(wù)。
?????? 在 Spring 整合 JMS 的應(yīng)用中,如果我們要進(jìn)行本地的事務(wù)管理的話非常簡單,只需要在定義對應(yīng)的消息監(jiān)聽容器時指定其 sessionTransacted 屬性為 true ,如:
?
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="sessionTransacted" value="true"/> </bean>
?
?????? 該屬性值默認(rèn)為 false ,這樣 JMS 在進(jìn)行消息監(jiān)聽的時候就會進(jìn)行事務(wù)控制,當(dāng)在接收消息時監(jiān)聽器執(zhí)行失敗時 JMS 就會對接收到的消息進(jìn)行回滾,對于 SessionAwareMessageListener 在接收到消息后發(fā)送一個返回消息時也處于同一事務(wù)下,但是對于其他操作如數(shù)據(jù)庫訪問等將不屬于該事務(wù)控制。
這里我們可以來做一個這樣的測試:我們?nèi)缟吓渲帽O(jiān)聽在 queueDestination 的消息監(jiān)聽容器的 sessionTransacted 屬性為 true ,然后把我們前面提到的消息監(jiān)聽器 ConsumerMessageListener 改成這樣:
?
public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //這里我們知道生產(chǎn)者發(fā)送的就是一個純文本消息,所以這里可以直接進(jìn)行強(qiáng)制轉(zhuǎn)換,或者直接把onMessage方法的參數(shù)改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一個純文本消息。"); try { System.out.println("消息內(nèi)容是:" + textMsg.getText()); if (1 == 1) { throw new RuntimeException("Error"); } } catch (JMSException e) { e.printStackTrace(); } } }
????????
我們可以看到在上述代碼中我們的 ConsumerMessageListener 在進(jìn)行消息接收的時候拋出了一個 RuntimeException ,根據(jù)我們上面說的,因?yàn)槲覀円呀?jīng)在對應(yīng)的監(jiān)聽容器上定義了其 sessionTransacted 屬性為 true ,所以當(dāng)這里拋出異常的時候 JMS 將對接收到的消息進(jìn)行回滾,即下次進(jìn)行消息接收的時候該消息仍然能夠被接收到。為了驗(yàn)證這一點(diǎn),我們先執(zhí)行一遍測試代碼,往 queueDestination 發(fā)送一個文本消息,這個時候 ConsumerMessageListener 在進(jìn)行接收的時候?qū)伋鲆粋€ RuntimeException ,已經(jīng)接收到的純文本消息將進(jìn)行回滾;接著我們?nèi)サ羯厦娲a中拋出異常的語句,即 ConsumerMessageListener 能夠正常的進(jìn)行消息接收,這個時候我們再運(yùn)行一次測試代碼,往 ConsumerMessageListener 監(jiān)聽的 queueDestination 發(fā)送一條消息。如果之前在接手時拋出了異常的那條消息已經(jīng)回滾了的話,那么這個時候?qū)⒛軌蚪邮盏絻蓷l消息,控制臺將輸出接收到的兩條消息的內(nèi)容。具體結(jié)果有興趣的朋友可以自己驗(yàn)證一下。
?????? 如果想接收消息和數(shù)據(jù)庫訪問處于同一事務(wù)中,那么我們就可以配置一個外部的事務(wù)管理同時配置一個支持外部事務(wù)管理的消息監(jiān)聽容器(如 DefaultMessageListenerContainer )。要配置這樣一個參與分布式事務(wù)管理的消息監(jiān)聽容器,我們可以配置一個 JtaTransactionManager ,當(dāng)然底層的 JMS ConnectionFactory 需要能夠支持分布式事務(wù)管理,并正確地注冊我們的 JtaTransactionManager 。這樣消息監(jiān)聽器進(jìn)行消息接收和對應(yīng)的數(shù)據(jù)庫訪問就會處于同一數(shù)據(jù)庫控制下,當(dāng)消息接收失敗或數(shù)據(jù)庫訪問失敗都會進(jìn)行事務(wù)回滾操作。
?
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
????????
當(dāng)給消息監(jiān)聽容器指定了 transactionManager 時,消息監(jiān)聽容器將忽略 sessionTransacted 的值。 ?
?????? 關(guān)于使用 JtaTransactionManager 來管理上述分布式事務(wù),我們這里也可以來做一個試驗(yàn)。
?????? 首先:往 Spring 配置文件 applicationContext.xml 中添加如下配置:
???
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource"/> </bean> <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
??
?????? 我們可以看到,在這里我們引入了一個 jndi 數(shù)據(jù)源,定義了一個 JtaTransactionManager ,定義了 Spring 基于注解的聲明式事務(wù)管理,定義了一個 Spring 提供的進(jìn)行 Jdbc 操作的工具類 jdbcTemplate 。
?
?????? 接下來把我們的 ConsumerMessageListener 改為如下形式:
public class ConsumerMessageListener implements MessageListener { @Autowired private TestDao testDao; private int count = 0; public void onMessage(Message message) { //這里我們知道生產(chǎn)者發(fā)送的就是一個純文本消息,所以這里可以直接進(jìn)行強(qiáng)制轉(zhuǎn)換,或者直接把onMessage方法的參數(shù)改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println(new Date().toLocaleString() + "接收到一個純文本消息。"); try { String text = textMsg.getText(); System.out.println("消息內(nèi)容是:" + text); System.out.println("當(dāng)前count的值是:" + count); testDao.insert(text + count); if (count == 0) { count ++; throw new RuntimeException("Error! 出錯啦!"); } } catch (JMSException e) { e.printStackTrace(); } } }
?
?????? 我們可以看到,在 ConsumerMessageListener 中我們定義了一個實(shí)例變量 count ,其初始值為 0 ;在 onMessage 里面,我們可以看到我們把接收到的消息內(nèi)容作為參數(shù)調(diào)用了 testDao 的 insert 方法;當(dāng) count 值為 0 ,也就是進(jìn)行第一次消息接收的時候會將 count 的值加 1 ,同時拋出一個運(yùn)行時異常。那么我們這里要測試的就是進(jìn)行第一次接收的時候 testDao 已經(jīng)把相關(guān)內(nèi)容插入數(shù)據(jù)庫了,接著在 onMessage 里面拋出了一個異常同時 count 加 1 ,我們預(yù)期的結(jié)果應(yīng)該是此時數(shù)據(jù)庫進(jìn)行回滾,同時 JMS 也回滾,這樣 JMS 將繼續(xù)嘗試接收該消息,此時同樣會調(diào)用 testDao 的 insert 方法將內(nèi)容插入數(shù)據(jù)庫,再接著 count 已經(jīng)不為 0 了,所以此時將不再拋出異常, JMS 成功進(jìn)行消息的接收, testDao 也成功的將消息內(nèi)容插入到了數(shù)據(jù)庫。要證明這個預(yù)期我們除了看數(shù)據(jù)庫中插入的數(shù)據(jù)外,還可以看控制臺的輸出,正常情況控制臺將輸出兩次消息接收的內(nèi)容,且第一次時 count 為 0 ,第二次 count 為 1 。
?????? TestDao 是一個接口,其 TestDaoImpl 對 insert 的方法實(shí)現(xiàn)如下: ?
?
@Transactional(readOnly=false) public void insert(final String name) { jdbcTemplate.update("insert into test(name) values(?)", name); }
???????
這里我們使用支持 JtaTransactionManager 的 Weblogic 來進(jìn)行測試,因?yàn)槭? Web 容器,所以我們這里定義了一個 Controller 來進(jìn)行消息的發(fā)送,具體代碼如下:
?
@Controller @RequestMapping("test") public class TestController { @Autowired @Qualifier("queueDestination") private Destination destination; @Autowired private ProducerService producerService; @RequestMapping("first") public String first() { producerService.sendMessage(destination, "你好,現(xiàn)在是:" + new Date().toLocaleString()); return "/test/first"; } }
??????
接下來就是啟用 Weblogic 服務(wù)器,進(jìn)入其控制臺,定義一個名叫“ jdbc/mysql ”的 JNDI 數(shù)據(jù)源,然后把該項(xiàng)目部署到 Weblogic 服務(wù)器上并進(jìn)行啟動。接下來我們就可以訪問 /test/first.do 訪問到上述 first 方法。之后控制臺會輸出如下信息:
???????
我們可以看到當(dāng)
count
為
0
時接收了一次,并隨后拋出了異常,之后
count
為
1
又接收了一次,這說明在
count
為
0
時拋出異常后我們的
JMS
進(jìn)行回滾了,那么我們的數(shù)據(jù)庫是否有進(jìn)行回滾呢?接著我們來看數(shù)據(jù)庫中的內(nèi)容:
???????
我們可以看到數(shù)據(jù)庫表中只有一條記錄,而且最后一位表示
count
的值的為
1
,這說明在
JMS
進(jìn)行消息接收拋出異常時我們的數(shù)據(jù)庫也回滾了。關(guān)于使用
JtaTransactionManager
進(jìn)行分布式事務(wù)管理的問題就說到這里了,有興趣的朋友可以自己試驗(yàn)一下。
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
