从不知道到了解—RabbitMQ 基础概念及 Spring 的配置和使用 – 热爱改变生活
我的GitHub GitHub |     登录
  • If you can't fly, then run; if you can't run, then walk; if you can't walk, then crawl
  • but whatever you do, you have to keep moving forward。
  • “你骗得了我有什么用,这是你自己的人生”
  • 曾有伤心之地,入梦如听 此歌

从不知道到了解—RabbitMQ 基础概念及 Spring 的配置和使用

后端开发 sinvader 4723℃ 0评论

序言

你在系统中是否写过这样的接口:客户端访问服务器,服务器进行了大量逻辑/耗时操作之后,才能将结果返回给客户端,而这时,客户端的连接或许已经因为超时而关闭了。
为了能够及时的给客户端返回数据, 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。RabbitMQ 用 Erlang 语言编写,支持多种消息协议,消息队列,传送确认,可以将消息灵活的路由到队列,并支持多种交换类型。同时也可部署为负载均衡的集群,实现高可用性和吞吐量,联跨多个可用性区域和地区。RabbitMQ 支持 Java, .NET, PHP, Python, JavaScript, Ruby, Go 等多种语言,方便开发者使用。

基础介绍

在使用 RabbitMQ 之前,需要了解一些 RabbitMQ 的基础知识,这些基础知识最好还是要了解的,否则在面对 Spring 中配置文件的时候会有很多疑惑。

Queue

MQ 就是 Message Queuing,Queue 就是队列了。在 RabbitMQ 中,Queue 是最主要的地方,Message Queuing 就是存储 Message 的队列。
生产者 Producer(以下简写为 p)将消息投递到 Message Queuing(以下简写为 q)中,消费者(以下简写为 c)从 queue 中获取消息进行处理。
rabbitMq_queue
如上图中所画(图中缺少 exchange,下面会说到)的那样,多个 p 将消息交给了 q,然后就不关心系统怎么处理这些消息了。而 q 在收到这些消息后,会分发给在它这里注册了的 c,让他们去分别处理。每一条消息只会分发给一个 c,而不是每一个 c 接受到所有的消息然后全部去处理。

Producer

Producer 负责产生消息并将消息发送给 Queue,它不关心消息怎么处理,只把消息按照指定的规则(下面会说)发送出去。

Consumer

系统在启动的时候,会将所有的消费者 Consumer 注册到 RabbitMQ 中并对指定的 Queue(有规则,下面会说)进行监听,如果 Queue 中产生了数据,queue 会将消息平摊发送给所有注册到当前 quene 中的 consumer 中,由 consumer 进行处理。

Exchange

Exchange 在 RabbitMQ 中是一个很重要的组件。
上面我们说 Producer 负责产生消息并将消息发送给 Queue,实际上 Producer 并不直接连接 Queue,而是连接了 Exchange,由 Exchange 将消息路由到指定的 Queue 中。也就是在 producer 和 queue 中间插入了一个 Exchange 来分发消息给 queue。
ExchangeType、binding 和 ExchangeName 三者确定了一个 Exchange,ExchangeName 确定了这个 Exchange 的名字,ExchangeType 决定了这是一个什么类型的 Exchange,binding 决定了它以什么规则连接一个 queue。

  • ExchangeName 标记了这个 Exchange,在 producer 发送消息的时候需要指定,消息会发送到指定 ExchangeName 的 Exchange 中。比如说你去参加婚礼宴会,主人之前告诉你到了酒店去大厅找服务员 A,她会带你到包厢。这里的“服务员 A”这个名字就是 ExchangeName
  • binding(其实是一个字符串或者说一种特别的正则)会将 Exchange 和 Queue 联系起来,而 Exchange 路由到 Queue 的规则就是这个 binding/binding-key。比如说你去参加一个婚宴,请帖写着 302 包厢,那服务员就会领着你到 302 包厢。在这个例子中,服务员就是 Exchange,包厢是 queue,而服务员脑中这个“根据你的请柬领你到哪个包厢”的规则就是 binding。而你的请柬是 routing-key,这个后面说
  • ExchangeType 是 Exchange 的类型,这决定了这个 Exchange 会执行哪种规则 (binding) 将消息路由到不同的 queue。ExchangeType 分为多种,其中最为常用的就是 direct,fanout 和 topic。还是比如你去参加婚宴,婚宴上可能有很多服务员。
    • 服务员 A 是个耿直 boy,你请柬上写着哪个包厢他就领你去哪个包厢,也就是你的请柬上的“302 包厢”和他脑中“302 包厢(302 包厢在 3 楼第二个包厢)”是相等的,那他就领着你去了,这个服务员就叫“direct”。
    • 而服务员 B 他负责领路到 4 楼的 401、402 包厢,另外他是个呆子但是会一点魔法,不管是谁找到他,他都会念一句魔法把你复制两份(有几个包厢就复制几份),然后把你分别领到他负责的包厢去,401、402 包厢每一个都有一个你,这个服务员是“fanout”,你在这个例子中就是消息(意即如果消息发送给 type 是 fanout 的 exchange,exchange 会把消息发送给所有和他绑定的 queue 中)。
    • 最后有一个服务员 C 特别聪明,老板交给他的任务是如果找你的是男人,领到 501,如果是女人,领到 502,如果是小孩子,领到 503,即按照不同的规则(这里的规则更详细的解释可以解释成:*. 男人. 人路由到 501,*. 女人. 人路由到 502,*. 小孩. 人路由到 503),这个服务员就叫“topic”,topic 的详细规则是这样的:
      routing key 为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
      binding key 与 routing key 一样也是句点号“. ”分隔的字符串
      binding key 中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
      rabbitMQ_exchange_topic
      以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到 Q1 与 Q2,
      routingKey=”lazy.orange.fox”的消息会路由到 Q1 与 Q2,
      routingKey=”lazy.brown.fox”的消息会路由到 Q2,
      routingKey=”lazy.pink.rabbit”的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配);
      routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何 bindingKey。
      
      参考自:RabbitMQ 基础概念详细介绍 
      

