StackDoc

StackDoc

當前位置: 主頁 > 服務器軟件 > Websphere >

使用WebSphere MQ Java 和JMS API 對消息進行分組

時間:2010-10-03 08:42來源:互聯網 作者:互聯網 點擊:
級別: 初級 David Currie ( david_currie@uk.ibm.com ), IT專家,IBM Software Services for WebSphere, IBM 2006

級別: 初級

David Currie ( david_currie@uk.ibm.com ), IT專家,IBM Software Services for WebSphere, IBM

2006 年2 月15 日

本文介紹了WebSphere MQ 中的消息組支持,以及如何利用該支持來提供邏輯消息排序和支持相關消息分組。並演示瞭如何使用WebSphere MQ Java 類進行分組操作,和如何使用JMS API 實現與此相同的功能。隨後,本文還給出了一個建議解決方案,並說明瞭如何在WebSphere Application Server 或其他J2EE 應用服務器中異步接收消息組時如何應用此解決方案。

消息組介紹

IBM? WebSphere? MQ 並不能始終保證發送和接收應用程序間的消息的正確順序。如果三條消息按照順序ABC發送,可能不會按照相同的順序到達(例如,如果中間網絡將消息分佈到集群中,然後再重新組合時)。但如果消息順序對應用程序的正常工作非常重要又該如何呢?假設有這樣的場景,消息B告知應用程序忽略前一個消息。如果消息以順序CBA送達,則序列的意義將完全不同了。

WebSphere MQ 通過消息分組來解決此問題。發送消息的應用程序可以指定其將消息ABC作為組的一部分發送。組中的每個消息都分配了一個序列號(從1 開始)。然後,接收應用程序可以指定希望按照此邏輯順序接收消息(與消息到達目的地的實際順序相對)。現在,即使消息BC首先到達,也不會將其立即傳遞給應用程序,因為它們的序列號不為1。

消息組還可用於另一個目的。有時候消息順序可能並不重要,但可能要求將一個消息集合一起處理(在空間上和時間上)。例如,假定有一個應用程序在每次向在線購物車添加了物品後都會發送一條消息。購物車中的物品可能需要一起處理,或許要將其聚合到單個訂單消息中。可以通過將消息放入到消息組中對此聚合進行管理。消息的接收者可以指定,在所有消息到達目的地之前,不希望接收組中的任何消息。在此場景中,在同一個位置接收所有消息也很重要。如果出於可伸縮性方面的原因,目的地有多個使用者,則務必將表示相同訂單中的物品的所有消息發送到相同的使用者,而消息組就可以確保滿足這一要求。

消息組的概念與消息段不同,後者表示大型消息發送時被拆分為較小的消息,應在接收時將其重新組裝為原始消息。消息組中的每個實體都是一個完整的消息。可以使用消息段對消息組內的消息進行拆分,但在本文中將不會考慮此選項。

使用WebSphere MQ Java API

異常處理 <br />為了使其更加清楚,從本文的代碼清單中刪除了異常處理邏輯。有關包含完整代碼的示例類,請參閱下載部分。

現在我們將討論使用WebSphere MQ Java? API 發送和接收消息組的實際操作。

發送消息組

下面的清單1給出了使用WebSphere MQ Java API將包含五條消息的組發送到隊列管理器QM_host上的隊列default所需的代碼:


清單1.使用WebSphere MQ Java API發送消息組

MQQueueManager queueManager = new MQQueueManager("QM_host");
MQQueue queue = queueManager.accessQueue("default", MQC.MQOO_OUTPUT);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQC.MQPMO_LOGICAL_ORDER;
for (int i = 1; i <= 5; i++) {
    MQMessage message = new MQMessage();
    message.format = "MQSTR";
    message.writeString("Message " + i);
    if (i < 5) {
        message.messageFlags = MQC.MQMF_MSG_IN_GROUP;
    } else {
        message.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;
    }
    queue.put(message, pmo);
}
queue.close();
queueManager.disconnect(); 

該示例首先連接到隊列管理器,並打開用於進行輸出的隊列句柄。從消息組的角度而言,要注意的第一個重要方面是向put消息選項添加了約束MQPMO_LOGICAL_ORDER 。此值告知隊列管理器,應用程序將把組中的每個消息按照序列順序放入隊列中,客戶機在處理任何後續消息前會將一個組中的所有消息放置到其上。

此代碼隨後循環五次,每次放置一個新消息。 (消息格式設置為MQSTR ,因此我們可能稍後接收到JMS文本消息類型的消息。)對於前四條消息,設置了消息標誌MQMF_MSG_IN_GROUP ,以指示相應的消息應屬於當前組。第五條消息設置了消息標誌MQMF_LAST_MSG_IN_GROUP ,以指示該消息是組中的最後一條消息。下一次放置具有MQMF_MSG_IN_GROUP標誌的消息時,將自動開始一個新的組。

示例最後關閉隊列句柄並從隊列管理器斷開,從而結束。運行此代碼時,通過使用WebSphere MQ Explorer 瀏覽組消息,可得到圖1 所示的結果:


圖1.在WebSphere MQ Explorer中瀏覽消息組
圖1. 在WebSphere MQ Explorer 中瀏覽消息組

為每條消息分配了相同的24 位組標識符,且分別具有從1 到5 的邏輯序列號。

使用MQPMO_LOGICAL_ORDER put消息選項純粹為了方便起見。應用程序可能不會使用此標誌,而採用顯式設置組標識符和序列號的方式。如果消息不按順序發出,或和其他消息組穿插著發送,則有必要採取後一種方式。仍然應設置消息標誌來指示消息屬於某個組以及是否為組中的最後一條消息。此機制的另一個可用場景為組中的消息分散在很長的時間內進行傳遞。可能出現這樣的情況,應用程序使用邏輯消息排序發送組中的開頭的若干消息,然後系統出現故障。當應用程序重新啟動時,可以繼續處理該消息組,能在不進行邏輯排序的情況下發送後面的消息,只要顯式地將組標識符設置為前面的消息所使用的組標識符並使用後續序列標識符即可。此時,可以仍然對後續消息使用邏輯消息排序。隊列管理器將隨後繼續使用相同的組標識符,並在每次遞增序列號。

可以將消息組與事務結合使用。如果第一個消息放置在事務下,則必須將所有使用相同隊列句柄的所有其他消息都放置於事務下。不過,每個消息並不一定要在相同的事務中。

接收消息組

我們已經以組的形式發送了消息,接下來我們希望採用相同的順序接收這些消息。下面的清單2 給出瞭如何使用WebSphere MQ Java API 完成此任務的示例:


清單2.使用WebSphere MQ Java API接收消息組

MQQueueManager queueManager = new MQQueueManager("QM_host");
MQQueue queue = queueManager.accessQueue("default", MQC.MQOO_INPUT_AS_Q_DEF);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = MQC.MQGMO_LOGICAL_ORDER | MQC.MQGMO_ALL_MSGS_AVAILABLE;
gmo.matchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
do {
    queue.get(message, gmo);
    int dataLength = retrievedMessage.getDataLength();
    System.out.println(message.readStringOfCharLength(dataLength));
    gmo.matchOptions = MQC.MQMO_MATCH_GROUP_ID;
} while (gmo.groupStatus != MQC.MQGS_LAST_MSG_IN_GROUP);
queue.close();
queueManager.disconnect(); 

和前面一樣,該代碼首先連接到隊列管理器,並打開隊列句柄,但這次是為了使用缺省隊列定義接收消息。我們指定兩個get消息選擇: MQGMO_LOGICAL_ORDER指示我們希望按照邏輯順序接收消息,即,應該首先接收序列號為1的消息,然後是序列號為2的消息,依此類推。第二個選項MQGMO_ALL_MSGS_AVAILABLE指示在組中的所有消息可用前,我們不希望接收其中的任何消息。此選項可防止在開始處理組中的消息時卻發現後續消息尚未發送或尚未到達的情況。

對於第一個get ,我們指定不需要任何匹配選項——準備接收任何組中的第一條消息。對於後續迭代,我們均指定MQMO_MATCH_GROUP_ID選項,以指示我們只希望接收具有匹配組標識符的消息。我們將為每個迭代使用相同的消息對象,因此,對於第二個get,將包含所接收到的第一條消息的組標識符。每個get 操作都將更新get 消息選項的組狀態字段。當設置為MQGS_LAST_MSG_IN_GROUP時,我們就知道已經接收到了組中的所有消息。

和前面的清單中一樣,請確保在完成時進行清理工作,即關閉隊列句柄並從隊列管理器斷開。

使用WebSphere MQ JMS API

規範版本 <br />本文中的JMS示例使用來自JMS 1.1的統一域接口。不過,可以對其進行重新編寫,以使用早期WebSphere MQ JMS 版本中提供的點到點或發布/訂閱接口。類似地,可以使用EJB 2.0部署描述符和JMS 1.0.2b接口來在J2EE 1.3應用服務器中使用下載部分提供的MDB示例。

此時,您可能會問,為什麼這些示例都使用WebSphere MQ Java API 在這個標準盛行的時代,我們是不是應該使用Java Message Service (JMS) API 不過,對於大部分標準規範,JMS 代表了消息傳遞系統所支持的功能的最低要求。因此,並非WebSphere MQ 所支持的所有行為都可通過此API 進行表示,而消息組正是其中之一。 JMS規範確實定義了兩個分別名為JMSXGroupIDJMSXSeqNum的屬性,並指定這兩個屬性分別表示消息所屬的組的標識符和在該組中的序列號。不過,JMS 規範未提供任何使用這些屬性的支持。不過,這並非十分絕對——通過採用一些補救方法,仍然可以通過使用這些屬性來復現現有行為。

發送消息組

首先,讓我們看看發送應用程序。正如上面提到的,put消息選項MQPMO_LOGICAL_ORDER僅是一個隊列管理器指令,用於自動分配消息組標識符和序列號。下面的清單3 演示瞭如何在JMS API 缺少此選項的情況下顯式地設置這些屬性。


清單3.使用WebSphere MQ JMS API發送消息組

MQConnectionFactory factory = new MQConnectionFactory();
factory.setQueueManager("QM_host")
MQQueue destination = new MQQueue("default");
destination.setTargetClient(JMSC.MQJMS_CLIENT_NONJMS_MQ);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
String groupId = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
for (int i = 1; i <= 5; i++) {
    TextMessage message = session.createTextMessage();
    message.setStringProperty("JMSXGroupID", groupId);
    message.setIntProperty("JMSXGroupSeq", i);
    if (i == 5) {
        message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
    }
    message.setText("Message " + i);
    producer.send(message);
}
connection.close(); 

該示例首先採用編程的方式構造一個連接工廠和目的地。這些被管理的對像也可以從存儲庫(如Java Naming and Directory Interface——JNDI)獲得。接下來,我們創建用於發送消息的常見JMS構件,然後採用24字節的BigInteger隨機值,並將其轉換為十六進製字符串,從而生成組標識符。對於消息標識符,WebSphere MQ JMS API要求組標識符帶有字符串ID:作為前綴

然後,該代碼進行迭代,以發送上述五條消息。組標識符設置為JMSXGroupId字符串屬性,序列號設置為JMSXGroupSeq整數屬性。 API 將假定設置了組標識符,且消息屬於組的一部分。因此,所剩下的部分就是如何指示組中的最後一條消息,我們通過將Boolean型屬性JMS_IBM_Last_Msg_In_Group設置為True來進行指示。

