程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> apache ActiveMQ之初體驗

apache ActiveMQ之初體驗

編輯:C++入門知識

一. 開篇語

繼上一篇weblogic中使用jms發送和接受消息的文章後, 本文使用apache的一個開源組件ActiveMQ接著探討JMS的話題, 本篇只是ActiveMQ的一個入門的例子, 希望對您有所幫助.


二. ActiveMQ

1. ActiveMQ簡介:

ActiveMQ是Apache的一個能力強勁的開源消息總線, 它完全支持JMS1.1和JavaEE1.4規范的JMS Provider實現.


2. ActiveMQ特性:

1.) 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.) 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
3.) 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支持Spring2.0的特性
4.) 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resourceadaptors的配置,

可以讓ActiveMQ可以自動的部署到任何兼容J2EE1.4商業服務器上
5.) 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.) 支持通過JDBC和journal提供高速的消息持久化
7.) 從設計上保證了高性能的集群,客戶端-服務器,點對點
8.) 支持Ajax
9.) 支持與Axis的整合
10.) 可以很容易得調用內嵌JMS provider,進行測試


3. 環境准備:

1.) 下載ActiveMQ:

http://activemq.apache.org/download.html, 我下載的是apache-activemq-5.2.0

2.) 運行ActiveMQ server

解壓縮下載好的文件, 雙擊bin/activemq.bat 啟動server, ActiveMQ內置了jetty服務器, 默認使用TCP連接端口為61616.

ActiveMQ提供一個用於監控ActiveMQ的admin應用: http://127.0.0.1:8161/admin

3.) 在Eclipse中建立Java工程, 並導入activemq-all-5.2.0.jar包

4.) 新建兩個Java類: 消息生產者MsgSender和消息消費者MsgReceiver


4. 代碼測試(P2P):

1.) 消息生產者: MsgSender

/**
 * Message Provider
 */
public class MsgSender {
	
	// ConnectionFactory: use to create JMS connection
	private static ConnectionFactory connectionFactory;

	// Connection: connect message provider and JMS server
	private static Connection connection;

	// Session: a message send or receive thread
	private static Session session;

	// Destination: use to sign the message type
	private static Destination destination;

	// MessageProducer:sender
	private static MessageProducer messageProducer;

	/**
	 * init the JMS object
	 */
	public static void init() throws Exception {
		// use ActiveMQ to to create connection factory.
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER, 
				ActiveMQConnection.DEFAULT_PASSWORD, 
				"tcp://localhost:61616");

		// get the connection from connection factory
		connection = connectionFactory.createConnection();
		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue("myQueue");
		messageProducer = session.createProducer(destination);
		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

		connection.start();
	}

	/**
	 * send activeMq message
	 */
	public static void sendMessage() throws Exception {
		for (int i = 1; i <= 5; i++) {
			TextMessage message = session.createTextMessage("ActiveMq message " + i);
			System.out.println("send:" + "ActiveMq message " + i);
			messageProducer.send(message);
		}
		session.commit();
	}
	
	/**
	 * release resource
	 */
	public static void release() throws Exception {
		messageProducer.close();
		session.close();
		connection.close();
	}

	/**
	 * main method
	 */
	public static void main(String[] args) throws Exception {
		init();
		sendMessage();
		release();
	}
}

2.) 消息消費者: MsgReceiver

/**
 * Message Consumer
 */
public class MsgReceiver {
	// ConnectionFactory: use to create JMS connection
	private static ConnectionFactory connectionFactory;

	// Connection: connect message provider and JMS server
	private static Connection connection;

	// Session: a message send or receive thread
	private static Session session;

	// use to sign the message type
	private static Destination destination;

	// MessageConsumer: receiver
	private static MessageConsumer messageConsumer;

	/**
	 * init the JMS object
	 */
	public static void init() throws Exception {
		// use ActiveMQ to to create connection factory.
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER, 
				ActiveMQConnection.DEFAULT_PASSWORD, 
				"tcp://localhost:61616");

		// get the connection from connection factory
		connection = connectionFactory.createConnection();
		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue("myQueue");
		messageConsumer = session.createConsumer(destination);
		
		connection.start();
	}

	/**
	 * receive activeMq message
	 */
	public static void receiveMessage() throws Exception {
		while (true) {
			TextMessage message = (TextMessage) messageConsumer.receive();
			if (message != null) {
				System.out.println("receive: " + message.getText());
			} else {
				break;
			}
		}
	}

	/**
	 * release resource
	 */
	public static void release() throws Exception {
		messageConsumer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * main method
	 */
	public static void main(String[] args) throws Exception {
		init();
		receiveMessage();
		release();
	}
}


  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved