【RabbitMQ】02-RabbitMQ与SpringBoot

cover

RabbitMQ与SpringBoot

RabbitMQ在SpringBoot项目中的应用:

  1. 连接
  2. 声明Queue、Exchange、Binding
  3. 五种工作模式的应用
  4. 消息可靠性

看了几篇博客主要是这么多内容,我自己把这些试了一下

这个博客讲的是真的细RabbitMQ由浅入深入门全总结(一)RabbitMQ由浅入深入门全总结(二)

代码传到码云了https://gitee.com/zsqbigbig/springboot-2.3.9.git

RabbitMQ与SpringBoot连接

  1. 项目结构

    img-45

  2. pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>compile</scope>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.5.8</version>
    </dependency>
  3. application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    spring:
    rabbitmq:
    host: api.rabbitmq.com
    port: 5672
    username: admin
    password: admin
    # 开启生产者的消息确认
    publisher-confirm-type: correlated
  4. model/Article.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Data
    @Builder
    public class Article {

    private Long id;
    private String author;
    private String title;
    private String subtitle;
    private String description;
    private String cover;
    private String url;

    }
  5. config/RabbitmqConstant

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    /**
    * 声明消息队列的queue, channel, exchange, routing_key变量
    * @author zsq
    * @date 2021年11月18日 11:44:06
    */
    public class RabbitConstant {

    private static final String PRE = "zsq";

    /**
    * 简单模式
    */
    public static final String SIMPLE_QUEUE_01 = PRE + "simple_queue_01";

    /**
    * 路由模式
    */
    public static final String DIRECT_QUEUE_01 = PRE + "direct_queue_01";
    public static final String DIRECT_EXCHANGE_01 = PRE + "direct_exchange_01";
    public static final String DIRECT_ROUTING_KEY_01 = PRE + "direct_routing_key_01";

    /**
    * 广播模式
    */
    public static final String FANOUT_QUEUE_01 = PRE + "fanout_queue_01";
    public static final String FANOUT_QUEUE_02 = PRE + "fanout_queue_02";
    public static final String FANOUT_QUEUE_03 = PRE + "fanout_queue_03";
    public static final String FANOUT_EXCHANGE_01 = PRE + "fanout_exchange_01";

    /**
    * 主题模式
    * * 匹配一个标识符
    * # 匹配零个或多个标识符
    */
    public static final String TOPIC_QUEUE_01 = PRE + "topic_queue_01";
    public static final String TOPIC_QUEUE_02 = PRE + "topic_queue_02";
    public static final String TOPIC_QUEUE_03 = PRE + "topic_queue_03";
    public static final String TOPIC_EXCHANGE_01 = PRE + "topic_exchange_01";
    public static final String TOPIC_ROUTING_KEY_01 = PRE + "topic.routing.key.student";
    public static final String TOPIC_ROUTING_KEY_02 = PRE + "topic.routing.key.teacher";
    public static final String TOPIC_ROUTING_KEY_03 = PRE + "topic.routing.key.*";

    /**
    * 过期时间设置
    */
    public static final String EXPIRE_QUEUE_01 = PRE + "expire.queue.01";
    public static final String EXPIRE_EXCHANGE_01 = PRE + "expire_exchange_01";
    public static final String EXPIRE_ROUTING_KEY_01 = PRE + "expire.queue.*";

    /**
    * 死信队列
    * DELAY_:延迟队列
    * DEAD_:正常队列
    */
    public static final String DELAY_QUEUE_01 = PRE + "delay_queue_01";
    public static final String DELAY_EXCHANGE_01 = PRE + "delay_exchange_01";
    public static final String DELAY_ROUTING_KEY_01 = PRE + "delay_routing_key_01";
    public static final String DEAD_LETTER_QUEUE_01 = PRE + "dead_letter_queue_01";
    public static final String DEAD_LETTER_EXCHANGE_01 = PRE + "dead_letter_exchange_01";
    public static final String DEAD_LETTER_ROUTING_KEY_01 = PRE + "dead_letter_routing_key_01";

    /**
    * 消息确认机制-生产者
    */
    public static final String PUBLISHER_CONFIRM_QUEUE_01 = PRE + "publisher_confirm_queue_01";
    public static final String PUBLISHER_CONFIRM_EXCHANGE_01 = PRE + "publisher_confirm_exchange_01";
    public static final String PUBLISHER_CONFIRM_ROUTING_KEY_01 = PRE + "publisher_confirm_routing_key_01";

    /**
    * 消息确认机制-消费者
    */
    public static final String ACKNOWLEDGE_QUEUE_01 = PRE + "acknowledge_queue_01";
    public static final String ACKNOWLEDGE_EXCHANGE_01 = PRE + "acknowledge_exchange_01";
    public static final String ACKNOWLEDGE_ROUTING_KEY_01 = PRE + "acknowledge_routing_key_01";
    public static final String ACKNOWLEDGE_DEAD_QUEUE_01 = PRE + "acknowledge_dead_queue_01";
    public static final String ACKNOWLEDGE_DEAD_EXCHANGE_01 = PRE + "acknowledge_dead_exchange_01";
    public static final String ACKNOWLEDGE_DEAD_ROUTING_KEY_01 = PRE + "acknowledge_dead_routing_key_01";

    }
  6. RabbitmqApplication.java

    1
    2
    3
    4
    5
    6
    7
    8
    @SpringBootApplication
    public class RabbitmqApplication {

    public static void main(String[] args) {
    SpringApplication.run(RabbitmqApplication.class);
    }

    }

