程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> ActiveMQ應用(1)-安裝及基本模式實例,activemq實例

ActiveMQ應用(1)-安裝及基本模式實例,activemq實例

編輯:JAVA綜合教程

ActiveMQ應用(1)-安裝及基本模式實例,activemq實例


0.下載地址

https://activemq.apache.org/download.html

 

1.解壓並啟動activemq服務(需根據系統的不同選擇不同的啟動文件)

/apache-activemq-5.13.1/bin/macosx/activemq start

 

2.登錄activemq服務器進行查看

地址:http://localhost:8161/

點擊[Manage ActiveMQ broker]登錄查看詳細數據,默認用戶名密碼admin/admin

 

 

3.創建eclipse項目

/apache-activemq-5.13.1/lib下倒入所需jar包

 

3.1 通用jms示例

public class Sender {
    private static final int SEND_NUMBER=5;
    
    public static void main(String[] args){
        ConnectionFactory connectionFactory;
        Connection connection =null;
        Session session;
        Destination destination;
        MessageProducer producer;
        
        connectionFactory=new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try{
            connection = connectionFactory.createConnection();
            connection.start();
            
            session=connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            
            destination=session.createQueue("JMeterQueue");
            
            producer=session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session,producer);
            session.commit();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            try{
                if(null!=connection){
                    connection.close();
                }
            }catch(Throwable ignore){
                
            }
        }
    }
    
    public static void sendMessage(Session session,MessageProducer producer) throws JMSException{
        for(int i=1;i<SEND_NUMBER;i++){
            TextMessage message=session.createTextMessage("ActiveMq send "+i);
            System.out.println("ActiveMq send "+i);
            producer.send(message);
        }
    }
}

 

public class Receiver {
    public static void main(String[] args){
        ConnectionFactory connectionFactory ;
        Connection connection=null;
        Session session;
        Destination destination;
        MessageConsumer consumer;
        
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try{
            connection = connectionFactory.createConnection();
            connection.start();
            session=connection.createSession(Boolean.TRUE, 
                    Session.AUTO_ACKNOWLEDGE);
            destination=session.createQueue("JMeterQueue");
            consumer=session.createConsumer(destination);
            while(true){
                TextMessage message=(TextMessage)consumer.receive(10000);
            
                if(null !=message){
                    System.out.println("Message receive "+ message.getText());
                }else{
                    break;
                }
            }
            session.commit();
            //session.commit 之後,Messages Enqueued 中的消息才會被被消費掉,Messages Dequeued 才會增加;
            //如果不commit,Messages Dequeued會一直為0,每次啟動receiver後都會受到所有未消費的消息
        }catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

 

3.2 p2p示例

public class QueueSender {
    
    // 發送次數
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目標,在ActiveMQ管理員控制台創建 
    public static final String DESTINATION = "mq.p2p.queue";
    
    public static void run() throws Exception {
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 創建鏈接工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通過工廠創建一個連接
            connection = factory.createQueueConnection();
            // 啟動連接
            connection.start();
            // 創建一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建一個消息隊列
            Queue queue = session.createQueue(DESTINATION);
            // 創建消息發送者
            javax.jms.QueueSender sender = session.createSender(queue);
            // 設置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, sender);
            // 提交會話
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "發送消息第" + (i + 1) + "條";
            Message msg=session.createTextMessage(message);
            sender.send(msg);
        }
    }
    
    public static void main(String[] args) throws Exception {
        QueueSender.run();
    }
}

 

public class QueueReceiver {
     
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目標,在ActiveMQ管理員控制台創建 
    public static final String TARGET = "mq.p2p.queue";
    
    public static void run() throws Exception {
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 創建鏈接工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通過工廠創建一個連接
            connection = factory.createQueueConnection();
            // 啟動連接
            connection.start();
            // 創建一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建一個消息隊列
            Queue queue = session.createQueue(TARGET);
            // 創建消息制作者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            
            receiver.setMessageListener(new MessageListener() { 
                public void onMessage(Message msg) { 
                    if (msg != null) {
                        TextMessage map = (TextMessage) msg;
                        try {
                            System.out.println(map.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                } 
            }); 
            // 休眠100ms再關閉
            Thread.sleep(1000 * 20); 
            
            // 提交會話
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        QueueReceiver.run();
    }
}

 

3.3 訂閱示例

public class TopicSender {
    
    // 發送次數
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目標,在ActiveMQ管理員控制台創建
    public static final String DESTINATION = "mq.topic";
    
    public static void run() throws Exception {
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 創建鏈接工廠
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通過工廠創建一個連接
            connection = factory.createTopicConnection();
            // 啟動連接
            connection.start();
            // 創建一個session會話
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建一個消息隊列
            Topic topic = session.createTopic(DESTINATION);
            // 創建消息發送者
            TopicPublisher publisher = session.createPublisher(topic);
            // 設置持久化模式
            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, publisher);
            // 提交會話
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "發送消息第" + (i + 1) + "條";
            TextMessage msg =session.createTextMessage(message);
            publisher.send(msg);
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopicSender.run();
    }
}

 

public class TopicReceiver {
     
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目標,在ActiveMQ管理員控制台創建
    public static final String TARGET = "mq.topic";
    
    public static void run() throws Exception {     
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 創建鏈接工廠
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通過工廠創建一個連接
            connection = factory.createTopicConnection();
            // 啟動連接
            connection.start();
            // 創建一個session會話
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建一個消息隊列
            Topic topic = session.createTopic(TARGET);
            // 創建消息制作者
            TopicSubscriber subscriber = session.createSubscriber(topic);

            subscriber.setMessageListener(new MessageListener() { 
                public void onMessage(Message msg) { 
                    System.out.println(msg);
                } 
            }); 
            // 休眠100ms再關閉
            Thread.sleep(1000 * 20); 
            
            // 提交會話
            session.commit();
            
        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopicReceiver.run();
    }
}

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved