日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Spring整合JMS(四)——事務(wù)管理

系統(tǒng) 1924 0

?

Spring 提供了一個 JmsTransactionManager 用于對 JMS ConnectionFactory 做事務(wù)管理。這將允許 JMS 應(yīng)用利用 Spring 的事務(wù)管理特性。 JmsTransactionManager 在執(zhí)行本地資源事務(wù)管理時將從指定的 ConnectionFactory 綁定一個 ConnectionFactory/Session 這樣的配對到線程中。 JmsTemplate 會自動檢測這樣的事務(wù)資源,并對它們進(jìn)行相應(yīng)操作。

Java EE 環(huán)境中, ConnectionFactory 會池化 Connection Session ,這樣這些資源將會在整個事務(wù)中被有效地重復(fù)利用。在一個獨(dú)立的環(huán)境中,使用 Spring SingleConnectionFactory 時所有的事務(wù)將公用一個 Connection ,但是每個事務(wù)將保留自己獨(dú)立的 Session

JmsTemplate 可以利用 JtaTransactionManager 和能夠進(jìn)行分布式的 JMS ConnectionFactory 處理分布式事務(wù)。

?????? Spring 整合 JMS 的應(yīng)用中,如果我們要進(jìn)行本地的事務(wù)管理的話非常簡單,只需要在定義對應(yīng)的消息監(jiān)聽容器時指定其 sessionTransacted 屬性為 true ,如:

?

            <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
        <property name="sessionTransacted" value="true"/>
    </bean>

      

?

?????? 該屬性值默認(rèn)為 false ,這樣 JMS 在進(jìn)行消息監(jiān)聽的時候就會進(jìn)行事務(wù)控制,當(dāng)在接收消息時監(jiān)聽器執(zhí)行失敗時 JMS 就會對接收到的消息進(jìn)行回滾,對于 SessionAwareMessageListener 在接收到消息后發(fā)送一個返回消息時也處于同一事務(wù)下,但是對于其他操作如數(shù)據(jù)庫訪問等將不屬于該事務(wù)控制。

這里我們可以來做一個這樣的測試:我們?nèi)缟吓渲帽O(jiān)聽在 queueDestination 的消息監(jiān)聽容器的 sessionTransacted 屬性為 true ,然后把我們前面提到的消息監(jiān)聽器 ConsumerMessageListener 改成這樣:

?

        public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
            //這里我們知道生產(chǎn)者發(fā)送的就是一個純文本消息,所以這里可以直接進(jìn)行強(qiáng)制轉(zhuǎn)換,或者直接把onMessage方法的參數(shù)改成Message的子類TextMessage
            TextMessage textMsg = (TextMessage) message;
            System.out.println("接收到一個純文本消息。");
            try {
                System.out.println("消息內(nèi)容是:" + textMsg.getText());
                if (1 == 1) {
                    throw new RuntimeException("Error");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
    }
 
}

      

????????

我們可以看到在上述代碼中我們的 ConsumerMessageListener 在進(jìn)行消息接收的時候拋出了一個 RuntimeException ,根據(jù)我們上面說的,因?yàn)槲覀円呀?jīng)在對應(yīng)的監(jiān)聽容器上定義了其 sessionTransacted 屬性為 true ,所以當(dāng)這里拋出異常的時候 JMS 將對接收到的消息進(jìn)行回滾,即下次進(jìn)行消息接收的時候該消息仍然能夠被接收到。為了驗(yàn)證這一點(diǎn),我們先執(zhí)行一遍測試代碼,往 queueDestination 發(fā)送一個文本消息,這個時候 ConsumerMessageListener 在進(jìn)行接收的時候?qū)伋鲆粋€ RuntimeException ,已經(jīng)接收到的純文本消息將進(jìn)行回滾;接著我們?nèi)サ羯厦娲a中拋出異常的語句,即 ConsumerMessageListener 能夠正常的進(jìn)行消息接收,這個時候我們再運(yùn)行一次測試代碼,往 ConsumerMessageListener 監(jiān)聽的 queueDestination 發(fā)送一條消息。如果之前在接手時拋出了異常的那條消息已經(jīng)回滾了的話,那么這個時候?qū)⒛軌蚪邮盏絻蓷l消息,控制臺將輸出接收到的兩條消息的內(nèi)容。具體結(jié)果有興趣的朋友可以自己驗(yàn)證一下。

?????? 如果想接收消息和數(shù)據(jù)庫訪問處于同一事務(wù)中,那么我們就可以配置一個外部的事務(wù)管理同時配置一個支持外部事務(wù)管理的消息監(jiān)聽容器(如 DefaultMessageListenerContainer )。要配置這樣一個參與分布式事務(wù)管理的消息監(jiān)聽容器,我們可以配置一個 JtaTransactionManager ,當(dāng)然底層的 JMS ConnectionFactory 需要能夠支持分布式事務(wù)管理,并正確地注冊我們的 JtaTransactionManager 。這樣消息監(jiān)聽器進(jìn)行消息接收和對應(yīng)的數(shù)據(jù)庫訪問就會處于同一數(shù)據(jù)庫控制下,當(dāng)消息接收失敗或數(shù)據(jù)庫訪問失敗都會進(jìn)行事務(wù)回滾操作。

?

            <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>
    
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

      

????????

當(dāng)給消息監(jiān)聽容器指定了 transactionManager 時,消息監(jiān)聽容器將忽略 sessionTransacted 的值。 ?

?????? 關(guān)于使用 JtaTransactionManager 來管理上述分布式事務(wù),我們這里也可以來做一個試驗(yàn)。

?????? 首先:往 Spring 配置文件 applicationContext.xml 中添加如下配置:

