程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> rabbitMQ第五篇:Spring集成RabbitMQ,rabbitmq第五篇

rabbitMQ第五篇:Spring集成RabbitMQ,rabbitmq第五篇

編輯:JAVA綜合教程

rabbitMQ第五篇:Spring集成RabbitMQ,rabbitmq第五篇


   前面幾篇講解了如何使用rabbitMq,這一篇主要講解spring集成rabbitmq。

   首先引入配置文件org.springframework.amqp,如下

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.6.0.RELEASE</version>
        </dependency>

一:配置消費者和生成者公共部分

<rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"
                               port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}"
                               channel-cache-size="50"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定義消息隊列-->
    <rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/>
    <rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/>
    <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>
    <!--綁定隊列-->
    <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="spittle.alert.queue.1"></rabbit:binding>
            <rabbit:binding queue="spittle.alert.queue.2"></rabbit:binding>
            <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

二:配置生成者

<import resource="amqp-share.xml"/>
    <!--創建消息隊列模板-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
                     exchange="spittle.fanout" message-converter="jsonMessageConverter">
    </rabbit:template>
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>

三:生產者程序

public class Spittle implements Serializable {
    private Long id;
    private Spitter spitter;
    private String message;
    private Date postedTime;

    public Spittle(Long id, Spitter spitter, String message, Date postedTime) {
        this.id = id;
        this.spitter = spitter;
        this.message = message;
        this.postedTime = postedTime;
    }

    public Long getId() {
        return this.id;
    }

    public String getMessage() {
        return this.message;
    }

    public Date getPostedTime() {
        return this.postedTime;
    }

    public Spitter getSpitter() {
        return this.spitter;
    }
}
public class ProducerMain {
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml");
        AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate");
        for (int i = 0; i < 20; i++) {
            System.out.println("Sending message #" + i);
            Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date());
            template.convertAndSend(spittle);
            Thread.sleep(5000);
        }
        System.out.println("Done!");
    }
}

其中convertAndSend方法默認第一個參數是交換機名稱,第二個參數是路由名稱,第三個才是我們發送的數據,現在我們啟動程序,效果如下

第四個:消費者程序

首先編寫一個用於監聽生產者發送信息的代碼

/**
 * Created by Administrator on 2016/11/18.
 */
public class SpittleAlertHandler implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            String body=new String(message.getBody(),"UTF-8");
            System.out.println(body);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

一定要注意實現MessageListener,我們只需要獲取message的body即可,通過json來轉換我們需要的程序(比如我們可以發送一個map,map存放方法和實體,這樣我們可以通過反射來調用不同的程序來運行)。

下面我們配置消費者

 

<import resource="amqp-share.xml"/>
    <rabbit:listener-container connection-factory="connectionFactory">
      <rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/>
    </rabbit:listener-container>
    <bean id="spittleListener" class="com.lp.summary.rabbitmq.impl.SpittleAlertHandler"/>

其中spittleListener是監聽的程序,method是執行的方法,queues是我們監聽的隊列,多個隊列可以逗號隔開(因為我們采用的是分發,所以三個隊列獲取的消息是相同的,這裡為了簡便我放在一個監聽程序中了,其實我們可以寫三個消費者,每個消費者監聽一個隊列)

現在只需要啟動程序即可運行

public class ConsumerMain {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-consumer.xml");
    }
}

 當然direct跟上面的情況差不多,只不過這個是根據路由匹配,先把數據發送到交換機,然後綁定路由和隊列,通過交換機id和路由來找到隊列,下面是一些主要的配置

 <rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1"></rabbit:queue>
    <rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false"></rabbit:queue>
    <!--交換機定義-->
    <!--rabbit:direct-exchange:定義exchange模式為direct,
        意思就是消息與一個特定的路由鍵完全匹配,才會轉發。
        rabbit:binding:設置消息queue匹配的key-->
    <rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}">
        <rabbit:bindings>
            <rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/>
            <rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/>
      </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--spring template聲明-->
    <rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory"
    message-converter="jsonMessageConverter"></rabbit:template>
    <!--消息對象轉成成json-->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>

下面是消費者監聽配置

 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter"></rabbit:listener>
    </rabbit:listener-container>
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter"></rabbit:listener>
    </rabbit:listener-container>

下面是程序

 public static void main(String[] args) {
        ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml");
        MQProducer mqProducer=(MQProducer) context.getBean("mqProducer");
        mqProducer.sendDateToQueue("spring.test.queueKey1","Hello World spring.test.queueKey1");
        mqProducer.sendDateToQueue("spring.test.queueKey2","Hello World spring.test.queueKey2");
    }

實際情況可能需要我們去分離消費者和生成者的程序。當然spring還有負載均衡的配置,這裡就不多介紹了。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved