昨天,有位朋友在面试的时候,问实际项目中有没有用过消息队列,这位朋友
表示自己只是学了点理论,没有在项目中用过。所以,今天就给安排上。
Rabbitmq 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 实现,是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。消息传递指的是应用程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此通信,直接调用通常是指远程过程调用的技术。
Simple 模式是最简单的一个模式,由一个生产者,一个队列,一个消费者组成,生产者将消息通过交换机(此时,图中并没有交换机的概念,如不定义交换机,会使用默认的交换机)把消息存储到队列,消费者从队列中取出消息进行处理。
用 Java demo 实现此模式
Productor
public class Send {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、创建连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello world";
// 4、发送消息到指定队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException | IOException e) {
e.printStackTrace();
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Customer
public class Recv {
private final static String QUEUE_NAME = "queue1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setVirtualHost("/");
// 2、获取 Connection和 Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
观察可视化界面,会看到消息先会被写入到队列中,随后又被消费者消费了。
Fanout——发布订阅模式,是一种广播机制。
此模式包括:一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将消息发送到交换机,交换机不存储消息,将消息存储到队列,消费者从队列中取消息。如果生产者将消息发送到没有绑定队列的交换机上,消息将丢失。
用 Java demo 实现此模式
Productor
public class Productor {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 消息内容
String message = "hello fanout mode";
// 指定路由key
String routeKey = "";
String type = "fanout";
// 3、声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、声明队列
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueDeclare("queue3", true, false, false, null);
channel.queueDeclare("queue4", true, false, false, null);
// 5、绑定 channel 与 queue
channel.queueBind("queue1", EXCHANGE_NAME, routeKey);
channel.queueBind("queue2", EXCHANGE_NAME, routeKey);
channel.queueBind("queue3", EXCHANGE_NAME, routeKey);
channel.queueBind("queue4", EXCHANGE_NAME, routeKey);
// 6、发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息发送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息发送异常");
}finally {
// 关闭通道和连接......
}
}
}
Customer
public class Customer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName + ":开始接收消息");
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道和连接......
}
}
};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建线程分别从四个队列中获取消息
new Thread(runnable, "queue1").start();
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
new Thread(runnable, "queue4").start();
}
}
执行完 Productor 发现四个队列中分别增加了一条消息,而执行完 Customer 后四个队列中的消息都被消费者消费了。
Direct 模式是在 Fanout 模式基础上添加了 routing key,Fanout(发布/订阅)模式是交换机将消息存储到所有绑定的队列中,而 Direct 模式是在此基础上,添加了过滤条件,交换机只会将消息存储到满足 routing key 的队列中。
在上图中,我们可以看到交换机绑定了两个队列,其中队列 Q1绑定的 routing key 为 “orange” ,队列Q2绑定的routing key 为 “black” 和 “green”。在这样的设置中,发布 routing key 为 “orange” 的消息将被路由到 Q1,routing key 为 “black” 或 “green” 的消息将被路由到 Q2
在 rabbitmq 中给队列绑定 routing_key,routing_key 必须是单词列表
用 Java demo 实现此模式
Productor
public class Productor {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 消息内容
String message = "hello direct mode";
// 指定路由key
String routeKey = "email";
String type = "direct";
// 3、声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、声明队列
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueDeclare("queue3", true, false, false, null);
// 5、绑定 channel 与 queue
channel.queueBind("queue1", EXCHANGE_NAME, "email");
channel.queueBind("queue2", EXCHANGE_NAME, "sms");
channel.queueBind("queue3", EXCHANGE_NAME, "vx");
// 6、发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息发送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 关闭通道和连接......
}
}
}
可以通过可视化页面查看,各队列绑定的 routing_key
由于设置的 routing_key为 “email”,所以,应该只有 queue1 存储了一条消息。
Customer 与上述 fanout 示例一致。
Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;如果一个队列也没匹配上,该消息将丢失。
routing_key 必须是单词列表,用点分隔,其中 * 和 # 的含义为:
用Java demo 实现此模式
Productor
public class Productor {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 消息内容
String message = "hello topic mode";
// 指定路由key
String routeKey = "com.order.test.xxx";
String type = "topic";
// 3、声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
// 5、绑定 channel 与 queue
channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");
channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*");
// 6、发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息发送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 关闭通道和连接......
}
}
}
执行完 Productor 后,通过可视化页面查看到,queue 绑定的 routing_key
由于上述例子中,routing_key为:“com.order.test.xxx”,那么 queue5 和 queue6 都将接收到消息。
Customer 与上述实例一样,执行完 Customer 后,再次查看队列信息,queue5 和 queue6 的消息都被消费了。
当有多个消费者时,如何均衡消息者消费消息的多少,主要有两种模式:
在这种模式下,rabbitmq 采用轮询的方式将任务分配给多个消费者,但可能出现一种情况,当分配给某一个消费者的任务很复杂时,而有些消费者接收的任务较轻量,会出现有的消费者很忙,而有的消费者处于空闲的状态,而 rabbitmq 不会感知到这种情况的发生,rabbitmq 不考虑消费者未确认消息的数量,只是盲目的分配任务。
用 Java demo 实现此模式
Productor
public class Productor {
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、向 Queue1 发布20个消息
for (int i = 0; i < 20; i++) {
String msg = "feiyangyang: " + i;
channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 关闭通道和连接......
}
}
}
Worker1
public class Worker1 {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.96.109");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 获取连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("Worker1 开始接收消息");
System.in.read();
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道和连接......
}
}
}
Worker2 与 Worker1 相同
我们看下消息分发结果:
Worker1 开始接收消息
Worker1:收到消息是:feiyangyang: 0
Worker1:收到消息是:feiyangyang: 2
Worker1:收到消息是:feiyangyang: 4
Worker1:收到消息是:feiyangyang: 6
Worker1:收到消息是:feiyangyang: 8
Worker1:收到消息是:feiyangyang: 10
Worker1:收到消息是:feiyangyang: 12
Worker1:收到消息是:feiyangyang: 14
Worker1:收到消息是:feiyangyang: 16
Worker1:收到消息是:feiyangyang: 18
Worker2 开始接收消息
Worker2:收到消息是:feiyangyang: 1
Worker2:收到消息是:feiyangyang: 3
Worker2:收到消息是:feiyangyang: 5
Worker2:收到消息是:feiyangyang: 7
Worker2:收到消息是:feiyangyang: 9
Worker2:收到消息是:feiyangyang: 11
Worker2:收到消息是:feiyangyang: 13
Worker2:收到消息是:feiyangyang: 15
Worker2:收到消息是:feiyangyang: 17
Worker2:收到消息是:feiyangyang: 19
可以看出,轮询分发模式就是将消息均衡的分配所有消费者。
[]
为了解决 Work 轮询分发模式 这个问题,rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。
Productor 代码与上述轮询模式相同,而 Customer 中稍作修改
Worker1
// Channel 使用 Qos 机制
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
try {
Thread.sleep(1000);
// 改成手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
上述实例相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息,将 Worker1 的 sleep 时间设置为 1s,将 Worker2 的 sleep 时间设置为 2s,查看消息分发结果
Worker1 开始接收消息
Worker1:收到消息是:feiyangyang: 0
Worker1:收到消息是:feiyangyang: 2
Worker1:收到消息是:feiyangyang: 4
Worker1:收到消息是:feiyangyang: 5
Worker1:收到消息是:feiyangyang: 7
Worker1:收到消息是:feiyangyang: 8
Worker1:收到消息是:feiyangyang: 10
Worker1:收到消息是:feiyangyang: 11
Worker1:收到消息是:feiyangyang: 13
Worker1:收到消息是:feiyangyang: 14
Worker1:收到消息是:feiyangyang: 16
Worker1:收到消息是:feiyangyang: 17
Worker1:收到消息是:feiyangyang: 19
Worker2 开始接收消息
Worker2:收到消息是:feiyangyang: 1
Worker2:收到消息是:feiyangyang: 3
Worker2:收到消息是:feiyangyang: 6
Worker2:收到消息是:feiyangyang: 9
Worker2:收到消息是:feiyangyang: 12
Worker2:收到消息是:feiyangyang: 15
Worker2:收到消息是:feiyangyang: 18
当使用 Work 公平分发模式时,要设置消费者为手动应答,并且开启 Qos 机制。
消费者完成一项任务可能需要几秒钟,如果其中一个消费者开始了一项长期任务并且只完成了部分任务而死亡,如果将 autoAck 设置为 true ,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除,在这种情况下,我们将丢失所有已分派给该特定消费者但尚未处理的消息。
[如果其中一个消费者宕了,rabbitmq 可以将其消息分配给其他消费者。为了确保消息不会丢失,rabbitmq 采用消息确认,消费者发回确认消息,告诉 rabbitmq 消息已经被接收并处理,此时,rabbitmq 可以放心的删除这条消息。]
如果消费者在没有发送 ack 的情况下宕了,rabbitmq 将理解为该条消息未被消费者处理完,如果有其他消费者在线,将迅速重新交付给其他消费者,这样就可以确保不会丢失消息了。
默认情况下rabbitmq 会启用手动消息确认,也就是 autoAck 默认为 false,一旦我们完成了一项任务,需要手动的进行消息确认,所以 autoAck 需要保持为默认值 false,并使用如下方法进行手动应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
rabbitmq 的消息确认机制可以保证消息不会丢失,但是如果 rabbitmq 服务器停止,我们的任务仍然会丢失。
当 rabbitmq 退出或崩溃时,如果不进行持久化,队列和消息都会消失。需要做两件事来确保消息不会丢失,将队列和消息都标记为持久的。
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
2 . 设置消息持久
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
将消息标记为持久性并不能完全保证消息不会丢失,当 rabbitmq 接收到消息并且还没保存时,仍然有很短的时间窗口会使消息丢失,如果需要更强的保证,可以使用发布者确认机制。
解耦
在微服务架构体系中,微服务A需要与微服务B进行通信,传统的做法是A调用B的接口。但这样做如果系统B无法访问或连接超时,系统A需要等待,直到系统B做出响应,并且A与B存在严重的耦合现象。如果引入消息队列进行系统AB的通信,流程是这样的:
系统A将消息放到队列中,就不用关心系统B是否可以获取等其他事情了,实现了两个系统间的解耦。
使用场景:
削峰
系统A每秒请求100个,系统可以稳定运行,但如果在秒杀活动中,每秒并发达到1w个,但系统最大处理能力只能每秒处理 1000 个,所以,在秒杀活动中,系统服务器会出现宕机的现象。如果引入 MQ ,可以解决这个问题。每秒 1w个请求会导致系统崩溃,那我们让用户发送的请求都存储到队列中,由于系统最大处理能力是每秒1000个请求,让系统A每秒只从队列中拉取1000个请求,保证系统能稳定运行,在秒杀期间,请求大量进入到队列,积压到MQ中,而系统每秒只从队列中取1000个请求处理。这种短暂的高峰期积压是没问题的,因为高峰期一旦过去,每秒请求数迅速递减,而系统每秒还是从队列中取1000个请求进行处理,系统会快速将积压的消息消费掉。
使用场景:
异步
用户注册,需要发送注册邮件和注册短信,传统的做法有两种:串行、并行。
使用场景:
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8