RabbitMQ工作模式使用(五种常用)

RabbitMQ在官网上提供了六种工作模式官方文档

  1. 简单模式(Hello World):类似于点对点传输消息。一个生产者将消息放入队列,一个消费者收到该消息
  2. 工作模式(Work queues):一个生产者将消息放入队列,多个消费者中的一个收到该消息
  3. 订阅模式(publish/Subscribe):一个生产者将消息放入队列,多个消费者均收到该消息
  4. 路由模式(Routing):一个生产者将消息放入队列并传入routing_key,绑定了routing_key的消费者中的一个收到该消息
  5. 主题模式(Topics):一个生产者将消息放入队列并传入routing_key,绑定了符合routing_key规则的消费者均收到该消息

简单模式

The simplest thing that does something

生产者发送消息到队列,消费者从队列拿到消息消费。

img-01

如果队列没有绑定任何交换机,那么队列绑定的一定是默认的交换机。AMQP协议规定,队列必须绑定交换机。

代码

  1. SimpleConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    /**
    * 简单模式、工作模式
    * 该方式创建的队列均与默认交换机绑定。
    * 该队列的消费者争抢同一条消息
    * @author zsq
    * @date 2021年11月18日 16:25:33
    */
    @Configuration
    public class SimpleConfig {

    /**
    * 创建队列
    * 当未绑定exchange, routing_key时,默认绑定'DEFAULT_EXCHANGE', 'DEFAULT_ROUTING_KEY'
    * @return Queue
    */
    @Bean(RabbitConstant.SIMPLE_QUEUE_01)
    public Queue getQueue() {
    return new Queue(RabbitConstant.SIMPLE_QUEUE_01, true);
    }

    }
  2. SimpleProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class SimpleProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendSimple(String msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_01, msg);
    }

    }
  3. SimpleConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Slf4j
    @Component
    public class SimpleConsumer {

    @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_01)
    public void simpleConsumer01(String msg) {
    log.info("【消费者】 - 【Simple】 - 01\n{}", msg);
    }

    }
  4. controller/RabbitmqController.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    @RestController
    public class RabbitmqController {

    private Long getId() {
    int min = 0, max = Integer.MAX_VALUE;
    return RandomUtil.randomLong(min, max);
    }

    private Article getArticle() {
    return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
    .cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
    .url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
    .build();
    }

    @Resource
    private SimpleProducer simpleProducer;

    @GetMapping("/simple01")
    public String simple01() {
    for (int i = 0; i < 10; i++) {
    JSONObject jsonObject = JSONUtil.parseObj(getArticle());
    simpleProducer.sendSimple(JSONUtil.toJsonStr(jsonObject));
    }
    return "simple01";
    }

    }

使用

调用http://localhost:12031/simple01

查看日志,生产的十条消息均被唯一的消费者该消费

img-46

工作模式

Distributing tasks among workers (the competing consumers pattern)

轮询模式:一个消费者一次只消费一条消息

公平分发:按消费者服务的性能消费消息,性能越高的机器处理的消息越多。

工作模式,就是在简单模式的基础上增加消费者。多个消费者中只有一个消费者能拿到该消息

img-02

代码

SimpleConfig.java, SimpleProducer.java, RabbitmqController.java不需要更改。进修改SimpleConsumer.java即可

SimpleConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Component
public class SimpleConsumer {

@RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_01)
public void simpleConsumer01(String msg) {
log.info("【消费者】 - 【Simple】 - 01\n{}", msg);
}

@RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_01)
public void simpleConsumer02(String msg) {
log.info("【消费者】 - 【Simple】 - 02\n{}", msg);
}

@RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_01)
public void simpleConsumer03(String msg) {
log.info("【消费者】 - 【Simple】 - 03\n{}", msg);
}

}

使用

调用http://localhost:12031/simple01

查看日志,生产的十条消息分别被三个消费者获取

img-47

订阅模式

Sending messages to many consumers at once

绑定到交换机的队列均会收到(fanout)交换机中的消息。

img-03

代码

  1. FanoutConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    /**
    * 广播模式
    * Fanout交换机绑定多个队列,当交换机收到消息时,会下发给与之绑定的所有队列
    * @author zsq
    * @date 2021年11月18日 17:22:58
    */
    @Configuration
    public class FanoutConfig {

    @Bean(RabbitConstant.FANOUT_QUEUE_01)
    public Queue getQueue01() {
    return new Queue(RabbitConstant.FANOUT_QUEUE_01);
    }

    @Bean(RabbitConstant.FANOUT_QUEUE_02)
    public Queue getQueue02() {
    return new Queue(RabbitConstant.FANOUT_QUEUE_02);
    }

    @Bean(RabbitConstant.FANOUT_QUEUE_03)
    public Queue getQueue03() {
    return new Queue(RabbitConstant.FANOUT_QUEUE_03);
    }

    @Bean(RabbitConstant.FANOUT_EXCHANGE_01)
    public FanoutExchange getExchange() {
    return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_01);
    }

    @Bean
    public Binding getBinding01() {
    return BindingBuilder.bind(getQueue01()).to(getExchange());
    }

    @Bean
    public Binding getBinding02() {
    return BindingBuilder.bind(getQueue02()).to(getExchange());
    }

    @Bean
    public Binding getBinding03() {
    return BindingBuilder.bind(getQueue03()).to(getExchange());
    }

    }
  2. FanoutProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class FanoutProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendFanout(JSONObject msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_01, null, msg);
    }

    }
  3. FanoutConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    @Component
    public class FanoutConsumer {

    @RabbitListener(queues = RabbitConstant.FANOUT_QUEUE_01)
    public void fanoutConsumer01(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Fanout】 - 01\n{}", article);
    }

    @RabbitListener(queues = RabbitConstant.FANOUT_QUEUE_02)
    public void fanoutConsumer02(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Fanout】 - 02\n{}", article);
    }

    @RabbitListener(queues = RabbitConstant.FANOUT_QUEUE_03)
    public void fanoutConsumer03(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Fanout】 - 03\n{}", article);
    }

    }
  4. RabbitmqController.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    @RestController
    public class RabbitmqController {

    private Long getId() {
    int min = 0, max = Integer.MAX_VALUE;
    return RandomUtil.randomLong(min, max);
    }

    private Article getArticle() {
    return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
    .cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
    .url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
    .build();
    }

    @Resource
    private FanoutProducer fanoutProducer;

    @GetMapping("/fanout01")
    public String fanout01() {
    for (int i = 0; i < 2; i++) {
    Article article = getArticle();
    fanoutProducer.sendFanout(JSONUtil.parseObj(article));
    }
    return "fanout01";
    }

    }

