从不知道到了解—RabbitMQ基础概念及Spring的配置和使用

rabbitMq的学习

序言

你在系统中是否写过这样的接口:客户端访问服务器,服务器进行了大量逻辑/耗时操作之后,才能将结果返回给客户端,而这时,客户端的连接或许已经因为超时而关闭了。 为了能够及时的给客户端返回数据,在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。 RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。RabbitMQ用Erlang语言编写,支持多种消息协议,消息队列,传送确认,可以将消息灵活的路由到队列,并支持多种交换类型。同时也可部署为负载均衡的集群,实现高可用性和吞吐量,联跨多个可用性区域和地区。RabbitMQ支持Java, .NET, PHP, Python, JavaScript, Ruby, Go等多种语言,方便开发者使用。

基础介绍

在使用RabbitMQ之前,需要了解一些RabbitMQ的基础知识,这些基础知识最好还是要了解的,否则在面对Spring中配置文件的时候会有很多疑惑。

Queue

MQ就是Message Queuing,Queue就是队列了。在RabbitMQ中,Queue是最主要的地方,Message Queuing就是存储Message的队列。 生产者Producer(以下简写为p)将消息投递到Message Queuing(以下简写为q)中,消费者(以下简写为c)从queue中获取消息进行处理。 rabbitMq_queue 如上图中所画(图中缺少exchange,下面会说到)的那样,多个p将消息交给了q,然后就不关心系统怎么处理这些消息了。而q在收到这些消息后,会分发给在它这里注册了的c,让他们去分别处理。每一条消息只会分发给一个c,而不是每一个c接受到所有的消息然后全部去处理。

Producer

Producer负责产生消息并将消息发送给Queue,它不关心消息怎么处理,只把消息按照指定的规则(下面会说)发送出去。

Consumer

系统在启动的时候,会将所有的消费者Consumer注册到RabbitMQ中并对指定的Queue(有规则,下面会说)进行监听,如果Queue中产生了数据,queue会将消息平摊发送给所有注册到当前quene中的consumer中,由consumer进行处理。

Exchange

Exchange在RabbitMQ中是一个很重要的组件。 上面我们说Producer负责产生消息并将消息发送给Queue,实际上Producer并不直接连接Queue,而是连接了Exchange,由Exchange将消息路由到指定的Queue中。也就是在producer和queue中间插入了一个Exchange来分发消息给queue。 ExchangeType、binding和ExchangeName三者确定了一个Exchange,ExchangeName确定了这个Exchange的名字,ExchangeType决定了这是一个什么类型的Exchange,binding决定了它以什么规则连接一个queue。

  • ExchangeName标记了这个Exchange,在producer发送消息的时候需要指定,消息会发送到指定ExchangeName的Exchange中。比如说你去参加婚礼宴会,主人之前告诉你到了酒店去大厅找服务员A,她会带你到包厢。这里的“服务员A”这个名字就是ExchangeName
  • binding(其实是一个字符串或者说一种特别的正则)会将Exchange和Queue联系起来,而Exchange路由到Queue的规则就是这个binding/binding-key。比如说你去参加一个婚宴,请帖写着302包厢,那服务员就会领着你到302包厢。在这个例子中,服务员就是Exchange,包厢是queue,而服务员脑中这个“根据你的请柬领你到哪个包厢”的规则就是binding。而你的请柬是routing-key,这个后面说
  • ExchangeType是Exchange的类型,这决定了这个Exchange会执行哪种规则(binding)将消息路由到不同的queue。ExchangeType分为多种,其中最为常用的就是direct,fanout和topic。还是比如你去参加婚宴,婚宴上可能有很多服务员。
    • 服务员A是个耿直boy,你请柬上写着哪个包厢他就领你去哪个包厢,也就是你的请柬上的“302包厢”和他脑中“302包厢(302包厢在3楼第二个包厢)”是相等的,那他就领着你去了,这个服务员就叫“direct”。

    • 而服务员B他负责领路到4楼的401、402包厢,另外他是个呆子但是会一点魔法,不管是谁找到他,他都会念一句魔法把你复制两份(有几个包厢就复制几份),然后把你分别领到他负责的包厢去,401、402包厢每一个都有一个你,这个服务员是“fanout”,你在这个例子中就是消息(意即如果消息发送给type是fanout的exchange,exchange会把消息发送给所有和他绑定的queue中)。

    • 最后有一个服务员C特别聪明,老板交给他的任务是如果找你的是男人,领到501,如果是女人,领到502,如果是小孩子,领到503,即按照不同的规则(这里的规则更详细的解释可以解释成:*.男人.人路由到501,*.女人.人路由到502,*.小孩.人路由到503),这个服务员就叫“topic”,topic的详细规则是这样的:

      1
      2
      3
      routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
      binding key与routing key一样也是句点号“. ”分隔的字符串
      binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

      rabbitMQ_exchange_topic

      1
      2
      3
      4
      5
      以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,
      routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,
      routingKey=”lazy.brown.fox”的消息会路由到Q2,
      routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);
      routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

      参考自:RabbitMQ基础概念详细介绍

介绍Producer和Consumer的规则

上面在介绍Producer的时候都把规则给略过了,现在介绍完了Exchange,可以再回过头来介绍Producer的规则。 当时说Producer会将消息发送给queue,现在我们知道Producer是将消息发送给了Exchange。Producer在发送消息的时候需要指定Exchange,也就是需要声明ExchangeName,同时根据指定的这个Exchange的类型,来设置一个routing-key。我们知道Exchange和queue是通过一个叫binding的key来完成的,在Exchange中会记录所有它连接的queue所对应的binding(暂且叫bs吧,也就是多个binding的集合),消息通过设置的ExchangeName发送到指定的这个Exchange,同时携带过去的routing-key会与bs中的binding进行匹配,匹配条件满足就会将消息发送过去(匹配到几个就发送几个)。

1
2
3
4
5
Consumer连接queue的规则相对来说就简单很多了,指定queue的名字,设置几个连接参数就好了(比如说:exclusive、Auto-delete、Durable)。

exclusive:排他,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
Durable:持久化

另外对于Producer、Exchange、Queue,和Consumer都还有一些可设置的参数用来个性化配置,这个自己去搜索一下吧。

一张图来解释整个过程

rabbitMq 图中Producer发送消息给message,routing-key未指定是指没有固定一个routing-key,这个key是动态可变的。 而根据message这个Exchange的type可知,这个message是那么聪明的服务员,会根据规则将发送来的消息“聪明”的交给相应的Queue处理。 假如这个Producer发送的Message所指定的routing-key是order.log.phbj,那么根据Exchange与Queue间对应的橙色线,这条消息会被发送给queue2,并由Consumer接收处理。 如果这个Producer发送的Message所指定的routing-key是weixin.order.phbj,那么这个消息会被发送queue,并由对应注册的Consumer2接收(注意这里的Consumer可能不止一个,但是只会有一个Consumer来接收) 过程大概就是这样了,接下来配合Spring来使用。

Spring中使用RabbitMQ