介绍 Producer 和 Consumer 的规则

上面在介绍 Producer 的时候都把规则给略过了,现在介绍完了 Exchange,可以再回过头来介绍 Producer 的规则。

当时说 Producer 会将消息发送给 queue,现在我们知道 Producer 是将消息发送给了 Exchange。Producer 在发送消息的时候需要指定 Exchange,也就是需要声明 ExchangeName,同时根据指定的这个 Exchange 的类型,来设置一个 routing-key。我们知道 Exchange 和 queue 是通过一个叫 binding 的 key 来完成的,在 Exchange 中会记录所有它连接的 queue 所对应的 binding(暂且叫 bs 吧, 也就是多个 binding 的集合),消息通过设置的 ExchangeName 发送到指定的这个 Exchange,同时携带过去的 routing-key 会与 bs 中的 binding 进行匹配,匹配条件满足就会将消息发送过去(匹配到几个就发送几个)。

Consumer 连接 queue 的规则相对来说就简单很多了,指定 queue 的名字,设置几个连接参数就好了(比如说:exclusive、Auto-delete、Durable)。

exclusive:排他,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
Auto-delete: 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
Durable: 持久化

另外对于 Producer、Exchange、Queue,和 Consumer 都还有一些可设置的参数用来个性化配置,这个自己去搜索一下吧。

一张图来解释整个过程

rabbitMq

图中 Producer 发送消息给 message,routing-key 未指定是指没有固定一个 routing-key,这个 key 是动态可变的。

而根据 message 这个 Exchange 的 type 可知,这个 message 是那么聪明的服务员,会根据规则将发送来的消息“聪明”的交给相应的 Queue 处理。

假如这个 Producer 发送的 Message 所指定的 routing-key 是 order.log.phbj,那么根据 Exchange 与 Queue 间对应的橙色线,这条消息会被发送给 queue2, 并由 Consumer 接收处理。

如果这个 Producer 发送的 Message 所指定的 routing-key 是 weixin.order.phbj,那么这个消息会被发送 queue,并由对应注册的 Consumer2 接收(注意这里的 Consumer 可能不止一个,但是只会有一个 Consumer 来接收)

过程大概就是这样了,接下来配合 Spring 来使用。

Spring 中使用 RabbitMQ