使用

调用http://localhost:12031/fanout01

查看日志,生产的两条消息被三个消费者都获取到

img-48

路由模式

指定路由键,交换机将根据指定的路由键给队列发送消息。消息会发送给所有绑定了指定交换机和路由键的队列中。

img-04

代码

  1. DirectConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    /**
    * 路由模式
    * DirectExchange和Queue必须由RoutingKey进行绑定。
    * 使用路由模式发送消息时,必须指定Exchange, RoutingKey进行发送。
    * @author zsq
    * @date 2021年11月18日 11:52:09
    */
    @Configuration
    public class DirectConfig {

    /**
    * 创建队列
    * @return Queue
    */
    @Bean(RabbitConstant.DIRECT_QUEUE_01)
    public Queue getQueue01() {
    return new Queue(RabbitConstant.DIRECT_QUEUE_01);
    }

    @Bean(RabbitConstant.DIRECT_QUEUE_02)
    public Queue getQueue02() {
    return new Queue(RabbitConstant.DIRECT_QUEUE_02);
    }

    /**
    * 创建交换机
    * @return DirectExchange
    */
    @Bean
    public DirectExchange getDirectExchange() {
    return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE_01, true, false);
    }

    /**
    * 将交换机和队列绑定
    * @return Binding
    */
    @Bean
    public Binding getDirectBinding01() {
    return BindingBuilder.bind(getQueue01()).to(getDirectExchange()).with(RabbitConstant.DIRECT_ROUTING_KEY_01);
    }

    @Bean
    public Binding getDirectBinding02() {
    return BindingBuilder.bind(getQueue02()).to(getDirectExchange()).with(RabbitConstant.DIRECT_ROUTING_KEY_01);
    }

    }
  2. DirectProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Slf4j
    @Component
    public class DirectProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendDirect(JSONObject msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE_01, RabbitConstant.DIRECT_ROUTING_KEY_01, msg);
    }

    }
  3. DirectConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    /**
    * 路由模式-消费者
    * DIRECT_QUEUE_01消费者*2
    * DIRECT_QUEUE_02消费者*1
    * @author zsq
    * @date 2021年11月18日 11:43:08
    */
    @Slf4j
    @Component
    public class DirectConsumer {

    /**
    * DIRECT_QUEUE_01消费者
    * @param msg 消息提
    */
    @RabbitListener(queues = RabbitConstant.DIRECT_QUEUE_01)
    public void directConsumer01(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Direct(01)】 - 01\n{}", article);
    }

    /**
    * DIRECT_QUEUE_01消费者
    * @param msg 消息提
    */
    @RabbitListener(queues = RabbitConstant.DIRECT_QUEUE_01)
    public void directConsumer02(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Direct(01)】 - 02\n{}", article);
    }

    /**
    * DIRECT_QUEUE_02消费者
    * @param msg 消息提
    */
    @RabbitListener(queues = RabbitConstant.DIRECT_QUEUE_02)
    public void directConsumer03(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Direct(02)】 - 01\n{}", article);
    }

    }
  4. controller/RabbitmqController.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    @RestController
    public class RabbitmqController {

    private Long getId() {
    int min = 0, max = Integer.MAX_VALUE;
    return RandomUtil.randomLong(min, max);
    }

    private Article getArticle() {
    return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
    .cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
    .url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
    .build();
    }

    @Resource
    private DirectProducer directProducer;

    @GetMapping("/direct01")
    public String direct01() {
    for (int i = 0; i < 5; i++) {
    Article article = getArticle();
    directProducer.sendDirect(JSONUtil.parseObj(article));
    }
    return "direct01";
    }

    }

使用

调用http://localhost:12031/direct01

查看日志,生产五条消息。两个队列通过DIRECT_ROUTING_KEY_01绑定了DIRECT_EXCHANGE_01,每个队列都收到了这五条消息。

img-49

主题模式

Receiving messages based on a pattern (topics)

可以根据模糊匹配的路由key,交换机根据模糊匹配的routing key给队列发送消息。

#:代表0个或多个.隔开的字符串

*:代表一个.隔开的字符串

img-27