如果運行此代碼,然後使用WebSphere MQ Explorer 瀏覽結果,將會看到消息部署描述符屬性已像前面一樣進行了設置。我們現在應該可以運行最初的WebSphere MQ Java 接收程序來拾取此消息組。為此,我們需和上面的清單3中一樣,將目標客戶機指定為MQJMS_CLIENT_NONJMS_MQ ,從而確保不會向消息正文添加RFH2 Header(此Header可能會給非JMS客戶機造成混淆)。

接收消息組

不過,使用JMS 復現接收消息組的行為並不像這樣簡單。使用消息選擇器進行邏輯排序相當簡單。首先,我們使用選擇器來匹配任何序列號為1 的消息,獲得此消息後,我們將確定其所屬的組。然後,我們將設置第二個選擇器,以執行序列號2 和相應的組標識符。我們將繼續遞增序列號,直到收到一條設置了JMS_IBM_Last_Msg_In_Group屬性的消息為止。其中難點在於如何復現MQGMO_ALL_MSGS_AVAILABLE選項的行為。清單4 給出了一個可能的解決方案:


清單4.使用WebSphere MQ JMS API接收消息組

MQConnectionFactory factory = new MQConnectionFactory(); factory.setQueueManager("QM_host") MQQueue destination = new MQQueue("default"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true , Session.AUTO_ACKNOWLEDGE); MessageConsumer lastMessageConsumer = session.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE"); TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait(); lastMessageConsumer.close(); if (lastMessage != null) { int groupSize = lastMessage.getIntProperty("JMSXGroupSeq"); String groupId = lastMessage.getStringProperty("JMSXGroupID"); boolean failed = false; for (int i = 1; (i < groupSize) && !failed; i++) { MessageConsumer consumer = session .createConsumer(destination, "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i); TextMessage message = (TextMessage)consumer.receiveNoWait(); if (message != null) { System.out.println(message .getText()); } else { failed = true; } consumer.close(); } if (failed) { session.rollback(); } else { System.out.println(lastMessage.getText()); session. commit(); } } connection.close(); 

和前面一樣,我們首先創建使用消息所必需的所有JMS 資源。此時,我們必須啟動連接;否則就完全不能接收到任何消息。此解決方案的關鍵是,我們將嘗試首先接收組中的最後一條消息。為此,我們要創建一個具有JMS_IBM_Last_Msg_In_Group=TRUE消息選擇器的使用者。如果消息傳遞拓撲無法保證順序,則無法無法百分之百地確定組中所有其他消息已經達到。稍後我們將討論如何處理消息未全部達到的情況。

如果接收到了組中的最後一條消息,則可以獲取組標識符,並使用其序列號來確定組的大小。有了此信息後,我們可以進行迭代,嘗試從第一個序列號開始接收組中的所有其他消息。為此,我們將在每個迭代上設置一個新使用者,以根據組標識符和所需的序列號進行選擇。現在,如果由於某種原因而導致消息尚未達到,我們將從receiveNoWait方法得到null 。此時,我們將設置failed標誌,該標誌將導致退出循環。

回頭看一下我們創建JMS 會話的代碼行。您將看到,與前一個示例代碼不同,第一個參數設置為true ,以指示此會話上執行的發送和接收操作應作為本地事務的一部分執行。這意味著,如果由於尚未接收到某條消息而已經設置了failed標誌,則可以回滾事務,並將組中的所有其他消息返回給隊列。如果成功接收到了所有消息,則必須記得提交事務。否則,當關閉連接時,事務將會被回滾。

不過,在我們重複地回滾某個不完整的組時,目的地上可能有其他完整的組可用(但尚未達到進行處理的位置)。這個問題可以通過將不完整的組臨時從消息選擇器中排除得到解決,也可以將該組複製到內存中或持久性存儲中,以便稍後完成。下一部分將討論如何在一個特定的環境——應用服務器——中處理此問題。

在J2EE應用服務器中接收消息組