pom 文件放在文章末尾,这里先来说 RabbitMQ 的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <!--配置 connection-factory,指定连接 rabbit server 参数 -->
    <rabbit:connection-factory id="connectionFactory" username="admin" password="123456"
                               host="192.168.1.198"
                               port="5672"
                               virtual-host="/"/>
    <bean id="rabbitConnectionFactory"
          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="192.168.1.198"/>
        <property name="username" value="admin"/>
        <property name="password" value="123456"/>
        <property name="port" value="5672"/>
        <property name="virtualHost" value="/"/>
        <property name="channelCacheSize" value="5"/>
        <property name="publisherConfirms" value="true"/>
    </bean>
    <!--
        这里是两个 queue 的定义
        exclusive:排他,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
        Auto-delete: 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
        Durable: 持久化
    -->
    <rabbit:queue id="queue" name="queue" durable="true" auto-delete="false" exclusive="false"/>
    <rabbit:queue id="queue2" name="queue2" durable="true" auto-delete="false" exclusive="false"/>

    <!--RabbitMQ 的 Consumer-->
    <bean id="messageReceiver" class="net.sumile.consumer.MessageConsumer"></bean>
    <bean id="messageReceiver2" class="net.sumile.consumer.MessageConsumerQueue2"></bean>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="1"
                               prefetch="5">
        <rabbit:listener queues="queue" ref="messageReceiver"/>
        <rabbit:listener queues="queue2" ref="messageReceiver2"/>
    </rabbit:listener-container>


    <!--通过指定下面的 admin 信息,当前 producer 中的 exchange 和 queue 会在 rabbitmq 服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory"/>


    <!-- 两种 Exchange 的定义 -->
    <rabbit:direct-exchange name="order" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queue" key="order"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:direct-exchange name="exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queue2" key="exchange"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:topic-exchange name="message" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queue2" pattern="*.log.phbj"/>
            <rabbit:binding queue="queue" pattern="*.order.phbj"/>
            <rabbit:binding queue="queue" pattern="*.pay.phbj"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义是否使用序列化传输-->
    <!--<bean id="jsonMessageConverter"-->
    <!--class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>-->
    <!--message-converter="jsonMessageConverter"-->

    <!--定义 rabbit template 用于数据的发送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     exchange="exchange" routing-key="exchange"/>
    <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
                     exchange="order" routing-key="order"/>
    <bean id="amqpTemplate3"
          class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
        <property name="confirmCallback" ref="confirmCallBackListener"/>
        <property name="returnCallback" ref="returnCallBackListener"/>
        <property name="mandatory" value="true"/>
        <!--只有在关闭事务的情况下 mandatory 才起作用-->
        <property name="channelTransacted" value="false"/>
        <property name="exchange" value="message"/>
    </bean>
    <!--发送确认监听-->
    <bean id="confirmCallBackListener" class="net.sumile.producer.ConfirmCallBackListener"/>
    <bean id="returnCallBackListener" class="net.sumile.producer.ReturnCallBackListener"/>
</beans>

在程序启动之前,首先来确认下 RabbitMQ 的状态

RabbitMQInit

图中可以看出,现在没有连接链接到 RabbitMQ,Exchange 和 Queue 也都是空的。

然后启动程序

RabbitMQStart

这是启动之后立刻就截的图,可以看到,有一个连接链接上了 RabbitMQ,同时创建了三个 Exchange 以及两个 queue。

然后打开每一个 Exchange 来看看它的对应规则是什么,着重看红色框中的

RabbitMQExchangeBinding

可以看到,红色框中的设置正是在 xml 中配置的那部分:

<rabbit:direct-exchange name="order" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="queue" key="order"></rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="exchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="queue2" key="exchange"></rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:topic-exchange name="message" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="queue2" pattern="*.log.phbj"/>
        <rabbit:binding queue="queue" pattern="*.order.phbj"/>
        <rabbit:binding queue="queue" pattern="*.pay.phbj"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

其中配置文件中的“<rabbit:” 后面的 topic 和 direct 指该 Exchange 的类型。而绑定的 queue 则在 bindings 中,规则是通过 binding 配置的 key 或者 pattern。这些在图中都有显示。

RabbitMQQueue

图中 Bindings 与上面的 Exchange 的配置有对应,而 Consumers 则对应了 xml 中的以下配置:

<!--RabbitMQ 的 Consumer-->
<bean id="messageReceiver" class="net.sumile.consumer.MessageConsumer"></bean>
<bean id="messageReceiver2" class="net.sumile.consumer.MessageConsumerQueue2"></bean>

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="1"
                           prefetch="5">
    <rabbit:listener queues="queue" ref="messageReceiver"/>
    <rabbit:listener queues="queue2" ref="messageReceiver2"/>
</rabbit:listener-container>

Ack required 通过配置的 acknowledge 来配置,表示需要消息确认

Prefetch count 通过配置中的 prefetch 标签配置,来限制 Queue 每次发送给每个消费者的消息数。

如果有多个消费者同时订阅同一个 Queue 中的消息,Queue 中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetchCount 来限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1,则 Queue 每次给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。
参考:RabbitMQ 基础概念详细介绍

发送者的相关配置不会在上面的图中体现,他是通过代码来调用运行的。

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                 exchange="exchange" routing-key="exchange"/>
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
                 exchange="order" routing-key="order"/>
<bean id="amqpTemplate3"
      class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="confirmCallback" ref="confirmCallBackListener"/>
    <property name="returnCallback" ref="returnCallBackListener"/>
    <property name="mandatory" value="true"/>
    <!--只有在关闭事务的情况下 mandatory 才起作用-->
    <property name="channelTransacted" value="false"/>
    <property name="exchange" value="message"/>
</bean>
<!--发送确认监听-->
<bean id="confirmCallBackListener" class="net.sumile.producer.ConfirmCallBackListener"/>
<bean id="returnCallBackListener" class="net.sumile.producer.ReturnCallBackListener"/>

我们在配置文件中配置了三个发送消息的模板,其中 amqpTemplate 和 amqpTemplate2 是两个普通的发送模板,它定义了 exchange 和 routing-key。

而 amqpTemplate3 比较特殊,它定义了 mandatory,这个用来标识这个模板发送的消息是否需要回执(当 mandatory 标志位设置为 true 时,如果 exchange 根据自身类型和消息 routeKey 无法找到一个符合条件的 queue,那么会调用 basic.return 方法将消息返还给生产者;当 mandatory 设为 false 时,出现上述情形 broker 会直接将消息扔掉。

这就是为什么需要使用 bean 标签而不使用<rabbit:template 标签的原因。同时我们看到 amqpTemplate 和 amqpTemplate2 使用的 connection-factory 是 connectionFactory,而 amqpTemplate3 使用的是 rabbitConnectionFactory,这是因为如果需要回执的话,需要在 Connection-Factory 中指定一个参数 publisherConfirms,从下面两个 connection-factory 的配置就可以看出:

<rabbit:connection-factory id="connectionFactory" username="admin" password="123456"
                           host="192.168.1.198"
                           port="5672"
                           virtual-host="/"/>
<bean id="rabbitConnectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="host" value="192.168.1.198"/>
    <property name="username" value="admin"/>
    <property name="password" value="123456"/>
    <property name="port" value="5672"/>
    <property name="virtualHost" value="/"/>
    <property name="channelCacheSize" value="5"/>
    <property name="publisherConfirms" value="true"/>
</bean>

至此,配置文件与 RabbitMQ 的关系算是稍微的讲完了,解下来,我们实测一下。

实测运行

先放 Controler 文件以及其他一些文件
Controler

package net.sumile.controler;

import net.sumile.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;


/**
 * Created by Administrator on 2017/6/14 0014.
 */
@Controller
public class ControlerM {
    @Autowired
    MessageProducer messageProducer;

    @RequestMapping(value = "/home")
    public void send(@RequestParam(value = "type", defaultValue = "1") String type,
                     @RequestParam(value = "routing_key", defaultValue = "default.order.phbj") String routing_key,
                     @RequestParam(value = "message", defaultValue = "defaultMessage") String message) {
        if ("1".equals(type)) {
            messageProducer.sendMessage(message);
        } else if ("2".equals(type)) {
            messageProducer.sendMessage2(message);
        } else if ("3".equals(type)) {
            messageProducer.sendMessage3(routing_key, message);
        }
    }
}

以及 MessageProducer

package net.sumile.producer;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 功能概要:消息产生, 提交到队列中去
 */
@Service
public class MessageProducer {

    private Logger logger = LoggerFactory.getLogger(MessageProducer.class);

    @Resource
    private RabbitTemplate amqpTemplate;
    @Resource
    private RabbitTemplate amqpTemplate2;
    @Resource
    private RabbitTemplate amqpTemplate3;

    public void sendMessage(Object message) {
        System.out.println("1 发送:" + message);
        amqpTemplate.convertAndSend(message);
    }

    public void sendMessage2(Object message) {
        System.out.println("2 发送:" + message);
        amqpTemplate2.convertAndSend(message);
    }

    public void sendMessage3(final String routingKey, final Object message) {
        System.out.println("3 发送:[" + message + "] routing-key = [" + routingKey + "]");
        amqpTemplate3.convertAndSend(routingKey, message);
//        for (int i = 1; i < 3000000; i++) {
//            System.out.println("3 发送:[" + message + "] routing-key = [" + routingKey + "]" + i);
//            amqpTemplate3.convertAndSend(routingKey, message);
//        }
    }
}

以及两个接收者中的一个,另外一个类似,只是输出的文字由 Queue 替换为了 Queue2 用来区分接收的是哪个 queue 中的 message

package net.sumile.consumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import static java.lang.Thread.sleep;

/**
 * 功能概要:消费接收
 *
 * @author sumile
 * @since 2017 年 6 月 23 日 15:38:07
 */
public class MessageConsumer implements ChannelAwareMessageListener {

    private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("接收到 Queue:" + new String(message.getBody()));
        try {
            sleep(3000);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (InterruptedException e) {

        }
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

最重要的就是这几个文件,来捋一遍过程:
首先通过浏览器访问一个我们的服务器地址,进入 controler,根据传过来的 type 去调用不同的模板发送不同的消息,然后由对应的 Consumer 接收并打印出来。

  • 请求地址:http://localhost:8080/home?type=1&routing_key=myO.pay.phbj&message=k
    打印:

    1 发送:k
    接收到 Queue2:k
    

    因为 template 对应的 Exchange 是 exchange,而 exchange 对应的 queue 是 queue2,所以打印出来的接受者是 queue2

  • 请求地址:http://localhost:8080/home?type=2&routing_key=myO.pay.phbj&message=65
    打印:

    2 发送:65
    接收到 Queue:65
    

    原理同上

  • 请求地址:http://localhost:8080/home?type=3&routing_key=myO.pay.phbj&message=65
    打印:

    2 发送:65
    接收到 Queue:65
    

    原理同上

  • 请求地址:http://localhost:8080/home?type=3&routing_key=myO.pay.phbj&message=65
    打印:

    3 发送:[65] routing-key = [myO.pay.phbj]
    接收到 Queue:65
    

    这里 routing_key 匹配到了*.pay.phbj,所以发送到 queue 中并由 queue 的 Consumer 接收

  • 请求地址:http://localhost:8080/home?type=3&routing_key=myO.log.phbj&message=65
    打印:

    3 发送:[65] routing-key = [myO.log.phbj]
    接收到 Queue2:65
    

    这里 routing_key 匹配到了*.log.phbj,所以发送到 queue2 中并由 queue2 的 Consumer 接收

confirmCallback 和 returnCallback

接下来我们来看一组请求:
请求地址:http://localhost:8080/home?type=3&touting_key=myO.l2og.phbj&message=65
看这组请求,我们知道是调用 amqpTemplate3 来发送的,但是并没有 binding-key 与之对应,所以这个 Message 发送到 Exchange 之后 Exchange 不知道该交给哪个 Queue。但是由于我们设置了

<property name="returnCallback" ref="returnCallBackListener"/>

所以如果 Exchange 没有找到匹配的 queue 的时候,就会进入到这个类的方法中,由我们来处理这条迷路的 Message,不至于将这条消息丢失了。

那这个配置是做什么的呢?

<property name="confirmCallback" ref="confirmCallBackListener"/>

简单来说,如果发送出去的消息找不到 Exchange,到 confirmCallback 中,如果找到了 Exchange 找不到 Queue,到 returnCallback 中。其余的可以参考这个 RabbitMQ(四) 消息确认 (发送确认, 接收确认),如果要测试的话我们可以不停的发送消息,然后手动的将要路由到的 queue 删掉,就会出现这种情况了。

Consumer 的回复

我们在 Consumer 的配置中设置了 acknowledge 参数,表示需要手动回复“已经接收到该消息”然后 queue 才会删除该 Message。
下面来实测下如果不回复会怎么样

我们将 MessageConsumer 中的回复代码注释掉,并请求地址:http://localhost:8080/home?type=2&routing_key=myO.pay.phbj&message=65。看看 RabbitMQ 中 queue 里面 Message 是怎么变化的

public class MessageConsumer implements ChannelAwareMessageListener {

    private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("接收到 Queue:" + new String(message.getBody()));
//        try {
//            sleep(3000);
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        } catch (InterruptedException e) {
//
//        }
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

我们请求三遍上面的那个地址,然后刦看控制台,会看到回复:

2 发送:65
接收到 Queue:65
2 发送:65
接收到 Queue:65
2 发送:65
接收到 Queue:65

接收到了。然后再去看 RabbitMQ 的网页控制端:http://192.168.1.198:15672

RabbitMQ_consumer_noAck

queue 中累计了三条消息,而这三条消息已经是处理过的,如果有消息不停的进入,结果就是堆满内存

这是最需要注意的一点

 

都是自己在实际了解学习过程中遇到的一些问题以及感悟,看了很多博客,感谢各位大牛。
有错误请指出,望不吝赐教。

RabbitMQ-Demo
¥ 有帮助么?打赏一下~

转载请注明:热爱改变生活.cn » 从不知道到了解—RabbitMQ 基础概念及 Spring 的配置和使用


本博客只要没有注明“转”,那么均为原创。 转载请注明链接:sumile.cn » 从不知道到了解—RabbitMQ 基础概念及 Spring 的配置和使用

喜欢 (3)
发表我的评论
取消评论
表情

如需邮件形式接收回复,请注册登录

Hi,你需要填写昵称和邮箱~

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址