代码

  1. TopicConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    @Configuration
    public class TopicConfig {

    @Bean(RabbitConstant.TOPIC_QUEUE_01)
    public Queue getQueue01() {
    return new Queue(RabbitConstant.TOPIC_QUEUE_01);
    }

    @Bean(RabbitConstant.TOPIC_QUEUE_02)
    public Queue getQueue02() {
    return new Queue(RabbitConstant.TOPIC_QUEUE_02);
    }

    @Bean(RabbitConstant.TOPIC_QUEUE_03)
    public Queue getQueue03() {
    return new Queue(RabbitConstant.TOPIC_QUEUE_03);
    }

    @Bean(RabbitConstant.TOPIC_EXCHANGE_01)
    public TopicExchange getExchange() {
    return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE_01);
    }

    @Bean(RabbitConstant.TOPIC_ROUTING_KEY_01)
    public Binding getBinding01() {
    return BindingBuilder.bind(getQueue01()).to(getExchange()).with(RabbitConstant.TOPIC_ROUTING_KEY_01);
    }

    @Bean(RabbitConstant.TOPIC_ROUTING_KEY_02)
    public Binding getBinding02() {
    return BindingBuilder.bind(getQueue02()).to(getExchange()).with(RabbitConstant.TOPIC_ROUTING_KEY_02);
    }

    @Bean(RabbitConstant.TOPIC_ROUTING_KEY_03)
    public Binding getBinding03() {
    return BindingBuilder.bind(getQueue03()).to(getExchange()).with(RabbitConstant.TOPIC_ROUTING_KEY_03);
    }

    }
  2. TopicProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class TopicProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendTopic(String routingKey, JSONObject msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_01, routingKey, msg);
    }

    }
  3. TopicConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    @Component
    public class TopicConsumer {

    @RabbitListener(queues = {RabbitConstant.TOPIC_QUEUE_01})
    public void topicConsumer01(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Topic(student)】 - 01\n{}", article);
    }

    @RabbitListener(queues = {RabbitConstant.TOPIC_QUEUE_02})
    public void topicConsumer02(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Topic(teacher)】 - 02\n{}", article);
    }

    @RabbitListener(queues = {RabbitConstant.TOPIC_QUEUE_03})
    public void topicConsumer03(JSONObject msg) {
    Article article = JSONUtil.toBean(msg, Article.class);
    log.info("【消费者】 - 【Topic(#)】 - 03\n{}", article);
    }

    }
  4. controller/RabbitmqController.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    @Slf4j
    @RestController
    public class RabbitmqController {

    private Long getId() {
    int min = 0, max = Integer.MAX_VALUE;
    return RandomUtil.randomLong(min, max);
    }

    private Article getArticle() {
    return Article.builder().id(getId()).author("张三").title("RabbitMQ").subtitle("RabbitMQ应用").description("消息中间件-RabbitMQ")
    .cover("https://mmbiz.qpic.cn/sz_mmbiz_jpg/62VyqMwzt7fibI209vwGxtYSMo9mkNomYckXSf6GhHk2hS1oE8rs6KqBKTn1iaMCxQabfx5ep41ELEno2uO9OxGg/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1")
    .url("https://mp.weixin.qq.com/s/OKCqRSqbYQKN1kJYkU_ZeA")
    .build();
    }

    @Resource
    private TopicProducer topicProducer;

    @GetMapping("topic01")
    public String topic01() {
    for (int i = 0; i < 5; i++) {
    int random = RandomUtil.randomInt(2);
    String routingKey = (random == 1) ? RabbitConstant.TOPIC_ROUTING_KEY_01 : RabbitConstant.TOPIC_ROUTING_KEY_02;
    String author = (random == 1) ? "student" : "teacher";
    Article article = getArticle();
    article.setAuthor(author);
    topicProducer.sendTopic(routingKey, JSONUtil.parseObj(article));
    // log.info("【生产者】 - 【Topic】 - routingKey = {},\n{}", routingKey, article);
    }
    return "topic01";
    }

    }

使用

调用http://localhost:12031/topic01

查看日志,生产五条消息。这五条消息有两条推送到通过TOPIC_ROUTING_KEY_01绑定的队列中,有三条推送到通过TOPIC_ROUTING_KEY_02绑定队列中。可以看到Topic(#)拿到了所有消息,Topic(student)拿到了两条消息,Topic(teacher)拿到了三条消息。

img-50

RabbitMQ进阶使用

除了上面五种常用的工作模式,RabbitMQ还提供了下面四种功能:

  1. 队列设置过期时间
  2. 死信队列
  3. 消息的可靠传递
  4. 内存及磁盘监控

设置过期时间

当队列中的消息过期时,称这个消息为”dead message“。该消息无法被消费者消费

过期时间有两种设置方法:

  1. 给队列中的消息Message对象设置过期时间。
  2. 给队列声明过期时间,队列中所有消息都相同的过期时间。

队列过期遵守规则

  • 当Message和交换机都设置了过期时间时,时间短的为真实过期时间。

Queue设置过期时间

使用主题模式进行演示

  1. ExpireConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Configuration
    public class ExpireConfig {

    /**
    * 给消息设置过期时间
    */
    public Queue getQueue01() {
    Map<String, Object> args = CollUtil.newHashMap();
    // x-message-ttl:消息过期时间
    args.put("x-message-ttl", 5000);
    // 设置队列过期时间
    args.put("x-expires", 8000);
    return new Queue(RabbitConstant.EXPIRE_QUEUE_01, true, false, false, args);
    }

    @Bean(RabbitConstant.EXPIRE_EXCHANGE_01)
    public TopicExchange getExchange01() {
    return new TopicExchange(RabbitConstant.EXPIRE_EXCHANGE_01);
    }

    @Bean(RabbitConstant.EXPIRE_ROUTING_KEY_01)
    public Binding getBinding01() {
    return BindingBuilder.bind(getQueue01()).to(getExchange01()).with(RabbitConstant.EXPIRE_ROUTING_KEY_01);
    }

    }
  2. ExpireProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Slf4j
    @Component
    public class ExpireProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendExpire01(String msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.EXPIRE_EXCHANGE_01, RabbitConstant.EXPIRE_QUEUE_01, message);

    }

    }
  3. ExpireConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Slf4j
    @Component
    public class ExpireConsumer {

    @RabbitListener(queues = {RabbitConstant.EXPIRE_QUEUE_01})
    public void expireConsumer01(Message message) {
    String msg = new String(message.getBody());
    log.info("【消费者】 - 【Expire(01)】:msg = {}", msg);

    }

    }