pom文件放在文章末尾,这里先来说RabbitMQ的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" username="admin" password="123456"
host="192.168.1.198"
port="5672"
virtual-host="/"/>
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.1.198"/>
<property name="username" value="admin"/>
<property name="password" value="123456"/>
<property name="port" value="5672"/>
<property name="virtualHost" value="/"/>
<property name="channelCacheSize" value="5"/>
<property name="publisherConfirms" value="true"/>
</bean>
<!--
这里是两个queue的定义
exclusive:排他,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
Durable:持久化
-->
<rabbit:queue id="queue" name="queue" durable="true" auto-delete="false" exclusive="false"/>
<rabbit:queue id="queue2" name="queue2" durable="true" auto-delete="false" exclusive="false"/>
<!--RabbitMQ的Consumer-->
<bean id="messageReceiver" class="net.sumile.consumer.MessageConsumer"></bean>
<bean id="messageReceiver2" class="net.sumile.consumer.MessageConsumerQueue2"></bean>

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="1"
prefetch="5">
<rabbit:listener queues="queue" ref="messageReceiver"/>
<rabbit:listener queues="queue2" ref="messageReceiver2"/>
</rabbit:listener-container>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 两种Exchange的定义 -->
<rabbit:direct-exchange name="order" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue" key="order"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue2" key="exchange"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:topic-exchange name="message" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue2" pattern="\*.log.phbj"/>
<rabbit:binding queue="queue" pattern="\*.order.phbj"/>
<rabbit:binding queue="queue" pattern="\*.pay.phbj"/>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--定义是否使用序列化传输-->
<!--<bean id="jsonMessageConverter"-->
<!--class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>-->
<!--message-converter="jsonMessageConverter"-->

<!--定义rabbit template用于数据的发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchange" routing-key="exchange"/>
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
exchange="order" routing-key="order"/>
<bean id="amqpTemplate3"
class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="confirmCallback" ref="confirmCallBackListener"/>
<property name="returnCallback" ref="returnCallBackListener"/>
<property name="mandatory" value="true"/>
<!--只有在关闭事务的情况下mandatory才起作用-->
<property name="channelTransacted" value="false"/>
<property name="exchange" value="message"/>
</bean>
<!--发送确认监听-->
<bean id="confirmCallBackListener" class="net.sumile.producer.ConfirmCallBackListener"/>
<bean id="returnCallBackListener" class="net.sumile.producer.ReturnCallBackListener"/>
</beans>

在程序启动之前,首先来确认下RabbitMQ的状态 RabbitMQInit 图中可以看出,现在没有连接链接到RabbitMQ,Exchange和Queue也都是空的。 然后启动程序 RabbitMQStart 这是启动之后立刻就截的图,可以看到,有一个连接链接上了RabbitMQ,同时创建了三个Exchange以及两个queue。 然后打开每一个Exchange来看看它的对应规则是什么,着重看红色框中的 RabbitMQExchangeBinding 可以看到,红色框中的设置正是在xml中配置的那部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<rabbit:direct-exchange name="order" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue" key="order"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue2" key="exchange"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:topic-exchange name="message" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue2" pattern="*.log.phbj"/>
<rabbit:binding queue="queue" pattern="*.order.phbj"/>
<rabbit:binding queue="queue" pattern="*.pay.phbj"/>
</rabbit:bindings>
</rabbit:topic-exchange>

其中配置文件中的“<rabbit:” 后面的topic和direct指该Exchange的类型。而绑定的queue则在bindings中,规则是通过binding配置的key或者pattern。这些在图中都有显示。 RabbitMQQueue 图中Bindings与上面的Exchange的配置有对应,而Consumers则对应了xml中的以下配置:

1
2
3
4
5
6
7
8
9
<!--RabbitMQ 的 Consumer-->
<bean id="messageReceiver" class="net.sumile.consumer.MessageConsumer"></bean>
<bean id="messageReceiver2" class="net.sumile.consumer.MessageConsumerQueue2"></bean>

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="1"
prefetch="5">
<rabbit:listener queues="queue" ref="messageReceiver"/>
<rabbit:listener queues="queue2" ref="messageReceiver2"/>
</rabbit:listener-container>

Ack required通过配置的acknowledge来配置,表示需要消息确认 Prefetch count通过配置中的prefetch标签配置,来限制Queue每次发送给每个消费者的消息数。

如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。 参考:RabbitMQ基础概念详细介绍

发送者的相关配置不会在上面的图中体现,他是通过代码来调用运行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchange" routing-key="exchange"/>
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
exchange="order" routing-key="order"/>
<bean id="amqpTemplate3"
class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="confirmCallback" ref="confirmCallBackListener"/>
<property name="returnCallback" ref="returnCallBackListener"/>
<property name="mandatory" value="true"/>
<!--只有在关闭事务的情况下mandatory才起作用-->
<property name="channelTransacted" value="false"/>
<property name="exchange" value="message"/>
</bean>
<!--发送确认监听-->
<bean id="confirmCallBackListener" class="net.sumile.producer.ConfirmCallBackListener"/>
<bean id="returnCallBackListener" class="net.sumile.producer.ReturnCallBackListener"/>

我们在配置文件中配置了三个发送消息的模板,其中amqpTemplate和amqpTemplate2是两个普通的发送模板,它定义了exchange和routing-key。 而amqpTemplate3比较特殊,它定义了mandatory,这个用来标识这个模板发送的消息是否需要回执(当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。) 这就是为什么需要使用bean标签而不使用<rabbit:template标签的原因。同时我们看到amqpTemplateamqpTemplate2使用的connection-factoryconnectionFactory,而amqpTemplate3使用的是rabbitConnectionFactory,这是因为如果需要回执的话,需要在Connection-Factory中指定一个参数publisherConfirms,从下面两个connection-factory的配置就可以看出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<rabbit:connection-factory id="connectionFactory" username="admin" password="123456"
host="192.168.1.198"
port="5672"
virtual-host="/"/>
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.1.198"/>
<property name="username" value="admin"/>
<property name="password" value="123456"/>
<property name="port" value="5672"/>
<property name="virtualHost" value="/"/>
<property name="channelCacheSize" value="5"/>
<property name="publisherConfirms" value="true"/>
</bean>

至此,配置文件与RabbitMQ的关系算是稍微的讲完了,解下来,我们实测一下。

实测运行

先放Controler文件以及其他一些文件 Controler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package net.sumile.controler;

import net.sumile.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
* Created by Administrator on 2017/6/14 0014.
*/
@Controller
public class ControlerM {
@Autowired
MessageProducer messageProducer;

@RequestMapping(value = "/home")
public void send(@RequestParam(value = "type", defaultValue = "1") String type,
@RequestParam(value = "routing\_key", defaultValue = "default.order.phbj") String routing\_key,
@RequestParam(value = "message", defaultValue = "defaultMessage") String message) {
if ("1".equals(type)) {
messageProducer.sendMessage(message);
} else if ("2".equals(type)) {
messageProducer.sendMessage2(message);
} else if ("3".equals(type)) {
messageProducer.sendMessage3(routing\_key, message);
}
}
}

以及MessageProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package net.sumile.producer;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
* 功能概要:消息产生,提交到队列中去
*/
@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource
private RabbitTemplate amqpTemplate;
@Resource
private RabbitTemplate amqpTemplate2;
@Resource
private RabbitTemplate amqpTemplate3;

public void sendMessage(Object message) {
System.out.println("1发送:" + message);
amqpTemplate.convertAndSend(message);
}

public void sendMessage2(Object message) {
System.out.println("2发送:" + message);
amqpTemplate2.convertAndSend(message);
}

public void sendMessage3(final String routingKey, final Object message) {
System.out.println("3发送:\[" + message + "\] routing-key = \[" + routingKey + "\]");
amqpTemplate3.convertAndSend(routingKey, message);
// for (int i = 1; i < 3000000; i++) {
// System.out.println("3发送:[" + message + "] routing-key = [" + routingKey + "]" + i);
// amqpTemplate3.convertAndSend(routingKey, message);
// }
}
}
}

以及两个接收者中的一个,另外一个类似,只是输出的文字由Queue替换为了Queue2用来区分接收的是哪个queue中的message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package net.sumile.consumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import static java.lang.Thread.sleep;