???

            <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSource"/>
    </bean>
 
    <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/>
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
 
    <tx:annotation-driven transaction-manager="jtaTransactionManager"/>

      

??

?????? 我們可以看到,在這里我們引入了一個 jndi 數(shù)據(jù)源,定義了一個 JtaTransactionManager ,定義了 Spring 基于注解的聲明式事務(wù)管理,定義了一個 Spring 提供的進(jìn)行 Jdbc 操作的工具類 jdbcTemplate

?

?????? 接下來把我們的 ConsumerMessageListener 改為如下形式:

        public class ConsumerMessageListener implements MessageListener {
 
    @Autowired
    private TestDao testDao;
    
    private int count = 0;
    
    public void onMessage(Message message) {
            //這里我們知道生產(chǎn)者發(fā)送的就是一個純文本消息,所以這里可以直接進(jìn)行強(qiáng)制轉(zhuǎn)換,或者直接把onMessage方法的參數(shù)改成Message的子類TextMessage
            TextMessage textMsg = (TextMessage) message;
            System.out.println(new Date().toLocaleString() + "接收到一個純文本消息。");
            try {
                String text = textMsg.getText();
                System.out.println("消息內(nèi)容是:" + text);
                System.out.println("當(dāng)前count的值是:" + count);
                testDao.insert(text + count);
                if (count == 0) {
                    count ++;
                    throw new RuntimeException("Error! 出錯啦!");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
    }
 
}

      

?

?????? 我們可以看到,在 ConsumerMessageListener 中我們定義了一個實(shí)例變量 count ,其初始值為 0 ;在 onMessage 里面,我們可以看到我們把接收到的消息內(nèi)容作為參數(shù)調(diào)用了 testDao insert 方法;當(dāng) count 值為 0 ,也就是進(jìn)行第一次消息接收的時候會將 count 的值加 1 ,同時拋出一個運(yùn)行時異常。那么我們這里要測試的就是進(jìn)行第一次接收的時候 testDao 已經(jīng)把相關(guān)內(nèi)容插入數(shù)據(jù)庫了,接著在 onMessage 里面拋出了一個異常同時 count 1 ,我們預(yù)期的結(jié)果應(yīng)該是此時數(shù)據(jù)庫進(jìn)行回滾,同時 JMS 也回滾,這樣 JMS 將繼續(xù)嘗試接收該消息,此時同樣會調(diào)用 testDao insert 方法將內(nèi)容插入數(shù)據(jù)庫,再接著 count 已經(jīng)不為 0 了,所以此時將不再拋出異常, JMS 成功進(jìn)行消息的接收, testDao 也成功的將消息內(nèi)容插入到了數(shù)據(jù)庫。要證明這個預(yù)期我們除了看數(shù)據(jù)庫中插入的數(shù)據(jù)外,還可以看控制臺的輸出,正常情況控制臺將輸出兩次消息接收的內(nèi)容,且第一次時 count 0 ,第二次 count 1

?????? TestDao 是一個接口,其 TestDaoImpl insert 的方法實(shí)現(xiàn)如下: ?

?

            @Transactional(readOnly=false)
    public void insert(final String name) {
        
        jdbcTemplate.update("insert into test(name) values(?)", name);
 
    }

      

???????

這里我們使用支持 JtaTransactionManager Weblogic 來進(jìn)行測試,因?yàn)槭? Web 容器,所以我們這里定義了一個 Controller 來進(jìn)行消息的發(fā)送,具體代碼如下:

?

        @Controller
@RequestMapping("test")
public class TestController {
 
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;
    
    @Autowired
    private ProducerService producerService;
    
    @RequestMapping("first")
    public String first() {
        producerService.sendMessage(destination, "你好,現(xiàn)在是:" + new Date().toLocaleString());
        return "/test/first";
    }
    
}

      

??????

接下來就是啟用 Weblogic 服務(wù)器,進(jìn)入其控制臺,定義一個名叫“ jdbc/mysql ”的 JNDI 數(shù)據(jù)源,然后把該項(xiàng)目部署到 Weblogic 服務(wù)器上并進(jìn)行啟動。接下來我們就可以訪問 /test/first.do 訪問到上述 first 方法。之后控制臺會輸出如下信息:


Spring整合JMS(四)——事務(wù)管理
???????
我們可以看到當(dāng) count 0 時接收了一次,并隨后拋出了異常,之后 count 1 又接收了一次,這說明在 count 0 時拋出異常后我們的 JMS 進(jìn)行回滾了,那么我們的數(shù)據(jù)庫是否有進(jìn)行回滾呢?接著我們來看數(shù)據(jù)庫中的內(nèi)容:



???????
我們可以看到數(shù)據(jù)庫表中只有一條記錄,而且最后一位表示 count 的值的為 1 ,這說明在 JMS 進(jìn)行消息接收拋出異常時我們的數(shù)據(jù)庫也回滾了。關(guān)于使用 JtaTransactionManager 進(jìn)行分布式事務(wù)管理的問題就說到這里了,有興趣的朋友可以自己試驗(yàn)一下。

?

Spring整合JMS(四)——事務(wù)管理


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 黄陵县| 磐石市| 黄骅市| 泊头市| 榆社县| 揭阳市| 九龙县| 定结县| 顺昌县| 汤原县| 德安县| 古田县| 文水县| 丰台区| 广南县| 昭觉县| 会昌县| 格尔木市| 崇阳县| 固阳县| 吉林市| 淄博市| 安陆市| 阿克| 平阳县| 济阳县| 光泽县| 淮南市| 镇平县| 六枝特区| 宁国市| 海盐县| 青河县| 白河县| 银川市| 辉县市| 沭阳县| 博野县| 于田县| 湛江市| 嘉兴市|