Java音訊隊列-Spring整合ActiveMq。本站提示廣大學習愛好者:(Java音訊隊列-Spring整合ActiveMq)文章只能為提供參考,不一定能成為您想要的結果。以下是Java音訊隊列-Spring整合ActiveMq正文
首先和大家一同回憶一下Java 音訊服務,在我之前的博客《Java音訊隊列-JMS概述》中,我為大家剖析了:
然後在另一篇博客《Java音訊隊列-ActiveMq實戰》中,和大家一同從0到1的開啟了一個ActiveMq 的項目,在項目開發的進程中,我們對ActiveMq有了一定的理解:
在接上去的這篇博客中,我會和大家一同來整合Spring 和ActiveMq,這篇博文,我們基於Spring+JMS+ActiveMQ+Tomcat,完成了Point-To-Point的異步隊列音訊和PUB/SUB(發布/訂閱)模型,復雜實例,不包括任何業務。
2、目錄構造
IDE選擇了IDEA(建議大家運用),為了防止下載jar 的各種費事,底層運用maven搭建了一個項目,整合了Spring 和ActiveMq

2.2 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Crawl-Page</groupId>
<artifactId>Crawl-Page</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>Crawl-Page Maven Webapp</name>
<url>http://maven.apache.org</url>
<!-- 版本管理 -->
<properties>
<springframework>4.1.8.RELEASE</springframework>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!-- JSP相關 -->
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
<version>2.5</version>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${springframework}</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency>
<!-- 自用jar包,可以疏忽-->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
<finalName>Crawl-Page</finalName>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<port>8080</port>
<path>/</path>
</configuration>
</plugin>
</plugins>
</build>
</project>
View Code
由於這裡pom.xml 文件有點長,就不展開了。
我們可以看到其實依賴也就幾個,1、Spring 中心依賴 2、ActiveMq core和pool(這裡假如同窗們選擇導入jar,可以直接導入我們上一篇博客中說道的那個activemq-all 這個jar包)3、java servlet 相關依賴
這外面我們選擇的ActiveMq pool 的依賴版本會和之後的dtd 有關系,需求版本對應,所以同窗們等下配置activemq 文件的時分,需求留意dtd 版本選擇
2.3 web.xml
web.xml 也迥然不同,指定Spring 配置文件,springMvc 命名,編碼格式
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>Archetype Created Web Application</display-name>
<!-- 加載spring的配置文件,例如hibernate、jms等集成 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
classpath:applicationContext*.xml;
</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>springMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 處置編碼格式 -->
<filter>
<filter-name>characterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>
2.4 SpringMvc 和applicationContext.xml
這外面的SpringMVC沒什麼特別,有需求的同窗可以參考一下:

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
<!-- 啟用MVC注解 -->
<mvc:annotation-driven />
<!-- 指定Sping組件掃描的根本包途徑 -->
<context:component-scan base-package="com.Jayce" >
<!-- 這裡只掃描Controller,不可反復加載Service -->
<context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- JSP視圖解析器-->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/WEB-INF/views/" />
<property name="suffix" value=".jsp" />
<!-- 定義其解析視圖的order順序為1 -->
<property name="order" value="1" />
</bean>
</beans>
View Code
applicationContext.xml 次要運用來裝載Bean,我們項目中並沒有什麼特別的Java Bean,因而只用來指出包掃描途徑:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd">
<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
<!-- 配置掃描途徑 -->
<context:component-scan base-package="com.Jayce">
<!-- 只掃描Service,也可以添加Repostory,但是要把Controller掃除在外,Controller由spring-mvc.xml去加載 -->
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
</beans>
View Code
2.5 applicationContext-ActiveMQ.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
>
<context:component-scan base-package="com.Jayce" />
<mvc:annotation-driven />
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.148.128:61616"
userName="admin"
password="admin" />
<!-- 配置JMS銜接工長 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定義音訊隊列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置音訊隊列的名字 -->
<constructor-arg>
<value>Jaycekon</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接納音訊。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默許是false,此處顯示寫出false -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 配置音訊隊列監聽者(Queue) -->
<bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />
<!-- 顯示注入音訊監聽容器(Queue),配置銜接工廠,監聽的目的是demoQueueDestination,監聽器是下面定義的監聽器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
</beans>
這裡和大家解說一下這個配置文件,假如大家可以從上述配置文件中看懂,可以跳過。同窗們也可以在ActiveMQ官網中的檢查。
1、ActiveMq 中的DTD,我們在聲明相關配置之前,我們需求先導入ActiveMq 中的DTD,不然Spring 並不了解我們的標簽是什麼意思。
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd
我們在pom.xml 文件中有配置了activemq 的版本依賴我們這裡的版本,需求和依賴的版本一樣,不然是找不到相關的dtd
2、amq:connectionFactory:很直白的一個配置項,用於配置我們鏈接工廠的地址和用戶名密碼,這裡需求留意的是選擇tcp銜接而不是http銜接
3、jmsTemplate:比擬重要的一個配置,這裡指定了銜接工廠,默許音訊發送目的地,還有銜接時長,發布音訊的方式
3、項目構造
package com.Jayce.Service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
* Created by Administrator on 2017/1/5.
*/
@Service
public class ProducerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination,final String msg){
System.out.println(Thread.currentThread().getName()+" 向隊列"+destination.toString()+"發送音訊---------------------->"+msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
public void sendMessage(final String msg){
String destination = jmsTemplate.getDefaultDestinationName();
System.out.println(Thread.currentThread().getName()+" 向隊列"+destination+"發送音訊---------------------->"+msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
}
將音訊消費者做成一個服務,當我們需求發送音訊的時分,只需求調用ProducerService實例中的sendMessage 辦法就可以向默許目的發送一個音訊。
這裡提供了兩個發送方式,一個是發送到默許的目的地,一個是依據目的地發送音訊。
有興味的同窗可以和我上一篇文章《ActiveMq實戰》中ActiveMq 發送音訊的方式比照一下,可以發現一些不同。
3.2 ConsumerService
package com.Jayce.Service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/1/5.
*/
@Service
public class ConsumerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
public TextMessage receive(Destination destination){
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
try{
System.out.println("從隊列" + destination.toString() + "收到了音訊:\t"
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return textMessage;
}
}
由於我們項目中並沒有什麼業務,所以的話抵消息的處置也就是打印輸入。我們只需求調用jmsTemplate中的 receive 辦法,就可以從外面獲取到一條音訊。
再和我們上一篇博客比照一下,上一篇博客中,我們承受到信息之後需求手動確認事務,這樣ActiveMQ中才會確定這條音訊曾經被正確讀取了。而整合了Spring之後,事務將由Spring 來管理。
3.3 MessageController
package com.Jayce.Controller;
import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/1/5.
*/
@Controller
public class MessageController {
private Logger logger = LoggerFactory.getLogger(MessageController.class);
@Resource(name = "demoQueueDestination")
private Destination destination;
//隊列音訊消費者
@Resource(name = "producerService")
private ProducerService producer;
//隊列音訊消費者
@Resource(name = "consumerService")
private ConsumerService consumer;
@RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
@ResponseBody
public void send(String msg) {
logger.info(Thread.currentThread().getName()+"------------send to jms Start");
producer.sendMessage(msg);
logger.info(Thread.currentThread().getName()+"------------send to jms End");
}
@RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
@ResponseBody
public Object receive(){
logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
TextMessage tm = consumer.receive(destination);
logger.info(Thread.currentThread().getName()+"------------receive from jms End");
return tm;
}
}
控制層外面需求注入我們的消費者和消費者(實踐開發中,消費者和消費者一定不會在同一個項目中的,不然就音訊服務這個東西就沒有意義了)。
如今服務層和控制層都好了,接上去我們就停止一個復雜的測試
4、項目測試
先確定你的ActiveMQ服務曾經開啟。

4.2 啟動項目
項目運用了Tomcat 插件,防止了本地再下載Tomcat的費事,有需求的同窗可以運用一下。
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<port>8080</port>
<path>/</path>
</configuration>
</plugin>
</plugins>
4.3 發送音訊
這裡用了Chrome 的一個插件PostMan 有興味的同窗可以理解一下,在Chrome 拓展順序中可以找到,防止了後端的同窗去弄頁面!

我們發送了一個post 懇求之後,看一下服務器的效果:

我們可以看到,曾經向隊列發送了一條音訊。我們看一下ActiveMq如今的形態:

我們可以看到,一條音訊曾經成功發送到了ActiveMq中。
4.4 接納音訊
運用get懇求訪問服務器後台:

服務的輸入:

ActiveMq服務器形態:

我們可以看到,消費者曾經消費了一條信息,並且沒有斷開與ActiveMq之間的鏈接。
4.5 監聽器
在實踐項目中,我們很少會自己手動去獲取音訊,假如需求手動去獲取音訊,那就沒有必要運用到ActiveMq了,可以用一個Redis 就足夠了。
不能手動去獲取音訊,那麼我們就可以選擇運用一個監聽器來監聽能否有音訊抵達,這樣子可以很快的完成抵消息的處置。
4.5.1 applicationContext-ActiveMQ.xml 配置在下面的配置文件中,我們曾經默許的添加了這段監聽器的配置文件,假如同窗們不想運用這個監聽器,可以直接正文掉。
<!-- 配置音訊隊列監聽者(Queue) -->
<bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />
<!-- 顯示注入音訊監聽容器(Queue),配置銜接工廠,監聽的目的是demoQueueDestination,監聽器是下面定義的監聽器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
4.5.2 MessageListener
我們需求創立一個類完成MessageListener 接口:
package com.Jayce.Filter;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/1/5.
*/
public class QueueMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("QueueMessageListener監聽到了文本音訊:\t"
+ tm.getText());
//do something ...
} catch (JMSException e) {
e.printStackTrace();
}
}
}
完成接口的onMessage 辦法,我們將需求的業務操作在外面處理,這樣子,就完成了我們消費者-兩頭件-消費者,這樣一個解耦的操作了。
4.5.3 測試
和下面一樣,運用postMan 發送post懇求,我們可以看到控制台外面,音訊馬上就能打印出來:
再看看ActiveMQ服務器的形態:

我們可以看到,運用監聽器的效果,和手動接納音訊的效果是一樣的。
這樣子一整個項目上去,我們曾經成功的整合了Spring和ActiveMQ。
4.6 壓力測試
這裡其實也算不上什麼壓力測試,在配置pom.xml文件的時分,大家有看到一個 commons-httpclient 的依賴,接上去我們運用httpClient 不停的想服務器發送音訊,看一下服務器處理音訊的速度如何:
package com.Jaycekon.test;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Administrator on 2017/1/5.
*/
public class Client {
@Test
public void test() {
HttpClient httpClient = new HttpClient();
new Thread(new Sender(httpClient)).start();
}
}
class Sender implements Runnable {
public static AtomicInteger count = new AtomicInteger(0);
HttpClient httpClient;
public Sender(HttpClient client) {
httpClient = client;
}
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
post.addParameter("msg", "Hello world!");
httpClient.executeMethod(post);
System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());
} catch (IOException e) {
e.printStackTrace();
}
}
}
這外面用了HttpClient 來向服務器發送Post 懇求,然後計數輸入,有興味的同窗可以自己測試一下,可以多開幾個線程,這裡只開了一個線程。
5、項目源碼
github:https://github.com/jaycekon/Crawl-Page
有興味的同窗可以下載上去,該項目後續會持續整合Redis以及其他一些架構相關的技術