程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 利用JMS建立消息傳遞系統

利用JMS建立消息傳遞系統

編輯:關於JAVA

在開始之前需要說明,為了理解本文,我們希望讀者擁有Java編程的相當堅實的基礎。我們將試圖盡可能簡單地解釋什麼是JMS,但是在你能夠建立自己的程序並正確地理解它們之前,你需要有一些使用JNDI開發實際應用程序的經驗。

JMS(Java消息服務)是一個消息交換標准,它允許使用J2EE應用程序組件建立、發送、接收和讀取消息。它假設分布式通訊擁有自由(free)的連接、是可靠的(reliable)和異步的(asynchronous)。

Exchange(交換)系統

消息交換反映了程序組件或應用程序之間的一種交互作用。消息交換系統是一種類似於下的系統:一個相似系統的客戶端可以發送和接收任何其它客戶端的消息。每個客戶端都並入系統的代理中,它提供了建立、發送、接收和讀取消息的可能。

交換系統使得分布式的交互操作成為可能。組件在目的地(Destination)發送消息,收件人也可以在相同的目的地中得到這個消息。發送者和收件人不一定是互相熟悉的。換句話說,它並沒有強迫發送者知道一些收件人的信息,也沒有強迫收件人知道某些發送者的信息。發送者和收件人只需要知道消息的格式以及要到達的目的地。在這種情形下,上述的系統不同於與它緊密相連的一些技術,例如遠程方法調用(RMI),它只要求開發人員了解RMI中的一些方法。

消息傳遞系統

消息傳遞系統是一種分布式的系統,是基於系統組件之間的異步消息交換。面向消息的中間件(Message-Oriented Middleware,MOM)就是這種產品,消息傳遞系統是在它的原理上建立的。

消息傳遞系統應用軟件不會直接地通訊(這與傳統的系統(基於RMI的)形成鮮明的對照),而需要依賴MOM的幫助。如果系統的某個組件希望給另一個組件發送消息,它將把給定的消息發送給MOM,接著MOM把該消息發送給收件人。

與傳統的基於RMI構建的系統相比,它有以下優點:

·發送消息的應用程序不需要期待回應,可以繼續執行。

· 沒有強迫發送消息的應用程序和特定消息的收件人在某個特定的時刻是激活的。如果消息的收件人不是激活的,MOM保證收件人一旦激活就立即收到該消息。

· 系統組件沒有直接地彼此相連。它們被分離開了,這就是在運行時刻能把組件從一個主機傳輸到另一個、卻不會中斷系統可用性的原因。

消息交換模型:點對點模型和發表-預訂模型

目前有兩種“基本的”消息交換模型:點對點模型和發表-預訂(pub-sub)模型。點對點模型應用於一個或多個組件(發送者)僅僅給一個組件收件人(接收者)發送消息的情形。這種模型是基於消息隊列概念的:發送者把消息發送到隊列中,接收者從該隊列中讀取消息。在點對點模型中,相同的隊列上可能存在多個接收者,但是MOM只給其中一個傳遞消息。給哪一個傳遞消息依賴於MOM的實現(implementation)。

發表-預訂模型應用於一個或多個組件(發表者)給一個或多個組件收件人(預訂者)發送消息的情形。這種特定的模型是基於消息主題(message topic)概念的:發表者把消息發送到某個主題中,而該特定主題的預訂者接收這些消息。

發表-預訂模型看起來更加“優雅”,但是很多發表-預訂模型不能保證消息按照發送的次序傳遞(它與點對點模型相反,點對點隊列實現了FIFO(先進先出)原理)。因此,消息的次序很重要(或者為了同步需要使用消息的頭和屬性部分)的時候,就應該避免采用發表-預訂模型。

Java消息服務(JMS)是使用面向消息中間件的一套Java API,它允許你的應用程序建立、發送、接收和讀取消息。這組程序集位於J2EE程序包結構樹上的javax.jms程序包中。JMS在很多MOM產品中得到了實現,其中iPlanet Message Queue、 IBM MQSeries、Progress Software SonicMQ、BEA WebLogic Server、Prism Technologies OpenFusion等最有名氣,也存在一些免費的實現。

JMS同時支持消息交換的兩種“基本的”模型。但是,其說明(specification)並沒有要求廠商同時實現兩種模型,盡管大多數JMS產品實現了點對點和發表-預訂模型。

JMS應用程序

JMS應用程序的主要部分是:

· 產生連接的部分和目的地

· 連接

· 對話

· 產生消息的部分

· 使用消息的部分

· 消息

產生連接的部分(ConnectionFactory)是負責建立JMS連接的對象。每個ConnectionFactory都是QueueConnectionFactory或TopicConnectionFactory的一個副本(copy)。MOM管理器建立特定的對象,並把它與JNDI樹關聯起來,這樣JMS客戶端就能夠使用標准的JNDI查找表得到ConnectionFactory的入口。在點對點的模型中,它使用了javax.jms.QueueConnectionFactory;在發表-預訂模型中,它使用的是javax.jms.TopicConnectionFactory。

目的地(Destination)——它是隊列或主題,這依賴於我們使用了下面哪種模型:javax.jms.Queue或javax.jms.Topic。

連接(Connection)——它可能是客戶端和服務應用之間的開放的TCP/IP。它可以被用於建立一個或少量的對話。在你的應用程序能夠接收消息前,你必須調用start()方法。為了暫停發送消息,你需要調用stop()。

對話(Session)——在JMS連接的幫助下建立的對象,被客戶端用作發送和接收消息。

產生消息的部分(MessageProducer)——對話建立的對象,被用於在目的地中發送消息。

使用消息的部分(MessageConsumer)——對話建立的對象,用於接收消息。為了同步接收消息,需要使用receive()方法。對於異步的情形,使用MessageListener和唯一的方法——onMessage()。在該方法中,在定義的消息到達後應該執行一定的操作。

消息(Message)——消息本身。JMS消息由三個部分組成:

· 消息頭

· 屬性(不是必要的)

· 消息體(不是必要的)

本文沒有解釋更多的細節信息,你可以在官方文檔中找到具體的細節。

什麼時候使用EJB 2.0

請注意下述各項內容:

在新的EJB 2.0規范中,與JMS的集成是通過建立新的EJB類型——消息驅動Bean(MDB)來實現的。MDB的特性是客戶端不會使用遠程接口(remote interface)與它通訊。其交互操作的唯一途徑是通過消息發送。MDB僅僅是消息監聽程序,是一個實現了javax.ejb.MessageDrivenBean和javax.jms.MessageListener接口的類,沒有任何其它的功能。其中的第一個接口只有兩個方法:setMessageDrivenContext() 和ejbRemove()。第二個接口只有一個方法:onMessage()。這個規范還需要一個不帶參數的ejbCreate()建立方法。客戶端不會直接與MDB通訊;它不會建立MDB。容器(container)自身決定什麼時候和需要多少個MDB來處理來自特定目的地的消息。MDB的主要缺陷是它只能從一個目的地接收到消息。

代碼示例