/**
* 功能概要:消费接收
*
* @author sumile
* @since 2017年6月23日15:38:07
*/
public class MessageConsumer implements ChannelAwareMessageListener {

private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("接收到Queue:" + new String(message.getBody()));
try {
sleep(3000);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (InterruptedException e) {

}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

最重要的就是这几个文件,来捋一遍过程: 首先通过浏览器访问一个我们的服务器地址,进入controler,根据传过来的type去调用不同的模板发送不同的消息,然后由对应的Consumer接收并打印出来。

  • 请求地址:http://localhost:8080/home?type=1&routing_key=myO.pay.phbj&message=k 打印:

    1发送:k
    接收到Queue2:k

    因为template对应的Exchange是exchange,而exchange对应的queue是queue2,所以打印出来的接受者是queue2

  • 请求地址:http://localhost:8080/home?type=2&routing_key=myO.pay.phbj&message=65 打印:

    2发送:65
    接收到Queue:65

    原理同上

  • 请求地址:http://localhost:8080/home?type=3&routing_key=myO.pay.phbj&message=65 打印:

    2发送:65
    接收到Queue:65

    原理同上

  • 请求地址:http://localhost:8080/home?type=3&routing_key=myO.pay.phbj&message=65 打印:

    3发送:[65] routing-key = [myO.pay.phbj]
    接收到Queue:65

    这里routing_key匹配到了*.pay.phbj,所以发送到queue中并由queue的Consumer接收

  • 请求地址:http://localhost:8080/home?type=3&routing_key=myO.log.phbj&message=65 打印:

    3发送:[65] routing-key = [myO.log.phbj]
    接收到Queue2:65

    这里routing_key匹配到了*.log.phbj,所以发送到queue2中并由queue2的Consumer接收

confirmCallback和returnCallback

接下来我们来看一组请求: 请求地址:http://localhost:8080/home?type=3&touting_key=myO.l2og.phbj&message=65 看这组请求,我们知道是调用amqpTemplate3来发送的,但是并没有binding-key与之对应,所以这个Message发送到Exchange之后Exchange不知道该交给哪个Queue。但是由于我们设置了

1
<property name="returnCallback" ref="returnCallBackListener"/>

所以如果Exchange没有找到匹配的queue的时候,就会进入到这个类的方法中,由我们来处理这条迷路的Message,不至于将这条消息丢失了。 那这个配置是做什么的呢?

1
<property name="confirmCallback" ref="confirmCallBackListener"/>

简单来说,如果发送出去的消息找不到Exchange,到confirmCallback中,如果找到了Exchange找不到Queue,到returnCallback中。其余的可以参考这个RabbitMQ(四)消息确认(发送确认,接收确认),如果要测试的话我们可以不停的发送消息,然后手动的将要路由到的queue删掉,就会出现这种情况了。

Consumer的回复

我们在Consumer的配置中设置了acknowledge参数,表示需要手动回复“已经接收到该消息”然后queue才会删除该Message。 下面来实测下如果不回复会怎么样 我们将MessageConsumer 中的回复代码注释掉,并请求地址:http://localhost:8080/home?type=2&routing_key=myO.pay.phbj&message=65。看看RabbitMQ中queue里面Message是怎么变化的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MessageConsumer implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("接收到Queue:" + new String(message.getBody()));
// try {
// sleep(3000);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (InterruptedException e) {
//
// }
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

我们请求三遍上面的那个地址,然后去看控制台,会看到回复:

2发送:65
接收到Queue:65
2发送:65
接收到Queue:65
2发送:65
接收到Queue:65

接收到了。然后再去看RabbitMQ的网页控制端:http://192.168.1.198:15672 RabbitMQ_consumer_noAck queue中累计了三条消息,而这三条消息已经是处理过的,如果有消息不停的进入,结果就是堆满内存 这是最需要注意的一点   都是自己在实际了解学习过程中遇到的一些问题以及感悟,看了很多博客,感谢各位大牛。 有错误请指出,望不吝赐教。

RabbitMQ-Demo

-------------本文结束  感谢您的阅读-------------
下次一定