当消息发送5000ms后,消息过期,无法被消费。

Message设置过期时间

生产者发送消息时,我们可以为每条消息均设置一个过期时间。我找了三个方法,这三个方法效果是一样的。一个肯定就够用了,应该还有别的方法,但是没必要知道那么多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 下面这三个方法效果相同。
*/
@Slf4j
@Component
public class ExpireProducer {

@Resource
private RabbitTemplate rabbitTemplate;

public void sendExpire02(String msg) {
String expiration = "5000";
// 消息属性配置
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(expiration).setContentEncoding("UTF-8").build();
// 消息配置
Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.send(RabbitConstant.EXPIRE_EXCHANGE_01, RabbitConstant.EXPIRE_QUEUE_01, message);
}

public void sendExpire03(String msg) {
String expiration = "5000";
rabbitTemplate.convertAndSend(RabbitConstant.EXPIRE_EXCHANGE_01, RabbitConstant.EXPIRE_QUEUE_01, message -> {
message.getMessageProperties().setExpiration(expiration);
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
});
}

public void sendExpire04(String msg) {
String expiration = "5000";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) {
message.getMessageProperties().setExpiration(expiration);
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitConstant.EXPIRE_EXCHANGE_01, RabbitConstant.EXPIRE_QUEUE_01, msg, messagePostProcessor);
}

}

死信队列

死信队列这一完整功能,由两个队列完成。一个延迟队列,一个普通队列,这个普通队列就是死信队列。
延迟队列中的消息过期后,会将消息传入到普通队列(死信队列)中。

  • 当仅对消息体设置过期时间expiration时,延迟队列依然遵守先进先出的原则。例如:延迟队列中第一条消息5s过期,第二条消息1s过期,第二条消息要等到第一条消息过期后,才会会进入死信队列。
  1. DeadConfig.java死信队列配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    @Configuration
    public class DeadConfig {

    /**
    * 延时队列:当队列中的消息过期时,会将消息传递给绑定的死信队列中。
    * 通过'x-dead-letter-exchange'和'x-dead-letter-routing-key'进行绑定
    * @return Queue
    */
    @Bean
    public Queue getDelayQueue() {
    Map<String, Object> args = CollUtil.newHashMap();
    args.put("x-message-ttl", 5000);
    args.put("x-dead-letter-exchange", RabbitConstant.DEAD_LETTER_EXCHANGE_01);
    args.put("x-dead-letter-routing-key", RabbitConstant.DEAD_LETTER_ROUTING_KEY_01);
    return new Queue(RabbitConstant.DELAY_QUEUE_01, true, false, false, args);
    }

    /**
    * 延时队列交换机
    */
    @Bean
    public DirectExchange getDelayExchange() {
    return new DirectExchange(RabbitConstant.DELAY_EXCHANGE_01);
    }

    /**
    * 延时队列绑定交换机和路由键
    */
    @Bean
    public Binding getDelayBinding() {
    return BindingBuilder.bind(getDelayQueue()).to(getDelayExchange()).with(RabbitConstant.DELAY_ROUTING_KEY_01);
    }

    /**
    * 声明死信队列
    */
    @Bean
    public Queue getDeadQueue() {
    return new Queue(RabbitConstant.DEAD_LETTER_QUEUE_01);
    }

    /**
    * 声明队列交换机
    */
    @Bean
    public DirectExchange getDeadExchange() {
    return new DirectExchange(RabbitConstant.DEAD_LETTER_EXCHANGE_01);
    }

    /**
    * 死信队列绑定交换机和路由键
    */
    @Bean
    public Binding getDeadBinding() {
    return BindingBuilder.bind(getDeadQueue()).to(getDeadExchange()).with(RabbitConstant.DEAD_LETTER_ROUTING_KEY_01);
    }

    }
  2. DeadProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Slf4j
    @Component
    public class DeadProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendDead(String msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.DELAY_EXCHANGE_01, RabbitConstant.DELAY_ROUTING_KEY_01, msg);
    }

    /**
    * 为消息配置单独的过期时间
    */
    public void sendDead02(String msg) {
    int bound = 5000;
    String expiration = String.valueOf(RandomUtil.randomInt(bound));
    msg += ", expiration = " + expiration;
    MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setExpiration(expiration).setContentEncoding("UTF-8").build();
    Message message = MessageBuilder.withBody(msg.getBytes()).andProperties(messageProperties).build();
    rabbitTemplate.send(RabbitConstant.DELAY_EXCHANGE_01, RabbitConstant.DELAY_ROUTING_KEY_01, message);
    }

    }
  3. DeadConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Slf4j
    @Component
    public class DeadConsumer {

    /**
    * 延时队列过期后,消息传递到该队列中,由该队列进行消费
    */
    @RabbitListener(queues = {RabbitConstant.DEAD_LETTER_QUEUE_01})
    public void consumerDead01(Message message) throws IOException {
    log.info("【消费者】 - 【DeadLetter(01)】:msg = {}", new String(message.getBody()));
    }

    }
  4. RabbitController.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Slf4j
    @RestController
    public class RabbitmqController {

    @Resource
    private DeadProducer deadProducer;

    @GetMapping("/dead01")
    public String dead01() {
    log.info("dead01");
    for (int i = 0; i < 10; i++) {
    String msg = String.valueOf(i);
    deadProducer.sendDead01(msg);
    }
    return "dead01";
    }

    @GetMapping("/dead02")
    public String dead02() {
    log.info("dead02");
    for (int i = 0; i < 10; i++) {
    String msg = String.valueOf(i);
    deadProducer.sendDead02(msg);
    }
    return "dead02";
    }

    }

延迟队列过期时间相同

调用/dead01接口。
消息在5s过期后,依次进入死信队列。

img-51

延迟队列过期时间不同

调用/dead02接口。
严格遵守先进先出的原则,即使后面的消息过期时间短,也要等到其之前的消息过期后,才会进入到死信队列中。

img-52

消息可靠性生产者

RabbitMQ在传递消息的过程中,可能会因为网络等因素导致消息传递失败。如生产者将消息存入队列,消费者获取队列的消息时,可能导致失败。RabbitMQ提供了保证消息可靠传递的机制,通过生产者和消费者两部分来处理。

生产者为消息的发送者。RabbitMQ提供了两种方式来确保生产者的消息可靠性

  • confirm机制:确保消息成功发送至交换机中。若在发送至队列时失败,不会执行confirm中的方法
  • return机制:确保消息从交换机发送至队列中。

confirm机制

生产者发送消息后,会异步等待接收一个 ack 应答,收到返回的 ack 确认消息后,根据 ack是 true 还是 false,调用 confirmCallback 接口进行处理

  1. application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spring:
    rabbitmq:
    host: api.rabbitmq.com
    port: 5672
    username: admin
    password: admin
    # 开启生产者的消息确认:
    # 1. none:禁止发布确认
    # 2. correlated:消息成功发布到交换机后会触发回调方法
    # 3. simple
    publisher-confirm-type: correlated
  2. PublisherConfirmConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Slf4j
    @Configuration
    public class PublisherConfirmConfig implements RabbitTemplate.ConfirmCallback {

    /**
    * 根据ack去判断消息是否发送成功,并作出相应操作
    * @param ack:true-消息发送成功;false-消息发送失败
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    log.info("【消息确认】 - confirm:参数:correlationDate = {}, ack = {}, cause = {}", correlationData, ack, cause);
    if (ack) {
    log.info("【消息确认】 - 【成功】");
    } else {
    log.info("【消息确认】 - 【失败】");
    }
    }

    }
  3. PublisherConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Configuration
    public class PublisherConfig {

    @Bean(RabbitConstant.PUBLISHER_CONFIRM_QUEUE_01)
    public Queue getQueue() {
    return new Queue(RabbitConstant.PUBLISHER_CONFIRM_QUEUE_01);
    }

    @Bean(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01)
    public DirectExchange getExchange() {
    return new DirectExchange(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01);
    }

    @Bean(RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01)
    public Binding getBinding() {
    return BindingBuilder.bind(getQueue())
    .to(getExchange())
    .with(RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01);
    }

    }
  4. PublisherProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Slf4j
    @Component
    public class PublisherProducer {

    @Resource
    private PublisherConfirmConfig publisherConfirmConfig;

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
    * 消息成功发送
    */
    public void sendPublisher01(String msg) {
    rabbitTemplate.setConfirmCallback(publisherConfirmConfig);
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01, RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01, msg);
    }

    public void sendPublisher02(String msg) {
    rabbitTemplate.setConfirmCallback(publisherConfirmConfig);
    rabbitTemplate.convertAndSend("随便写一个exchange", "随便写一个routing_key", msg);
    }

    }

使用:

  • 正常情况

    img-55

  • 发送失败-网络连接错误

    img-53

  • 发送失败-没有交换机

    img-54

return机制

  1. application.yml

    1
    2
    3
    4
    spring:
    rabbitmq:
    # 开启消费者的消息确认:return
    publisher-returns: true
  2. PublisherReturnConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Slf4j
    @Configuration
    public class PublisherReturnConfig implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("【消息确认】 - return:\n参数:message = {}, replyCode = {}, replyText = {}, exchange = {}, routingKey = {}", new String(message.getBody()), replyCode, replyText, exchange, routingKey);
    }

    }
  3. PublisherProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    @Component
    public class PublisherProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private PublisherReturnConfig publisherReturnConfig;

    /**
    * 成功发送
    */
    public void sendPublisher03(String msg) {
    // 设置失败消息的处理方式:true-将失败消息发送给回调函数;false-丢弃失败消息
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(publisherReturnConfig);
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01, RabbitConstant.PUBLISHER_CONFIRM_ROUTING_KEY_01, msg);
    }

    /**
    * 发送失败
    */
    public void sendPublisher04(String msg) {
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(publisherReturnConfig);
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISHER_CONFIRM_EXCHANGE_01, "随便写一个routing_key", msg);
    }

    }
  • 正常情况:不执行returnCallback 方法

  • 发送失败-路由键不存在:

    img-56

