基於上篇文章"基於Tomcat + JNDI + ActiveMQ實現JMS的點對點消息傳送"很容易就可以編寫一個發布/訂閱消息傳送例子,相關環境准備與該篇文章基本類似,主要的區別如下。
配置連接工廠和話題
<Resource name="topic/connectionFactory" auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"
brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" />
<Resource name="topic/topic0"
auth="Container"
type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="TestTopic" />
新建一個發布者Servlet
package pubSub;
import java.io.IOException;
import java.io.PrintWriter;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicPublisher;
import javax.jms.DeliveryMode;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
/**
* Servlet implementation class JMSTest
*/
@WebServlet("/Publish")
public class Publisher extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public Publisher() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();
try {
// get the initial context
InitialContext ctx = new InitialContext();
// lookup the topic object
Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");
// lookup the topic connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
.lookup("java:comp/env/topic/connectionFactory");
// create a topic connection
TopicConnection topicConn = connFactory.createTopicConnection();
// create a topic session
TopicSession topicSession = topicConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// create a topic publisher
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// create the "Hello World" message
TextMessage message = topicSession.createTextMessage();
message.setText("Hello World");
// publish the messages
topicPublisher.publish(message);
// print what we did
out.write("Message published: " + message.getText());
// close the topic connection
topicConn.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
}
}
新建一個訂閱者Servlet
package pubSub;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class Receive
*/
@WebServlet("/Subscribe")
public class Subscriber extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public Subscriber() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();
try {
// get the initial context
InitialContext ctx = new InitialContext();
// lookup the topic object
Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");
// lookup the topic connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
.lookup("java:comp/env/topic/connectionFactory");
// create a topic connection
TopicConnection topicConn = connFactory.createTopicConnection();
// create a topic session
TopicSession topicSession = topicConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// create a topic subscriber
TopicSubscriber topicSubscriber = topicSession
.createSubscriber(topic);
// start the connection
topicConn.start();
// receive the message
TextMessage message = (TextMessage) topicSubscriber.receive();
// print the message
out.write("Message received: " + message.getText());
// close the topic connection
topicConn.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
}
}
運行Web工程,分別打開多個標簽訪問訂閱servlet,然後訪問發布servlet,結果如下:



在訂閱者訂閱消息的時候,一開始沒接收到消息,一旦發布者發布消息後,訂閱者馬上收到消息。
代碼參考自:http://howtodoinjava.com/jms/jms-publish-subscribe-message-example/