Java音訊隊列--ActiveMq 實戰。本站提示廣大學習愛好者:(Java音訊隊列--ActiveMq 實戰)文章只能為提供參考,不一定能成為您想要的結果。以下是Java音訊隊列--ActiveMq 實戰正文
ActiveMQ官網下載地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下停止開發。

下載完裝置包,解壓之後的目錄:

從它的目錄來說,還是很復雜的:
2、啟動ActiveMQ
進入到ActiveMQ 裝置目錄的Bin 目錄,linux 下輸出 ./activemq start 啟動activeMQ 服務。
輸出命令之後,會提示我們創立了一個進程IP 號,這時分闡明服務曾經成功啟動了。
ActiveMQ默許啟動時,啟動了內置的jetty服務器,提供一個用於監控ActiveMQ的admin使用。
admin:http://127.0.0.1:8161/admin/
我們在閱讀器翻開鏈接之後輸出賬號密碼(這裡和tomcat 服務器相似)
默許賬號:admin
密碼:admin

到這裡為止,ActiveMQ 服務端就啟動終了了。
ActiveMQ 在linux 下的終止命令是 ./activemq stop
3、創立一個ActiveMQ工程
項目目錄構造:

上述在官網下載ActiveMq 的時分,我們可以在目錄下看到一個jar包:

這個jar 包就是我們需求在項目中停止開發中運用到的相關依賴。
3.1 創立消費者
public class Producter {
//ActiveMq 的默許用戶名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默許登錄密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的鏈接地址
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
AtomicInteger count = new AtomicInteger(0);
//鏈接工廠
ConnectionFactory connectionFactory;
//鏈接對象
Connection connection;
//事務管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init(){
try {
//創立一個鏈接工廠
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//從工廠中創立一個鏈接
connection = connectionFactory.createConnection();
//開啟鏈接
connection.start();
//創立一個事務(這裡經過參數可以設置事務的級別)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//創立一個音訊隊列
Queue queue = session.createQueue(disname);
//音訊消費者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//創立一條音訊
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:我是大帥哥,我如今正在消費東西!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:我是大帥哥,我如今正在消費東西!,count:"+num);
//發送音訊
messageProducer.send(msg);
//提交事務
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.2 創立消費者
public class Comsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4、運轉ActiveMQ項目
4.1 消費者開端消費音訊
public class TestMq {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
TestMq testMq = new TestMq();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}
@Override
public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
運轉後果:
INFO | Successfully connected to tcp://localhost:61616 Thread-6productor:我是大帥哥,我如今正在消費東西!,count:0 Thread-4productor:我是大帥哥,我如今正在消費東西!,count:1 Thread-2productor:我是大帥哥,我如今正在消費東西!,count:3 Thread-5productor:我是大帥哥,我如今正在消費東西!,count:2 Thread-3productor:我是大帥哥,我如今正在消費東西!,count:4 Thread-6productor:我是大帥哥,我如今正在消費東西!,count:5 Thread-3productor:我是大帥哥,我如今正在消費東西!,count:6 Thread-5productor:我是大帥哥,我如今正在消費東西!,count:7 Thread-2productor:我是大帥哥,我如今正在消費東西!,count:8 Thread-4productor:我是大帥哥,我如今正在消費東西!,count:9 Thread-6productor:我是大帥哥,我如今正在消費東西!,count:10 Thread-3productor:我是大帥哥,我如今正在消費東西!,count:11 Thread-5productor:我是大帥哥,我如今正在消費東西!,count:12 Thread-2productor:我是大帥哥,我如今正在消費東西!,count:13 Thread-4productor:我是大帥哥,我如今正在消費東西!,count:14 Thread-6productor:我是大帥哥,我如今正在消費東西!,count:15 Thread-3productor:我是大帥哥,我如今正在消費東西!,count:16 Thread-5productor:我是大帥哥,我如今正在消費東西!,count:17 Thread-2productor:我是大帥哥,我如今正在消費東西!,count:18 Thread-4productor:我是大帥哥,我如今正在消費東西!,count:19

4.2 消費者開端消費音訊
public class TestConsumer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestConsumer testConsumer = new TestConsumer();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
運轉後果:
INFO | Successfully connected to tcp://localhost:61616 Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:4--->0 Thread-3: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我如今正在消費東西!,count:36--->1 Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:38--->2 Thread-5: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:37--->3 Thread-2: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:2--->4 Thread-3: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:40--->5 Thread-4: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:42--->6 Thread-5: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我如今正在消費東西!,count:41--->7 Thread-2: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:1--->8 Thread-3: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:44--->9 Thread-4: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我如今正在消費東西!,count:46--->10 Thread-5: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:45--->11 Thread-2: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:3--->12 Thread-3: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:48--->13 Thread-4: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:50--->14 Thread-5: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:49--->15 Thread-4: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:54--->16 Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:6--->17 Thread-3: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:52--->18 Thread-5: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:53--->19 Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:58--->20
檢查運轉後果,我們可以做ActiveMQ 服務端:http://127.0.0.1:8161/admin/ 外面的Queues 中檢查我們消費的音訊。

5、ActiveMQ的特性
關於JMS(Java 音訊服務) 的一些概述可以參考我的上一篇博客:http://www.cnblogs.com/jaycekon/p/6220200.html