我們假設你已經安裝了J2SE(可以在http://java.sun.com/j2se/找到它),並且已經安裝並運行了JBoss應用程序服務器(可以在http://www.jboss.org/找到它)。

為了編譯下面的示例,你需要輸入:

javac -classpath .;C:\jboss-3.2.3\client\jbossall-client.jar SimpleSender.java

為了運行它,你需要輸入:

java -classpath .;C:\jboss-3.2.3\client\jbossall-client.jar SimpleSender

(在輸入時,請用你自己的JBoss目錄代替C:\jboss-3.2.3。同時還要記住,在你能夠運行這些示例前,JBoss服務器必須處於運行狀態。)

好了,現在我們開始建立示例發送程序和接收程序了:

// SimpleSender.java
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;
public class SimpleSender {
 public static void main(String argv[]) {
  new SimpleSender();
 }
 public SimpleSender() {
  try {
   QueueConnectionFactory myQConnFactory;
   Queue myQueue;
   Properties properties = new Properties();
   properties.put(Context.INITIAL_CONTEXT_FACTORY,
           "org.jnp.interfaces.NamingContextFactory");
   properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
   properties.put(Context.PROVIDER_URL, "localhost");
   Context ctx = new InitialContext(properties);
   myQConnFactory = (QueueConnectionFactory)ctx.lookup
            ("UIL2ConnectionFactory");
   myQueue = (Queue) ctx.lookup("queue/testQueue");
   ctx.bind ("SimpleSender", myQueue);
   QueueConnection con = myQConnFactory.createQueueConnection();
   QueueSession session = con.createQueueSession(false,
              Session.AUTO_ACKNOWLEDGE);
   TextMessage textMessage = session.createTextMessage();
   QueueSender sender = session.createSender(myQueue);
   con.start();
   for (int i=0; i<10; i++) {
    textMessage.setText("Hello World #" + i);
    sender.send(textMessage);
   }
   con.close();
   ctx.close();
  } catch(Exception e) {
   e.printStackTrace();
  }
 }
}

我們有兩種接收消息的途徑。第一種是使用javax.jms.QueueReceiver的receive()方法向隊列同步請求消息。這可能阻塞接收程序,直到它不接收消息為止,或者如何某個消息沒有在特定的時間間隔內到達而返回超時操作。第二種是一旦可以訪問消息了就異步接收消息,使用javax.jms.MessageListener調用onMessage()方法,它會處理消息的內容。

建立接收程序的很多步驟與建立發送程序的步驟類似:

// SyncReceiver.java
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;
public class SyncReceiver {
 public static void main(String argv[])
 {
  new SyncReceiver();
 }
 public SyncReceiver() {
  try {
   QueueConnectionFactory myQConnFactory;
   Queue myQueue;
   Properties properties = new Properties();
   properties.put(Context.INITIAL_CONTEXT_FACTORY,
          "org.jnp.interfaces.NamingContextFactory");
   properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
   properties.put(Context.PROVIDER_URL, "localhost");
   Context ctx = new InitialContext(properties);
   myQConnFactory = (QueueConnectionFactory)ctx.lookup("UIL2ConnectionFactory");
   myQueue = (Queue) ctx.lookup("queue/testQueue");
   ctx.bind("SyncReceiver", myQueue);
   QueueConnection con = myQConnFactory.createQueueConnection();
   QueueSession session = con.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
   QueueReceiver receiver = session.createReceiver(myQueue);
   con.start();
   for (int i=0; i<10; i++) {
    TextMessage textMessage = (TextMessage) receiver.receive();
    System.out.println("Got: " + textMessage.getText());
   }
   con.close();
   ctx.close();
  } catch(Exception e) {
   e.printStackTrace();
  }
 }
}

後記

數據庫是長時間數據存儲的理想途徑,但是用戶變更的臨時數據和用戶通知的存儲不是它們的強項。盡管被認為是效率低下的,但是數據庫查詢還是頻繁地用於現實中。所有請求都需要大量額外的“隱藏的”工作,如果大量的對象頻繁地訪問一個數據庫,可能會導致數據庫服務器和網絡的嚴重的負載。在大多數時候,請求不會返回任何數據,更糟的情形是,將已知的信息返回到處理過程中。

簡單地說,數據庫不是計劃用於頻繁的查詢或事件的。如果一旦數據或任何事件發生了改變就要立即作出反應,那麼更簡單和效率更高的途徑將是使用異步消息。

技術處理的應用程序(例如公文流通、認領的處理等)大多數使用了MQ(消息隊列),因為MQ模型與技術處理模型的統計特征類似,它承認“辦公室”的形式,在其中每個人都有自己接收和發送的郵件箱。

這類應用程序的典型特征是使用了大量的代理(代理可能是人、自動處理的操作、甚至於物理設備,例如打印機或設備),其中每個代理都會遇到一些小的性能難題,並按照業務邏輯把它傳遞到下一個代理。在建立這類應用程序的時候,開發的主要事務是對快速的性能的把握,同時要把握開發時失敗的缺乏。使用數據庫的MQ服務器簡化了處理應用程序中的技術處理的過程;這樣做更加靈活、容易擴展。

同樣,對於聚焦於事務的應用程序,使用MQ技術也是非常方便的。這涉及到處理財政金融和新服務領域的一些應用程序。在財政金融市場,操作必須很快地完成;用戶對於即時發生的改變感興趣。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved