0.下載地址
https://activemq.apache.org/download.html
1.解壓並啟動activemq服務(需根據系統的不同選擇不同的啟動文件)
/apache-activemq-5.13.1/bin/macosx/activemq start
2.登錄activemq服務器進行查看
地址:http://localhost:8161/
點擊[Manage ActiveMQ broker]登錄查看詳細數據,默認用戶名密碼admin/admin

3.創建eclipse項目
/apache-activemq-5.13.1/lib下倒入所需jar包

3.1 通用jms示例
public class Sender {
private static final int SEND_NUMBER=5;
public static void main(String[] args){
ConnectionFactory connectionFactory;
Connection connection =null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory=new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try{
connection = connectionFactory.createConnection();
connection.start();
session=connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue("JMeterQueue");
producer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session,producer);
session.commit();
}catch(Exception e){
e.printStackTrace();
}finally{
try{
if(null!=connection){
connection.close();
}
}catch(Throwable ignore){
}
}
}
public static void sendMessage(Session session,MessageProducer producer) throws JMSException{
for(int i=1;i<SEND_NUMBER;i++){
TextMessage message=session.createTextMessage("ActiveMq send "+i);
System.out.println("ActiveMq send "+i);
producer.send(message);
}
}
}
public class Receiver {
public static void main(String[] args){
ConnectionFactory connectionFactory ;
Connection connection=null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try{
connection = connectionFactory.createConnection();
connection.start();
session=connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue("JMeterQueue");
consumer=session.createConsumer(destination);
while(true){
TextMessage message=(TextMessage)consumer.receive(10000);
if(null !=message){
System.out.println("Message receive "+ message.getText());
}else{
break;
}
}
session.commit();
//session.commit 之後,Messages Enqueued 中的消息才會被被消費掉,Messages Dequeued 才會增加;
//如果不commit,Messages Dequeued會一直為0,每次啟動receiver後都會受到所有未消費的消息
}catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
3.2 p2p示例
public class QueueSender {
// 發送次數
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標,在ActiveMQ管理員控制台創建
public static final String DESTINATION = "mq.p2p.queue";
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 創建鏈接工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創建一個連接
connection = factory.createQueueConnection();
// 啟動連接
connection.start();
// 創建一個session會話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創建一個消息隊列
Queue queue = session.createQueue(DESTINATION);
// 創建消息發送者
javax.jms.QueueSender sender = session.createSender(queue);
// 設置持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "發送消息第" + (i + 1) + "條";
Message msg=session.createTextMessage(message);
sender.send(msg);
}
}
public static void main(String[] args) throws Exception {
QueueSender.run();
}
}
public class QueueReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標,在ActiveMQ管理員控制台創建
public static final String TARGET = "mq.p2p.queue";
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 創建鏈接工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創建一個連接
connection = factory.createQueueConnection();
// 啟動連接
connection.start();
// 創建一個session會話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創建一個消息隊列
Queue queue = session.createQueue(TARGET);
// 創建消息制作者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
TextMessage map = (TextMessage) msg;
try {
System.out.println(map.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100ms再關閉
Thread.sleep(1000 * 20);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueReceiver.run();
}
}
3.3 訂閱示例
public class TopicSender {
// 發送次數
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標,在ActiveMQ管理員控制台創建
public static final String DESTINATION = "mq.topic";
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 創建鏈接工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創建一個連接
connection = factory.createTopicConnection();
// 啟動連接
connection.start();
// 創建一個session會話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創建一個消息隊列
Topic topic = session.createTopic(DESTINATION);
// 創建消息發送者
TopicPublisher publisher = session.createPublisher(topic);
// 設置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "發送消息第" + (i + 1) + "條";
TextMessage msg =session.createTextMessage(message);
publisher.send(msg);
}
}
public static void main(String[] args) throws Exception {
TopicSender.run();
}
}
public class TopicReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標,在ActiveMQ管理員控制台創建
public static final String TARGET = "mq.topic";
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 創建鏈接工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創建一個連接
connection = factory.createTopicConnection();
// 啟動連接
connection.start();
// 創建一個session會話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創建一個消息隊列
Topic topic = session.createTopic(TARGET);
// 創建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
System.out.println(msg);
}
});
// 休眠100ms再關閉
Thread.sleep(1000 * 20);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver.run();
}
}