上面的清單4演示瞭如何使用WebSphere MQ JMS API接收消息組。但在應用服務器環境中,消息接收通常是通過消息驅動Bean(Message-driven Bean,MDB)執行的。如何對我們的方法進行調整,才能使其在此情況下也能正常工作呢?仍然可以為MDB 配置消息選擇器,但將由管理員採用靜態方式定義。因此,我們將使用JMS_IBM_Last_Msg_In_Group=TRUE配置此選擇器,以便使MDB始終為組中的最後一條消息。我們需要首先作為事務的一部分接收此消息,因此MDB也應使用事務屬性RequiresNew配置為使用容器管理事務(Container Managed Transactions,CMT)。以下是我們的MDB的onMessage方法的代碼:


清單5.用於接收消息組的消息驅動Bean

public void onMessage(Message lastMessage) {
    InitialContext context = new InitialContext();
    ConnectionFactory factory = 
        (ConnectionFactory) context.lookup("java:comp/env/jms/factory");
    Destination destination = 
        (Destination) context.lookup("java:comp/env/jms/destination");
    Connection connection = factory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq");
    String groupId = lastMessage.getStringProperty("JMSXGroupID");
    
    boolean failed = false;
    
    for (int i = 1; (i < groupSize) && !failed; i++) {
    
        MessageConsumer consumer = session.createConsumer(destination,
            "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i);
        TextMessage message = (TextMessage) consumer.receiveNoWait();
    
        if (message != null) {
            System.out.println(message.getText());
        } else {
            failed = true;
        }
    
        consumer.close();
    
    }
    
    if (failed) {
        _context.setRollbackOnly();
        Thread.sleep(5000);
    } else {
        System.out.println(((TextMessage) lastMessage).getText());
    }
    
    connection.close();
} 

現在,示例採用適合J2EE 環境的方式從JNDI 獲取連接工廠和目的地。創建JMS 會話時創建的事務參數此時將被忽略——所有消息都將作為容器所啟動的全局事務的一部分進行接收。如果接收其中的某個消息失敗,我們會標記全局事務進行回滾,然後等候5 秒。由於在方法退出前,事務不會實際進行回滾,已接收的消息將在目的地上處於鎖定狀態,從而防止其他MDB 實例嘗試接收下這個不完整的組。這些其他實例可以轉而嘗試從目的地接收其他組。當該方法最終退出時,事務將回滾,且可再次對這些消息進行接收(此時可能整個組已經全部達到)。

在此環境中,請嘗試對試圖接收組的次數進行限制,因為每次回滾事務,被接收的消息的消息交付計數已遞增過一次。如果重複調用上述代碼,此計數可能會達到目的地上配置的擱置閾值,此時這些消息就將重新進行排隊,暫時不再能進行接收。

如果成功接收了整個消息組,則方法將完成,容器將提交事務,從而從目的地中刪除這些消息。

結束語

本文描述了消息組的概念以及為何需要使用消息組的原因。我們給出的示例代碼演示瞭如何使用WebSphere MQ Java 和JMS API 發送和接收消息組,以及如何使用MDB 來在J2EE 應用服務器中檢索消息組。雖然這些示例進行了簡化,但也應該能幫助您了解如何在這些環境中處理消息組。






回頁首


下載

描述名字大小下載方法
Code samples in zip format msggroups.zip 17 KB FTP | HTTP
關於下載方法的信息


參考資料



關於作者

David Currie 的照片

David Currie是位於英國的IBM Hursley Software Lab的IBM Software Services for WebSphere團隊的成員。在加入該團隊前,他在WebSphere Application Server Development 工作,工作重點是WebSphere MQ 之類的消息傳遞提供程序和應用服務器的集成。您可以通過david_currie@uk.ibm.com與David聯繫。




頂一下
(0)
0%
踩一下
(0)
0%
------分隔線----------------------------
發表評論
請自覺遵守互聯網相關的政策法規,嚴禁發布色情、暴力、反動的言論。
評價:
表情:
驗證碼:點擊我更換圖片
欄目列表
推薦內容