消费者可靠性

消费者消费消息时,可能出现异常,导致消息无法被正常消费。消费者可靠性,就是为了处理这些无法被正常消费的消息。RabbitMQ提供了三种方式确保消费者的消息可靠性

  • retry重试:消费者在执行方法时,如果抛出异常,方法终止,会触发RabbitMQ的重试机制。重试次数达到配置次数时如果还有异常,消息将被丢弃。该方式不能手动捕获异常,如果捕获异常的话,重试机制不生效。
  • try/catch + 手动ack

retry重试

配置文件application.yml配置的属性retry,也就是消费者执行消费操作时异常的重试配置。

  1. application.yml

    重试次数配置为3次

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    spring:
    rabbitmq:
    host: api.rabbitmq.com
    port: 5672
    username: admin
    password: admin
    listener:
    simple:
    # 消费者消息确认
    # 1. auto:自动确认(默认)
    # 2. manual:手动确认
    # 3. none:不确认,发送后自动丢弃
    acknowledge-mode: auto
    retry:
    # 开启重试
    enabled: true
    # 最大重试次数
    max-attempts: 3
    # 重试间隔时间
    initial-interval: 3000ms
    max-interval: 5000ms
  2. AcknowledgeConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Slf4j
    @Configuration
    public class AcknowledgeConfig {

    @Bean(RabbitConstant.ACKNOWLEDGE_QUEUE_01)
    public Queue getQueue() {
    return new Queue(RabbitConstant.ACKNOWLEDGE_QUEUE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01)
    public DirectExchange getExchange() {
    return new DirectExchange(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01)
    public Binding getBinding() {
    return BindingBuilder.bind(getQueue())
    .to(getExchange())
    .with(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01);
    }

    }
  3. AcknowledgeProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class AcknowledgeProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendAck01(String msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01, RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01, msg);
    }

    }
  4. AcknowledgeConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Slf4j
    @Component
    public class AcknowledgeConsumer {

    /**
    * 在方法体中执行异常操作。
    * 消费者中不能手动捕获异常,会被识别为方法正常运行,重试配置就不生效了。
    */
    @RabbitListener(queues = {RabbitConstant.ACKNOWLEDGE_QUEUE_01})
    public void consumerAck01(String msg) {
    log.info("【消费者】 - 【Acknowledge(01)】:msg = {}", msg);
    double a = 1 / 0;
    System.out.println(a);
    }

    }

使用:

  • 消费者方法异常时:

    同一个消息被消费者重试3次,三次重试后,仍然有异常,将异常抛出,消息被丢弃。

    img-57

try/catch + 手动ack

手动ack时,需要在代码中规定是否进行重发消息,此时配置文件中所配置的重发数据失效。

  1. application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spring:
    rabbitmq:
    host: api.rabbitmq.com
    port: 5672
    username: admin
    password: admin
    listener:
    simple:
    # 消费者消息确认
    # 1. auto:自动确认(默认)
    # 2. manual:手动确认
    # 3. none:不确认,发送后自动丢弃
    acknowledge-mode: manual
  2. AcknowledgeConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Slf4j
    @Configuration
    public class AcknowledgeConfig {

    @Bean(RabbitConstant.ACKNOWLEDGE_QUEUE_01)
    public Queue getQueue() {
    return new Queue(RabbitConstant.ACKNOWLEDGE_QUEUE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01)
    public DirectExchange getExchange() {
    return new DirectExchange(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01)
    public Binding getBinding() {
    return BindingBuilder.bind(getQueue())
    .to(getExchange())
    .with(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01);
    }

    }
  3. AcknowledgeProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class AcknowledgeProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendAck01(String msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01, RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01, msg);
    }

    }
  4. AcknowledgeConsumer.java

    消费者在执行basicNack方法时,requeue = true。若消费方法内的异常一直存在,则消息会被一直重试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Slf4j
    @Component
    public class AcknowledgeConsumer {

    /**
    * 消息无异常,执行channel.basicAck方法进行消息确认
    * 消息有异常,执行channel.basicNack方法不进行ack应答
    * multiple:true-处理多条;false-进处理被提供tag的消息
    * requeue:true-消息重入队列;false-丢弃消息
    * @param tag 队列中的唯一id
    */
    @RabbitListener(queues = {RabbitConstant.ACKNOWLEDGE_QUEUE_01})
    public void consumerAck02(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
    // true-处理多条;false-进处理被提供tag的消息
    boolean multiple = false;
    // true-消息重入队列;false-丢弃消息
    boolean requeue = true;
    try {
    log.info("【消费者】 - 【Acknowledge(02)】:msg = {}, tag = {}", msg, tag);
    double a = 1 / 0;
    System.out.println(a);
    channel.basicAck(tag, multiple);
    } catch (Exception e) {
    log.warn("【消费者】 - 【Acknowledge(02)】 - 【异常】:msg = {}, tag = {}", msg, tag);
    channel.basicNack(tag, multiple, requeue);
    }
    }

    }

使用:

  • 消息处理异常,消息被一直重试

    img-58

总结

  1. retrytry/catch + 手动ack都可以保证消息的可靠消费。
  2. 若消费者方法中一直存在消费异常,会发生两种情况:1. 消息重复消息;2. 消息最终被丢弃。这两种情况都有弊端。

根据上面两点总结,当消息被异常消费时,我们捕获异常并对其进行其他操作,如:异常信息存入数据库中;发邮箱通知客服等,就避免了重复消费或丢弃消息的缺点。

try/catch + 手动ack + 死信队列

需要将channel.basicNack中的requeue属性设置为false

  1. application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spring:
    rabbitmq:
    host: api.rabbitmq.com
    port: 5672
    username: admin
    password: admin
    listener:
    simple:
    # 消费者消息确认
    # 1. auto:自动确认(默认)
    # 2. manual:手动确认
    # 3. none:不确认,发送后自动丢弃
    acknowledge-mode: manual
  2. AcknowledgeConfig.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    @Configuration
    public class AcknowledgeConfig {

    /**
    * 如果该队列已存在,需要先删除掉这个队列,在启动项目。
    * 因为这个队列已经存在,而这个项目中的队列需要修改这个队列的配置。当队列已存在,配置无法修改
    */
    @Bean(RabbitConstant.ACKNOWLEDGE_QUEUE_01)
    public Queue getQueue() {
    Map<String, Object> args = CollUtil.newHashMap();
    args.put("x-dead-letter-exchange", RabbitConstant.ACKNOWLEDGE_DEAD_EXCHANGE_01);
    args.put("x-dead-letter-routing-key", RabbitConstant.ACKNOWLEDGE_DEAD_ROUTING_KEY_01);
    return new Queue(RabbitConstant.ACKNOWLEDGE_QUEUE_01, true, false, false, args);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01)
    public DirectExchange getExchange() {
    return new DirectExchange(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01)
    public Binding getBinding() {
    return BindingBuilder.bind(getQueue())
    .to(getExchange())
    .with(RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_DEAD_QUEUE_01)
    public Queue getDeadQueue() {
    return new Queue(RabbitConstant.ACKNOWLEDGE_DEAD_QUEUE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_DEAD_EXCHANGE_01)
    public DirectExchange getDeadExchange() {
    return new DirectExchange(RabbitConstant.ACKNOWLEDGE_DEAD_EXCHANGE_01);
    }

    @Bean(RabbitConstant.ACKNOWLEDGE_DEAD_ROUTING_KEY_01)
    public Binding getDeadBinding() {
    return BindingBuilder.bind(getDeadQueue())
    .to(getDeadExchange())
    .with(RabbitConstant.ACKNOWLEDGE_DEAD_ROUTING_KEY_01);
    }

    }
  3. AcknowledgeProducer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class AcknowledgeProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendAck01(String msg) {
    rabbitTemplate.convertAndSend(RabbitConstant.ACKNOWLEDGE_EXCHANGE_01, RabbitConstant.ACKNOWLEDGE_ROUTING_KEY_01, msg);
    }

    }
  4. AcknowledgeConsumer.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    @Slf4j
    @Component
    public class AcknowledgeConsumer {

    /**
    * 消息无异常,执行channel.basicAck方法进行消息确认
    * 消息有异常,执行channel.basicNack方法不进行ack应答
    * multiple:true-处理多条;false-进处理被提供tag的消息
    * requeue:true-消息重入队列;false-丢弃消息
    * @param tag 队列中的唯一id
    */
    @RabbitListener(queues = {RabbitConstant.ACKNOWLEDGE_QUEUE_01})
    public void consumerAck02(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
    // true-处理多条;false-进处理被提供tag的消息
    boolean multiple = false;
    // true-消息重入队列;false-丢弃消息
    boolean requeue = false;
    try {
    log.info("【消费者】 - 【Acknowledge(02)】:msg = {}, tag = {}", msg, tag);
    double a = 1 / 0;
    System.out.println(a);
    channel.basicAck(tag, multiple);
    } catch (Exception e) {
    log.warn("【消费者】 - 【Acknowledge(02)】 - 【异常】:msg = {}, tag = {}", msg, tag);
    channel.basicNack(tag, multiple, requeue);
    }
    }

    @RabbitListener(queues = {RabbitConstant.ACKNOWLEDGE_DEAD_QUEUE_01})
    public void consumerAckDead(String msg) {
    log.info("【消费者】 - 【Acknowledge】 - 【发送邮件通知客服】:msg = {}", msg);
    }

    }

使用:

  • 消费异常的消息,交给死信队列处理

    img-59

磁盘内存的监控

内存预警

RabbitMQ的管理界面会显示其内存使用情况,当分配给RabbitMQ的内存被完全使用时,队列会进入阻塞状态,生产者无法将消息写入到队列中。并且RabbitMQ发出警报。

RabbitMQ被分配的默认内存空间是0.4,也就是Linux操作系统所有内存的40%。

内存正常情况:

  1. 主界面

    img-29

  2. 连接正常

    img-35

内存超出所分配空间时:

  1. 主界面

    img-31

  2. 连接阻塞

    img-34

修改内存的方式

内存使用是可以修改的,根据 官网文档 有两种方式:1. 指令;2. 配置文件进行修改。

img-30

指令

1
2
3
4
# 百分比控制内存
rabbitmqctl set_vm_memory_high_watermark 0.4
# 绝对值控制内存
rabbitmqctl set_vm_memory_high_watermark absolute 50M

配置文件

1
2
3
4
5
# 指令
vi /etc/rabbitmq/rabbitmq.conf
# 修改内容,有如下两种方式
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark.absolute = 400M

磁盘预警

当磁盘空间小于设定值时,就会出现磁盘预警。

内存换页

-------------本文结束感谢您的阅读-------------