RabbitMQ 队列相关消息 1.Provider 消息生产者,就是投递消息的程序。
2.Consumer 消息消费者,就是接受消息的程序。
3.没有使用消息队列时消息传递方式
4.使用消息队列后消息传递方式
5.什么是队列? 队列就像存放了商品的仓库或者商店,是生产商品的工厂和购买商品的用户之间的中转站。
6.队列里存储了什么? 在 rabbitMQ 中,信息流从你的应用程序出发,来到 Rabbitmq 的队列,所有信息可以只存储在一个队列中。队列可以存储很多信息,因为它基本上是一个无限制的缓冲区,前提是你的机器有足够的存储空间。
7.队列和应用程序的关系? 多个生产者可以将消息发送到同一个队列中,多个消息者也可以只从同一个队列接收数据。
RabbitMQ相关概念 1.Message 消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由 一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先 权)、delivery-mode(指出消息可能持久性存储)等。
2.Publisher 消息的生产者。也是一个向交换器发布消息的客户端应用程序。
3.Consumer 消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
4.Exchange 交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 三种常用的交换器类型
direct(发布与订阅 完全匹配)
fanout(广播)
topic(主题,规则匹配)
5.Binding 绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息 队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
6.Queue 消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一 个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取 走。
7.Routing-key 路由键。RabbitMQ 决定消息该投递到哪个队列的规则。 队列通过路由键绑定到交换器。 消息发送到 MQ 服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ 也会将其 和绑定使用的路由键进行匹配。 如果相匹配,消息将会投递到该队列。 如果不匹配,消息将会进入黑洞。
8.Connection 链接。指 rabbit 服务器和服务建立的 TCP 链接。
9.Channel 信道。 1,Channel 中文叫做信道,是 TCP 里面的虚拟链接。例如:电缆相当于 TCP,信道是 一个独立光纤束,一条 TCP 连接上创建多条信道是没有问题的。 2,TCP 一旦打开,就会创建 AMQP 信道。 3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
10.Virtual Host 虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证 和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定, RabbitMQ 默认的 vhost 是/
11.Borker 表示消息队列服务器实体。
12.交换器和队列的关系 交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的 路由键匹配,那么消息就会被路由到该绑定的队列中。 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由 键匹配分发消息到具体的队列中。 路由键可以理解为匹配的规则。
13.RabbitMQ 为什么需要信道?为什么不是 TCP 直接通信?
TCP 的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次分手。
如果不用信道,那应用程序就会以 TCP 链接 Rabbit,高峰时每秒成千上万条链接 会造成资源巨大的浪费,而且操作系统每秒处理 TCP 链接数也是有限制的,必定造成性能 瓶颈。
信道的原理是一条线程一条通道,多条线程多条通道同用一条 TCP 链接。一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
安装RabbitMQ Windows
Linux
安装erlang:yum install erlang
,如报错No package erlang available,需要安装EPEL库。
安装wget:yum -y install wget
安装EPEL库:
启动RabbitMQ,并验证启动情况:rabbitmq-server --detached &ps aux |grep rabbitmq
以服务的方式启动:service rabbitmq-server start
检查5672端口是否打开:
重启RabbitMQ:
Docker 使用docker镜像中国下载Rabbitmq镜像,选择带有management的,因为这个是有WEB界面:
1 2 # 使用docker镜像中国下载Rabbitmq镜像,选择带有management的,因为这个是有WEB界面。 [root@docker ~]# docker pull registry.docker-cn.com/library/rabbitmq:3.7-management
选择官方的:
我选择的是这个3.7版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 查看镜像 [root@docker ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE registry.docker-cn.com/library/rabbitmq 3.7-management 24cb552c7c00 12 days ago 212 MB# 运行容器 [root@docker ~]# docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq 24cb552c7c00# 查看进程 [root@docker ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 73943a64f336 24cb552c7c00 "docker-entrypoint..." 7 minutes ago Up 7 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq [root@docker ~]# # 关闭防火墙设置开机不启动 [root@docker ~]# systemctl stop firewalld [root@docker ~]# systemctl disable firewalld
此时就可以登录Rabbitmq的WEB界面了,访问地址是[ip:15672]默认用户名和密码都是guest。
RabbitMQ交换器 Direct交换器
发布与订阅,完全匹配。
生产者 pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion>4.0 .0 </modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent </artifactId> <version>2.1 .4 .RELEASE</version> <relativePath/> <!-- lookup parent from repository -- > </parent> <groupId>com.lzhpo.rabbitmq</groupId> <artifactId>rabbitmq-direct-provider </artifactId> <version>0.0 .1 -SNAPSHOT </version> <name>rabbitmq-direct-provider </name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8 </java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp </artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web </artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test </artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin </artifactId> </plugin> </plugins> </build> </project>
application.properties 1 2 3 4 5 6 7 8 9 10 11 12 13 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =log.direct mq.config.queue.info.routing.key =log.info.routing.key mq.config.queue.error.routing.key =log.error.routing.key mq.config.queue.error =log.error
Sender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.lzhpo.rabbitmq.rabbitmqdirectprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class Sender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; @Value("${mq.config.queue.error.routing.key}") private String routingkey; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange, this .routingkey, msg); } }
RabbitmqDirectProviderApplicationTests测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.lzhpo.rabbitmq.rabbitmqdirectprovider;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqDirectProviderApplicationTests { @Autowired private Sender sender; @Test public void contextLoads () throws Exception { while (true ){ Thread.sleep(1000 ); this .sender.send("Hello RabbitMQ" ); } } }
消费者 pom.xml和生产者的一样。 application.properties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =log.direct mq.config.queue.info =log.info mq.config.queue.info.routing.key =log.info.routing.key mq.config.queue.error =log.error mq.config.queue.error.routing.key =log.error.routing.key
ErrorReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqdirectconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.error}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT), key="${mq.config.queue.error.routing.key}" ) ) public class ErrorReceiver { @RabbitHandler public void process (String msg) { System.out.println("Error..........receiver: " +msg); } }
InfoReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqdirectconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.info}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT), key="${mq.config.queue.info.routing.key}" ) ) public class InfoReceiver { @RabbitHandler public void process (String msg) { System.out.println("Info........receiver: " +msg); } }
Main 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.lzhpo.rabbitmq.rabbitmqdirectconsumer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitmqDirectConsumerApplication { public static void main (String[] args) { SpringApplication.run(RabbitmqDirectConsumerApplication.class, args); } }
测试结果 先启动消费者,再运行生产者的测试类RabbitmqDirectProviderApplicationTests
测试类。
Topic交换器
主题,规则匹配。
生产者 application.properties 1 2 3 4 5 6 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =log.topic
OrderSender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.lzhpo.rabbitmq.rabbitmqtopicprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class OrderSender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange,"order.log.debug" , "order.log.debug....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"order.log.info" , "order.log.info....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"order.log.warn" ,"order.log.warn....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"order.log.error" , "order.log.error....." +msg); } }
ProductSender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.lzhpo.rabbitmq.rabbitmqtopicprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class ProductSender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange,"product.log.debug" , "product.log.debug....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"product.log.info" , "product.log.info....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"product.log.warn" ,"product.log.warn....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"product.log.error" , "product.log.error....." +msg); } }
UserSender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.lzhpo.rabbitmq.rabbitmqtopicprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class UserSender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange,"user.log.debug" , "user.log.debug....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"user.log.info" , "user.log.info....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"user.log.warn" ,"user.log.warn....." +msg); this .rabbitAmqpTemplate.convertAndSend(this .exchange,"user.log.error" , "user.log.error....." +msg); } }
RabbitmqTopicProviderApplicationTests测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.lzhpo.rabbitmq.rabbitmqtopicprovider;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqTopicProviderApplicationTests { @Autowired private UserSender usersender; @Autowired private ProductSender productsender; @Autowired private OrderSender ordersender; @Test public void contextLoads () { this .usersender.send("UserSender....." ); this .productsender.send("ProductSender...." ); this .ordersender.send("OrderSender......" ); } }
消费者 application.properties 1 2 3 4 5 6 7 8 9 10 11 12 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =log.topic mq.config.queue.info =log.info mq.config.queue.error =log.error mq.config.queue.logs =log.all
ErrorReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqtopicconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.error}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC), key="*.log.error" ) ) public class ErrorReceiver { @RabbitHandler public void process (String msg) { System.out.println("......Error........receiver: " +msg); } }
InfoReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqtopicconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.info}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC), key="*.log.info" ) ) public class InfoReceiver { @RabbitHandler public void process (String msg) { System.out.println("......Info........receiver: " +msg); } }
LogsReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqtopicconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.logs}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC), key="*.log.*" ) ) public class LogsReceiver { @RabbitHandler public void process (String msg) { System.out.println("......All........receiver: " +msg); } }
Main 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.lzhpo.rabbitmq.rabbitmqtopicconsumer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitmqTopicConsumerApplication { public static void main (String[] args) { SpringApplication.run(RabbitmqTopicConsumerApplication.class, args); } }
测试结果 先启动消费者,然后再运行RabbitmqTopicProviderApplicationTests
测试类。
Fanout交换器
广播。
生产者 application.properties 1 2 3 4 5 6 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =order.fanout
Sender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.lzhpo.rabbitmq.rabbitmqfanoutprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class Sender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange,"" , msg); } }
RabbitmqFanoutProviderApplicationTests测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.lzhpo.rabbitmq.rabbitmqfanoutprovider;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqFanoutProviderApplicationTests { @Autowired private Sender sender; @Test public void contextLoads () throws Exception { while (true ){ Thread.sleep(1000 ); this .sender.send("Hello RabbitMQ" ); } } }
消费者 application.properties 1 2 3 4 5 6 7 8 9 10 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =order.fanout mq.config.queue.sms =order.sms mq.config.queue.push =order.push
SmsReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqfanoutconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.sms}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT) ) ) public class SmsReceiver { @RabbitHandler public void process (String msg) { System.out.println("Sms........receiver: " +msg); } }
PushReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.lzhpo.rabbitmq.rabbitmqfanoutconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.push}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT) ) ) public class PushReceiver { @RabbitHandler public void process (String msg) { System.out.println("Error..........receiver: " +msg); } }
Main 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.lzhpo.rabbitmq.rabbitmqfanoutconsumer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitmqFanoutConsumerApplication { public static void main (String[] args) { SpringApplication.run(RabbitmqFanoutConsumerApplication.class, args); } }
测试结果 先启动消费者,然后再启动生产者RabbitmqFanoutProviderApplicationTests
测试类。
使用 RabbitMQ 实现松耦合设计
生产者 application.properties 1 2 3 4 5 6 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =order.fanout
Sender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.lzhpo.rabbitmq.rabbitmqfanoutouheprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class Sender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange,"" , msg); } }
RabbitmqFanoutOuheProviderApplicationTests测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.lzhpo.rabbitmq.rabbitmqfanoutouheprovider;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqFanoutOuheProviderApplicationTests { @Autowired private Sender sender; @Test public void contextLoads () { this .sender.send("Hello RabbitMQ" ); } }
消费者 application.properties 1 2 3 4 5 6 7 8 9 10 11 12 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =order.fanout mq.config.queue.sms =order.sms mq.config.queue.push =order.push mq.config.queue.red =red
PushReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.lzhpo.rabbitmq.rabbitmqfanoutouheconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.push}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT) ) ) public class PushReceiver { @RabbitHandler public void process (String msg) { System.out.println("Push..........receiver: " +msg); } }
RedReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqfanoutouheconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.red}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT) ) ) public class RedReceiver { @RabbitHandler public void process (String msg) { System.out.println("给用户发送10元红包........receiver: " +msg); } }
SmsReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqfanoutouheconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.sms}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT) ) ) public class SmsReceiver { @RabbitHandler public void process (String msg) { System.out.println("Sms........receiver: " +msg); } }
Main 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.lzhpo.rabbitmq.rabbitmqfanoutouheconsumer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitmqFanoutOuheConsumerApplication { public static void main (String[] args) { SpringApplication.run(RabbitmqFanoutOuheConsumerApplication.class, args); } }
测试结果 先运行消费者,然后再运行生产者的RabbitmqFanoutOuheProviderApplicationTests
测试类。
RabbitMQ消息处理
消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保证消息可靠性的呢——消息持久化。
RabbitMQ的消息持久化处理 生产者 application.properties 1 2 3 4 5 6 7 8 9 10 11 12 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =log.direct mq.config.queue.info.routing.key =log.info.routing.key mq.config.queue.error.routing.key =log.error.routing.key mq.config.queue.error =log.error
Sender 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.lzhpo.rabbitmq.rabbitmqdurabledirectprovider;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;@Component public class Sender { @Autowired private AmqpTemplate rabbitAmqpTemplate; @Value("${mq.config.exchange}") private String exchange; @Value("${mq.config.queue.error.routing.key}") private String routingkey; public void send (String msg) { this .rabbitAmqpTemplate.convertAndSend(this .exchange, this .routingkey, msg); } }
RabbitmqDurableDirectProviderApplicationTests测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.lzhpo.rabbitmq.rabbitmqdurabledirectprovider;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqDurableDirectProviderApplicationTests { @Autowired private Sender sender; @Test public void contextLoads () throws Exception { int flag = 0 ; while (true ){ flag++; Thread.sleep(2000 ); this .sender.send("Hello RabbitMQ " +flag); } } }
消费者 application.properties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring.rabbitmq.host =localhost spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest mq.config.exchange =log.direct mq.config.queue.info =log.info mq.config.queue.info.routing.key =log.info.routing.key mq.config.queue.error =log.error mq.config.queue.error.routing.key =log.error.routing.key
ErrorReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.error}",autoDelete="false"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT), key="${mq.config.queue.error.routing.key}" ) ) public class ErrorReceiver { @RabbitHandler public void process (String msg) { System.out.println("Error..........receiver: " +msg); } }
InfoReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @RabbitListener( bindings=@QueueBinding( value=@Queue(value="${mq.config.queue.info}",autoDelete="true"), exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT), key="${mq.config.queue.info.routing.key}" ) ) public class InfoReceiver { @RabbitHandler public void process (String msg) { System.out.println("Info........receiver: " +msg); } }
Main 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.lzhpo.rabbitmq.rabbitmqdurabledirectconsumer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitmqDurableDirectConsumerApplication { public static void main (String[] args) { SpringApplication.run(RabbitmqDurableDirectConsumerApplication.class, args); } }
测试结果 先启动消费者,再运行RabbitmqDurableDirectProviderApplicationTests
测试类。
RabbitMQ中的消息确认ACK机制
什么是消息确认ACK?
如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那可能这条正常在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确认ACK。
ACK的消息确认机制?
ACK机制是消费者从RabbitMQ手到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
如果在集群的环境下:RabbitMQ会立即将这个消息推送给这个在线的其它消费者。这种机制保证了再消费者服务端故障的时候,不会丢失任何消息和任务。
消息永远不会从RabbitMQ中删除:只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的。
ACK机制的开发注意事项?
如果忘记了ACK,那么后果很严重。当Consumer退出时,Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。
修改 Consusmer 配置文件解决 ACK 反馈问题。
在application.properties
配置文件中添加以下
1 2 3 4 5 spring.rabbitmq.listener.retry.enabled =true spring.rabbitmq.listener.retry.max-attempts =5
RabbitMQ六种消息模式 pom依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client </artifactId> <version>4.0 .2 </version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api </artifactId> <version>1.7 .10 </version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12 </artifactId> <version>1.7 .5 </version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2 .17 </version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11 </version> </dependency>
RabbitMQ的连接工具(我单独写出来了一个工具类,方便使用):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.lzhpo.rabbitmq;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnectionUtils { public static Connection getConnection () throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("127.0.0.1" ); factory.setPort(5672 ); factory.setVirtualHost("/" ); factory.setUsername("guest" ); factory.setPassword("guest" ); return factory.newConnection(); } }
简单队列
简单队列:一个生产者P发送消息到队列Q,一个消费者C接收。
生产者(Send) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.lzhpo.rabbitmq.model5.simplequeues;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private static final String QUEUE_NAME = "test_simple_queue" ; public static void main (String[] args) throws Exception{ Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String msg = "hello simple!" ; channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); System.out.println("---send msg:" +msg); channel.close(); connection.close(); } }
消费者(Recv) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package com.lzhpo.rabbitmq.model5.simplequeues;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv { private static final String QUEUE_NAME = "test_simple_queue" ; public static void main (String[] args) throws Exception{ newApi(); } private static void newApi () throws Exception{ Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("new api recv:" + msg); } }; channel.basicConsume(QUEUE_NAME, true , consumer); } private static void oldApi () throws Exception{ Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); QueueingConsumer consumer = new QueueingConsumer (channel); channel.basicConsume(QUEUE_NAME, true , consumer); while (true ){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msgString = new String (delivery.getBody()); System.out.println("[recv] msg: " +msgString); } } }
工作队列 轮询分发
【轮询分发】:结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分。
生产者(Send) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package com.lzhpo.rabbitmq.model5.workqueues.lunxun;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private final static String QUEUE_NAME = "test_queue_work" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); for (int i = 0 ; i < 50 ; i++) { String message = "." +i; channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); System.out.println(" [x] Sent '" +message +"'" ); Thread.sleep(i*10 ); } channel.close(); connection.close(); } }
消费者1(Recv1) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package com.lzhpo.rabbitmq.model5.workqueues.lunxun;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv1 { private final static String QUEUE_NAME = "test_queue_work" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [1] Received '" + message + "'" ); try { doWork(message); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done" ); } } }; boolean autoAck = true ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork (String task) throws InterruptedException { Thread.sleep(1000 ); } @SuppressWarnings("unused") public static void oldAPi () throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); QueueingConsumer consumer = new QueueingConsumer (channel); channel.basicConsume(QUEUE_NAME, true , consumer); while (true ) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String (delivery.getBody()); System.out.println(" [x] Received '" + message + "'" ); } } }
消费者2(Recv2) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.lzhpo.rabbitmq.model5.workqueues.lunxun;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv2 { private final static String QUEUE_NAME = "test_queue_work" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [2] Received '" + message + "'" ); try { Thread.sleep(2000 ); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done" ); } } }; boolean autoAck = true ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
公平分发
使用公平分发,必须关闭自动应答,改为手动应答。
生产者(Send) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.lzhpo.rabbitmq.model5.workqueues.gongping;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private final static String QUEUE_NAME = "test_queue_work" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); int prefetchCount = 1 ; channel.basicQos(prefetchCount); for (int i = 0 ; i < 50 ; i++) { String message = "." + i; channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); System.out.println(" [x] Sent '" + message + "'" ); Thread.sleep(i * 10 ); } channel.close(); connection.close(); } }
消费者1(Recv1) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.lzhpo.rabbitmq.model5.workqueues.gongping;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv1 { private final static String QUEUE_NAME = "test_queue_work" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [1] Received '" + message + "'" ); try { doWork(message); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork (String task) throws InterruptedException { Thread.sleep(1000 ); } }
消费者2(Recv2) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package com.lzhpo.rabbitmq.model5.workqueues.gongping;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv2 { private final static String QUEUE_NAME = "test_queue_work" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [2] Received '" + message + "'" ); try { doWork(message); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork (String task) throws InterruptedException { Thread.sleep(2000 ); } public static void oldAPi () throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); QueueingConsumer consumer = new QueueingConsumer (channel); channel.basicConsume(QUEUE_NAME, false , consumer); while (true ) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String (delivery.getBody()); System.out.println(" [x] Received '" + message + "'" ); Thread.sleep(1000 ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false ); } } }
消息订阅模式
【订阅模式】:一个消息被多个消费者消费。
一个生产者,多个消费者。
每一个消费者都有自己的队列。
生产者没有直接把消息发送到队列,而是发送到了交换机、转发器exchange。
每个队列都要绑定到交换机上。
生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费。
生产者(Send) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.lzhpo.rabbitmq.model5.subscribeModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String msg = "Hello PB" ; channel.basicPublish(EXCHANGE_NAME, "" , null , msg.getBytes()); System.out.println("Send: " +msg); channel.close(); connection.close(); } }
消费者1(Recv1) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.lzhpo.rabbitmq.model5.subscribeModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv1 { private final static String QUEUE_NAME = "test_queue_fanout_email" ; private final static String EXCHANGE_NAME = "test_exchange_fanout" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "" ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("[1] Recv msg:" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done " ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2(Recv2) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.lzhpo.rabbitmq.model5.subscribeModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv2 { private final static String QUEUE_NAME = "test_queue_fanout_sms" ; private final static String EXCHANGE_NAME = "test_exchange_fanout" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done " ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
路由模式
发送消息到交换机并且要指定路由key 。
消费者将队列绑定到交换机时需要指定路由key。
生产者(Send) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.lzhpo.rabbitmq.model5.routingModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct" ); String msg = "hello direct!" ; String routingKey = "warning" ; channel.basicPublish(EXCHANGE_NAME, routingKey, null , msg.getBytes()); System.out.println("-------------send: " +msg); channel.close(); connection.close(); } }
消费者1(Recv1) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package com.lzhpo.rabbitmq.model5.routingModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv1 { private final static String QUEUE_NAME = "test_queue_direct_1" ; private final static String EXCHANGE_NAME = "test_exchange_direct" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error" ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("[1] Recv msg:" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done " ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2(Recv2) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package com.lzhpo.rabbitmq.model5.routingModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv2 { private final static String QUEUE_NAME = "test_queue_direct_2" ; private final static String EXCHANGE_NAME = "test_exchange_direct" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error" ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info" ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning" ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done " ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
主题模式
Topic主题模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
生产者(Send) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.lzhpo.rabbitmq.model5.topicModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String message = "id=1001" ; channel.basicPublish(EXCHANGE_NAME, "item.delete" , null , message.getBytes()); System.out.println(" [x] Sent '" + message + "'" ); channel.close(); connection.close(); } }
消费者1(Recv1) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.lzhpo.rabbitmq.model5.topicModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv1 { private final static String QUEUE_NAME = "test_queue_topic_1" ; private final static String EXCHANGE_NAME = "test_exchange_topic" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update" ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete" ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done " ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2(Recv2) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.lzhpo.rabbitmq.model5.topicModel;import com.lzhpo.rabbitmq.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_2" ; private final static String EXCHANGE_NAME = "test_exchange_topic" ; public static void main (String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#" ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "utf-8" ); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done " ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
RPC远程调用模式
前面学习了如何使用work队列在多个worker之间分配任务,但是如果需要在远程机器上运行个函数并等待结果,就需要使用RPC(远程过程调用)模式来实现。
参考官网教程【模拟RPC服务来返回斐波那契数列】:https://www.rabbitmq.com/tutorials/tutorial-six-java.html